diff --git a/agents/kamagent.go b/agents/kamagent.go index 136252a6a..52376ed63 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -63,8 +63,7 @@ type KamailioAgent struct { activeSessionIDs chan []*sessions.SessionID } -func (self *KamailioAgent) Connect() error { - var err error +func (self *KamailioAgent) Connect() (err error) { eventHandlers := map[*regexp.Regexp][]func([]byte, int){ kamAuthReqRegexp: {self.onCgrAuth}, kamCallStartRegexp: {self.onCallStart}, @@ -77,7 +76,7 @@ func (self *KamailioAgent) Connect() error { for connIdx, connCfg := range self.cfg.EvapiConns { logger := log.New(utils.Logger, "kamevapi:", 2) if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, eventHandlers, logger); err != nil { - return err + return } go func(conn *kamevapi.KamEvapi) { // Start reading in own goroutine, return on error if err := conn.ReadEvents(); err != nil { @@ -86,7 +85,7 @@ func (self *KamailioAgent) Connect() error { }(self.conns[connIdx]) } err = <-errChan // Will keep the Connect locked until the first error in one of the connections - return err + return } func (self *KamailioAgent) Shutdown() (err error) { @@ -139,8 +138,10 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { } authArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID var authReply sessions.V1AuthorizeReply - err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply) - if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil { + if err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to auth event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } else if err = ka.conns[connIdx].Send(kar.String()); err != nil { @@ -300,9 +301,10 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) { procEvArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID var processReply sessions.V1ProcessMessageReply - err = ka.sessionS.Call(utils.SessionSv1ProcessMessage, procEvArgs, &processReply) - - if kar, err := kev.AsKamProcessMessageReply(procEvArgs, &processReply, err); err != nil { + if err = ka.sessionS.Call(utils.SessionSv1ProcessMessage, procEvArgs, &processReply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to proccess message: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if kar, err := kev.AsKamProcessMessageReply(procEvArgs, &processReply, err); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } else if err = ka.conns[connIdx].Send(kar.String()); err != nil { @@ -344,9 +346,10 @@ func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) { procCDRArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID var processReply string - err = ka.sessionS.Call(utils.SessionSv1ProcessCDR, procCDRArgs, &processReply) - - if kar, err := kev.AsKamProcessCDRReply(procCDRArgs, &processReply, err); err != nil { + if err = ka.sessionS.Call(utils.SessionSv1ProcessCDR, procCDRArgs, &processReply); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed to process CDR for event: %s, error: %s", + utils.KamailioAgent, kev[utils.OriginID], err.Error())) + } else if kar, err := kev.AsKamProcessCDRReply(procCDRArgs, &processReply, err); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } else if err = ka.conns[connIdx].Send(kar.String()); err != nil { @@ -355,13 +358,12 @@ func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) { } } -func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) error { - if err := self.conns[connIdx].Send(dscEv.String()); err != nil { +func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) (err error) { + if err = self.conns[connIdx].Send(dscEv.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %v, error %s", utils.KamailioAgent, utils.ToJSON(dscEv), connIdx, err.Error())) - return err } - return nil + return } // Internal method to disconnect session in Kamailio @@ -394,6 +396,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r return } +// V1GetActiveSessionIDs returns a list of CGRIDs based on active sessions from agent func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) { for _, evapi := range ka.conns { kamEv, _ := json.Marshal(map[string]string{utils.Event: CGR_DLG_LIST}) @@ -403,11 +406,15 @@ func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*s return } } - select { - case *sessionIDs = <-ka.activeSessionIDs: - case <-time.After(5 * time.Second): - return errors.New("timeout executing dialog list") + for range ka.conns { + select { + case sIDs := <-ka.activeSessionIDs: + *sessionIDs = append(*sessionIDs, sIDs...) + case <-time.After(5 * time.Second): + return errors.New("timeout executing dialog list") + } } + fmt.Println(utils.ToJSON(*sessionIDs)) return } diff --git a/agents/kamevent.go b/agents/kamevent.go index e6a341960..ae0d9c434 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -234,7 +234,7 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs, if authArgs.GetAttributes && authReply.Attributes != nil { kar.Attributes = authReply.Attributes.Digest() } - if authArgs.AuthorizeResources { + if authArgs.AuthorizeResources && authReply.ResourceAllocation != nil { kar.ResourceAllocation = *authReply.ResourceAllocation } if authArgs.GetMaxUsage { @@ -248,10 +248,10 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs, kar.Suppliers = authReply.Suppliers.Digest() } - if authArgs.ProcessThresholds { + if authArgs.ProcessThresholds && authReply.ThresholdIDs != nil { kar.Thresholds = strings.Join(*authReply.ThresholdIDs, utils.FIELDS_SEP) } - if authArgs.ProcessStats { + if authArgs.ProcessStats && authReply.StatQueueIDs != nil { kar.StatQueues = strings.Join(*authReply.StatQueueIDs, utils.FIELDS_SEP) } return