Add asterisk and FS agents session restoration support

This commit is contained in:
arberkatellari
2024-06-11 17:14:33 +02:00
committed by Dan Christian Bogos
parent a0f94c6804
commit b061770e68
9 changed files with 343 additions and 41 deletions

View File

@@ -106,6 +106,9 @@ func (sma *AsteriskAgent) ListenAndServe(stopChan <-chan struct{}) (err error) {
}
utils.Logger.Info(fmt.Sprintf("<%s> successfully connected to Asterisk at: <%s>",
utils.AsteriskAgent, sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address))
// make a call asterisk -> sessions_conns to create an active client needed for syncSessions when restoring sessions, since prior clients are lost when engine shuts down
var reply string
sma.connMgr.Call(sma.ctx, sma.cgrCfg.AsteriskAgentCfg().SessionSConns, utils.SessionSv1Ping, &utils.CGREvent{}, &reply)
for {
select {
case <-stopChan:
@@ -230,6 +233,19 @@ func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) {
}
}
}
// set cached fields as variables to the channel,
// to be retrieved if engine shuts down while session is active
for key, val := range ev.cachedFields {
if !primaryFields.Has(key) {
if !sma.setChannelVar(ev.ChannelID(), key, val) {
return
}
}
}
// cgr_reqtype is part of primaryFields but needs to be attached to the channel since we check for it on ChannelDestroyed
if !sma.setChannelVar(ev.ChannelID(), utils.CGRReqType, ev.cachedFields[utils.CGRReqType]) {
return
}
// Exit channel from stasis
if _, err := sma.astConn.Call(
aringo.HTTP_POST,
@@ -293,18 +309,39 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
sma.evCacheMux.RLock()
cgrEvDisp, hasIt := sma.eventsCache[chID]
sma.evCacheMux.RUnlock()
if !hasIt { // Not handled by us
return
if !hasIt {
if cgrReqType, _ := ev.ariEv["channel"].(map[string]any)["channelvars"].(map[string]any)[utils.CGRReqType].(string); cgrReqType == utils.EmptyString {
return // Not handled by us
}
// convert received event to CGREvent
var err error
cgrEvDisp, err = ev.AsCGREvent(sma.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<AsteriskAgent> Error converting Asterisk event to CGREvent: <%v>", err))
return
}
// Populate event with needed fields recovered from channel variables
sma.evCacheMux.Lock()
err = ev.RestoreAndUpdateFields(cgrEvDisp)
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s",
utils.AsteriskAgent, err.Error(), chID))
return
}
}
sma.evCacheMux.Lock()
delete(sma.eventsCache, chID) // delete the event from cache as we do not need to keep it here forever
err := ev.UpdateCGREvent(cgrEvDisp) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s",
utils.AsteriskAgent, err.Error(), chID))
return
if hasIt {
sma.evCacheMux.Lock()
delete(sma.eventsCache, chID) // delete the event from cache as we do not need to keep it here forever
err := ev.UpdateCGREvent(cgrEvDisp) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s when attempting to destroy session for channelID: %s",
utils.AsteriskAgent, err.Error(), chID))
return
}
}
// populate terminate session args
tsArgs := ev.V1TerminateSessionArgs(*cgrEvDisp)
@@ -313,7 +350,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
utils.AsteriskAgent, chID))
return
}
var reply string
if err := sma.connMgr.Call(sma.ctx, sma.cgrCfg.AsteriskAgentCfg().SessionSConns,
utils.SessionSv1TerminateSession,
@@ -329,7 +365,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
utils.AsteriskAgent, err.Error(), chID))
}
}
}
// Call implements birpc.ClientConnector interface

View File

@@ -20,9 +20,72 @@ package agents
import (
"testing"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestAAsSessionSClientIface(t *testing.T) {
_ = sessions.BiRPCClient(new(AsteriskAgent))
}
func TestHandleChannelDestroyedFail(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cM := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
sma, err := NewAsteriskAgent(cfg, 1, cM)
if err != nil {
t.Error(err)
}
ariEv := map[string]any{
"application": "cgrates_auth",
"asterisk_id": "08:00:27:18:d8:cf",
"cause": "16",
"cause_txt": "Normal Clearing",
"channel": map[string]any{
"accountcode": "",
"caller": map[string]any{
"name": "1001",
"number": "1001",
},
"channelvars": map[string]any{
"CDR(answer)": "2024-05-03 08:53:06",
"CDR(billsec)": "4",
"cgr_flags": "*accounts *attributes *resources *stats *routes *thresholds cgr_reqtype:*prepaid",
},
"connected": map[string]any{
"name": "",
"number": "1002",
},
"creationtime": "2024-05-03T08:53:05.234+0200",
"dialplan": map[string]any{
"app_data": "",
"app_name": "",
"context": "internal",
"exten": "1002",
"priority": "9",
},
"id": "1714719185.3",
"language": "en",
"name": "PJSIP/1001-00000002",
"protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0",
"state": "Up",
},
"timestamp": "2024-05-03T08:53:11.511+0200",
"type": "ChannelDestroyed",
}
ev := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString)
evCopy := ev
sma.handleChannelDestroyed(ev)
if ev != evCopy {
t.Errorf("Expected ev to not change, received <%v>", utils.ToJSON(ev))
}
}

