mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Updated KamailioAgent.V1GetActiveSessionIDs. Fixes #1717
This commit is contained in:
committed by
Dan Christian Bogos
parent
c2733c6f2e
commit
ac6bc8866e
@@ -63,8 +63,7 @@ type KamailioAgent struct {
|
||||
activeSessionIDs chan []*sessions.SessionID
|
||||
}
|
||||
|
||||
func (self *KamailioAgent) Connect() error {
|
||||
var err error
|
||||
func (self *KamailioAgent) Connect() (err error) {
|
||||
eventHandlers := map[*regexp.Regexp][]func([]byte, int){
|
||||
kamAuthReqRegexp: {self.onCgrAuth},
|
||||
kamCallStartRegexp: {self.onCallStart},
|
||||
@@ -77,7 +76,7 @@ func (self *KamailioAgent) Connect() error {
|
||||
for connIdx, connCfg := range self.cfg.EvapiConns {
|
||||
logger := log.New(utils.Logger, "kamevapi:", 2)
|
||||
if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, eventHandlers, logger); err != nil {
|
||||
return err
|
||||
return
|
||||
}
|
||||
go func(conn *kamevapi.KamEvapi) { // Start reading in own goroutine, return on error
|
||||
if err := conn.ReadEvents(); err != nil {
|
||||
@@ -86,7 +85,7 @@ func (self *KamailioAgent) Connect() error {
|
||||
}(self.conns[connIdx])
|
||||
}
|
||||
err = <-errChan // Will keep the Connect locked until the first error in one of the connections
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (self *KamailioAgent) Shutdown() (err error) {
|
||||
@@ -139,8 +138,10 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
|
||||
}
|
||||
authArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
|
||||
var authReply sessions.V1AuthorizeReply
|
||||
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
|
||||
if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
|
||||
if err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to auth event: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if err = ka.conns[connIdx].Send(kar.String()); err != nil {
|
||||
@@ -300,9 +301,10 @@ func (ka *KamailioAgent) onCgrProcessMessage(evData []byte, connIdx int) {
|
||||
procEvArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
|
||||
|
||||
var processReply sessions.V1ProcessMessageReply
|
||||
err = ka.sessionS.Call(utils.SessionSv1ProcessMessage, procEvArgs, &processReply)
|
||||
|
||||
if kar, err := kev.AsKamProcessMessageReply(procEvArgs, &processReply, err); err != nil {
|
||||
if err = ka.sessionS.Call(utils.SessionSv1ProcessMessage, procEvArgs, &processReply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to proccess message: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if kar, err := kev.AsKamProcessMessageReply(procEvArgs, &processReply, err); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if err = ka.conns[connIdx].Send(kar.String()); err != nil {
|
||||
@@ -344,9 +346,10 @@ func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) {
|
||||
procCDRArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
|
||||
|
||||
var processReply string
|
||||
err = ka.sessionS.Call(utils.SessionSv1ProcessCDR, procCDRArgs, &processReply)
|
||||
|
||||
if kar, err := kev.AsKamProcessCDRReply(procCDRArgs, &processReply, err); err != nil {
|
||||
if err = ka.sessionS.Call(utils.SessionSv1ProcessCDR, procCDRArgs, &processReply); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed to process CDR for event: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if kar, err := kev.AsKamProcessCDRReply(procCDRArgs, &processReply, err); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s",
|
||||
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
|
||||
} else if err = ka.conns[connIdx].Send(kar.String()); err != nil {
|
||||
@@ -355,13 +358,12 @@ func (ka *KamailioAgent) onCgrProcessCDR(evData []byte, connIdx int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) error {
|
||||
if err := self.conns[connIdx].Send(dscEv.String()); err != nil {
|
||||
func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) (err error) {
|
||||
if err = self.conns[connIdx].Send(dscEv.String()); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %v, error %s",
|
||||
utils.KamailioAgent, utils.ToJSON(dscEv), connIdx, err.Error()))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
// Internal method to disconnect session in Kamailio
|
||||
@@ -394,6 +396,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
|
||||
return
|
||||
}
|
||||
|
||||
// V1GetActiveSessionIDs returns a list of CGRIDs based on active sessions from agent
|
||||
func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
|
||||
for _, evapi := range ka.conns {
|
||||
kamEv, _ := json.Marshal(map[string]string{utils.Event: CGR_DLG_LIST})
|
||||
@@ -403,11 +406,15 @@ func (ka *KamailioAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*s
|
||||
return
|
||||
}
|
||||
}
|
||||
select {
|
||||
case *sessionIDs = <-ka.activeSessionIDs:
|
||||
case <-time.After(5 * time.Second):
|
||||
return errors.New("timeout executing dialog list")
|
||||
for range ka.conns {
|
||||
select {
|
||||
case sIDs := <-ka.activeSessionIDs:
|
||||
*sessionIDs = append(*sessionIDs, sIDs...)
|
||||
case <-time.After(5 * time.Second):
|
||||
return errors.New("timeout executing dialog list")
|
||||
}
|
||||
}
|
||||
fmt.Println(utils.ToJSON(*sessionIDs))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -234,7 +234,7 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
|
||||
if authArgs.GetAttributes && authReply.Attributes != nil {
|
||||
kar.Attributes = authReply.Attributes.Digest()
|
||||
}
|
||||
if authArgs.AuthorizeResources {
|
||||
if authArgs.AuthorizeResources && authReply.ResourceAllocation != nil {
|
||||
kar.ResourceAllocation = *authReply.ResourceAllocation
|
||||
}
|
||||
if authArgs.GetMaxUsage {
|
||||
@@ -248,10 +248,10 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
|
||||
kar.Suppliers = authReply.Suppliers.Digest()
|
||||
}
|
||||
|
||||
if authArgs.ProcessThresholds {
|
||||
if authArgs.ProcessThresholds && authReply.ThresholdIDs != nil {
|
||||
kar.Thresholds = strings.Join(*authReply.ThresholdIDs, utils.FIELDS_SEP)
|
||||
}
|
||||
if authArgs.ProcessStats {
|
||||
if authArgs.ProcessStats && authReply.StatQueueIDs != nil {
|
||||
kar.StatQueues = strings.Join(*authReply.StatQueueIDs, utils.FIELDS_SEP)
|
||||
}
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user