diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index dd553db2a..fa52c14b8 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -19,6 +19,7 @@ along with this program. If not, see package agents import ( + "fmt" "strings" "github.com/cgrates/cgrates/config" @@ -273,37 +274,17 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) GetMaxUsage: true, CGREvent: *cgrEv, } - // For the moment hardcoded only GetMaxUsage : true - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.GetMaxUsage = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.AuthorizeResources = true - } - if strings.Index(subsystems, utils.MetaSuppliers) != -1 { - args.GetSuppliers = true - if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 { - args.SuppliersMaxCost = utils.MetaEventCost - } - if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 { - args.SuppliersIgnoreErrors = true - } - } - if strings.Index(subsystems, utils.MetaAttributes) != -1 { - args.GetAttributes = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + + args.GetMaxUsage = strings.Index(smaEv.Subsystems(), utils.MetaAccounts) != -1 + args.AuthorizeResources = strings.Index(smaEv.Subsystems(), utils.MetaResources) != -1 + args.GetSuppliers = strings.Index(smaEv.Subsystems(), utils.MetaSuppliers) != -1 + args.SuppliersIgnoreErrors = strings.Index(smaEv.Subsystems(), utils.MetaSuppliersIgnoreErrors) != -1 + if strings.Index(smaEv.Subsystems(), utils.MetaSuppliersEventCost) != -1 { + args.SuppliersMaxCost = utils.MetaEventCost + } + args.GetAttributes = strings.Index(smaEv.Subsystems(), utils.MetaAttributes) != -1 + args.ProcessThresholds = strings.Index(smaEv.Subsystems(), utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1 return } @@ -312,27 +293,17 @@ func (smaEv *SMAsteriskEvent) V1InitSessionArgs(cgrEv utils.CGREvent) (args *ses InitSession: true, CGREvent: cgrEv, } - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.InitSession = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.AllocateResources = true - } - if strings.Index(subsystems, utils.MetaAttributes) != -1 { - args.GetAttributes = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable", + utils.AsteriskAgent, utils.ToJSON(cgrEv))) + return nil + } + args.InitSession = strings.Index(subsystems, utils.MetaAccounts) != -1 + args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1 + args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1 + args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 return } @@ -341,23 +312,15 @@ func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs(cgrEv utils.CGREvent) (args TerminateSession: true, CGREvent: cgrEv, } - /* - subsystems, has := kev[KamCGRSubsystems] - if !has { - return - } - if strings.Index(subsystems, utils.MetaAccounts) == -1 { - args.TerminateSession = false - } - if strings.Index(subsystems, utils.MetaResources) != -1 { - args.ReleaseResources = true - } - if strings.Index(subsystems, utils.MetaThresholds) != -1 { - args.ProcessThresholds = utils.BoolPointer(true) - } - if strings.Index(subsystems, utils.MetaStats) != -1 { - args.ProcessStatQueues = utils.BoolPointer(true) - } - */ + subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable", + utils.AsteriskAgent, utils.ToJSON(cgrEv))) + return nil + } + args.TerminateSession = strings.Index(subsystems, utils.MetaAccounts) != -1 + args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1 + args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1 + args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1 return } diff --git a/agents/asteriskagent.go b/agents/asteriskagent.go index f3c655b94..6c46d8a5b 100644 --- a/agents/asteriskagent.go +++ b/agents/asteriskagent.go @@ -229,6 +229,12 @@ func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { // Ussually channelUP func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { + // utils.Logger.Debug(fmt.Sprintf("#################handleChannelStateChange#####################")) + // utils.Logger.Debug(fmt.Sprintf("ev.ariEv : %+v", ev.ariEv)) + // utils.Logger.Debug(fmt.Sprintf("ev.asteriskIP : %+v", ev.asteriskIP)) + // utils.Logger.Debug(fmt.Sprintf("ev.cachedFields : %+v", ev.cachedFields)) + // utils.Logger.Debug(fmt.Sprintf("ev.Subsystems() : %+v", ev.Subsystems())) + if ev.ChannelState() != channelUp { return } @@ -238,6 +244,7 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { if !hasIt { // Not handled by us return } + sma.evCacheMux.Lock() err := ev.UpdateCGREvent(cgrEv) // Updates the event directly in the cache sma.evCacheMux.Unlock() @@ -247,8 +254,10 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { utils.AsteriskAgent, err.Error(), ev.ChannelID())) return } + // populate init session args initSessionArgs := ev.V1InitSessionArgs(*cgrEv) + if initSessionArgs == nil { utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate init session arguments", utils.AsteriskAgent, ev.ChannelID())) @@ -294,7 +303,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { utils.AsteriskAgent, ev.ChannelID())) return } - var reply string if err := sma.smg.Call(utils.SessionSv1TerminateSession, tsArgs, &reply); err != nil { @@ -328,6 +336,27 @@ func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply int return utils.RPCCall(sma, serviceMethod, args, reply) } -func (fsa *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) { - return utils.ErrNotImplemented +func (sma *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, + sessionIDs *[]*sessions.SessionID) (err error) { + utils.Logger.Debug(fmt.Sprintf("ASTERISK Enter in Sync session??")) + var sIDs []*sessions.SessionID + i := 0 + sma.evCacheMux.RLock() + originIds := make([]string, len(sma.eventsCache)) + for orgId := range sma.eventsCache { + originIds[i] = orgId + i++ + } + sma.evCacheMux.RUnlock() + fmt.Println("originIds : ", originIds) + fmt.Println("sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address : ", sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address) + for _, orgId := range originIds { + sIDs = append(sIDs, &sessions.SessionID{ + OriginHost: sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, + OriginID: orgId}, + ) + } + *sessionIDs = sIDs + return + //return utils.ErrNotImplemented } diff --git a/agents/libhttpagent_test.go b/agents/libhttpagent_test.go index c35f83931..311a42ac7 100644 --- a/agents/libhttpagent_test.go +++ b/agents/libhttpagent_test.go @@ -130,4 +130,14 @@ func TestHttpXmlDPFieldAsInterface(t *testing.T) { } else if data != "37" { t.Errorf("expecting: 37, received: <%s>", data) } + if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg", "@calllegid"}); err != nil { + t.Error(err) + } else if data != "222146" { + t.Errorf("expecting: 222146, received: <%s>", data) + } + if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg[1]", "@calllegid"}); err != nil { + t.Error(err) + } else if data != "222147" { + t.Errorf("expecting: 222147, received: <%s>", data) + } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index c6ba5cebc..a23ebbfa5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -315,15 +315,19 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit smgRpcConn := <-internalSMGChan internalSMGChan <- smgRpcConn birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric)) + var reply string for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt) if err != nil { - utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) exitChan <- true return } + if err := birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err)) + } if err = sma.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf(" runtime error: %s!", err)) + utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) } } exitChan <- true diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf index b2368c755..7035c57a3 100755 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/extensions.conf @@ -2,7 +2,7 @@ exten => _1XXX,1,NoOp() same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if CGRateS is not active same => n,DumpChan() - same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*attributes;*accounts) + same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*attributes*sessions*suppliers*thresholds*stats*accounts*resources) same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime})) same => n,Hangup() diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf index 12b5e2e8b..9d1f51885 100644 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/http.conf @@ -1,4 +1,4 @@ [general] enabled = yes -bindaddr = 127.0.0.1 +bindaddr = 0.0.0.0 bindport = 8088 \ No newline at end of file diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf index e99652665..4db5b566c 100755 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf @@ -1,7 +1,7 @@ [simpletrans] type=transport protocol=udp -bind=0.0.0.0 +bind=0.0.0.0:5080 [1001] type = endpoint @@ -27,16 +27,15 @@ password=CGRateS.org type = endpoint transport = simpletrans context = internal -aors = 1002 -auth = 1002 disallow = all allow = ulaw allow = alaw +aors = 1002 +auth = 1002 [1002] type = aor max_contacts = 5 -qualify_frequency = 0 [1002] type=auth diff --git a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json index ba6c82254..778dbf093 100644 --- a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -28,21 +28,24 @@ "rals": { "enabled": true, "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport":"*json"}, ], "stats_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport":"*json"}, ], "attributes_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport":"*json"}, ], }, "cdrs": { "enabled": true, + "sessions_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], "stats_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "sessions_cost_retries": 5, }, @@ -51,45 +54,47 @@ "sessions": { "enabled": true, "rals_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "cdrs_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "resources_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "suppliers_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "attributes_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "stats_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], "thresholds_conns": [ - {"address": "*internal"} + {"address": "127.0.0.1:2012", "transport": "*json"} ], - "debit_interval": "10s", + "debit_interval": "5s", + "channel_sync_interval":"7s", }, "asterisk_agent": { "enabled": true, "sessions_conns": [ - {"address": "*internal"} + {"address": "*internal"} ], "create_cdr": true, "asterisk_conns":[ - {"address": "192.168.56.103:8088", "user": "cgrates", + {"address": "192.168.56.203:8088", "user": "cgrates", "password": "CGRateS.org", "connect_attempts": 3,"reconnects": 10} ], }, "attributes": { - "enabled": true, + "enabled": true, + "string_indexed_fields": ["Account"], }, @@ -98,6 +103,7 @@ "thresholds_conns": [ {"address": "*internal"} ], + "string_indexed_fields": ["Account"], }, @@ -106,11 +112,13 @@ "thresholds_conns": [ {"address": "*internal"} ], + "string_indexed_fields": ["Account","RunID","Destination"], }, "thresholds": { "enabled": true, + "string_indexed_fields": ["Account"], }, @@ -125,6 +133,7 @@ "stats_conns": [ {"address": "*internal"} ], + "string_indexed_fields": ["Account"], }, diff --git a/general_tests/tutorial_calls_test.go b/general_tests/tutorial_calls_test.go index cb897ab28..7a160a653 100755 --- a/general_tests/tutorial_calls_test.go +++ b/general_tests/tutorial_calls_test.go @@ -104,14 +104,12 @@ func TestOpensipsCalls(t *testing.T) { } } -/* Need to be checked func TestAsteriskCalls(t *testing.T) { optConf = utils.Asterisk for _, stest := range sTestsCalls { t.Run("", stest) } } -*/ func testCallInitCfg(t *testing.T) { // Init config first @@ -243,7 +241,7 @@ func testCallRestartFS(t *testing.T) { // Connect rpc client to rater func testCallRpcConn(t *testing.T) { var err error - tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } diff --git a/sessions/sessions.go b/sessions/sessions.go index 15643a6df..48e380750 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2322,6 +2322,7 @@ func (smg *SMGeneric) OnBiJSONDisconnect(c *rpc2.Client) { } func (smg *SMGeneric) syncSessions() { + utils.Logger.Debug("Enter in sync sessions ????") var rpcClnts []rpcclient.RpcClientConnection for _, conn := range smg.intBiJSONConns { rpcClnts = append(rpcClnts, conn) @@ -2344,6 +2345,7 @@ func (smg *SMGeneric) syncSessions() { } } } + utils.Logger.Debug(fmt.Sprintf("queried CGRIDS : %+v", queriedCGRIDs)) var toBeRemoved []string smg.aSessionsMux.RLock() for cgrid := range smg.activeSessions {