View File

@@ -201,6 +201,41 @@ func (smaEv *SMAsteriskEvent) ExtraParameters() (extraParams map[string]string)
return
}
// Will populate CGREvent with required fields restored from the channel variables
func (smaEv *SMAsteriskEvent) RestoreAndUpdateFields(cgrEv *utils.CGREvent) error {
resCGREv := *cgrEv
// make sure the channel contains the channelvars field to be recovered
channvars, has := smaEv.ariEv["channel"].(map[string]any)["channelvars"].(map[string]any)
if !has {
return fmt.Errorf("channelvars not found in event <%+v>", smaEv.ariEv["channel"].(map[string]any))
}
for key, val := range channvars {
switch {
case key == utils.CGRFlags:
// "+" characters are converted to " " white space characters when put in channel variables, cgr_flags dont contain white spaces so we can convert them back to "+" without a problem
cgrFlags := strings.ReplaceAll(val.(string), " ", "+")
resCGREv.Event[utils.CGRFlags] = cgrFlags
case key == "CDR(answer)":
resCGREv.Event[utils.AnswerTime] = val.(string)
case key == "CDR(billsec)":
resCGREv.Event[utils.Usage] = val.(string) + "s"
case key == utils.CGRReqType:
resCGREv.Event[utils.RequestType] = val.(string)
default:
resCGREv.Event[key] = val.(string)
}
}
resCGREv.Event[utils.EventName] = SMASessionTerminate
resCGREv.Event[utils.DisconnectCause] = smaEv.DisconnectCause()
for k, v := range smaEv.opts {
resCGREv.APIOpts[k] = v
}
*cgrEv = resCGREv
return nil
}
func (smaEv *SMAsteriskEvent) UpdateCGREvent(cgrEv *utils.CGREvent) error {
resCGREv := *cgrEv
switch smaEv.EventType() {

View File

@@ -21,6 +21,7 @@ import (
"encoding/json"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessions"
@@ -527,3 +528,138 @@ func TestSMAsteriskEventUpdateCGREvent(t *testing.T) {
}
}
func TestRestoreAndUpdateFieldsOk(t *testing.T) {
ariEv := map[string]any{
"application": "cgrates_auth",
"asterisk_id": "08:00:27:18:d8:cf",
"cause": "16",
"cause_txt": "Normal Clearing",
"channel": map[string]any{
"accountcode": "",
"caller": map[string]any{
"name": "1001",
"number": "1001",
},
"channelvars": map[string]any{
"CDR(answer)": "2024-05-03 08:53:06",
"CDR(billsec)": "4",
"cgr_flags": "*accounts *attributes *resources *stats *routes *thresholds cgr_reqtype:*prepaid",
"cgr_reqtype": "*prepaid",
},
"connected": map[string]any{
"name": "",
"number": "1002",
},
"creationtime": "2024-05-03T08:53:05.234+0200",
"dialplan": map[string]any{
"app_data": "",
"app_name": "",
"context": "internal",
"exten": "1002",
"priority": "9",
},
"id": "1714719185.3",
"language": "en",
"name": "PJSIP/1001-00000002",
"protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0",
"state": "Up",
},
"timestamp": "2024-05-03T08:53:11.511+0200",
"type": "ChannelDestroyed",
}
smaEv := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString)
cgrEv := utils.CGREvent{
Tenant: "cgrates.org",
ID: "ea36649",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: "1002",
utils.EventName: "SMA_SESSION_TERMINATE",
utils.OriginHost: "127.0.0.1",
utils.OriginID: "1714734552.6",
utils.RequestType: utils.MetaRated,
utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC),
utils.Source: utils.AsteriskAgent,
},
}
exp := utils.CGREvent{
Tenant: "cgrates.org",
ID: "ea36649",
Event: map[string]any{
utils.AccountField: "1001",
utils.AnswerTime: "2024-05-03 08:53:06",
utils.Destination: "1002",
utils.DisconnectCause: "Normal Clearing",
utils.EventName: "SMA_SESSION_TERMINATE",
utils.OriginHost: "127.0.0.1",
utils.OriginID: "1714734552.6",
utils.RequestType: utils.MetaPrepaid,
utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC),
utils.Source: utils.AsteriskAgent,
utils.Usage: "4s",
utils.CGRFlags: "*accounts+*attributes+*resources+*stats+*routes+*thresholds+cgr_reqtype:*prepaid",
},
}
if err := smaEv.RestoreAndUpdateFields(&cgrEv); err != nil {
t.Error(err)
} else if utils.ToJSON(cgrEv) != utils.ToJSON(exp) {
t.Errorf("Expected <%v>, \nreceived <%v>", utils.ToJSON(exp), utils.ToJSON(cgrEv))
}
}
func TestRestoreAndUpdateFieldsFail(t *testing.T) {
ariEv := map[string]any{
"application": "cgrates_auth",
"asterisk_id": "08:00:27:18:d8:cf",
"cause": "16",
"cause_txt": "Normal Clearing",
"channel": map[string]any{
"accountcode": "",
"caller": map[string]any{
"name": "1001",
"number": "1001",
},
"connected": map[string]any{
"name": "",
"number": "1002",
},
"creationtime": "2024-05-03T08:53:05.234+0200",
"dialplan": map[string]any{
"app_data": "",
"app_name": "",
"context": "internal",
"exten": "1002",
"priority": "9",
},
"id": "1714719185.3",
"language": "en",
"name": "PJSIP/1001-00000002",
"protocol_id": "cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0",
"state": "Up",
},
"timestamp": "2024-05-03T08:53:11.511+0200",
"type": "ChannelDestroyed",
}
smaEv := NewSMAsteriskEvent(ariEv, "127.0.0.1", utils.EmptyString)
cgrEv := utils.CGREvent{
Tenant: "cgrates.org",
ID: "ea36649",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: "1002",
utils.EventName: "SMA_SESSION_TERMINATE",
utils.OriginHost: "127.0.0.1",
utils.OriginID: "1714734552.6",
utils.RequestType: utils.MetaRated,
utils.SetupTime: time.Date(2013, 12, 30, 15, 01, 31, 0, time.UTC),
utils.Source: utils.AsteriskAgent,
},
}
expErr := "channelvars not found in event <map[accountcode: caller:map[name:1001 number:1001] connected:map[name: number:1002] creationtime:2024-05-03T08:53:05.234+0200 dialplan:map[app_data: app_name: context:internal exten:1002 priority:9] id:1714719185.3 language:en name:PJSIP/1001-00000002 protocol_id:cb1bb28866dd7d52b42484e5b38765ec@0:0:0:0:0:0:0:0 state:Up]>"
if err := smaEv.RestoreAndUpdateFields(&cgrEv); err.Error() != expErr {
t.Errorf("Expected error <%v>, \nreceived <%+v>", expErr, err)
}
}

