From ee98dbe0ca3ace0aa11b958e6840b2b9acc5e1e5 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 6 Mar 2024 13:10:35 -0500 Subject: [PATCH] Update AgentV1DisconnectSession api signature Will accept utils.CGREvent instead of utils.AttrDisconnectSession as a parameter. SessionSv1.ForceDisconnect will take utils.SessionFilterWithEvent as a parameter instead of *utils.SessionFilter. Added possibility to pass DisconnectCause as an Event parameter. The forceSTerminate that's called when the session timer expires will have DisconnectCause 'SESSION_TIMEOUT' instead of 'FORCED_DISCONNECT'. Added Dispatcher methods for AlterSessions. Event will be merged with EventStart of the session before being sent to AgentV1DisconnectSession. --- agents/astagent.go | 4 +- agents/diamagent.go | 8 +-- agents/fsagent.go | 7 ++- agents/kamagent.go | 8 +-- agents/radagent.go | 6 +- analyzers/analyzers_it_test.go | 2 +- apier/v1/api_interfaces.go | 3 +- apier/v1/dispatcher.go | 7 ++- apier/v1/sessions.go | 2 +- apier/v1/sessions_thresholds_it_test.go | 4 +- apier/v1/sessionsv1_it_test.go | 4 +- cmd/cgr-tester/sessions.go | 4 +- console/session_force_disconnect.go | 10 +-- dispatchers/sessions.go | 20 +++++- dispatchers/sessions_test.go | 16 +++-- sessions/libsessions.go | 2 +- sessions/sessions.go | 82 +++++++++++++++---------- sessions/sessions_birpc_it_test.go | 8 +-- sessions/sessionscover_test.go | 12 ++-- utils/apitpdata.go | 6 -- utils/consts.go | 7 +++ 21 files changed, 139 insertions(+), 83 deletions(-) diff --git a/agents/astagent.go b/agents/astagent.go index 94a57edae..172b08cdd 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -338,8 +338,8 @@ func (sma *AsteriskAgent) Call(ctx *context.Context, serviceMethod string, args } // V1DisconnectSession is internal method to disconnect session in asterisk -func (sma *AsteriskAgent) V1DisconnectSession(ctx *context.Context, args utils.AttrDisconnectSession, reply *string) error { - channelID := engine.NewMapEvent(args.EventStart).GetStringIgnoreErrors(utils.OriginID) +func (sma *AsteriskAgent) V1DisconnectSession(ctx *context.Context, cgrEv utils.CGREvent, reply *string) error { + channelID := engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.OriginID) sma.hangupChannel(channelID, "") *reply = utils.OK return nil diff --git a/agents/diamagent.go b/agents/diamagent.go index 0819e554a..cd74b849b 100644 --- a/agents/diamagent.go +++ b/agents/diamagent.go @@ -331,12 +331,12 @@ func (da *DiameterAgent) Call(ctx *context.Context, serviceMethod string, args a } // V1DisconnectSession is part of the sessions.BiRPClient -func (da *DiameterAgent) V1DisconnectSession(ctx *context.Context, args utils.AttrDisconnectSession, reply *string) (err error) { - ssID, has := args.EventStart[utils.OriginID] +func (da *DiameterAgent) V1DisconnectSession(ctx *context.Context, cgrEv utils.CGREvent, reply *string) (err error) { + ssID, has := cgrEv.Event[utils.OriginID] if !has { utils.Logger.Info( fmt.Sprintf("<%s> cannot disconnect session, missing OriginID in event: %s", - utils.DiameterAgent, utils.ToJSON(args.EventStart))) + utils.DiameterAgent, utils.ToJSON(cgrEv.Event))) return utils.ErrMandatoryIeMissing } originID := ssID.(string) @@ -347,7 +347,7 @@ func (da *DiameterAgent) V1DisconnectSession(ctx *context.Context, args utils.At case utils.MetaASR: return da.sendASR(originID, reply) case utils.MetaRAR: - return da.V1AlterSession(ctx, utils.CGREvent{Event: args.EventStart}, reply) + return da.V1AlterSession(ctx, utils.CGREvent{Event: cgrEv.Event}, reply) default: return fmt.Errorf("Unsupported request type <%s>", da.cgrCfg.DiameterAgentCfg().ForcedDisconnect) } diff --git a/agents/fsagent.go b/agents/fsagent.go index 0dc7c0cc7..24899895c 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -394,9 +394,10 @@ func (fsa *FSsessions) Call(ctx *context.Context, serviceMethod string, args any } // V1DisconnectSession internal method to disconnect session in FreeSWITCH -func (fsa *FSsessions) V1DisconnectSession(ctx *context.Context, args utils.AttrDisconnectSession, reply *string) (err error) { - ev := engine.NewMapEvent(args.EventStart) +func (fsa *FSsessions) V1DisconnectSession(ctx *context.Context, cgrEv utils.CGREvent, reply *string) (err error) { + ev := engine.NewMapEvent(cgrEv.Event) channelID := ev.GetStringIgnoreErrors(utils.OriginID) + disconnectCause := ev.GetStringIgnoreErrors(utils.DisconnectCause) connIdx, err := ev.GetTInt64(FsConnID) if err != nil { utils.Logger.Err( @@ -411,7 +412,7 @@ func (fsa *FSsessions) V1DisconnectSession(ctx *context.Context, args utils.Attr } if err = fsa.disconnectSession(int(connIdx), channelID, utils.FirstNonEmpty(ev.GetStringIgnoreErrors(CALL_DEST_NR), ev.GetStringIgnoreErrors(SIP_REQ_USER)), - args.Reason); err != nil { + disconnectCause); err != nil { return } *reply = utils.OK diff --git a/agents/kamagent.go b/agents/kamagent.go index 1fb75f177..e0e0d8b85 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -395,10 +395,10 @@ func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisco } // Internal method to disconnect session in Kamailio -func (ka *KamailioAgent) V1DisconnectSession(ctx *context.Context, args utils.AttrDisconnectSession, reply *string) (err error) { - hEntry := utils.IfaceAsString(args.EventStart[KamHashEntry]) - hID := utils.IfaceAsString(args.EventStart[KamHashID]) - connIdxIface, has := args.EventStart[EvapiConnID] +func (ka *KamailioAgent) V1DisconnectSession(ctx *context.Context, cgrEv utils.CGREvent, reply *string) (err error) { + hEntry := utils.IfaceAsString(cgrEv.Event[KamHashEntry]) + hID := utils.IfaceAsString(cgrEv.Event[KamHashID]) + connIdxIface, has := cgrEv.Event[EvapiConnID] if !has { utils.Logger.Err( fmt.Sprintf("<%s> error: <%s:%s> when attempting to disconnect <%s:%s> and <%s:%s>", diff --git a/agents/radagent.go b/agents/radagent.go index ce32a2a61..005d1745d 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -493,8 +493,8 @@ func (*RadiusAgent) V1GetActiveSessionIDs(_ *context.Context, _ string, _ *[]*se } // V1DisconnectSession remotely disconnects a session by making use of the RADIUS Disconnect Message functionality. -func (ra *RadiusAgent) V1DisconnectSession(_ *context.Context, attr utils.AttrDisconnectSession, reply *string) error { - ifaceOriginID, has := attr.EventStart[utils.OriginID] +func (ra *RadiusAgent) V1DisconnectSession(_ *context.Context, cgrEv utils.CGREvent, reply *string) error { + ifaceOriginID, has := cgrEv.Event[utils.OriginID] if !has { return utils.NewErrMandatoryIeMissing(utils.OriginID) } @@ -503,7 +503,7 @@ func (ra *RadiusAgent) V1DisconnectSession(_ *context.Context, attr utils.AttrDi reqVars := &utils.DataNode{ Type: utils.NMMapType, Map: map[string]*utils.DataNode{ - utils.DisconnectCause: utils.NewLeafNode(attr.Reason), + utils.DisconnectCause: utils.NewLeafNode(cgrEv.Event[utils.DisconnectCause]), }, } diff --git a/analyzers/analyzers_it_test.go b/analyzers/analyzers_it_test.go index bde2a1714..2d6167eaf 100644 --- a/analyzers/analyzers_it_test.go +++ b/analyzers/analyzers_it_test.go @@ -270,6 +270,6 @@ func testAnalyzerSKillEngine(t *testing.T) { type smock struct{} func (*smock) DisconnectPeer(ctx *context.Context, - args *utils.AttrDisconnectSession, reply *string) error { + args *utils.DPRArgs, reply *string) error { return utils.ErrNotFound } diff --git a/apier/v1/api_interfaces.go b/apier/v1/api_interfaces.go index de4d3e720..25de9b924 100644 --- a/apier/v1/api_interfaces.go +++ b/apier/v1/api_interfaces.go @@ -90,7 +90,8 @@ type SessionSv1Interface interface { GetCost(ctx *context.Context, args *sessions.V1ProcessEventArgs, rply *sessions.V1GetCostReply) error GetActiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error GetActiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) error - ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, rply *string) error + ForceDisconnect(ctx *context.Context, args utils.SessionFilterWithEvent, rply *string) error + AlterSessions(ctx *context.Context, args utils.SessionFilterWithEvent, rply *string) error GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, rply *[]*sessions.ExternalSession) error GetPassiveSessionsCount(ctx *context.Context, args *utils.SessionFilter, rply *int) error Ping(ctx *context.Context, ign *utils.CGREvent, reply *string) error diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index eb880f621..48a55d045 100644 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -519,11 +519,16 @@ func (dS *DispatcherSessionSv1) GetActiveSessionsCount(ctx *context.Context, arg return dS.dS.SessionSv1GetActiveSessionsCount(ctx, args, reply) } -func (dS *DispatcherSessionSv1) ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, +func (dS *DispatcherSessionSv1) ForceDisconnect(ctx *context.Context, args utils.SessionFilterWithEvent, reply *string) (err error) { return dS.dS.SessionSv1ForceDisconnect(ctx, args, reply) } +func (dS *DispatcherSessionSv1) AlterSessions(ctx *context.Context, args utils.SessionFilterWithEvent, + reply *string) (err error) { + return dS.dS.SessionSv1AlterSessions(ctx, args, reply) +} + func (dS *DispatcherSessionSv1) GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, reply *[]*sessions.ExternalSession) (err error) { return dS.dS.SessionSv1GetPassiveSessions(ctx, args, reply) diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index 290342e4f..bcc3c7541 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -100,7 +100,7 @@ func (ssv1 *SessionSv1) GetActiveSessionsCount(ctx *context.Context, args *utils return ssv1.sS.BiRPCv1GetActiveSessionsCount(ctx, args, rply) } -func (ssv1 *SessionSv1) ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, +func (ssv1 *SessionSv1) ForceDisconnect(ctx *context.Context, args utils.SessionFilterWithEvent, rply *string) error { return ssv1.sS.BiRPCv1ForceDisconnect(ctx, args, rply) } diff --git a/apier/v1/sessions_thresholds_it_test.go b/apier/v1/sessions_thresholds_it_test.go index 6d8e093b8..bc50fc5fe 100644 --- a/apier/v1/sessions_thresholds_it_test.go +++ b/apier/v1/sessions_thresholds_it_test.go @@ -40,7 +40,7 @@ var ( sSv1Cfg2 *config.CGRConfig sSv1BiRpc2 *birpc.BirpcClient sSApierRpc2 *birpc.Client - disconnectEvChan2 = make(chan *utils.AttrDisconnectSession) + disconnectEvChan2 = make(chan utils.CGREvent) sessionsConfDIR string sessionsThresholdTests = []func(t *testing.T){ @@ -86,7 +86,7 @@ func TestSessionSITtests(t *testing.T) { type smock2 struct{} func (*smock2) DisconnectSession(ctx *context.Context, - args *utils.AttrDisconnectSession, reply *string) error { + args utils.CGREvent, reply *string) error { disconnectEvChan2 <- args *reply = utils.OK return nil diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index 8b136b4ad..32977e6d1 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -40,7 +40,7 @@ var ( sSv1Cfg *config.CGRConfig sSv1BiRpc *birpc.BirpcClient sSApierRpc *birpc.Client - discEvChan = make(chan *utils.AttrDisconnectSession, 1) + discEvChan = make(chan utils.CGREvent, 1) sSV1RequestType string sTestSessionSv1 = []func(t *testing.T){ @@ -90,7 +90,7 @@ func testSSv1ItInitCfgDir(t *testing.T) { type smock struct{} func (*smock) DisconnectSession(ctx *context.Context, - args *utils.AttrDisconnectSession, reply *string) error { + args utils.CGREvent, reply *string) error { discEvChan <- args // free the channel <-discEvChan diff --git a/cmd/cgr-tester/sessions.go b/cmd/cgr-tester/sessions.go index 2a319d541..29391595c 100644 --- a/cmd/cgr-tester/sessions.go +++ b/cmd/cgr-tester/sessions.go @@ -34,13 +34,13 @@ import ( var ( brpc *birpc.BirpcClient - disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1) + disconnectEvChan = make(chan utils.CGREvent, 1) ) type smock struct{} func (*smock) DisconnectSession(ctx *context.Context, - args *utils.AttrDisconnectSession, reply *string) error { + args utils.CGREvent, reply *string) error { disconnectEvChan <- args *reply = utils.OK return nil diff --git a/console/session_force_disconnect.go b/console/session_force_disconnect.go index aafee0849..15a1a7d1a 100644 --- a/console/session_force_disconnect.go +++ b/console/session_force_disconnect.go @@ -26,7 +26,7 @@ func init() { c := &CmdSessionsForceDisconnect{ name: "session_force_disconnect", rpcMethod: utils.SessionSv1ForceDisconnect, - rpcParams: &utils.SessionFilter{}, + rpcParams: utils.SessionFilterWithEvent{}, } commands[c.Name()] = c c.CommandExecuter = &CommandExecuter{c} @@ -35,7 +35,7 @@ func init() { type CmdSessionsForceDisconnect struct { name string rpcMethod string - rpcParams *utils.SessionFilter + rpcParams utils.SessionFilterWithEvent *CommandExecuter } @@ -48,8 +48,10 @@ func (cmd *CmdSessionsForceDisconnect) RpcMethod() string { } func (cmd *CmdSessionsForceDisconnect) RpcParams(reset bool) any { - if reset || cmd.rpcParams == nil { - cmd.rpcParams = &utils.SessionFilter{APIOpts: make(map[string]any)} + if reset || cmd.rpcParams.SessionFilter == nil { + cmd.rpcParams.SessionFilter = &utils.SessionFilter{ + APIOpts: make(map[string]any), + } } return cmd.rpcParams } diff --git a/dispatchers/sessions.go b/dispatchers/sessions.go index 14fa89ebe..404c725c9 100644 --- a/dispatchers/sessions.go +++ b/dispatchers/sessions.go @@ -211,7 +211,7 @@ func (dS *DispatcherService) SessionSv1GetActiveSessionsCount(ctx *context.Conte }, utils.MetaSessionS, utils.SessionSv1GetActiveSessionsCount, args, reply) } -func (dS *DispatcherService) SessionSv1ForceDisconnect(ctx *context.Context, args *utils.SessionFilter, +func (dS *DispatcherService) SessionSv1ForceDisconnect(ctx *context.Context, args utils.SessionFilterWithEvent, reply *string) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant if args.Tenant != utils.EmptyString { @@ -229,6 +229,24 @@ func (dS *DispatcherService) SessionSv1ForceDisconnect(ctx *context.Context, arg }, utils.MetaSessionS, utils.SessionSv1ForceDisconnect, args, reply) } +func (dS *DispatcherService) SessionSv1AlterSessions(ctx *context.Context, args utils.SessionFilterWithEvent, + reply *string) (err error) { + tnt := dS.cfg.GeneralCfg().DefaultTenant + if args.Tenant != utils.EmptyString { + tnt = args.Tenant + } + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if err = dS.authorize(utils.SessionSv1AlterSessions, + tnt, utils.IfaceAsString(args.APIOpts[utils.OptsAPIKey]), utils.TimePointer(time.Now())); err != nil { + return + } + } + return dS.Dispatch(&utils.CGREvent{ + Tenant: tnt, + APIOpts: args.APIOpts, + }, utils.MetaSessionS, utils.SessionSv1AlterSessions, args, reply) +} + func (dS *DispatcherService) SessionSv1GetPassiveSessions(ctx *context.Context, args *utils.SessionFilter, reply *[]*sessions.ExternalSession) (err error) { tnt := dS.cfg.GeneralCfg().DefaultTenant diff --git a/dispatchers/sessions_test.go b/dispatchers/sessions_test.go index 43511694e..7617cbab8 100644 --- a/dispatchers/sessions_test.go +++ b/dispatchers/sessions_test.go @@ -472,11 +472,13 @@ func TestDspSessionSv1GetActiveSessionsCountErrorNil(t *testing.T) { func TestDspSessionSv1ForceDisconnectNil(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) - CGREvent := &utils.SessionFilter{ - Tenant: "tenant", + args := utils.SessionFilterWithEvent{ + SessionFilter: &utils.SessionFilter{ + Tenant: "tenant", + }, } var reply *string - result := dspSrv.SessionSv1ForceDisconnect(context.Background(), CGREvent, reply) + result := dspSrv.SessionSv1ForceDisconnect(context.Background(), args, reply) expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) @@ -487,11 +489,13 @@ func TestDspSessionSv1ForceDisconnectErrorNil(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil) cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"} - CGREvent := &utils.SessionFilter{ - Tenant: "tenant", + args := utils.SessionFilterWithEvent{ + SessionFilter: &utils.SessionFilter{ + Tenant: "tenant", + }, } var reply *string - result := dspSrv.SessionSv1ForceDisconnect(context.Background(), CGREvent, reply) + result := dspSrv.SessionSv1ForceDisconnect(context.Background(), args, reply) expected := "MANDATORY_IE_MISSING: [ApiKey]" if result == nil || result.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) diff --git a/sessions/libsessions.go b/sessions/libsessions.go index fe3df354c..2ab88fac1 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -45,7 +45,7 @@ var authReqs = engine.MapEvent{ // BiRPCClient is the interface implemented by Agents which are able to // communicate bidirectionally with SessionS and remote Communication Switch type BiRPCClient interface { - V1DisconnectSession(*context.Context, utils.AttrDisconnectSession, *string) error + V1DisconnectSession(*context.Context, utils.CGREvent, *string) error V1GetActiveSessionIDs(*context.Context, string, *[]*SessionID) error V1AlterSession(*context.Context, utils.CGREvent, *string) error V1DisconnectPeer(*context.Context, *utils.DPRArgs, *string) error diff --git a/sessions/sessions.go b/sessions/sessions.go index a08c57f27..3d6acfd09 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -37,11 +37,6 @@ import ( "github.com/cgrates/cgrates/utils" ) -var ( - // ErrForcedDisconnect is used to specify the reason why the session was disconnected - ErrForcedDisconnect = errors.New("FORCED_DISCONNECT") -) - // NewSessionS constructs a new SessionS instance func NewSessionS(cgrCfg *config.CGRConfig, dm *engine.DataManager, @@ -330,7 +325,9 @@ func (sS *SessionS) setSTerminator(s *Session, opts engine.MapEvent) { lastUsage = *s.sTerminator.ttlLastUsage } sS.forceSTerminate(s, lastUsage, s.sTerminator.ttlUsage, - s.sTerminator.ttlLastUsed) + s.sTerminator.ttlLastUsed, nil, + map[string]any{ + utils.DisconnectCause: utils.SessionTimeout}) s.Unlock() case <-endChan: timer.Stop() @@ -341,7 +338,8 @@ func (sS *SessionS) setSTerminator(s *Session, opts engine.MapEvent) { // forceSTerminate is called when a session times-out or it is forced from CGRateS side // not thread safe -func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage, lastUsed *time.Duration) (err error) { +func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage, lastUsed *time.Duration, + apiOpts, event map[string]any) (err error) { if extraUsage != 0 { for i := range s.SRuns { if _, err = sS.debitSession(s, i, extraUsage, lastUsed); err != nil { @@ -408,13 +406,25 @@ func (sS *SessionS) forceSTerminate(s *Session, extraUsage time.Duration, tUsage sS.replicateSessions(s.CGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil { go func() { + // Merge parameter event with the session event. Losing the EventStart OriginID + // could create unwanted behaviour. + if event == nil { + event = make(map[string]any) + } + for key, val := range s.EventStart { + if _, has := event[key]; !has { + event[key] = val + } + } + disconnectArgs := utils.CGREvent{ + ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), + APIOpts: apiOpts, + Event: event, + } var rply string if err := clntConn.conn.Call(context.TODO(), - utils.AgentV1DisconnectSession, - utils.AttrDisconnectSession{ - EventStart: s.EventStart, - Reason: ErrForcedDisconnect.Error()}, - &rply); err != nil { + utils.AgentV1DisconnectSession, disconnectArgs, &rply); err != nil { if err != utils.ErrNotImplemented { utils.Logger.Warning(fmt.Sprintf( "<%s> remotely disconnecting session with id <%s> failed: %v", @@ -573,8 +583,11 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, fmt.Sprintf("<%s> could not disconnect session: %s, error: %s", utils.SessionS, s.cgrID(), err.Error())) } - if err = sS.forceSTerminate(s, 0, nil, nil); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", utils.SessionS, s.cgrID(), err)) + if err = sS.forceSTerminate(s, 0, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", + utils.SessionS, s.cgrID(), err)) } s.Unlock() return @@ -614,9 +627,11 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, utils.Logger.Warning( fmt.Sprintf("<%s> could not disconnect session: <%s>, error: <%s>", utils.SessionS, s.cgrID(), err.Error())) - if err = sS.forceSTerminate(s, 0, nil, nil); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", - utils.SessionS, s.cgrID(), err)) + if err = sS.forceSTerminate(s, 0, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", + utils.SessionS, s.cgrID(), err)) } } return @@ -775,11 +790,15 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { if clnt.proto == 0 { // compatibility with OpenSIPS 2.3 servMethod = "SMGClientV1.DisconnectSession" } + disconnectArgs := utils.CGREvent{ + ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), + Event: s.EventStart, + } + disconnectArgs.Event[utils.DisconnectCause] = rsn var rply string if err = clnt.conn.Call(context.TODO(), servMethod, - utils.AttrDisconnectSession{ - EventStart: s.EventStart, - Reason: rsn}, &rply); err != nil { + disconnectArgs, &rply); err != nil { if err != utils.ErrNotImplemented { return err } @@ -1418,10 +1437,11 @@ func (sS *SessionS) terminateSyncSessions(toBeRemoved []string) { rand.Int63n(sS.cgrCfg.SessionSCfg().StaleChanMaxExtraUsage.Milliseconds()) * time.Millisecond.Nanoseconds()) } ss[0].Lock() - if err := sS.forceSTerminate(ss[0], eUsage, nil, nil); err != nil { + if err := sS.forceSTerminate(ss[0], eUsage, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", - utils.SessionS, cgrID, err.Error())) + fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%v>", + utils.SessionS, cgrID, err)) } ss[0].Unlock() } @@ -3660,14 +3680,14 @@ func (sS *SessionS) BiRPCv1SyncSessions(ctx *context.Context, // BiRPCv1ForceDisconnect will force disconnecting sessions matching sessions func (sS *SessionS) BiRPCv1ForceDisconnect(ctx *context.Context, - args *utils.SessionFilter, reply *string) (err error) { - if args == nil { //protection in case on nil - args = &utils.SessionFilter{} + args utils.SessionFilterWithEvent, reply *string) (err error) { + if args.SessionFilter == nil { //protection in case on nil + args.SessionFilter = &utils.SessionFilter{} } if len(args.Filters) != 0 && sS.dm == nil { return utils.ErrNoDatabaseConn } - aSs := sS.filterSessions(args, false) + aSs := sS.filterSessions(args.SessionFilter, false) if len(aSs) == 0 { return utils.ErrNotFound } @@ -3677,11 +3697,11 @@ func (sS *SessionS) BiRPCv1ForceDisconnect(ctx *context.Context, continue } ss[0].Lock() - if errTerm := sS.forceSTerminate(ss[0], 0, nil, nil); errTerm != nil { + if errTerm := sS.forceSTerminate(ss[0], 0, nil, nil, + args.APIOpts, args.Event); errTerm != nil { utils.Logger.Warning( - fmt.Sprintf( - "<%s> failed force-terminating session with id: <%s>, err: <%s>", - utils.SessionS, ss[0].cgrID(), errTerm.Error())) + fmt.Sprintf("<%s> failed force-terminating session with id: <%s>, err: <%v>", + utils.SessionS, ss[0].cgrID(), errTerm)) err = utils.ErrPartiallyExecuted } ss[0].Unlock() diff --git a/sessions/sessions_birpc_it_test.go b/sessions/sessions_birpc_it_test.go index f8aec291e..e0f0da31c 100644 --- a/sessions/sessions_birpc_it_test.go +++ b/sessions/sessions_birpc_it_test.go @@ -38,7 +38,7 @@ var ( sessionsBiRPCCfgDIR string sessionsBiRPCCfg *config.CGRConfig sessionsBiRPC *birpc.BirpcClient - disconnectEvChan = make(chan *utils.AttrDisconnectSession, 1) + disconnectEvChan = make(chan utils.CGREvent, 1) err error sessionsTests = []func(t *testing.T){ testSessionsBiRPCInitCfg, @@ -75,7 +75,7 @@ func TestSessionsBiRPC(t *testing.T) { type smock struct{} func (*smock) DisconnectSession(ctx *context.Context, - args *utils.AttrDisconnectSession, reply *string) error { + args utils.CGREvent, reply *string) error { disconnectEvChan <- args *reply = utils.OK return nil @@ -209,10 +209,10 @@ func testSessionsBiRPCSessionAutomaticDisconnects(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Error("Did not receive disconnect event") case disconnectEv := <-disconnectEvChan: - if engine.NewMapEvent(disconnectEv.EventStart).GetStringIgnoreErrors(utils.OriginID) != initArgs.CGREvent.Event[utils.OriginID] { + if engine.NewMapEvent(disconnectEv.Event).GetStringIgnoreErrors(utils.OriginID) != initArgs.CGREvent.Event[utils.OriginID] { t.Errorf("Unexpected event received: %+v", disconnectEv) } - initArgs.CGREvent.Event[utils.Usage] = disconnectEv.EventStart[utils.Usage] + initArgs.CGREvent.Event[utils.Usage] = disconnectEv.Event[utils.Usage] } termArgs := &V1TerminateSessionArgs{ TerminateSession: true, diff --git a/sessions/sessionscover_test.go b/sessions/sessionscover_test.go index b0144ec97..1af7cc842 100644 --- a/sessions/sessionscover_test.go +++ b/sessions/sessionscover_test.go @@ -271,7 +271,8 @@ func TestForceSTerminatorManualTermination(t *testing.T) { sessions := NewSessionS(cfg, dm, nil) expected := "MANDATORY_IE_MISSING: [connIDs]" - if err := sessions.forceSTerminate(ss, time.Second, nil, nil); err == nil || err.Error() != expected { + if err := sessions.forceSTerminate(ss, time.Second, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err == nil || err.Error() != expected { t.Errorf("Expected %+v, receive %+v", expected, err) } } @@ -308,7 +309,8 @@ func TestForceSTerminatorPostCDRs(t *testing.T) { } expected := "INTERNALLY_DISCONNECTED" - if err := sessions.forceSTerminate(ss, time.Second, nil, nil); err == nil || err.Error() != expected { + if err := sessions.forceSTerminate(ss, time.Second, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err == nil || err.Error() != expected { t.Errorf("Expected %+v, receiveD %+v", expected, err) } } @@ -347,7 +349,8 @@ func TestForceSTerminatorReleaseSession(t *testing.T) { } expected := "MANDATORY_IE_MISSING: [connIDs]" - if err := sessions.forceSTerminate(ss, time.Second, nil, nil); err == nil || err.Error() != expected { + if err := sessions.forceSTerminate(ss, time.Second, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err == nil || err.Error() != expected { t.Errorf("Expected %+v, receiveD %+v", expected, err) } } @@ -397,7 +400,8 @@ func TestForceSTerminatorClientCall(t *testing.T) { } expected := "MANDATORY_IE_MISSING: [connIDs]" - if err := sessions.forceSTerminate(ss, time.Second, nil, nil); err == nil || err.Error() != expected { + if err := sessions.forceSTerminate(ss, time.Second, nil, nil, nil, + map[string]any{utils.DisconnectCause: utils.ForcedDisconnect}); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } time.Sleep(10 * time.Millisecond) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 1e9935d4d..d85080e32 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -963,12 +963,6 @@ func (ai *ActivationInterval) IsActiveAtTime(atTime time.Time) bool { (ai.ExpiryTime.IsZero() || ai.ExpiryTime.After(atTime)) } -// Attributes to send on SessionDisconnect by SMG -type AttrDisconnectSession struct { - EventStart map[string]any - Reason string -} - // MetricWithFilters is used in TPStatProfile type MetricWithFilters struct { FilterIDs []string diff --git a/utils/consts.go b/utils/consts.go index a845ca7c6..ea75e93d8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2823,6 +2823,13 @@ const ( ExecCgr = "exec" ) +// SessionS disconnect causes + +const ( + ForcedDisconnect = "FORCED_DISCONNECT" + SessionTimeout = "SESSION_TIMEOUT" +) + var AnzIndexType = StringSet{ // AnzIndexType are the analyzers possible index types MetaScorch: {}, MetaBoltdb: {},