From 64426affc4dd8dcb0302938cf7f802fa8c11b6bb Mon Sep 17 00:00:00 2001 From: TeoV Date: Fri, 11 May 2018 11:30:24 -0400 Subject: [PATCH] Update Asterisk Agent --- agents/asterisk_event.go | 194 ++++++++++++++++ agents/asteriskagent.go | 210 +++++++++++------- cmd/cgr-engine/cgr-engine.go | 2 +- .../asterisk/etc/asterisk/pjsip.conf | 2 +- .../cgrates/etc/cgrates/cgrates.json | 135 ++++++++--- utils/consts.go | 1 + 6 files changed, 428 insertions(+), 116 deletions(-) diff --git a/agents/asterisk_event.go b/agents/asterisk_event.go index c2d4ea221..f29ecceb1 100644 --- a/agents/asterisk_event.go +++ b/agents/asterisk_event.go @@ -21,6 +21,7 @@ package agents import ( "strings" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" ) @@ -227,6 +228,7 @@ func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *sessions.SMGenericEvent) err if _, hasIt := resSMGEv[utils.AnswerTime]; !hasIt { resSMGEv[utils.Usage] = "0s" } else { + if aTime, err := smgEv.GetAnswerTime(utils.META_DEFAULT, ""); err != nil { return err } else if aTime.IsZero() { @@ -243,3 +245,195 @@ func (smaEv *SMAsteriskEvent) UpdateSMGEvent(smgEv *sessions.SMGenericEvent) err *smgEv = resSMGEv return nil } + +func (smaEv *SMAsteriskEvent) UpdateCGREvent(cgrEv *utils.CGREvent) error { + resCGREv := *cgrEv + switch smaEv.EventType() { + case ARIChannelStateChange: + if smaEv.ChannelState() == channelUp { + resCGREv.Event[utils.EVENT_NAME] = SMASessionStart + resCGREv.Event[utils.AnswerTime] = smaEv.Timestamp() + } + case ARIChannelDestroyed: + resCGREv.Event[utils.EVENT_NAME] = SMASessionTerminate + resCGREv.Event[utils.DISCONNECT_CAUSE] = smaEv.DisconnectCause() + if _, hasIt := resCGREv.Event[utils.AnswerTime]; !hasIt { + resCGREv.Event[utils.Usage] = "0s" + } else { + if aTime, err := utils.IfaceAsTime(resCGREv.Event[utils.AnswerTime], config.CgrConfig().DefaultTimezone); err != nil { + return err + } else if aTime.IsZero() { + resCGREv.Event[utils.Usage] = "0s" + } else { + actualTime, err := utils.ParseTimeDetectLayout(smaEv.Timestamp(), "") + if err != nil { + return err + } + resCGREv.Event[utils.Usage] = actualTime.Sub(aTime).String() + } + } + } + *cgrEv = resCGREv + return nil +} + +func (smaEv *SMAsteriskEvent) AsMapStringInterface() (mp map[string]interface{}) { + mp = make(map[string]interface{}) + var evName string + switch smaEv.EventType() { + case ARIStasisStart: + evName = SMAAuthorization + case ARIChannelStateChange: + evName = SMASessionStart + case ARIChannelDestroyed: + evName = SMASessionTerminate + } + mp[utils.EVENT_NAME] = evName + mp[utils.OriginID] = smaEv.ChannelID() + if smaEv.RequestType() != "" { + mp[utils.RequestType] = smaEv.RequestType() + } + if smaEv.Tenant() != "" { + mp[utils.Tenant] = smaEv.Tenant() + } + if smaEv.Category() != "" { + mp[utils.Category] = smaEv.Category() + } + if smaEv.Subject() != "" { + mp[utils.Subject] = smaEv.Subject() + } + mp[utils.OriginHost] = smaEv.OriginatorIP() + mp[utils.Account] = smaEv.Account() + mp[utils.Destination] = smaEv.Destination() + mp[utils.SetupTime] = smaEv.Timestamp() + if smaEv.Supplier() != "" { + mp[utils.SUPPLIER] = smaEv.Supplier() + } + for extraKey, extraVal := range smaEv.ExtraParameters() { // Append extraParameters + mp[extraKey] = extraVal + } + return +} + +// AsCDR converts KamEvent into CGREvent +func (smaEv *SMAsteriskEvent) AsCGREvent(timezone string) (cgrEv *utils.CGREvent, err error) { + setupTime, err := utils.ParseTimeDetectLayout( + smaEv.Timestamp(), timezone) + if err != nil { + return + } + cgrEv = &utils.CGREvent{ + Tenant: utils.FirstNonEmpty(smaEv.Tenant(), + config.CgrConfig().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Time: &setupTime, + Event: smaEv.AsMapStringInterface(), + } + return cgrEv, nil +} + +func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs) { + cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone) + if err != nil { + return + } + args = &sessions.V1AuthorizeArgs{ + GetMaxUsage: true, + CGREvent: *cgrEv, + } + // For the moment hardcoded only GetMaxUsage : true + /* + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.GetMaxUsage = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.AuthorizeResources = true + } + if strings.Index(subsystems, utils.MetaSuppliers) != -1 { + args.GetSuppliers = true + if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 { + args.SuppliersMaxCost = utils.MetaEventCost + } + if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 { + args.SuppliersIgnoreErrors = true + } + } + if strings.Index(subsystems, utils.MetaAttributes) != -1 { + args.GetAttributes = true + } + if strings.Index(subsystems, utils.MetaThresholds) != -1 { + args.ProcessThresholds = utils.BoolPointer(true) + } + if strings.Index(subsystems, utils.MetaStats) != -1 { + args.ProcessStatQueues = utils.BoolPointer(true) + } + */ + return +} + +func (smaEv *SMAsteriskEvent) V1InitSessionArgs() (args *sessions.V1InitSessionArgs) { + cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone) + if err != nil { + return + } + args = &sessions.V1InitSessionArgs{ // defaults + InitSession: true, + CGREvent: *cgrEv, + } + /* + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.InitSession = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.AllocateResources = true + } + if strings.Index(subsystems, utils.MetaAttributes) != -1 { + args.GetAttributes = true + } + if strings.Index(subsystems, utils.MetaThresholds) != -1 { + args.ProcessThresholds = utils.BoolPointer(true) + } + if strings.Index(subsystems, utils.MetaStats) != -1 { + args.ProcessStatQueues = utils.BoolPointer(true) + } + */ + return +} + +func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs() (args *sessions.V1TerminateSessionArgs) { + cgrEv, err := smaEv.AsCGREvent(config.CgrConfig().DefaultTimezone) + if err != nil { + return + } + args = &sessions.V1TerminateSessionArgs{ // defaults + TerminateSession: true, + CGREvent: *cgrEv, + } + /* + subsystems, has := kev[KamCGRSubsystems] + if !has { + return + } + if strings.Index(subsystems, utils.MetaAccounts) == -1 { + args.TerminateSession = false + } + if strings.Index(subsystems, utils.MetaResources) != -1 { + args.ReleaseResources = true + } + if strings.Index(subsystems, utils.MetaThresholds) != -1 { + args.ProcessThresholds = utils.BoolPointer(true) + } + if strings.Index(subsystems, utils.MetaStats) != -1 { + args.ProcessStatQueues = utils.BoolPointer(true) + } + */ + return +} diff --git a/agents/asteriskagent.go b/agents/asteriskagent.go index 5ece52e6c..44bc50312 100644 --- a/agents/asteriskagent.go +++ b/agents/asteriskagent.go @@ -48,30 +48,33 @@ const ( SMASessionTerminate = "SMA_SESSION_TERMINATE" ) -func NewSMAsterisk(cgrCfg *config.CGRConfig, astConnIdx int, smgConn *utils.BiRPCInternalClient) (*SMAsterisk, error) { - sma := &SMAsterisk{cgrCfg: cgrCfg, smg: smgConn, - eventsCache: make(map[string]*sessions.SMGenericEvent)} +func NewAsteriskAgent(cgrCfg *config.CGRConfig, astConnIdx int, + smgConn *utils.BiRPCInternalClient) (*AsteriskAgent, error) { + sma := &AsteriskAgent{cgrCfg: cgrCfg, smg: smgConn, + eventsCache: make(map[string]*utils.CGREvent)} sma.smg.SetClientConn(sma) // pass the connection to SMA back into smg so we can receive the disconnects return sma, nil } -type SMAsterisk struct { +type AsteriskAgent struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple astConnIdx int smg *utils.BiRPCInternalClient astConn *aringo.ARInGO astEvChan chan map[string]interface{} astErrChan chan error - eventsCache map[string]*sessions.SMGenericEvent // used to gather information about events during various phases - evCacheMux sync.RWMutex // Protect eventsCache + eventsCache map[string]*utils.CGREvent // used to gather information about events during various phases + evCacheMux sync.RWMutex // Protect eventsCache } -func (sma *SMAsterisk) connectAsterisk() (err error) { +func (sma *AsteriskAgent) connectAsterisk() (err error) { connCfg := sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx] sma.astEvChan = make(chan map[string]interface{}) sma.astErrChan = make(chan error) - sma.astConn, err = aringo.NewARInGO(fmt.Sprintf("ws://%s/ari/events?api_key=%s:%s&app=%s", connCfg.Address, connCfg.User, connCfg.Password, CGRAuthAPP), "http://cgrates.org", - connCfg.User, connCfg.Password, fmt.Sprintf("%s %s", utils.CGRateS, utils.VERSION), sma.astEvChan, sma.astErrChan, connCfg.ConnectAttempts, connCfg.Reconnects) + sma.astConn, err = aringo.NewARInGO(fmt.Sprintf("ws://%s/ari/events?api_key=%s:%s&app=%s", + connCfg.Address, connCfg.User, connCfg.Password, CGRAuthAPP), "http://cgrates.org", + connCfg.User, connCfg.Password, fmt.Sprintf("%s %s", utils.CGRateS, utils.VERSION), + sma.astEvChan, sma.astErrChan, connCfg.ConnectAttempts, connCfg.Reconnects) if err != nil { return err } @@ -79,7 +82,7 @@ func (sma *SMAsterisk) connectAsterisk() (err error) { } // Called to start the service -func (sma *SMAsterisk) ListenAndServe() (err error) { +func (sma *AsteriskAgent) ListenAndServe() (err error) { if err := sma.connectAsterisk(); err != nil { return err } @@ -88,7 +91,8 @@ func (sma *SMAsterisk) ListenAndServe() (err error) { case err = <-sma.astErrChan: return case astRawEv := <-sma.astEvChan: - smAsteriskEvent := NewSMAsteriskEvent(astRawEv, strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0]) + smAsteriskEvent := NewSMAsteriskEvent(astRawEv, + strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0]) switch smAsteriskEvent.EventType() { case ARIStasisStart: go sma.handleStasisStart(smAsteriskEvent) @@ -99,52 +103,71 @@ func (sma *SMAsterisk) ListenAndServe() (err error) { } } } - panic(" ListenAndServe out of select") + panic(" ListenAndServe out of select") } // hangupChannel will disconnect from CGRateS side with congestion reason -func (sma *SMAsterisk) hangupChannel(channelID string) (err error) { +func (sma *AsteriskAgent) hangupChannel(channelID string) (err error) { _, err = sma.astConn.Call(aringo.HTTP_DELETE, fmt.Sprintf("http://%s/ari/channels/%s", sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, channelID), url.Values{"reason": {"congestion"}}) return } -func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { +func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) { // Subscribe for channel updates even after we leave Stasis - if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/applications/%s/subscription?eventSource=channel:%s", - sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, CGRAuthAPP, ev.ChannelID()), nil); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when subscribing to events for channelID: %s", err.Error(), ev.ChannelID())) + if _, err := sma.astConn.Call(aringo.HTTP_POST, + fmt.Sprintf("http://%s/ari/applications/%s/subscription?eventSource=channel:%s", + sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, + CGRAuthAPP, ev.ChannelID()), nil); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when subscribingto events for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) // Since we got error, disconnect channel if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return } - // Query the SMG via RPC for maxUsage - var maxUsage float64 - smgEv := ev.AsSMGenericEvent() - if err := sma.smg.Call("SMGenericV1.GetMaxUsage", *smgEv, &maxUsage); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to authorize session for channelID: %s", err.Error(), ev.ChannelID())) + //authorize Session + authArgs := ev.V1AuthorizeArgs() + if authArgs == nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate auth session arguments", + utils.AsteriskAgent, ev.ChannelID())) + return + } + // var maxUsage float64 + // smgEv := ev.AsSMGenericEvent() + var authReply sessions.V1AuthorizeReply + if err := sma.smg.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to authorize session for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return } - if maxUsage == 0 { + if authReply.MaxUsage != nil && *authReply.MaxUsage == time.Duration(0) { if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return - } else if maxUsage != -1 { + } else if *authReply.MaxUsage != time.Duration(-1) { // Set absolute timeout for non-postpaid calls - if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/variable?variable=%s", // Asterisk having issue with variable terminating empty so harcoding param in url - sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID(), CGRMaxSessionTime), - url.Values{"value": {strconv.FormatFloat(maxUsage*1000, 'f', -1, 64)}}); err != nil { // Asterisk expects value in ms - utils.Logger.Err(fmt.Sprintf(" Error: %s when setting %s for channelID: %s", err.Error(), CGRMaxSessionTime, ev.ChannelID())) + if _, err := sma.astConn.Call(aringo.HTTP_POST, + fmt.Sprintf("http://%s/ari/channels/%s/variable?variable=%s", // Asterisk having issue with variable terminating empty so harcoding param in url + sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, + ev.ChannelID(), CGRMaxSessionTime), + url.Values{"value": {strconv.FormatFloat( + float64(authReply.MaxUsage.Nanoseconds())*1000, 'f', -1, 64)}}); err != nil { // Asterisk expects value in ms + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when setting %s for channelID: %s", + utils.AsteriskAgent, err.Error(), CGRMaxSessionTime, ev.ChannelID())) // Since we got error, disconnect channel if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return } @@ -152,108 +175,141 @@ func (sma *SMAsterisk) handleStasisStart(ev *SMAsteriskEvent) { // Exit channel from stasis if _, err := sma.astConn.Call(aringo.HTTP_POST, fmt.Sprintf("http://%s/ari/channels/%s/continue", - sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ev.ChannelID()), nil); err != nil { + sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, + ev.ChannelID()), nil); err != nil { } // Done with processing event, cache it for later use sma.evCacheMux.Lock() - sma.eventsCache[ev.ChannelID()] = smgEv + sma.eventsCache[ev.ChannelID()] = &authArgs.CGREvent sma.evCacheMux.Unlock() } // Ussually channelUP -func (sma *SMAsterisk) handleChannelStateChange(ev *SMAsteriskEvent) { +func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) { if ev.ChannelState() != channelUp { return } sma.evCacheMux.RLock() - smgEv, hasIt := sma.eventsCache[ev.ChannelID()] + _, hasIt := sma.eventsCache[ev.ChannelID()] sma.evCacheMux.RUnlock() if !hasIt { // Not handled by us return } - sma.evCacheMux.Lock() - err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache - sma.evCacheMux.Unlock() - if err != nil { + + initSessionArgs := ev.V1InitSessionArgs() + if initSessionArgs == nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate init session arguments", + utils.AsteriskAgent, ev.ChannelID())) + return + } + //initit Session + var initReply sessions.V1InitSessionReply + if err := sma.smg.Call(utils.SessionSv1InitiateSession, + initSessionArgs, &initReply); err != nil { utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", - err.Error(), ev.ChannelID())) + fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", - err.Error(), ev.ChannelID())) + fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) + } + return + } else if initReply.MaxUsage != nil && *initReply.MaxUsage == time.Duration(0) { + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return } - var maxUsage time.Duration - if err := sma.smg.Call(utils.SMGenericV2InitiateSession, - *smgEv, &maxUsage); err != nil { + sma.evCacheMux.Lock() + err := ev.UpdateCGREvent(&initSessionArgs.CGREvent) // Updates the event directly in the cache + sma.evCacheMux.Unlock() + if err != nil { utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", - err.Error(), ev.ChannelID())) + fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) if err := sma.hangupChannel(ev.ChannelID()); err != nil { utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", - err.Error(), ev.ChannelID())) - } - return - } else if maxUsage == 0 { - if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", - err.Error(), ev.ChannelID())) + fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } return } } // Channel disconnect -func (sma *SMAsterisk) handleChannelDestroyed(ev *SMAsteriskEvent) { +func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) { sma.evCacheMux.RLock() - smgEv, hasIt := sma.eventsCache[ev.ChannelID()] + _, hasIt := sma.eventsCache[ev.ChannelID()] sma.evCacheMux.RUnlock() if !hasIt { // Not handled by us return } - sma.evCacheMux.Lock() - err := ev.UpdateSMGEvent(smgEv) // Updates the event directly in the cache - sma.evCacheMux.Unlock() - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to initiate session for channelID: %s", err.Error(), ev.ChannelID())) - if err := sma.hangupChannel(ev.ChannelID()); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", err.Error(), ev.ChannelID())) - } + + //terminate session + tsArgs := ev.V1TerminateSessionArgs() + if tsArgs == nil { + utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate terminate session arguments", + utils.AsteriskAgent, ev.ChannelID())) return } var reply string - if err := sma.smg.Call("SMGenericV1.TerminateSession", *smgEv, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to terminate session for channelID: %s", err.Error(), ev.ChannelID())) + if err := sma.smg.Call(utils.SessionSv1TerminateSession, + tsArgs, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to terminate session for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) } if sma.cgrCfg.AsteriskAgentCfg().CreateCDR { - if err := sma.smg.Call("SMGenericV1.ProcessCDR", *smgEv, &reply); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error: %s when attempting to process CDR for channelID: %s", err.Error(), ev.ChannelID())) + setupTime, err := utils.ParseTimeDetectLayout( + ev.SetupTime(), config.CgrConfig().DefaultTimezone) + if err != nil { + return } + cgrEv := utils.CGREvent{ + Tenant: ev.Tenant(), + ID: utils.UUIDSha1Prefix(), + Time: &setupTime, + Event: ev.AsMapStringInterface(), + } + if err := sma.smg.Call(utils.SessionSv1ProcessCDR, cgrEv, &reply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to process CDR for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) + } + } + sma.evCacheMux.Lock() + err := ev.UpdateCGREvent(&tsArgs.CGREvent) // Updates the event directly in the cache + sma.evCacheMux.Unlock() + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to initiate session for channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) + if err := sma.hangupChannel(ev.ChannelID()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), ev.ChannelID())) + } + return } } // Called to shutdown the service -func (sma *SMAsterisk) ServiceShutdown() error { +func (sma *AsteriskAgent) ServiceShutdown() error { return nil } // Internal method to disconnect session in asterisk -func (sma *SMAsterisk) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error { +func (sma *AsteriskAgent) V1DisconnectSession(args utils.AttrDisconnectSession, reply *string) error { channelID := sessions.SMGenericEvent(args.EventStart).GetOriginID(utils.META_DEFAULT) if err := sma.hangupChannel(channelID); err != nil { utils.Logger.Err( - fmt.Sprintf(" Error: %s when attempting to disconnect channelID: %s", - err.Error(), channelID)) + fmt.Sprintf("<%s> Error: %s when attempting to disconnect channelID: %s", + utils.AsteriskAgent, err.Error(), channelID)) } *reply = utils.OK return nil } // rpcclient.RpcClientConnection interface -func (sma *SMAsterisk) Call(serviceMethod string, args interface{}, reply interface{}) error { +func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply interface{}) error { return utils.RPCCall(sma, serviceMethod, args, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index a75dec9cc..1ebe22f02 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -241,7 +241,7 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit internalSMGChan <- smgRpcConn birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric)) for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers - sma, err := agents.NewSMAsterisk(cfg, connIdx, birpcClnt) + sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt) if err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) exitChan <- true diff --git a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf index e99652665..b2649d605 100755 --- a/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf +++ b/data/tutorials/asterisk_ari/asterisk/etc/asterisk/pjsip.conf @@ -1,7 +1,7 @@ [simpletrans] type=transport protocol=udp -bind=0.0.0.0 +bind=192.168.56.203 [1001] type = endpoint diff --git a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json index 036565064..f28abc914 100644 --- a/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json +++ b/data/tutorials/asterisk_ari/cgrates/etc/cgrates/cgrates.json @@ -24,44 +24,70 @@ }, +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + "rals": { "enabled": true, - "cdrstats_conns": [ - {"address": "*internal"} + "thresholds_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "stats_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} ], "pubsubs_conns": [ {"address": "*internal"} ], - "users_conns": [ - {"address": "*internal"} - ], - "aliases_conns": [ - {"address": "*internal"} + "attributes_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} ], }, -"stor_db": { // database used to store offline tariff plans and CDRs - "db_password": "CGRateS.org", // password to use when connecting to stordb + +"cdrs": { + "enabled": true, + "sessions_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "stats_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "sessions_cost_retries": 5, +}, + + +"sessions": { + "enabled": true, + "rals_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "cdrs_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "resources_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "suppliers_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "attributes_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "stats_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "thresholds_conns": [ + {"address": "127.0.0.1:2012", "transport": "*json"} + ], + "debit_interval": "10s", }, "scheduler": { "enabled": true, }, - -"cdrs": { - "enabled": true, // start the CDR Server service: - "cdrstats_conns": [ - {"address": "*internal"} - ], -}, - - -"cdrstats": { - "enabled": true, // starts the cdrstats service: -}, - - "cdre": { "*default": { "cdr_format": "csv", // exported CDRs format @@ -125,33 +151,68 @@ }, -"sessions": { - "enabled": true, - "debit_interval": "5s", // interval to perform debits on. -}, - -"sm_asterisk": { - "enabled": true, // starts Asterisk SessionManager service: +"asterisk_agent": { + "enabled": true, + "sessions_conns": [ + {"address": "*internal"} + ], "create_cdr": true, - "asterisk_conns":[ // instantiate connections to multiple Asterisk servers + "asterisk_conns":[ {"address": "127.0.0.1:8088", "user": "cgrates", "password": "CGRateS.org", "connect_attempts": 3,"reconnects": 10} ], }, "pubsubs": { - "enabled": true, // starts PubSub service: . + "enabled": true, }, -"aliases": { - "enabled": true, // starts PubSub service: . +"attributes": { + "enabled": true, + "string_indexed_fields": ["Account"], }, -"users": { - "enabled": true, // starts User service: . - "indexes": ["Uuid"], // user profile field indexes +"resources": { + "enabled": true, + "thresholds_conns": [ + {"address": "*internal"} + ], + "string_indexed_fields": ["Account"], + "prefix_indexed_fields": ["Destination"], }, + +"stats": { + "enabled": true, + "thresholds_conns": [ + {"address": "*internal"} + ], + "string_indexed_fields": ["Account"], +}, + + +"thresholds": { + "enabled": true, + "string_indexed_fields": ["Account"], +}, + + +"suppliers": { + "enabled": true, + "rals_conns": [ + {"address": "*internal"} + ], + "resources_conns": [ + {"address": "*internal"} + ], + "stats_conns": [ + {"address": "*internal"} + ], + "string_indexed_fields": ["Account"], + "prefix_indexed_fields": ["Destination"], +}, + + } diff --git a/utils/consts.go b/utils/consts.go index 68e980181..ac55ac920 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -842,6 +842,7 @@ const ( RadiusAgent = "RadiusAgent" DiameterAgent = "DiameterAgent" FreeSWITCHAgent = "FreeSWITCHAgent" + AsteriskAgent = "AsteriskAgent" ) func buildCacheInstRevPrefixes() {