Update calls test for asterisk

This commit is contained in:
TeoV
2018-11-13 10:02:37 -05:00
committed by Dan Christian Bogos
parent ead9bb2e27
commit ba5e0b7cd7
10 changed files with 113 additions and 99 deletions

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package agents
import (
"fmt"
"strings"
"github.com/cgrates/cgrates/config"
@@ -273,37 +274,17 @@ func (smaEv *SMAsteriskEvent) V1AuthorizeArgs() (args *sessions.V1AuthorizeArgs)
GetMaxUsage: true,
CGREvent: *cgrEv,
}
// For the moment hardcoded only GetMaxUsage : true
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.GetMaxUsage = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AuthorizeResources = true
}
if strings.Index(subsystems, utils.MetaSuppliers) != -1 {
args.GetSuppliers = true
if strings.Index(subsystems, utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
if strings.Index(subsystems, utils.MetaSuppliersIgnoreErrors) != -1 {
args.SuppliersIgnoreErrors = true
}
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
args.GetMaxUsage = strings.Index(smaEv.Subsystems(), utils.MetaAccounts) != -1
args.AuthorizeResources = strings.Index(smaEv.Subsystems(), utils.MetaResources) != -1
args.GetSuppliers = strings.Index(smaEv.Subsystems(), utils.MetaSuppliers) != -1
args.SuppliersIgnoreErrors = strings.Index(smaEv.Subsystems(), utils.MetaSuppliersIgnoreErrors) != -1
if strings.Index(smaEv.Subsystems(), utils.MetaSuppliersEventCost) != -1 {
args.SuppliersMaxCost = utils.MetaEventCost
}
args.GetAttributes = strings.Index(smaEv.Subsystems(), utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(smaEv.Subsystems(), utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(smaEv.Subsystems(), utils.MetaStats) != -1
return
}
@@ -312,27 +293,17 @@ func (smaEv *SMAsteriskEvent) V1InitSessionArgs(cgrEv utils.CGREvent) (args *ses
InitSession: true,
CGREvent: cgrEv,
}
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.InitSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.AllocateResources = true
}
if strings.Index(subsystems, utils.MetaAttributes) != -1 {
args.GetAttributes = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable",
utils.AsteriskAgent, utils.ToJSON(cgrEv)))
return nil
}
args.InitSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.AllocateResources = strings.Index(subsystems, utils.MetaResources) != -1
args.GetAttributes = strings.Index(subsystems, utils.MetaAttributes) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}
@@ -341,23 +312,15 @@ func (smaEv *SMAsteriskEvent) V1TerminateSessionArgs(cgrEv utils.CGREvent) (args
TerminateSession: true,
CGREvent: cgrEv,
}
/*
subsystems, has := kev[KamCGRSubsystems]
if !has {
return
}
if strings.Index(subsystems, utils.MetaAccounts) == -1 {
args.TerminateSession = false
}
if strings.Index(subsystems, utils.MetaResources) != -1 {
args.ReleaseResources = true
}
if strings.Index(subsystems, utils.MetaThresholds) != -1 {
args.ProcessThresholds = utils.BoolPointer(true)
}
if strings.Index(subsystems, utils.MetaStats) != -1 {
args.ProcessStatQueues = utils.BoolPointer(true)
}
*/
subsystems, err := cgrEv.FieldAsString(utils.CGRSubsystems)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s don't have cgr_subsystems variable",
utils.AsteriskAgent, utils.ToJSON(cgrEv)))
return nil
}
args.TerminateSession = strings.Index(subsystems, utils.MetaAccounts) != -1
args.ReleaseResources = strings.Index(subsystems, utils.MetaResources) != -1
args.ProcessThresholds = strings.Index(subsystems, utils.MetaThresholds) != -1
args.ProcessStats = strings.Index(subsystems, utils.MetaStats) != -1
return
}

View File