View File

@@ -301,6 +301,9 @@ func (fsa *FSsessions) onChannelHangupComplete(fsev FSEvent, connIdx int) {
func (fsa *FSsessions) Connect() error {
eventFilters := map[string][]string{"Call-Direction": {"inbound"}}
connErr := make(chan error)
var reply string
// make a call fs -> sessions_conns to create an active client needed for syncSessions when restoring sessions, since prior clients are lost when engine shuts down
fsa.connMgr.Call(fsa.ctx, fsa.cfg.SessionSConns, utils.SessionSv1Ping, &utils.CGREvent{}, &reply)
for connIdx, connCfg := range fsa.cfg.EventSocketConns {
fSock, err := fsock.NewFSock(
connCfg.Address, connCfg.Password,
@@ -377,10 +380,6 @@ func (fsa *FSsessions) Shutdown() (err error) {
utils.Logger.Err(fmt.Sprintf("<%s> Cannot shutdown sessions, fsock not connected for connection index: %v", utils.FreeSWITCHAgent, connIdx))
continue
}
utils.Logger.Info(fmt.Sprintf("<%s> Shutting down all sessions on connection index: %v", utils.FreeSWITCHAgent, connIdx))
if _, err = fSock.SendApiCmd("hupall MANAGER_REQUEST cgr_reqtype *prepaid"); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on calls shutdown: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx))
}
if err = fSock.Disconnect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on disconnect: %s, connection index: %v", utils.FreeSWITCHAgent, err.Error(), connIdx))
}

