Updated Kamailio connID to connIdx

This commit is contained in:
Tripon Alexandru-Ionut
2019-06-06 10:41:02 +03:00
committed by Dan Christian Bogos
parent 430010eb68
commit e7b9996a00
4 changed files with 66 additions and 51 deletions

View File

@@ -48,9 +48,8 @@ func NewKamailioAgent(kaCfg *config.KamAgentCfg,
cfg: kaCfg,
sessionS: sessionS,
timezone: timezone,
conns: make(map[string]*kamevapi.KamEvapi),
conns: make([]*kamevapi.KamEvapi, len(kaCfg.EvapiConns)),
activeSessionIDs: make(chan []*sessions.SessionID),
connAliases: make(map[string]string),
}
return
}
@@ -59,14 +58,13 @@ type KamailioAgent struct {
cfg *config.KamAgentCfg
sessionS rpcclient.RpcClientConnection
timezone string
conns map[string]*kamevapi.KamEvapi
connAliases map[string]string
conns []*kamevapi.KamEvapi
activeSessionIDs chan []*sessions.SessionID
}
func (self *KamailioAgent) Connect() error {
var err error
eventHandlers := map[*regexp.Regexp][]func([]byte, string){
eventHandlers := map[*regexp.Regexp][]func([]byte, int){
kamAuthReqRegexp: {self.onCgrAuth},
kamCallStartRegexp: {self.onCallStart},
kamCallEndRegexp: {self.onCallEnd},
@@ -74,15 +72,13 @@ func (self *KamailioAgent) Connect() error {
kamProcessEventRegex: {self.onCgrProcessEvent},
}
errChan := make(chan error)
for _, connCfg := range self.cfg.EvapiConns {
connID := utils.GenUUID()
self.connAliases[connID] = connCfg.Alias
for connIdx, connCfg := range self.cfg.EvapiConns {
logger := log.New(utils.Logger, "kamevapi:", 2)
if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil {
if self.conns[connIdx], err = kamevapi.NewKamEvapi(connCfg.Address, connIdx, connCfg.Reconnects, eventHandlers, logger); err != nil {
return err
}
go func() { // Start reading in own goroutine, return on error
if err := self.conns[connID].ReadEvents(); err != nil {
if err := self.conns[connIdx].ReadEvents(); err != nil {
errChan <- err
}
}()
@@ -101,8 +97,13 @@ func (ka *KamailioAgent) Call(serviceMethod string, args interface{}, reply inte
}
// onCgrAuth is called when new event of type CGR_AUTH_REQUEST is coming
func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
func (ka *KamailioAgent) onCgrAuth(evData []byte, connIdx int) {
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
return
}
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
@@ -115,7 +116,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
if kRply, err := kev.AsKamAuthReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kRply.String()); err != nil {
} else if err = ka.conns[connIdx].Send(kRply.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
@@ -127,21 +128,25 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
authArgs.CGREvent.Event[utils.OriginHost] = utils.FirstNonEmpty(authArgs.CGREvent.Event[utils.OriginHost].(string), ka.connAliases[connID], ka.conns[connID].RemoteAddr().String())
authArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID
authArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
var authReply sessions.V1AuthorizeReply
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
if kar, err := kev.AsKamAuthReply(authArgs, &authReply, err); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kar.String()); err != nil {
} else if err = ka.conns[connIdx].Send(kar.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
}
func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
func (ka *KamailioAgent) onCallStart(evData []byte, connIdx int) {
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
return
}
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
@@ -151,7 +156,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
return
}
if kev.MissingParameter() {
ka.disconnectSession(connID,
ka.disconnectSession(connIdx,
NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
utils.ErrMandatoryIeMissing.Error()))
return
@@ -162,8 +167,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
initSessionArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID so we can properly disconnect later
var initReply sessions.V1InitSessionReply
if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
@@ -171,15 +175,20 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
utils.Logger.Err(
fmt.Sprintf("<%s> could not process answer for event %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
ka.disconnectSession(connID,
ka.disconnectSession(connIdx,
NewKamSessionDisconnect(kev[KamHashEntry], kev[KamHashID],
utils.ErrServerError.Error()))
return
}
}
func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
func (ka *KamailioAgent) onCallEnd(evData []byte, connIdx int) {
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
return
}
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
@@ -200,8 +209,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
return
}
var reply string
tsArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID in case we need to create a session and disconnect it
tsArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID in case we need to create a session and disconnect it
if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
utils.Logger.Err(
@@ -214,7 +222,6 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
if err != nil {
return
}
cgrArgs := cgrEv.ConsumeArgs(strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1, false)
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR,
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrArgs.ArgDispatcher}, &reply); err != nil {
@@ -224,7 +231,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
}
}
func (ka *KamailioAgent) onDlgList(evData []byte, connID string) {
func (ka *KamailioAgent) onDlgList(evData []byte, connIdx int) {
kamDlgRpl, err := NewKamDlgReply(evData)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
@@ -235,15 +242,20 @@ func (ka *KamailioAgent) onDlgList(evData []byte, connID string) {
// FixMe: find way to add OriginHost from event also, to be compatible with above implementation
for _, dlgInfo := range kamDlgRpl.Jsonrpl_body.Result {
sIDs = append(sIDs, &sessions.SessionID{
OriginHost: ka.conns[connID].RemoteAddr().String(),
OriginHost: ka.conns[connIdx].RemoteAddr().String(),
OriginID: dlgInfo.CallId + ";" + dlgInfo.Caller.Tag,
})
}
ka.activeSessionIDs <- sIDs
}
func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) {
kev, err := NewKamEvent(evData)
func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connIdx int) {
if connIdx >= len(ka.conns) { // protection against index out of range panic
err := fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
return
}
kev, err := NewKamEvent(evData, ka.cfg.EvapiConns[connIdx].Alias, ka.conns[connIdx].RemoteAddr().String())
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> unmarshalling event data: %s, error: %s",
utils.KamailioAgent, evData, err.Error()))
@@ -254,7 +266,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) {
if kRply, err := kev.AsKamProcessEventReply(nil, nil, utils.ErrMandatoryIeMissing); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kRply.String()); err != nil {
} else if err = ka.conns[connIdx].Send(kRply.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending process session event reply for event: %s, error %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
@@ -264,7 +276,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) {
//in case that we don't reveice cgr_subsystems from kamailio
//we consider this as ping-pong event
if _, has := kev[utils.CGRSubsystems]; !has {
if err = ka.conns[connID].Send(kev.AsKamProcessEventEmptyReply().String()); err != nil {
if err = ka.conns[connIdx].Send(kev.AsKamProcessEventEmptyReply().String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending empty process event reply for event: %s, error %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
@@ -276,12 +288,7 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
originHost := ka.conns[connID].RemoteAddr().String()
if oHIf, has := procEvArgs.CGREvent.Event[utils.OriginHost]; has {
originHost = oHIf.(string)
}
procEvArgs.CGREvent.Event[utils.OriginHost] = originHost
procEvArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID
procEvArgs.CGREvent.Event[EvapiConnID] = connIdx // Attach the connection ID
var processReply sessions.V1ProcessEventReply
err = ka.sessionS.Call(utils.SessionSv1ProcessEvent, procEvArgs, &processReply)
@@ -289,16 +296,16 @@ func (ka *KamailioAgent) onCgrProcessEvent(evData []byte, connID string) {
if kar, err := kev.AsKamProcessEventReply(procEvArgs, &processReply, err); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed building process session event reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
} else if err = ka.conns[connID].Send(kar.String()); err != nil {
} else if err = ka.conns[connIdx].Send(kar.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending auth reply for event: %s, error: %s",
utils.KamailioAgent, kev[utils.OriginID], err.Error()))
}
}
func (self *KamailioAgent) disconnectSession(connID string, dscEv *KamSessionDisconnect) error {
if err := self.conns[connID].Send(dscEv.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %s, error %s",
utils.KamailioAgent, utils.ToJSON(dscEv), err.Error(), connID))
func (self *KamailioAgent) disconnectSession(connIdx int, dscEv *KamSessionDisconnect) error {
if err := self.conns[connIdx].Send(dscEv.String()); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed sending disconnect request: %s, connection id: %v, error %s",
utils.KamailioAgent, utils.ToJSON(dscEv), connIdx, err.Error()))
return err
}
return nil
@@ -314,7 +321,7 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
if err != nil {
return err
}
connIDIface, has := args.EventStart[EvapiConnID]
connIdxIface, has := args.EventStart[EvapiConnID]
if !has {
utils.Logger.Err(
fmt.Sprintf("<%s> error: <%s:%s> when attempting to disconnect <%s:%s> and <%s:%s>",
@@ -322,11 +329,16 @@ func (ka *KamailioAgent) V1DisconnectSession(args utils.AttrDisconnectSession, r
KamHashEntry, hEntry, KamHashID, hID))
return
}
connID, err := utils.IfaceAsString(connIDIface)
connIdx, err := utils.IfaceAsInt64(connIdxIface)
if err != nil {
return err
}
if err = ka.disconnectSession(connID,
if int(connIdx) >= len(ka.conns) { // protection against index out of range panic
err = fmt.Errorf("Index out of range[0,%v): %v ", len(ka.conns), connIdx)
utils.Logger.Err(fmt.Sprintf("<%s> %s", utils.FreeSWITCHAgent, err.Error()))
return
}
if err = ka.disconnectSession(int(connIdx),
NewKamSessionDisconnect(hEntry, hID,
utils.ErrInsufficientCredit.Error())); err != nil {
return

View File

@@ -71,11 +71,12 @@ func (self *KamSessionDisconnect) String() string {
}
// NewKamEvent parses bytes received over the wire from Kamailio into KamEvent
func NewKamEvent(kamEvData []byte) (KamEvent, error) {
func NewKamEvent(kamEvData []byte, alias, adress string) (KamEvent, error) {
kev := make(map[string]string)
if err := json.Unmarshal(kamEvData, &kev); err != nil {
return nil, err
}
kev[utils.CGROriginHost] = utils.FirstNonEmpty(kev[utils.CGROriginHost], alias, adress)
return kev, nil
}

View File

@@ -53,8 +53,10 @@ func TestNewKamEvent(t *testing.T) {
"cgr_destination": "1002", "cgr_answertime": "1419839310",
"cgr_duration": "3", "cgr_pdd": "4",
utils.CGR_SUPPLIER: "supplier2",
utils.CGR_DISCONNECT_CAUSE: "200"}
if kamEv, err := NewKamEvent([]byte(evStr)); err != nil {
utils.CGR_DISCONNECT_CAUSE: "200",
utils.CGROriginHost: utils.KamailioAgent,
}
if kamEv, err := NewKamEvent([]byte(evStr), utils.KamailioAgent, ""); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eKamEv, kamEv) {
t.Error("Received: ", kamEv)

2
glide.lock generated
View File

@@ -12,7 +12,7 @@ imports:
- name: github.com/cgrates/fsock
version: 4759d9e84c74981872c5c2bbffe6f23ecba2ea3c
- name: github.com/cgrates/kamevapi
version: 0e0d0379606fd8f12b53c6da6aeb28544f7bfa37
version: ec679e0176542f09fa7a2503c95d12b4159b482a
- name: github.com/cgrates/ltcache
version: 92fb7fa77cca400b55d805e4a6d625443027c7f5
- name: github.com/cgrates/osipsdagram