@@ -229,6 +229,12 @@ func (sma *AsteriskAgent) handleStasisStart(ev *SMAsteriskEvent) {
// Ussually channelUP
func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
// utils.Logger.Debug(fmt.Sprintf("#################handleChannelStateChange#####################"))
// utils.Logger.Debug(fmt.Sprintf("ev.ariEv : %+v", ev.ariEv))
// utils.Logger.Debug(fmt.Sprintf("ev.asteriskIP : %+v", ev.asteriskIP))
// utils.Logger.Debug(fmt.Sprintf("ev.cachedFields : %+v", ev.cachedFields))
// utils.Logger.Debug(fmt.Sprintf("ev.Subsystems() : %+v", ev.Subsystems()))
if ev.ChannelState() != channelUp {
return
}
@@ -238,6 +244,7 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
if !hasIt { // Not handled by us
return
}
sma.evCacheMux.Lock()
err := ev.UpdateCGREvent(cgrEv) // Updates the event directly in the cache
sma.evCacheMux.Unlock()
@@ -247,8 +254,10 @@ func (sma *AsteriskAgent) handleChannelStateChange(ev *SMAsteriskEvent) {
utils.AsteriskAgent, err.Error(), ev.ChannelID()))
return
}
// populate init session args
initSessionArgs := ev.V1InitSessionArgs(*cgrEv)
if initSessionArgs == nil {
utils.Logger.Err(fmt.Sprintf("<%s> event: %s cannot generate init session arguments",
utils.AsteriskAgent, ev.ChannelID()))
@@ -294,7 +303,6 @@ func (sma *AsteriskAgent) handleChannelDestroyed(ev *SMAsteriskEvent) {
utils.AsteriskAgent, ev.ChannelID()))
return
}
var reply string
if err := sma.smg.Call(utils.SessionSv1TerminateSession,
tsArgs, &reply); err != nil {
@@ -328,6 +336,27 @@ func (sma *AsteriskAgent) Call(serviceMethod string, args interface{}, reply int
return utils.RPCCall(sma, serviceMethod, args, reply)
}
func (fsa *AsteriskAgent) V1GetActiveSessionIDs(ignParam string, sessionIDs *[]*sessions.SessionID) (err error) {
return utils.ErrNotImplemented
func (sma *AsteriskAgent) V1GetActiveSessionIDs(ignParam string,
sessionIDs *[]*sessions.SessionID) (err error) {
utils.Logger.Debug(fmt.Sprintf("ASTERISK Enter in Sync session??"))
var sIDs []*sessions.SessionID
i := 0
sma.evCacheMux.RLock()
originIds := make([]string, len(sma.eventsCache))
for orgId := range sma.eventsCache {
originIds[i] = orgId
i++
}
sma.evCacheMux.RUnlock()
fmt.Println("originIds : ", originIds)
fmt.Println("sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address : ", sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address)
for _, orgId := range originIds {
sIDs = append(sIDs, &sessions.SessionID{
OriginHost: sma.cgrCfg.AsteriskAgentCfg().AsteriskConns[sma.astConnIdx].Address,
OriginID: orgId},
)
}
*sessionIDs = sIDs
return
//return utils.ErrNotImplemented
}

View File

@@ -130,4 +130,14 @@ func TestHttpXmlDPFieldAsInterface(t *testing.T) {
} else if data != "37" {
t.Errorf("expecting: 37, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg", "@calllegid"}); err != nil {
t.Error(err)
} else if data != "222146" {
t.Errorf("expecting: 222146, received: <%s>", data)
}
if data, err := dP.FieldAsString([]string{"complete-success-notification", "callleg[1]", "@calllegid"}); err != nil {
t.Error(err)
} else if data != "222147" {
t.Errorf("expecting: 222147, received: <%s>", data)
}
}

View File

@@ -315,15 +315,19 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
smgRpcConn := <-internalSMGChan
internalSMGChan <- smgRpcConn
birpcClnt := utils.NewBiRPCInternalClient(smgRpcConn.(*sessions.SMGeneric))
var reply string
for connIdx := range cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
sma, err := agents.NewAsteriskAgent(cfg, connIdx, birpcClnt)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> error: %s!", err))
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err))
exitChan <- true
return
}
if err := birpcClnt.Call(utils.SessionSv1RegisterInternalBiJSONConn, "", &reply); err != nil { // for session sync
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.AsteriskAgent, err))
}
if err = sma.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMAsterisk> runtime error: %s!", err))
utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err))
}
}
exitChan <- true