View File

@@ -1,6 +1,7 @@
[general]
enabled = yes
allowed_origins = http://cgrates.org
channelvars = cgr_reqtype,cgr_flags,CDR(answer),CDR(billsec)
[cgrates]
type = user

View File

@@ -5,7 +5,7 @@
same => n,Set(LIMIT_PLAYAUDIO_CALLER=YES)
same => n,Set(LIMIT_PLAYAUDIO_CALLEE=YES)
same => n,DumpChan()
same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_routes=supplier1,cgr_flags=*accounts+*attributes+*resources+*stats+*routes+*thresholds)
same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_flags=*accounts+*attributes+*resources+*stats+*routes+*thresholds)
same => n,Set(CHANNEL(hangup_handler_push)=hangUp,hUP,1)
same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime}:10000)))
same => n,Hangup()

View File

@@ -33,6 +33,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestAsteriskAgentReload(t *testing.T) {
@@ -59,14 +60,17 @@ func TestAsteriskAgentReload(t *testing.T) {
cacheSChan <- cacheSrv
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
db := NewDataDBService(cfg, cm, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
astService := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
shdChan, cm, anz, srvDep)
astService := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
srvMngr.AddServices(astService, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
if err := srvMngr.StartServices(); err != nil {
@@ -129,14 +133,17 @@ func TestAsteriskAgentReload2(t *testing.T) {
cacheSChan <- cacheSrv
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
db := NewDataDBService(cfg, cm, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
shdChan, cm, anz, srvDep)
astSrv := NewAsteriskAgent(cfg, shdChan, cm, srvDep)
srvMngr.AddServices(astSrv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
if err := srvMngr.StartServices(); err != nil {

View File

@@ -34,6 +34,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func TestFreeSwitchAgentReload(t *testing.T) {
@@ -60,16 +61,19 @@ func TestFreeSwitchAgentReload(t *testing.T) {
cacheSChan <- cacheSrv
server := cores.NewServer(nil)
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, nil)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srvMngr := servmanager.NewServiceManager(cfg, shdChan, shdWg, cm)
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
db := NewDataDBService(cfg, nil, srvDep)
db := NewDataDBService(cfg, cm, srvDep)
anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep)
sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1),
shdChan, nil, anz, srvDep)
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
engine.NewConnManager(cfg, nil)
shdChan, cm, anz, srvDep)
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
srvMngr.AddServices(srv, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), cm, anz, srvDep), db)
if err := srvMngr.StartServices(); err != nil {
t.Fatal(err)
}
@@ -110,7 +114,11 @@ func TestFreeSwitchAgentReload2(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
@@ -146,7 +154,11 @@ func TestFreeSwitchAgentReload3(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
@@ -181,7 +193,11 @@ func TestFreeSwitchAgentReload4(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
}
@@ -228,7 +244,11 @@ func TestFreeSwitchAgentReload5(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
}
@@ -248,7 +268,9 @@ func TestFreeSwitchAgentReload6(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
shdChan := utils.NewSyncedChan()
chS := engine.NewCacheS(cfg, nil, nil)
db := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(db, cfg.CacheCfg(), nil)
chS := engine.NewCacheS(cfg, dm, nil)
cacheSrv, err := engine.NewService(chS)
if err != nil {
t.Fatal(err)
@@ -256,7 +278,11 @@ func TestFreeSwitchAgentReload6(t *testing.T) {
cacheSChan := make(chan birpc.ClientConnector, 1)
cacheSChan <- cacheSrv
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewFreeswitchAgent(cfg, shdChan, nil, srvDep)
internalSessionSChan := make(chan birpc.ClientConnector, 1)
cm := engine.NewConnManager(cfg, map[string]chan context.ClientConnector{
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): internalSessionSChan,
})
srv := NewFreeswitchAgent(cfg, shdChan, cm, srvDep)
if srv.IsRunning() {
t.Fatalf("Expected service to be down")
}
@@ -279,7 +305,7 @@ func TestFreeSwitchAgentReload6(t *testing.T) {
},
},
}
srv.(*FreeswitchAgent).fS, err = agents.NewFSsessions(agentCfg, "", nil)
srv.(*FreeswitchAgent).fS, err = agents.NewFSsessions(agentCfg, "", cm)
if err != nil {
t.Fatal(err)
}