From e7b9996a0083e02ec5aefd670ae9059e14446bec Mon Sep 17 00:00:00 2001 From: Tripon Alexandru-Ionut Date: Thu, 6 Jun 2019 10:41:02 +0300 Subject: [PATCH] Updated Kamailio connID to connIdx --- agents/kamagent.go | 106 ++++++++++++++++++++++------------------ agents/kamevent.go | 3 +- agents/kamevent_test.go | 6 ++- glide.lock | 2 +- 4 files changed, 66 insertions(+), 51 deletions(-) diff --git a/agents/kamagent.go b/agents/kamagent.go index 4aa3c568a..8d0b824da 100644 --- a/agents/kamagent.go +++ b/agents/kamagent.go @@ -48,9 +48,8 @@ func NewKamailioAgent(kaCfg *config.KamAgentCfg, cfg: kaCfg, sessionS: sessionS, timezone: timezone, - conns: make(map[string]*kamevapi.KamEvapi), + conns: make([]*kamevapi.KamEvapi, len(kaCfg.EvapiConns)), activeSessionIDs: make(chan []*sessions.SessionID), - connAliases: make(map[string]string), } return } @@ -59,14 +58,13 @@ type KamailioAgent struct { cfg *config.KamAgentCfg sessionS rpcclient.RpcClientConnection timezone string - conns map[string]*kamevapi.KamEvapi - connAliases map[string]string + conns []*kamevapi.KamEvapi activeSessionIDs chan []*sessions.SessionID } func (self *KamailioAgent) Connect() error { var err error - eventHandlers := map[*regexp.Regexp][]func([]byte, string){ + eventHandlers := map[*regexp.Regexp][]func([]byte, int){ kamAuthReqRegexp: {self.onCgrAuth}, kamCallStartRegexp: {self.onCallStart}, kamCallEndRegexp: {self.onCallEnd}, @@ -74,15 +72,13 @@ func (self *KamailioAgent) Connect() error { kamProcessEventRegex: {self.onCgrProcessEvent}, } errChan := make(chan error) - for _, connCfg := range self.cfg.EvapiConns { - connID := utils.GenUUID() - self.connAliases[connID] = connCfg.Alias + for connIdx, connCfg := range self.cfg.EvapiConns { logger := log.New(utils.Logger, "kamevapi:", 2) - if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil { + if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, eventHandlers, logger); err != nil { return err } go func() { // Start reading in own goroutine, return on error - if err := self.conns[connID].ReadEvents(); err != nil { + if err := self.conns[connIdx].ReadEvents(); err != nil { errChan <- err } }() @@ -101,8 +97,13 @@ func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply inte } // onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming -func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) { - kev, err := NewKamEvent(evData) +func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) { + if connIdx >= len(ka.conns) { // protection against index out of range panic + err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + return + } + kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s", utils.KamailioAgent, evData, err.Error())) @@ -115,7 +116,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) { if kRply, err := kev.AsKamAuthReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) - } else if err = ka.conns[connID].Send(kRply.String()); err != nil { + } else if err = ka.conns[connIdx].Send(kRply.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } @@ -127,21 +128,25 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) { utils.KamailioAgent, kev[utils.OriginID])) return } - authArgs.CGREvent.Event[utils.OriginHost] = utils.FirstNonEmpty(authArgs.CGREvent.Event[utils.OriginHost].(string), ka.connAliases[connID], ka.conns[connID].RemoteAddr().String()) - authArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID + authArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID var authReply sessions.V1AuthorizeReply err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply) if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) - } else if err = ka.conns[connID].Send(kar.String()); err != nil { + } else if err = ka.conns[connIdx].Send(kar.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } } -func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { - kev, err := NewKamEvent(evData) +func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) { + if connIdx >= len(ka.conns) { // protection against index out of range panic + err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + return + } + kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s", utils.KamailioAgent, evData, err.Error())) @@ -151,7 +156,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { return } if kev.MissingParameter() { - ka.disconnectSession(connID, + ka.disconnectSession(connIdx, NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID], utils.ErrMandatoryIeMissing.Error())) return @@ -162,8 +167,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { utils.KamailioAgent, kev[utils.OriginID])) return } - initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later - + initSessionArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID so we can properly disconnect later var initReply sessions.V1InitSessionReply if err := ka.sessionS.Call(utils.SessionSv1InitiateSession, @@ -171,15 +175,20 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) { utils.Logger.Err( fmt.Sprintf("<%s> could not process answer for event %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) - ka.disconnectSession(connID, + ka.disconnectSession(connIdx, NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID], utils.ErrServerError.Error())) return } } -func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { - kev, err := NewKamEvent(evData) +func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) { + if connIdx >= len(ka.conns) { // protection against index out of range panic + err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + return + } + kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s", utils.KamailioAgent, evData, err.Error())) @@ -200,8 +209,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { return } var reply string - - tsArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID in case we need to create a session and disconnect it + tsArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it if err := ka.sessionS.Call(utils.SessionSv1TerminateSession, tsArgs, &reply); err != nil { utils.Logger.Err( @@ -214,7 +222,6 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { if err != nil { return } - cgrArgs := cgrEv.ConsumeArgs(strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1, false) if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR, &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrArgs.ArgDispatcher}, &reply); err != nil { @@ -224,7 +231,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) { } } -func (ka *KamailioAgent) onDlgList(evData []byte, connID string) { +func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) { kamDlgRpl, err := NewKamDlgReply(evData) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s", @@ -235,15 +242,20 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connID string) { // FixMe: find way to add OriginHost from event also, to be compatible with above implementation for _, dlgInfo := range kamDlgRpl.Jsonrpl_body.Result { sIDs = append(sIDs, &sessions.SessionID{ - OriginHost: ka.conns[connID].RemoteAddr().String(), + OriginHost: ka.conns[connIdx].RemoteAddr().String(), OriginID: dlgInfo.CallId + ";" + dlgInfo.Caller.Tag, }) } ka.activeSessionIDs <- sIDs } -func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { - kev, err := NewKamEvent(evData) +func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connIdx int) { + if connIdx >= len(ka.conns) { // protection against index out of range panic + err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + return + } + kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String()) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s", utils.KamailioAgent, evData, err.Error())) @@ -254,7 +266,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { if kRply, err := kev.AsKamProcessEventReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) - } else if err = ka.conns[connID].Send(kRply.String()); err != nil { + } else if err = ka.conns[connIdx].Send(kRply.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending process session event reply for event: %s, error %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } @@ -264,7 +276,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { //in case that we don't reveice cgr_subsystems from kamailio //we consider this as ping-pong event if _, has := kev[utils.CGRSubsystems]; !has { - if err = ka.conns[connID].Send(kev.AsKamProcessEventEmptyReply().String()); err != nil { + if err = ka.conns[connIdx].Send(kev.AsKamProcessEventEmptyReply().String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending empty process event reply for event: %s, error %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } @@ -276,12 +288,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { utils.KamailioAgent, kev[utils.OriginID])) return } - originHost := ka.conns[connID].RemoteAddr().String() - if oHIf, has := procEvArgs.CGREvent.Event[utils.OriginHost]; has { - originHost = oHIf.(string) - } - procEvArgs.CGREvent.Event[utils.OriginHost] = originHost - procEvArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID + procEvArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID var processReply sessions.V1ProcessEventReply err = ka.sessionS.Call(utils.SessionSv1ProcessEvent, procEvArgs, &processReply) @@ -289,16 +296,16 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) { if kar, err := kev.AsKamProcessEventReply(procEvArgs, &processReply, err); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) - } else if err = ka.conns[connID].Send(kar.String()); err != nil { + } else if err = ka.conns[connIdx].Send(kar.String()); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s", utils.KamailioAgent, kev[utils.OriginID], err.Error())) } } -func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error { - if err := self.conns[connID].Send(dscEv.String()); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s", - utils.KamailioAgent, utils.ToJSON(dscEv), err.Error(), connID)) +func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) error { + if err := self.conns[connIdx].Send(dscEv.String()); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %v, error %s", + utils.KamailioAgent, utils.ToJSON(dscEv), connIdx, err.Error())) return err } return nil @@ -314,7 +321,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r if err != nil { return err } - connIDIface, has := args.EventStart[EvapiConnID] + connIdxIface, has := args.EventStart[EvapiConnID] if !has { utils.Logger.Err( fmt.Sprintf("<%s> error: <%s:%s> when attempting to disconnect <%s:%s> and <%s:%s>", @@ -322,11 +329,16 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r KamHashEntry, hEntry, KamHashID, hID)) return } - connID, err := utils.IfaceAsString(connIDIface) + connIdx, err := utils.IfaceAsInt64(connIdxIface) if err != nil { return err } - if err = ka.disconnectSession(connID, + if int(connIdx) >= len(ka.conns) { // protection against index out of range panic + err = fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx) + utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error())) + return + } + if err = ka.disconnectSession(int(connIdx), NewKamSessionDisconnect(hEntry, hID, utils.ErrInsufficientCredit.Error())); err != nil { return diff --git a/agents/kamevent.go b/agents/kamevent.go index e6c4e0558..d206c2eb5 100644 --- a/agents/kamevent.go +++ b/agents/kamevent.go @@ -71,11 +71,12 @@ func (self *KamSessionDisconnect) String() string { } // NewKamEvent parses bytes received over the wire from Kamailio into KamEvent -func NewKamEvent(kamEvData []byte) (KamEvent, error) { +func NewKamEvent(kamEvData []byte, alias, adress string) (KamEvent, error) { kev := make(map[string]string) if err := json.Unmarshal(kamEvData, &kev); err != nil { return nil, err } + kev[utils.CGROriginHost] = utils.FirstNonEmpty(kev[utils.CGROriginHost], alias, adress) return kev, nil } diff --git a/agents/kamevent_test.go b/agents/kamevent_test.go index d1bc46bad..4d98065fa 100644 --- a/agents/kamevent_test.go +++ b/agents/kamevent_test.go @@ -53,8 +53,10 @@ func TestNewKamEvent(t *testing.T) { "cgr_destination": "1002", "cgr_answertime": "1419839310", "cgr_duration": "3", "cgr_pdd": "4", utils.CGR_SUPPLIER: "supplier2", - utils.CGR_DISCONNECT_CAUSE: "200"} - if kamEv, err := NewKamEvent([]byte(evStr)); err != nil { + utils.CGR_DISCONNECT_CAUSE: "200", + utils.CGROriginHost: utils.KamailioAgent, + } + if kamEv, err := NewKamEvent([]byte(evStr), utils.KamailioAgent, ""); err != nil { t.Error(err) } else if !reflect.DeepEqual(eKamEv, kamEv) { t.Error("Received: ", kamEv) diff --git a/glide.lock b/glide.lock index 717baaa08..78874bb3b 100644 --- a/glide.lock +++ b/glide.lock @@ -12,7 +12,7 @@ imports: - name: github.com/cgrates/fsock version: 4759d9e84c74981872c5c2bbffe6f23ecba2ea3c - name: github.com/cgrates/kamevapi - version: 0e0d0379606fd8f12b53c6da6aeb28544f7bfa37 + version: ec679e0176542f09fa7a2503c95d12b4159b482a - name: github.com/cgrates/ltcache version: 92fb7fa77cca400b55d805e4a6d625443027c7f5 - name: github.com/cgrates/osipsdagram