View File

@@ -2,7 +2,7 @@
exten => _1XXX,1,NoOp()
same => n,Set(CGRMaxSessionTime=0); use it to disconnect automatically the call if CGRateS is not active
same => n,DumpChan()
same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*attributes;*accounts)
same => n,Stasis(cgrates_auth,cgr_reqtype=*prepaid,cgr_supplier=supplier1,cgr_subsystems=*attributes*sessions*suppliers*thresholds*stats*accounts*resources)
same => n,Dial(PJSIP/${EXTEN},30,L(${CGRMaxSessionTime}))
same => n,Hangup()

View File

@@ -1,4 +1,4 @@
[general]
enabled = yes
bindaddr = 127.0.0.1
bindaddr = 0.0.0.0
bindport = 8088

View File

@@ -1,7 +1,7 @@
[simpletrans]
type=transport
protocol=udp
bind=0.0.0.0
bind=0.0.0.0:5080
[1001]
type = endpoint
@@ -27,16 +27,15 @@ password=CGRateS.org
type = endpoint
transport = simpletrans
context = internal
aors = 1002
auth = 1002
disallow = all
allow = ulaw
allow = alaw
aors = 1002
auth = 1002
[1002]
type = aor
max_contacts = 5
qualify_frequency = 0
[1002]
type=auth

View File

@@ -28,21 +28,24 @@
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"stats_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"attributes_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport":"*json"},
],
},
"cdrs": {
"enabled": true,
"sessions_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"stats_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"sessions_cost_retries": 5,
},
@@ -51,45 +54,47 @@
"sessions": {
"enabled": true,
"rals_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"cdrs_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"resources_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"suppliers_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"attributes_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"stats_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"thresholds_conns": [
{"address": "*internal"}
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"debit_interval": "10s",
"debit_interval": "5s",
"channel_sync_interval":"7s",
},
"asterisk_agent": {
"enabled": true,
"sessions_conns": [
{"address": "*internal"}
{"address": "*internal"}
],
"create_cdr": true,
"asterisk_conns":[
{"address": "192.168.56.103:8088", "user": "cgrates",
{"address": "192.168.56.203:8088", "user": "cgrates",
"password": "CGRateS.org", "connect_attempts": 3,"reconnects": 10}
],
},
"attributes": {
"enabled": true,
"enabled": true,
"string_indexed_fields": ["Account"],
},
@@ -98,6 +103,7 @@
"thresholds_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account"],
},
@@ -106,11 +112,13 @@
"thresholds_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account","RunID","Destination"],
},
"thresholds": {
"enabled": true,
"string_indexed_fields": ["Account"],
},
@@ -125,6 +133,7 @@
"stats_conns": [
{"address": "*internal"}
],
"string_indexed_fields": ["Account"],
},

View File

@@ -104,14 +104,12 @@ func TestOpensipsCalls(t *testing.T) {
}
}
/* Need to be checked
func TestAsteriskCalls(t *testing.T) {
optConf = utils.Asterisk
for _, stest := range sTestsCalls {
t.Run("", stest)
}
}
*/
func testCallInitCfg(t *testing.T) {
// Init config first
@@ -243,7 +241,7 @@ func testCallRestartFS(t *testing.T) {
// Connect rpc client to rater
func testCallRpcConn(t *testing.T) {
var err error
tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
tutorialCallsRpc, err = jsonrpc.Dial("tcp", tutorialCallsCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}

View File

@@ -2322,6 +2322,7 @@ func (smg *SMGeneric) OnBiJSONDisconnect(c *rpc2.Client) {
}
func (smg *SMGeneric) syncSessions() {
utils.Logger.Debug("Enter in sync sessions ????")
var rpcClnts []rpcclient.RpcClientConnection
for _, conn := range smg.intBiJSONConns {
rpcClnts = append(rpcClnts, conn)
@@ -2344,6 +2345,7 @@ func (smg *SMGeneric) syncSessions() {
}
}
}
utils.Logger.Debug(fmt.Sprintf("queried CGRIDS : %+v", queriedCGRIDs))
var toBeRemoved []string
smg.aSessionsMux.RLock()
for cgrid := range smg.activeSessions {