Updated AsteriskConnCfg and KamConnCfg

This commit is contained in:
Tripon Alexandru-Ionut
2019-06-04 17:04:03 +03:00
committed by Dan Christian Bogos
parent 2252ca5fef
commit 9cd269f54c
7 changed files with 61 additions and 58 deletions

View File

@@ -103,7 +103,8 @@ func (sma *AsteriskAgent) ListenAndServe() (err error) {
return
case astRawEv := <-sma.astEvChan:
smAsteriskEvent := NewSMAsteriskEvent(astRawEv,
strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0])
strings.Split(sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address, ":")[0],
sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Alias)
switch smAsteriskEvent.EventType() {
case ARIStasisStart:

View File

@@ -27,16 +27,17 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewSMAsteriskEvent(ariEv map[string]interface{}, asteriskIP string) *SMAsteriskEvent {
func NewSMAsteriskEvent(ariEv map[string]interface{}, asteriskIP, asteriskAlias string) *SMAsteriskEvent {
smsmaEv := &SMAsteriskEvent{ariEv: ariEv, asteriskIP: asteriskIP, cachedFields: make(map[string]string)}
smsmaEv.parseStasisArgs() // Populate appArgs
return smsmaEv
}
type SMAsteriskEvent struct { // Standalone struct so we can cache the fields while we parse them
ariEv map[string]interface{} // stasis event
asteriskIP string
cachedFields map[string]string // Cache replies here
ariEv map[string]interface{} // stasis event
asteriskIP string
asteriskAlias string
cachedFields map[string]string // Cache replies here
}
// parseStasisArgs will convert the args passed to Stasis into CGRateS attribute/value pairs understood by CGRateS and store them in cachedFields
@@ -239,7 +240,7 @@ func (smaEv *SMAsteriskEvent) AsMapStringInterface() (mp map[string]interface{})
if smaEv.Subject() != "" {
mp[utils.Subject] = smaEv.Subject()
}
mp[utils.OriginHost] = utils.FirstNonEmpty(smaEv.OriginHost(), smaEv.OriginatorIP())
mp[utils.OriginHost] = utils.FirstNonEmpty(smaEv.OriginHost(), smaEv.asteriskAlias, smaEv.OriginatorIP())
mp[utils.Account] = smaEv.Account()
mp[utils.Destination] = smaEv.Destination()
mp[utils.SetupTime] = smaEv.SetupTime()

View File

@@ -40,7 +40,7 @@ func TestSMAParseStasisArgs(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
expAppArgs := map[string]string{"cgr_reqtype": "*prepaid", "cgr_supplier": "supplier1", "extra1": "val1", "extra2": "val2"}
if !reflect.DeepEqual(smaEv.cachedFields, expAppArgs) {
t.Errorf("Expecting: %+v, received: %+v", smaEv.cachedFields, expAppArgs)
@@ -49,7 +49,7 @@ func TestSMAParseStasisArgs(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
expAppArgs = map[string]string{}
if !reflect.DeepEqual(smaEv.cachedFields, expAppArgs) {
t.Errorf("Expecting: %+v, received: %+v", smaEv.cachedFields, expAppArgs)
@@ -61,7 +61,7 @@ func TestSMAEventType(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.EventType() != "StasisStart" {
t.Error("Received:", smaEv.EventType())
}
@@ -69,7 +69,7 @@ func TestSMAEventType(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.EventType() != "" {
t.Error("Received:", smaEv.EventType())
}
@@ -80,7 +80,7 @@ func TestSMAEventChannelID(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.ChannelID() != "1473681228.6" {
t.Error("Received:", smaEv.ChannelID())
}
@@ -88,7 +88,7 @@ func TestSMAEventChannelID(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.ChannelID() != "" {
t.Error("Received:", smaEv.ChannelID())
}
@@ -99,7 +99,7 @@ func TestSMAEventOriginatorIP(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.OriginatorIP() != "127.0.0.1" {
t.Error("Received:", smaEv.OriginatorIP())
}
@@ -110,7 +110,7 @@ func TestSMAEventAccount(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Account() != "1001" {
t.Error("Received:", smaEv.Account())
}
@@ -118,7 +118,7 @@ func TestSMAEventAccount(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Account() != "" {
t.Error("Received:", smaEv.Account())
}
@@ -129,7 +129,7 @@ func TestSMAEventDestination(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Destination() != "1002" {
t.Error("Received:", smaEv.Destination())
}
@@ -137,7 +137,7 @@ func TestSMAEventDestination(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Destination() != "" {
t.Error("Received:", smaEv.Destination())
}
@@ -148,7 +148,7 @@ func TestSMAEventTimestamp(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Timestamp() != "2016-09-12T13:53:48.919+0200" {
t.Error("Received:", smaEv.Timestamp())
}
@@ -156,7 +156,7 @@ func TestSMAEventTimestamp(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Timestamp() != "" {
t.Error("Received:", smaEv.Timestamp())
}
@@ -167,7 +167,7 @@ func TestSMAEventChannelState(t *testing.T) {
if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.ChannelState() != "Up" {
t.Error("Received:", smaEv.ChannelState())
}
@@ -175,7 +175,7 @@ func TestSMAEventChannelState(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.ChannelState() != "" {
t.Error("Received:", smaEv.ChannelState())
}
@@ -186,7 +186,7 @@ func TestSMASetupTime(t *testing.T) {
if err := json.Unmarshal([]byte(channelStateChange), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.SetupTime() != "2016-09-12T13:53:48.918+0200" {
t.Error("Received:", smaEv.SetupTime())
}
@@ -194,7 +194,7 @@ func TestSMASetupTime(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &ev); err != nil {
t.Error(err)
}
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.SetupTime() != "" {
t.Error("Received:", smaEv.SetupTime())
}
@@ -205,12 +205,12 @@ func TestSMAEventRequestType(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.RequestType() != "*prepaid" {
t.Error("Received:", smaEv.RequestType())
}
ev = make(map[string]interface{}) // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.RequestType() != "" {
t.Error("Received:", smaEv.RequestType())
}
@@ -221,12 +221,12 @@ func TestSMAEventTenant(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Tenant() != "" {
t.Error("Received:", smaEv.Tenant())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_tenant=cgrates.org"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Tenant() != "cgrates.org" {
t.Error("Received:", smaEv.Tenant())
}
@@ -237,12 +237,12 @@ func TestSMAEventCategory(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Category() != "" {
t.Error("Received:", smaEv.Category())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_category=premium_call"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Category() != "premium_call" {
t.Error("Received:", smaEv.Category())
}
@@ -253,12 +253,12 @@ func TestSMAEventSubject(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Subject() != "" {
t.Error("Received:", smaEv.Subject())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_subject=dan"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Subject() != "dan" {
t.Error("Received:", smaEv.Subject())
}
@@ -269,12 +269,12 @@ func TestSMAEventPDD(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.PDD() != "" {
t.Error("Received:", smaEv.PDD())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_pdd=2.1"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.PDD() != "2.1" {
t.Error("Received:", smaEv.PDD())
}
@@ -285,12 +285,12 @@ func TestSMAEventSupplier(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Supplier() != "supplier1" {
t.Error("Received:", smaEv.Supplier())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_supplier=supplier1"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.Supplier() != "supplier1" {
t.Error("Received:", smaEv.Supplier())
}
@@ -301,12 +301,12 @@ func TestSMAEventDisconnectCause(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.DisconnectCause() != "" {
t.Error("Received:", smaEv.DisconnectCause())
}
ev = map[string]interface{}{"args": []interface{}{"cgr_disconnectcause=NORMAL_DISCONNECT"}} // Clear previous data
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv = NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.DisconnectCause() != "NORMAL_DISCONNECT" {
t.Error("Received:", smaEv.DisconnectCause())
}
@@ -321,7 +321,7 @@ func TestSMAEventExtraParameters(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if smaEv.PDD() != "" {
t.Error("Received:", smaEv.PDD())
}
@@ -336,7 +336,7 @@ func TestSMAEventV1AuthorizeArgs(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
cgrEv, err := smaEv.AsCGREvent(timezone)
if err != nil {
t.Error(err)
@@ -354,7 +354,7 @@ func TestSMAEventV1AuthorizeArgs(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart2), &ev2); err != nil {
t.Error(err)
}
smaEv2 := NewSMAsteriskEvent(ev2, "127.0.0.1")
smaEv2 := NewSMAsteriskEvent(ev2, "127.0.0.1", "")
smaEv2.parseStasisArgs()
cgrEv2, err := smaEv2.AsCGREvent(timezone)
if err != nil {
@@ -399,7 +399,7 @@ func TestSMAEventV1InitSessionArgs(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if rcv := smaEv.V1InitSessionArgs(utils.CGREventWithArgDispatcher{CGREvent: cgrEv}); !reflect.DeepEqual(exp, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rcv))
}
@@ -432,7 +432,7 @@ func TestSMAEventV1TerminateSessionArgs(t *testing.T) {
if err := json.Unmarshal([]byte(stasisStart), &ev); err != nil {
t.Error(err)
}
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1")
smaEv := NewSMAsteriskEvent(ev, "127.0.0.1", "")
if rcv := smaEv.V1TerminateSessionArgs(utils.CGREventWithArgDispatcher{CGREvent: cgrEv}); !reflect.DeepEqual(exp, rcv) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(exp), utils.ToJSON(rcv))
}

View File

@@ -50,6 +50,7 @@ func NewKamailioAgent(kaCfg *config.KamAgentCfg,
timezone: timezone,
conns: make(map[string]*kamevapi.KamEvapi),
activeSessionIDs: make(chan []*sessions.SessionID),
connAliases: make(map[string]string),
}
return
}
@@ -59,6 +60,7 @@ type KamailioAgent struct {
sessionS rpcclient.RpcClientConnection
timezone string
conns map[string]*kamevapi.KamEvapi
connAliases map[string]string
activeSessionIDs chan []*sessions.SessionID
}
@@ -74,6 +76,7 @@ func (self *KamailioAgent) Connect() error {
errChan := make(chan error)
for _, connCfg := range self.cfg.EvapiConns {
connID := utils.GenUUID()
self.connAliases[connID] = connCfg.Alias
logger := log.New(utils.Logger, "kamevapi:", 2)
if self.conns[connID], err = kamevapi.NewKamEvapi(connCfg.Address, connID, connCfg.Reconnects, eventHandlers, logger); err != nil {
return err
@@ -124,11 +127,7 @@ func (ka *KamailioAgent) onCgrAuth(evData []byte, connID string) {
utils.KamailioAgent, kev[utils.OriginID]))
return
}
originHost := ka.conns[connID].RemoteAddr().String()
if oHIf, has := authArgs.CGREvent.Event[utils.OriginHost]; has {
originHost = oHIf.(string)
}
authArgs.CGREvent.Event[utils.OriginHost] = originHost
authArgs.CGREvent.Event[utils.OriginHost] = utils.FirstNonEmpty(authArgs.CGREvent.Event[utils.OriginHost].(string), ka.connAliases[connID], ka.conns[connID].RemoteAddr().String())
authArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID
var authReply sessions.V1AuthorizeReply
err = ka.sessionS.Call(utils.SessionSv1AuthorizeEvent, authArgs, &authReply)
@@ -164,11 +163,7 @@ func (ka *KamailioAgent) onCallStart(evData []byte, connID string) {
return
}
initSessionArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID so we can properly disconnect later
originHost := ka.conns[connID].RemoteAddr().String()
if oHIf, has := initSessionArgs.CGREvent.Event[utils.OriginHost]; has {
originHost = oHIf.(string)
}
initSessionArgs.CGREvent.Event[utils.OriginHost] = originHost
var initReply sessions.V1InitSessionReply
if err := ka.sessionS.Call(utils.SessionSv1InitiateSession,
@@ -205,11 +200,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
return
}
var reply string
originHost := ka.conns[connID].RemoteAddr().String()
if oHIf, has := tsArgs.CGREvent.Event[utils.OriginHost]; has {
originHost = oHIf.(string)
}
tsArgs.CGREvent.Event[utils.OriginHost] = originHost
tsArgs.CGREvent.Event[EvapiConnID] = connID // Attach the connection ID in case we need to create a session and disconnect it
if err := ka.sessionS.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
@@ -223,7 +214,7 @@ func (ka *KamailioAgent) onCallEnd(evData []byte, connID string) {
if err != nil {
return
}
cgrEv.Event[utils.OriginHost] = originHost
cgrArgs := cgrEv.ConsumeArgs(strings.Index(kev[utils.CGRSubsystems], utils.MetaDispatchers) != -1, false)
if err := ka.sessionS.Call(utils.SessionSv1ProcessCDR,
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv, ArgDispatcher: cgrArgs.ArgDispatcher}, &reply); err != nil {

View File

@@ -20,6 +20,7 @@ package config
// Represents one connection instance towards Kamailio
type KamConnCfg struct {
Alias string
Address string
Reconnects int
}
@@ -31,6 +32,9 @@ func (self *KamConnCfg) loadFromJsonCfg(jsnCfg *KamConnJsonCfg) error {
if jsnCfg.Address != nil {
self.Address = *jsnCfg.Address
}
if jsnCfg.Alias != nil {
self.Alias = *jsnCfg.Alias
}
if jsnCfg.Reconnects != nil {
self.Reconnects = *jsnCfg.Reconnects
}

View File

@@ -226,6 +226,7 @@ type RemoteHostJson struct {
}
type AstConnJsonCfg struct {
Alias *string
Address *string
User *string
Password *string
@@ -259,6 +260,7 @@ type KamAgentJsonCfg struct {
// Represents one connection instance towards Kamailio
type KamConnJsonCfg struct {
Alias *string
Address *string
Reconnects *int
}

View File

@@ -410,6 +410,7 @@ func NewDefaultAsteriskConnCfg() *AsteriskConnCfg {
}
type AsteriskConnCfg struct {
Alias string
Address string
User string
Password string
@@ -424,6 +425,9 @@ func (aConnCfg *AsteriskConnCfg) loadFromJsonCfg(jsnCfg *AstConnJsonCfg) error {
if jsnCfg.Address != nil {
aConnCfg.Address = *jsnCfg.Address
}
if jsnCfg.Alias != nil {
aConnCfg.Alias = *jsnCfg.Alias
}
if jsnCfg.User != nil {
aConnCfg.User = *jsnCfg.User
}