From b061770e683f9711342b410a2162a35e6b4daf3f Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Tue, 11 Jun 2024 17:14:33 +0200 Subject: [PATCH] Add asterisk and FS agents session restoration support --- agents/astagent.go | 61 ++++++-- agents/astagent_test.go | 63 ++++++++ agents/asterisk_event.go | 35 +++++ agents/asterisk_event_test.go | 136 ++++++++++++++++++ agents/fsagent.go | 7 +- .../asterisk/etc/asterisk/ari.conf | 1 + .../asterisk/etc/asterisk/extensions.conf | 2 +- services/asteriskagent_it_test.go | 27 ++-- services/freeswitchagent_it_test.go | 52 +++++-- 9 files changed, 343 insertions(+), 41 deletions(-) diff --git a/agents/astagent.go b/agents/astagent.go index 172b08cdd..f44f0d5db 100644 --- a/agents/astagent.go +++ b/agents/astagent.go @@ -106,6 +106,9 @@ func (sma *AsteriskAgent) ListenAndServe(stopChan <-chan struct{}) (err error) { } utils.Logger.Info(fmt.Sprintf("<%s> successfully connected to Asterisk at: <%s>", utils.AsteriskAgent, sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address)) + // make a call asterisk -> sessions_conns to create an active client needed for syncSessions when restoring sessions, since prior clients are lost when engine shuts down + var reply string + sma.connMgr.Call(sma.ctx, sma.cgrCfg.AsteriskAgentCfg().SessionSConns, utils.SessionSv1Ping, &utils.CGREvent{}, &reply) for { select { case <-stopChan: @@ -230,6 +233,19 @@ func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { } } } + // set cached fields as variables to the channel, + // to be retrieved if engine shuts down while session is active + for key, val := range ev.cachedFields { + if !primaryFields.Has(key) { + if !sma.setChannelVar(ev.ChannelID(), key, val) { + return + } + } + } + // cgr_reqtype is part of primaryFields but needs to be attached to the channel since we check for it on ChannelDestroyed + if !sma.setChannelVar(ev.ChannelID(), utils.CGRReqType, ev.cachedFields[utils.CGRReqType]) { + return + } // Exit channel from stasis if _, err := sma.astConn.Call( aringo.HTTP_POST, @@ -293,18 +309,39 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { sma.evCacheMux.RLock() cgrEvDisp, hasIt := sma.eventsCache[chID] sma.evCacheMux.RUnlock() - if !hasIt { // Not handled by us - return + if !hasIt { + if cgrReqType, _ := ev.ariEv["channel"].(map[string]any)["channelvars"].(map[string]any)[utils.CGRReqType].(string); cgrReqType == utils.EmptyString { + return // Not handled by us + } + // convert received event to CGREvent + var err error + cgrEvDisp, err = ev.AsCGREvent(sma.cgrCfg.GeneralCfg().DefaultTimezone) + if err != nil { + utils.Logger.Warning(fmt.Sprintf(" Error converting Asterisk event to CGREvent: <%v>", err)) + return + } + // Populate event with needed fields recovered from channel variables + sma.evCacheMux.Lock() + err = ev.RestoreAndUpdateFields(cgrEvDisp) + sma.evCacheMux.Unlock() + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s", + utils.AsteriskAgent, err.Error(), chID)) + return + } } - sma.evCacheMux.Lock() - delete(sma.eventsCache, chID) // delete the event from cache as we do not need to keep it here forever - err := ev.UpdateCGREvent(cgrEvDisp) // Updates the event directly in the cache - sma.evCacheMux.Unlock() - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s", - utils.AsteriskAgent, err.Error(), chID)) - return + if hasIt { + sma.evCacheMux.Lock() + delete(sma.eventsCache, chID) // delete the event from cache as we do not need to keep it here forever + err := ev.UpdateCGREvent(cgrEvDisp) // Updates the event directly in the cache + sma.evCacheMux.Unlock() + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s", + utils.AsteriskAgent, err.Error(), chID)) + return + } } // populate terminate session args tsArgs := ev.V1TerminateSessionArgs(*cgrEvDisp) @@ -313,7 +350,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { utils.AsteriskAgent, chID)) return } - var reply string if err := sma.connMgr.Call(sma.ctx, sma.cgrCfg.AsteriskAgentCfg().SessionSConns, utils.SessionSv1TerminateSession, @@ -329,7 +365,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { utils.AsteriskAgent, err.Error(), chID)) } } - } // Call implements birpc.ClientConnector interface diff --git a/agents/astagent_test.go b/agents/astagent_test.go index 813020ef0..1dc6f991a 100644 --- a/agents/astagent_test.go +++ b/agents/astagent_test.go @@ -20,9 +20,72 @@ package agents import ( "testing" + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func TestAAsSessionSClientIface(t *testing.T) { _ = sessions.BiRPCClient(new(AsteriskAgent)) } + +func TestHandleChannelDestroyedFail(t *testing.T) { + + cfg := config.NewDefaultCGRConfig() + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + sma, err := NewAsteriskAgent(cfg, 1, cM) + if err != nil { + t.Error(err) + } + + ariEv := map[string]any{ + "application": "cgrates_auth", + "asterisk_id": "08:00:27:18:d8:cf", + "cause": "16", + "cause_txt": "Normal Clearing", + "channel": map[string]any{ + "accountcode": "", + "caller": map[string]any{ + "name": "1001", + "number": "1001", + }, + "channelvars": map[string]any{ + "CDR(answer)": "2024-05-03 08:53:06", + "CDR(billsec)": "4", + "cgr_flags": "*accounts *attributes *resources *stats *routes *thresholds cgr_reqtype:*prepaid", + }, + "connected": map[string]any{ + "name": "", + "number": "1002", + }, + "creationtime": "2024-05-03T08:53:05.234+0200", + "dialplan": map[string]any{ + "app_data": "", + "app_name": "", + "context": "internal", + "exten": "1002", + "priority": "9", + }, + "id": "1714719185.3", + "language": "en", + "name": "PJSIP/1001-00000002", + "protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0", + "state": "Up", + }, + "timestamp": "2024-05-03T08:53:11.511+0200", + "type": "ChannelDestroyed", + } + ev := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString) + evCopy := ev + sma.handleChannelDestroyed(ev) + if ev != evCopy { + t.Errorf("Expected ev to not change, received <%v>", utils.ToJSON(ev)) + } +} diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index d43b53ae5..66293f3a1 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -201,6 +201,41 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string) return } +// Will populate CGREvent with required fields restored from the channel variables +func (smaEv *SMAsteriskEvent) RestoreAndUpdateFields(cgrEv *utils.CGREvent) error { + resCGREv := *cgrEv + // make sure the channel contains the channelvars field to be recovered + channvars, has := smaEv.ariEv["channel"].(map[string]any)["channelvars"].(map[string]any) + if !has { + return fmt.Errorf("channelvars not found in event <%+v>", smaEv.ariEv["channel"].(map[string]any)) + } + for key, val := range channvars { + switch { + case key == utils.CGRFlags: + // "+" characters are converted to " " white space characters when put in channel variables, cgr_flags dont contain white spaces so we can convert them back to "+" without a problem + cgrFlags := strings.ReplaceAll(val.(string), " ", "+") + resCGREv.Event[utils.CGRFlags] = cgrFlags + case key == "CDR(answer)": + resCGREv.Event[utils.AnswerTime] = val.(string) + case key == "CDR(billsec)": + resCGREv.Event[utils.Usage] = val.(string) + "s" + case key == utils.CGRReqType: + resCGREv.Event[utils.RequestType] = val.(string) + default: + resCGREv.Event[key] = val.(string) + } + + } + resCGREv.Event[utils.EventName] = SMASessionTerminate + resCGREv.Event[utils.DisconnectCause] = smaEv.DisconnectCause() + + for k, v := range smaEv.opts { + resCGREv.APIOpts[k] = v + } + *cgrEv = resCGREv + return nil +} + func (smaEv *SMAsteriskEvent) UpdateCGREvent(cgrEv *utils.CGREvent) error { resCGREv := *cgrEv switch smaEv.EventType() { diff --git a/agents/asterisk_event_test.go b/agents/asterisk_event_test.go index 94a917684..d7a849eb7 100644 --- a/agents/asterisk_event_test.go +++ b/agents/asterisk_event_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "reflect" "testing" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/sessions" @@ -527,3 +528,138 @@ func TestSMAsteriskEventUpdateCGREvent(t *testing.T) { } } + +func TestRestoreAndUpdateFieldsOk(t *testing.T) { + ariEv := map[string]any{ + "application": "cgrates_auth", + "asterisk_id": "08:00:27:18:d8:cf", + "cause": "16", + "cause_txt": "Normal Clearing", + "channel": map[string]any{ + "accountcode": "", + "caller": map[string]any{ + "name": "1001", + "number": "1001", + }, + "channelvars": map[string]any{ + "CDR(answer)": "2024-05-03 08:53:06", + "CDR(billsec)": "4", + "cgr_flags": "*accounts *attributes *resources *stats *routes *thresholds cgr_reqtype:*prepaid", + "cgr_reqtype": "*prepaid", + }, + "connected": map[string]any{ + "name": "", + "number": "1002", + }, + "creationtime": "2024-05-03T08:53:05.234+0200", + "dialplan": map[string]any{ + "app_data": "", + "app_name": "", + "context": "internal", + "exten": "1002", + "priority": "9", + }, + "id": "1714719185.3", + "language": "en", + "name": "PJSIP/1001-00000002", + "protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0", + "state": "Up", + }, + "timestamp": "2024-05-03T08:53:11.511+0200", + "type": "ChannelDestroyed", + } + smaEv := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString) + cgrEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "ea36649", + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.EventName: "SMA_SESSION_TERMINATE", + utils.OriginHost: "127.0.0.1", + utils.OriginID: "1714734552.6", + utils.RequestType: utils.MetaRated, + utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC), + utils.Source: utils.AsteriskAgent, + }, + } + exp := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "ea36649", + Event: map[string]any{ + utils.AccountField: "1001", + utils.AnswerTime: "2024-05-03 08:53:06", + utils.Destination: "1002", + utils.DisconnectCause: "Normal Clearing", + utils.EventName: "SMA_SESSION_TERMINATE", + utils.OriginHost: "127.0.0.1", + utils.OriginID: "1714734552.6", + utils.RequestType: utils.MetaPrepaid, + utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC), + utils.Source: utils.AsteriskAgent, + utils.Usage: "4s", + utils.CGRFlags: "*accounts+*attributes+*resources+*stats+*routes+*thresholds+cgr_reqtype:*prepaid", + }, + } + if err := smaEv.RestoreAndUpdateFields(&cgrEv); err != nil { + t.Error(err) + } else if utils.ToJSON(cgrEv) != utils.ToJSON(exp) { + t.Errorf("Expected <%v>, \nreceived <%v>", utils.ToJSON(exp), utils.ToJSON(cgrEv)) + } + +} + +func TestRestoreAndUpdateFieldsFail(t *testing.T) { + ariEv := map[string]any{ + "application": "cgrates_auth", + "asterisk_id": "08:00:27:18:d8:cf", + "cause": "16", + "cause_txt": "Normal Clearing", + "channel": map[string]any{ + "accountcode": "", + "caller": map[string]any{ + "name": "1001", + "number": "1001", + }, + "connected": map[string]any{ + "name": "", + "number": "1002", + }, + "creationtime": "2024-05-03T08:53:05.234+0200", + "dialplan": map[string]any{ + "app_data": "", + "app_name": "", + "context": "internal", + "exten": "1002", + "priority": "9", + }, + "id": "1714719185.3", + "language": "en", + "name": "PJSIP/1001-00000002", + "protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0", + "state": "Up", + }, + "timestamp": "2024-05-03T08:53:11.511+0200", + "type": "ChannelDestroyed", + } + smaEv := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString) + cgrEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "ea36649", + Event: map[string]any{ + utils.AccountField: "1001", + utils.Destination: "1002", + utils.EventName: "SMA_SESSION_TERMINATE", + utils.OriginHost: "127.0.0.1", + utils.OriginID: "1714734552.6", + utils.RequestType: utils.MetaRated, + utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC), + utils.Source: utils.AsteriskAgent, + }, + } + expErr := "channelvars not found in event " + if err := smaEv.RestoreAndUpdateFields(&cgrEv); err.Error() != expErr { + t.Errorf("Expected error <%v>, \nreceived <%+v>", expErr, err) + } + +} diff --git a/agents/fsagent.go b/agents/fsagent.go index 6de8dc968..8e34ed3d4 100644 --- a/agents/fsagent.go +++ b/agents/fsagent.go @@ -301,6 +301,9 @@ func (fsa *FSsessions) onChannelHangupComplete(fsev FSEvent, connIdx int) { func (fsa *FSsessions) Connect() error { eventFilters := map[string][]string{"Call-Direction": {"inbound"}} connErr := make(chan error) + var reply string + // make a call fs -> sessions_conns to create an active client needed for syncSessions when restoring sessions, since prior clients are lost when engine shuts down + fsa.connMgr.Call(fsa.ctx, fsa.cfg.SessionSConns, utils.SessionSv1Ping, &utils.CGREvent{}, &reply) for connIdx, connCfg := range fsa.cfg.EventSocketConns { fSock, err := fsock.NewFSock( connCfg.Address, connCfg.Password, @@ -377,10 +380,6 @@ func (fsa *FSsessions) Shutdown() (err error) { utils.Logger.Err(fmt.Sprintf("<%s> Cannot shutdown sessions, fsock not connected for connection index: %v", utils.FreeSWITCHAgent, connIdx)) continue } - utils.Logger.Info(fmt.Sprintf("<%s> Shutting down all sessions on connection index: %v", utils.FreeSWITCHAgent, connIdx)) - if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Error on calls shutdown: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx)) - } if err = fSock.Disconnect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Error on disconnect: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx)) } diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/ari.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/ari.conf index f78a801e7..277a8d990 100644 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/ari.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/ari.conf @@ -1,6 +1,7 @@ [general] enabled = yes allowed_origins = http://cgrates.org +channelvars = cgr_reqtype,cgr_flags,CDR(answer),CDR(billsec) [cgrates] type = user diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf index 94a6b7c66..2cc58fa57 100644 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf @@ -5,7 +5,7 @@ same => n,Set(LIMIT_PLAYAUDIO_CALLER=YES) same => n,Set(LIMIT_PLAYAUDIO_CALLEE=YES) same => n,DumpChan() - same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_routes=supplier1,cgr_flags=*accounts+*attributes+*resources+*stats+*routes+*thresholds) + same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_flags=*accounts+*attributes+*resources+*stats+*routes+*thresholds) same => n,Set(CHANNEL(hangup_handler_push)=hangUp,hUP,1) same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime}:10000))) same => n,Hangup() diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 50d6724e0..b404c3be7 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -33,6 +33,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func TestAsteriskAgentReload(t *testing.T) { @@ -59,14 +60,17 @@ func TestAsteriskAgentReload(t *testing.T) { cacheSChan <- cacheSrv server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) + db := NewDataDBService(cfg, cm, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, anz, srvDep) - astService := NewAsteriskAgent(cfg, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) + shdChan, cm, anz, srvDep) + astService := NewAsteriskAgent(cfg, shdChan, cm, srvDep) srvMngr.AddServices(astService, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) if err := srvMngr.StartServices(); err != nil { @@ -129,14 +133,17 @@ func TestAsteriskAgentReload2(t *testing.T) { cacheSChan <- cacheSrv server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) + db := NewDataDBService(cfg, cm, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, anz, srvDep) - astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) + shdChan, cm, anz, srvDep) + astSrv := NewAsteriskAgent(cfg, shdChan, cm, srvDep) srvMngr.AddServices(astSrv, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) if err := srvMngr.StartServices(); err != nil { diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index 033b527a5..a9d78ae17 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -34,6 +34,7 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func TestFreeSwitchAgentReload(t *testing.T) { @@ -60,16 +61,19 @@ func TestFreeSwitchAgentReload(t *testing.T) { cacheSChan <- cacheSrv server := cores.NewServer(nil) - srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm) srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - db := NewDataDBService(cfg, nil, srvDep) + db := NewDataDBService(cfg, cm, srvDep) anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), - shdChan, nil, anz, srvDep) - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) - engine.NewConnManager(cfg, nil) + shdChan, cm, anz, srvDep) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) srvMngr.AddServices(srv, sS, - NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) + NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep), db) if err := srvMngr.StartServices(); err != nil { t.Fatal(err) } @@ -110,7 +114,11 @@ func TestFreeSwitchAgentReload2(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") @@ -146,7 +154,11 @@ func TestFreeSwitchAgentReload3(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") @@ -181,7 +193,11 @@ func TestFreeSwitchAgentReload4(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") } @@ -228,7 +244,11 @@ func TestFreeSwitchAgentReload5(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") } @@ -248,7 +268,9 @@ func TestFreeSwitchAgentReload6(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil shdChan := utils.NewSyncedChan() - chS := engine.NewCacheS(cfg, nil, nil) + db := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := engine.NewDataManager(db, cfg.CacheCfg(), nil) + chS := engine.NewCacheS(cfg, dm, nil) cacheSrv, err := engine.NewService(chS) if err != nil { t.Fatal(err) @@ -256,7 +278,11 @@ func TestFreeSwitchAgentReload6(t *testing.T) { cacheSChan := make(chan birpc.ClientConnector, 1) cacheSChan <- cacheSrv srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep) + internalSessionSChan := make(chan birpc.ClientConnector, 1) + cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{ + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan, + }) + srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep) if srv.IsRunning() { t.Fatalf("Expected service to be down") } @@ -279,7 +305,7 @@ func TestFreeSwitchAgentReload6(t *testing.T) { }, }, } - srv.(*FreeswitchAgent).fS, err = agents.NewFSsessions(agentCfg, "", nil) + srv.(*FreeswitchAgent).fS, err = agents.NewFSsessions(agentCfg, "", cm) if err != nil { t.Fatal(err) }