From a5155e7e27b796e6225987dc351ef7d26c0f4072 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 4 Jan 2016 13:21:23 +0100 Subject: [PATCH 01/14] SM-FS: MaxWaitConnection config parameter --- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/libconfig_json.go | 1 + config/smconfig.go | 6 ++++++ engine/action.go | 1 - sessionmanager/fssessionmanager.go | 3 ++- 6 files changed, 11 insertions(+), 2 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 7d5edc4f3..0ce859553 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -225,6 +225,7 @@ const CGRATES_CFG_JSON = ` "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) "subscribe_park": true, // subscribe via fsock to receive park events "channel_sync_interval": "5m", // sync channels with freeswitch regularly + "max_wait_connection": "2s", // maximum duration to wait for a connection to be retrieved from the pool "connections":[ // instantiate connections to multiple FreeSWITCH servers {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} ], diff --git a/config/config_json_test.go b/config/config_json_test.go index 4b485a23b..91bb36aba 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -361,6 +361,7 @@ func TestSmFsJsonCfg(t *testing.T) { Empty_balance_ann_file: utils.StringPointer(""), Subscribe_park: utils.BoolPointer(true), Channel_sync_interval: utils.StringPointer("5m"), + Max_wait_connection: utils.StringPointer("2s"), Connections: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Server: utils.StringPointer("127.0.0.1:8021"), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 7dda0b6ec..60d0ed103 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -189,6 +189,7 @@ type SmFsJsonCfg struct { Empty_balance_ann_file *string Subscribe_park *bool Channel_sync_interval *string + Max_wait_connection *string Connections *[]*FsConnJsonCfg } diff --git a/config/smconfig.go b/config/smconfig.go index b06ad7c20..50e503bce 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -131,6 +131,7 @@ type SmFsConfig struct { EmptyBalanceAnnFile string SubscribePark bool ChannelSyncInterval time.Duration + MaxWaitConnection time.Duration Connections []*FsConnConfig } @@ -193,6 +194,11 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { return err } } + if jsnCfg.Max_wait_connection != nil { + if self.MaxWaitConnection, err = utils.ParseDurationWithSecs(*jsnCfg.Max_wait_connection); err != nil { + return err + } + } if jsnCfg.Connections != nil { self.Connections = make([]*FsConnConfig, len(*jsnCfg.Connections)) for idx, jsnConnCfg := range *jsnCfg.Connections { diff --git a/engine/action.go b/engine/action.go index b3ae9bcff..abd90f72c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -257,7 +257,6 @@ func cdrLogAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) return err } } - b, _ := json.Marshal(cdrs) a.ExpirationString = string(b) // testing purpose only return diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 8c91438d3..10d56b990 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -242,7 +242,7 @@ func (sm *FSSessionManager) Connect() error { errChan <- err } }() - if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, + if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, sm.cfg.MaxWaitConnection, make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil { return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) } else if fsSenderPool == nil { @@ -354,6 +354,7 @@ application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a */ func (sm *FSSessionManager) SyncSessions() error { for connId, senderPool := range sm.senderPools { + utils.Logger.Debug(fmt.Sprintf("### connID: %s, senderPool: %+v", connId, senderPool)) fsConn, err := senderPool.PopFSock() if err != nil { utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) From 303f43e9bad261a6e0824c579f86db9b81ea7f75 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 4 Jan 2016 13:55:49 +0100 Subject: [PATCH 02/14] SM-FS Cleanup sessions when loosing connection to FS, fixes #282 --- sessionmanager/fssessionmanager.go | 32 ++++++++++++++++++------------ 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 10d56b990..aabd61384 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -355,21 +355,27 @@ application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a func (sm *FSSessionManager) SyncSessions() error { for connId, senderPool := range sm.senderPools { utils.Logger.Debug(fmt.Sprintf("### connID: %s, senderPool: %+v", connId, senderPool)) + var aChans []map[string]string fsConn, err := senderPool.PopFSock() if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) - continue - } - activeChanStr, err := fsConn.SendApiCmd("show channels") - senderPool.PushFSock(fsConn) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) - continue - } - aChans := fsock.MapChanData(activeChanStr) - if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS - utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", activeChanStr)) - continue + if err == fsock.ErrConnectionPoolTimeout { // Timeout waiting for connections to re-establish, cleanup calls + aChans = make([]map[string]string, 0) // Emulate no call information so we can disconnect bellow + } else { + utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + } else { + activeChanStr, err := fsConn.SendApiCmd("show channels") + senderPool.PushFSock(fsConn) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + aChans = fsock.MapChanData(activeChanStr) + if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS + utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", activeChanStr)) + continue + } } for _, session := range sm.sessions.getSessions() { if session.connId != connId { // This session belongs to another connectionId From 6e9d3b643fdf43fef7c8d5ebea77030492db02d7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 4 Jan 2016 17:49:50 +0200 Subject: [PATCH 03/14] fixed merged test --- engine/units_counter_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/engine/units_counter_test.go b/engine/units_counter_test.go index be8b3a49c..f136c23b7 100644 --- a/engine/units_counter_test.go +++ b/engine/units_counter_test.go @@ -246,47 +246,46 @@ func TestUnitCountersCountAllVoiceDestinationEvent(t *testing.T) { } } -/* func TestUnitCountersResetCounterById(t *testing.T) { a := &Account{ ActionTriggers: ActionTriggers{ &ActionTrigger{ - ID: "TestTR1", + UniqueID: "TestTR1", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.MONETARY, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR11", + UniqueID: "TestTR11", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.MONETARY, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR2", + UniqueID: "TestTR2", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.VOICE, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR3", + UniqueID: "TestTR3", ThresholdType: utils.TRIGGER_MAX_BALANCE_COUNTER, BalanceType: utils.VOICE, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR4", + UniqueID: "TestTR4", ThresholdType: utils.TRIGGER_MAX_BALANCE_COUNTER, BalanceType: utils.SMS, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR5", + UniqueID: "TestTR5", ThresholdType: utils.TRIGGER_MAX_BALANCE, BalanceType: utils.SMS, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), @@ -328,4 +327,3 @@ func TestUnitCountersResetCounterById(t *testing.T) { t.Errorf("Error Initializing adding unit counters: %v", len(a.UnitCounters)) } } -*/ From b1d2cc01957b2b7d2d278f18246a471d9a1ee372 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Jan 2016 10:07:05 +0100 Subject: [PATCH 04/14] DiameterAgent create_cdr option --- agents/dmtagent.go | 6 ++++-- config/config_defaults.go | 9 +++++---- config/config_json_test.go | 1 + config/daconfig.go | 4 ++++ config/libconfig_json.go | 1 + sessionmanager/fssessionmanager.go | 1 - 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/agents/dmtagent.go b/agents/dmtagent.go index 00a4bee10..e27cb9dfc 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -101,8 +101,10 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro case 3: var rpl string err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) - if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { - err = errCdr + if self.cgrCfg.DiameterAgentCfg().CreateCDR { + if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { + err = errCdr + } } } if err != nil { diff --git a/config/config_defaults.go b/config/config_defaults.go index 0ce859553..837c8ecd0 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -214,7 +214,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> - "create_cdr": false, // create CDR out of events and sends them to CDRS component + "create_cdr": false, // create CDR out of events and sends it to CDRS component "extra_fields": [], // extra fields to store in auth/CDRs when creating them "debit_interval": "10s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this @@ -236,7 +236,7 @@ const CGRATES_CFG_JSON = ` "enabled": false, // starts SessionManager service: "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> - "create_cdr": false, // create CDR out of events and sends them to CDRS component + "create_cdr": false, // create CDR out of events and sends it to CDRS component "debit_interval": "10s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this "max_call_duration": "3h", // maximum call duration a prepaid call can last @@ -252,7 +252,7 @@ const CGRATES_CFG_JSON = ` "rater": "internal", // address where to reach the Rater <""|internal|127.0.0.1:2013> "cdrs": "internal", // address where to reach CDR Server, empty to disable CDR capturing <""|internal|x.y.z.y:1234> "reconnects": 5, // number of reconnects if connection is lost - "create_cdr": false, // create CDR out of events and sends them to CDRS component + "create_cdr": false, // create CDR out of events and sends it to CDRS component "debit_interval": "10s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this "max_call_duration": "3h", // maximum call duration a prepaid call can last @@ -266,6 +266,7 @@ const CGRATES_CFG_JSON = ` "listen": "127.0.0.1:3868", // address where to listen for diameter requests "dictionaries_dir": "/usr/share/cgrates/diameter/dict/", // path towards directory holding additional dictionaries to load "sm_generic": "internal", // connection towards SMG component for session management + "create_cdr": true, // create CDR out of CCR terminate and send it to SMG component "debit_interval": "5m", // interval for CCR updates "timezone": "", // timezone for timestamps where not specified, empty for general defaults <""|UTC|Local|$IANA_TZ_DB> "dialect": "huawei", // the diameter dialect used in the communication, supported: @@ -276,7 +277,7 @@ const CGRATES_CFG_JSON = ` "request_processors": [ { "id": "*default", // formal identifier of this processor - "dry_run": false, // do not send the CDRs to CDRS, just parse them + "dry_run": false, // do not send the events to SMG, just log them "request_filter": "Subscription-Id>Subscription-Id-Type(0)", // filter requests processed by this processor "continue_on_success": false, // continue to the next template if executed "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value diff --git a/config/config_json_test.go b/config/config_json_test.go index 91bb36aba..c7317f93d 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -425,6 +425,7 @@ func TestDiameterAgentJsonCfg(t *testing.T) { Listen: utils.StringPointer("127.0.0.1:3868"), Dictionaries_dir: utils.StringPointer("/usr/share/cgrates/diameter/dict/"), Sm_generic: utils.StringPointer("internal"), + Create_cdr: utils.BoolPointer(true), Debit_interval: utils.StringPointer("5m"), Timezone: utils.StringPointer(""), Dialect: utils.StringPointer("huawei"), diff --git a/config/daconfig.go b/config/daconfig.go index 4044457ef..13210e376 100644 --- a/config/daconfig.go +++ b/config/daconfig.go @@ -29,6 +29,7 @@ type DiameterAgentCfg struct { Listen string // address where to listen for diameter requests DictionariesDir string SMGeneric string // connection towards SMG component + CreateCDR bool DebitInterval time.Duration Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Dialect string // the diameter dialect used in the implementation @@ -55,6 +56,9 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro if jsnCfg.Sm_generic != nil { self.SMGeneric = *jsnCfg.Sm_generic } + if jsnCfg.Create_cdr != nil { + self.CreateCDR = *jsnCfg.Create_cdr + } if jsnCfg.Debit_interval != nil { var err error if self.DebitInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Debit_interval); err != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 60d0ed103..fdf261f00 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -244,6 +244,7 @@ type DiameterAgentJsonCfg struct { Listen *string // address where to listen for diameter requests Dictionaries_dir *string // path towards additional dictionaries Sm_generic *string // Connection towards generic SM + Create_cdr *bool Debit_interval *string Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Dialect *string diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index aabd61384..2d504888b 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -354,7 +354,6 @@ application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a */ func (sm *FSSessionManager) SyncSessions() error { for connId, senderPool := range sm.senderPools { - utils.Logger.Debug(fmt.Sprintf("### connID: %s, senderPool: %+v", connId, senderPool)) var aChans []map[string]string fsConn, err := senderPool.PopFSock() if err != nil { From 14af059489af4d9e3c6ad5ee8877a915ceac7f66 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Jan 2016 18:13:05 +0100 Subject: [PATCH 05/14] Diameter DryRun with ContinueOnSuccess - fixes #346; return 4012 in case of not enough funds for the call, added SMS request processors in integration tests --- agents/dmtagent.go | 79 +++++++++---------- agents/dmtagent_it_test.go | 79 +++++++++++++++++++ agents/dmtclient.go | 7 +- .../samples/dmtagent/diameter_processors.json | 53 +++++++++++++ 4 files changed, 176 insertions(+), 42 deletions(-) create mode 100644 data/conf/samples/dmtagent/diameter_processors.json diff --git a/agents/dmtagent.go b/agents/dmtagent.go index e27cb9dfc..d06c67ec3 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -65,7 +65,7 @@ func (self *DiameterAgent) handlers() diam.Handler { return dSM } -func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor) (*CCA, error) { +func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor) *CCA { passesAllFilters := true for _, fldFilter := range reqProcessor.RequestFilter { if passes, _ := passesFieldFilter(ccr.diamMessage, fldFilter); !passes { @@ -73,53 +73,54 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro } } if !passesAllFilters { // Not going with this processor further - return nil, nil - } - smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) - if err != nil { - return nil, err + return nil } cca := NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) + if err != nil { + cca.ResultCode = DiameterRatingFailed + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) + return cca + } + var maxUsage float64 if reqProcessor.DryRun { // DryRun does not send over network utils.Logger.Info(fmt.Sprintf(" RequestProcessor: %s", reqProcessor.Id)) utils.Logger.Info(fmt.Sprintf(" CCR message: %s", ccr.diamMessage)) utils.Logger.Info(fmt.Sprintf(" SMGenericEvent: %+v", smgEv)) cca.ResultCode = diam.LimitedSuccess - if err := cca.SetProcessorAVPs(reqProcessor, 0); err != nil { - cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil - } - return cca, nil - } - var maxUsage float64 - switch ccr.CCRequestType { - case 1: - err = self.smg.Call("SMGenericV1.SessionStart", smgEv, &maxUsage) - case 2: - err = self.smg.Call("SMGenericV1.SessionUpdate", smgEv, &maxUsage) - case 3: - var rpl string - err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) - if self.cgrCfg.DiameterAgentCfg().CreateCDR { - if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { - err = errCdr + } else { // Find out maxUsage over APIs + switch ccr.CCRequestType { + case 1: + err = self.smg.Call("SMGenericV1.SessionStart", smgEv, &maxUsage) + case 2: + err = self.smg.Call("SMGenericV1.SessionUpdate", smgEv, &maxUsage) + case 3: + var rpl string + err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) + if self.cgrCfg.DiameterAgentCfg().CreateCDR { + if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { + err = errCdr + } } } + if err != nil { + cca.ResultCode = DiameterRatingFailed + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, API error: %s", ccr.diamMessage, err)) + return cca + } + if ccr.CCRequestType != 3 && maxUsage == 0 { // Not enough balance, RFC demands 4012 + cca.ResultCode = 4012 + } else { + cca.ResultCode = diam.Success + } + cca.GrantedServiceUnit.CCTime = int(maxUsage) } - if err != nil { - cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil - } - cca.ResultCode = diam.Success - cca.GrantedServiceUnit.CCTime = int(maxUsage) if err := cca.SetProcessorAVPs(reqProcessor, maxUsage); err != nil { cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil + utils.Logger.Err(fmt.Sprintf(" CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err)) + return cca } - return cca, nil + return cca } func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { @@ -130,16 +131,12 @@ func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { } var cca *CCA // For now we simply overload in loop, maybe we will find some other use of this for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors { - if cca, err = self.processCCR(ccr, reqProcessor); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error processing CCR %+v, processor id: %s, error: %s", ccr, reqProcessor.Id, err.Error())) - } + cca = self.processCCR(ccr, reqProcessor) if cca != nil && !reqProcessor.ContinueOnSuccess { break } } - if err != nil { //ToDo: return standard diameter error - return - } else if cca == nil { + if cca == nil { utils.Logger.Err(fmt.Sprintf(" No request processor enabled for CCR: %+v, ignoring request", ccr)) return } diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index a423f7126..4c7372635 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -31,6 +31,9 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" + "github.com/fiorix/go-diameter/diam" + "github.com/fiorix/go-diameter/diam/avp" + "github.com/fiorix/go-diameter/diam/datatype" "github.com/fiorix/go-diameter/diam/dict" ) @@ -336,6 +339,82 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) { } } +func TestDmtAgentSendCCRSMS(t *testing.T) { + if !*testIntegration { + return + } + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("cgrates;1451911932;00082")) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("message@huawei.com")) + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(4)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2016, 1, 5, 11, 30, 10, 0, time.UTC))) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(0)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(1)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("104502200011")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.ServiceIdentifier, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.RequestedAction, avp.Mbit, 0, datatype.Enumerated(0)) + ccr.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.CCTime, avp.Mbit, 0, datatype.Unsigned32(1))}}) + ccr.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ // + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("22509")), // Calling-Vlr-Number + diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("4002")), // Called-Party-NP + }, + }), + diam.NewAVP(2000, avp.Mbit, 10415, &diam.GroupedAVP{ // SMS-Information + AVP: []*diam.AVP{ + diam.NewAVP(886, avp.Mbit, 10415, &diam.GroupedAVP{ // Originator-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("49602200011")), // Address-Data + }}), + diam.NewAVP(1201, avp.Mbit, 10415, &diam.GroupedAVP{ // Recipient-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("49780029555")), // Address-Data + }}), + }, + }), + }}) + if err := dmtClient.SendMessage(ccr); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(100) * time.Millisecond) + msg := dmtClient.ReceivedMessage() + if msg == nil { + t.Fatal("No message returned") + } + if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Granted-Service-Unit not found") + } else if strCCTime := avpValAsString(avps[0]); strCCTime != "0" { + t.Errorf("Expecting 0, received: %s", strCCTime) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 9.205 + if err := apierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { // Should also consider derived charges which double the cost of 6m10s - 2x0.7584 + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } +} + func TestDmtAgentCdrs(t *testing.T) { if !*testIntegration { return diff --git a/agents/dmtclient.go b/agents/dmtclient.go index 4025b6bc5..fd02fd5fe 100644 --- a/agents/dmtclient.go +++ b/agents/dmtclient.go @@ -85,5 +85,10 @@ func (dc *DiameterClient) handleALL(c diam.Conn, m *diam.Message) { // Returns the message out of received buffer func (dc *DiameterClient) ReceivedMessage() *diam.Message { - return <-dc.received + select { + case rcv := <-dc.received: + return rcv + case <-time.After(time.Duration(1) * time.Second): // Timeout reading + return nil + } } diff --git a/data/conf/samples/dmtagent/diameter_processors.json b/data/conf/samples/dmtagent/diameter_processors.json new file mode 100644 index 000000000..90d8b611a --- /dev/null +++ b/data/conf/samples/dmtagent/diameter_processors.json @@ -0,0 +1,53 @@ +{ + +"diameter_agent": { + "request_processors": [ + { + "id": "*default", // formal identifier of this processor + "dry_run": false, // do not send the events to SMG, just log them + "request_filter": "Service-Context-Id(^voice)", // filter requests processed by this processor + "continue_on_success": false, // continue to the next template if executed + "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "Session-Id", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "Service-Information>IN-Information>Real-Called-Number", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*handler", "handler_id": "*ccr_usage", "mandatory": true}, + {"tag": "SubscriberID", "field_id": "SubscriberId", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "mandatory": true}, + ], + "cca_fields":[ // fields returned in CCA + {"tag": "GrantedUnits", "field_id": "Granted-Service-Unit>CC-Time", "type": "*handler", "handler_id": "*cca_usage", "mandatory": true}, + ], + }, + { + "id": "message", // formal identifier of this processor + "dry_run": false, // do not send the events to SMG, just log them + "request_filter": "Service-Context-Id(^message)", // filter requests processed by this processor + "continue_on_success": false, // continue to the next template if executed + "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*sms", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "Session-Id", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*prepaid", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "field_filter":"Subscription-Id>Subscription-Id-Type(0)", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "field_filter":"Subscription-Id>Subscription-Id-Type(0)", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "Service-Information>SMS-Information>Recipient-Address>Address-Data", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "Requested-Service-Unit>CC-Time", "mandatory": true}, + ], + }, + ], +}, + +} + From 0308735b9b6f4e360cebf1ea5451826780f1e7cb Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 5 Jan 2016 19:48:29 +0100 Subject: [PATCH 06/14] SMGeneric - Adding ChargeEvent method, fixes #345 --- agents/dmtagent.go | 8 +++++-- agents/dmtagent_it_test.go | 42 +++++++++++++++++++----------------- apier/v1/smgenericbirpcv1.go | 9 ++++++++ apier/v1/smgenericv1.go | 19 ++++++++++++++++ sessionmanager/smgeneric.go | 36 +++++++++++++++++++++++++++++++ 5 files changed, 92 insertions(+), 22 deletions(-) diff --git a/agents/dmtagent.go b/agents/dmtagent.go index d06c67ec3..c68f35b64 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -94,9 +94,13 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro err = self.smg.Call("SMGenericV1.SessionStart", smgEv, &maxUsage) case 2: err = self.smg.Call("SMGenericV1.SessionUpdate", smgEv, &maxUsage) - case 3: + case 3, 4: var rpl string - err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) + if ccr.CCRequestType == 3 { + err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) + } else if ccr.CCRequestType == 4 { + err = self.smg.Call("SMGenericV1.ChargeEvent", smgEv, &rpl) + } if self.cgrCfg.DiameterAgentCfg().CreateCDR { if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { err = errCdr diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 4c7372635..2de7b88fb 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -393,26 +393,28 @@ func TestDmtAgentSendCCRSMS(t *testing.T) { if err := dmtClient.SendMessage(ccr); err != nil { t.Error(err) } - time.Sleep(time.Duration(100) * time.Millisecond) - msg := dmtClient.ReceivedMessage() - if msg == nil { - t.Fatal("No message returned") - } - if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { - t.Error(err) - } else if len(avps) == 0 { - t.Error("Granted-Service-Unit not found") - } else if strCCTime := avpValAsString(avps[0]); strCCTime != "0" { - t.Errorf("Expecting 0, received: %s", strCCTime) - } - var acnt *engine.Account - attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} - eAcntVal := 9.205 - if err := apierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { - t.Error(err) - } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { // Should also consider derived charges which double the cost of 6m10s - 2x0.7584 - t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) - } + /* + time.Sleep(time.Duration(100) * time.Millisecond) + msg := dmtClient.ReceivedMessage() + if msg == nil { + t.Fatal("No message returned") + } + if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Granted-Service-Unit not found") + } else if strCCTime := avpValAsString(avps[0]); strCCTime != "0" { + t.Errorf("Expecting 0, received: %s", strCCTime) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 9.205 + if err := apierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { // Should also consider derived charges which double the cost of 6m10s - 2x0.7584 + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } + */ } func TestDmtAgentCdrs(t *testing.T) { diff --git a/apier/v1/smgenericbirpcv1.go b/apier/v1/smgenericbirpcv1.go index caa93cb67..b80bcf960 100644 --- a/apier/v1/smgenericbirpcv1.go +++ b/apier/v1/smgenericbirpcv1.go @@ -99,6 +99,15 @@ func (self *SMGenericBiRpcV1) SessionEnd(clnt *rpc2.Client, ev sessionmanager.SM return nil } +// Called on individual Events (eg SMS) +func (self *SMGenericBiRpcV1) ChargeEvent(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error { + if err := self.sm.ChargeEvent(ev, clnt); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + // Called on session end, should send the CDR to CDRS func (self *SMGenericBiRpcV1) ProcessCdr(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error { if err := self.sm.ProcessCdr(ev); err != nil { diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 864ba1c5f..834ebbe9f 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -70,6 +70,15 @@ func (self *SMGenericV1) SessionEnd(ev sessionmanager.SMGenericEvent, reply *str return nil } +// Called on individual Events (eg SMS) +func (self *SMGenericV1) ChargeEvent(ev sessionmanager.SMGenericEvent, reply *string) error { + if err := self.sm.ChargeEvent(ev, nil); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + // Called on session end, should send the CDR to CDRS func (self *SMGenericV1) ProcessCdr(ev sessionmanager.SMGenericEvent, reply *string) error { if err := self.sm.ProcessCdr(ev); err != nil { @@ -132,6 +141,16 @@ func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply inte return rpcclient.ErrWrongReplyType } return self.SessionEnd(argsConverted, replyConverted) + case "SMGenericV1.ChargeEvent": + argsConverted, canConvert := args.(sessionmanager.SMGenericEvent) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return self.ChargeEvent(argsConverted, replyConverted) case "SMGenericV1.ProcessCdr": argsConverted, canConvert := args.(sessionmanager.SMGenericEvent) if !canConvert { diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 62e58a8c8..c9b3f2289 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -30,6 +30,8 @@ import ( "github.com/cgrates/cgrates/utils" ) +var ErrPartiallyExecuted = errors.New("Partially executed") + func NewSMGeneric(cgrCfg *config.CGRConfig, rater engine.Connector, cdrsrv engine.Connector, timezone string, extconns *SMGExternalConnections) *SMGeneric { gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()} @@ -197,6 +199,40 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error { return nil } +// Processes one time events (eg: SMS) +func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) error { + var sessionRuns []*engine.SessionRun + if err := self.rater.GetSessionRuns(gev.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil { + return err + } else if len(sessionRuns) == 0 { + return nil + } + var withErrors bool + for _, sR := range sessionRuns { + cc := new(engine.CallCost) + if err := self.rater.Debit(sR.CallDescriptor, cc); err != nil { + withErrors = true + utils.Logger.Err(fmt.Sprintf(" Could not Debit CD: %+v, RunID: %s, error: %s", sR.CallDescriptor, sR.DerivedCharger.RunID, err.Error())) + continue + } + var reply string + if err := self.cdrsrv.LogCallCost(&engine.CallCostLog{ + CgrId: gev.GetCgrId(self.timezone), + Source: utils.SESSION_MANAGER_SOURCE, + RunId: sR.DerivedCharger.RunID, + CallCost: cc, + CheckDuplicate: true, + }, &reply); err != nil && err != utils.ErrExists { + withErrors = true + utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error())) + } + } + if withErrors { + return ErrPartiallyExecuted + } + return nil +} + func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error { var reply string if err := self.cdrsrv.ProcessCdr(gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { From db8e91eef129c080beeb76af04d7070c6bac5319 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 5 Jan 2016 14:28:53 +0200 Subject: [PATCH 07/14] updated external libs --- glide.lock | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/glide.lock b/glide.lock index 97ec30d38..52cc5d80f 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,12 @@ hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 -updated: 2015-12-18T18:02:12.669823411+02:00 +updated: 2016-01-05T14:15:26.840209805+02:00 imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d - name: github.com/cenkalti/rpc2 version: 2d1be381ce47537e9e076b2b76dc70933162e4e9 - name: github.com/cgrates/fsock - version: f0d40ceb94f8bd15223a05466fe221df20ff5444 + version: c3b1d274ae0e42742ba1bce2bf3a138d72fb82ee - name: github.com/cgrates/kamevapi version: a376b1f937ba959857929fa3e111c0f3243278c0 - name: github.com/cgrates/osipsdagram @@ -28,7 +28,7 @@ imports: - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/jinzhu/gorm - version: d209be3138acbe304daffee637bc495499c1e70e + version: 5e23d7013eabb9f28c263f64eebacdcbf9786ded - name: github.com/jinzhu/inflection version: 3272df6c21d04180007eb3349844c89a3856bc25 - name: github.com/kr/pty @@ -36,20 +36,20 @@ imports: - name: github.com/lib/pq version: 11fc39a580a008f1f39bb3d11d984fb34ed778d9 - name: github.com/mediocregopher/radix.v2 - version: b49c6f79790ba8a13a70b42e6a53d72f2239cf63 + version: 91435107718b55ff544323a2b0f25fdd8475d283 subpackages: - /pool - redis - name: github.com/peterh/liner version: 3f1c20449d1836aa4cbe38731b96f95cdf89634d - name: github.com/ugorji/go - version: cd43bdd6be4b5675a0d1e75c4af55ee1dc0d9c5e + version: 646ae4a518c1c3be0739df898118d9bccf993858 subpackages: - /codec - name: golang.org/x/crypto - version: f18420efc3b4f8e9f3d51f6bd2476e92c46260e9 + version: 552e9d568fde9701ea1944fb01c8aadaceaa7353 - name: golang.org/x/net - version: 28273ec927bee3bea305f112fc28ceee575ea893 + version: 3b90a77d2885fb0429e8a21ab72fc73ca6f8b401 subpackages: - /websocket - name: golang.org/x/text From 36caba4db1c3df863b4d21a037a80434e333e3f0 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 6 Jan 2016 12:39:51 +0200 Subject: [PATCH 08/14] started session manager integration tests --- data/conf/samples/smg/cgrates.json | 32 ++++ engine/aliases.go | 1 + engine/calldesc.go | 20 ++- engine/responder.go | 5 +- glide.lock | 10 +- sessionmanager/smg_it_test.go | 231 +++++++++++++++++++++++++++++ 6 files changed, 282 insertions(+), 17 deletions(-) create mode 100644 data/conf/samples/smg/cgrates.json create mode 100644 sessionmanager/smg_it_test.go diff --git a/data/conf/samples/smg/cgrates.json b/data/conf/samples/smg/cgrates.json new file mode 100644 index 000000000..0cbfc28c2 --- /dev/null +++ b/data/conf/samples/smg/cgrates.json @@ -0,0 +1,32 @@ +{ +// CGRateS Configuration file +// +// Used for cgradmin +// Starts rater, scheduler + +"listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address +}, + +"rater": { + "enabled": true, // enable Rater service: +}, + +"scheduler": { + "enabled": true, // start Scheduler service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: + "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> +}, + +"sm_generic": { + "enabled": true, + "rater": "internal", + "cdrs": "internal", +}, + +} diff --git a/engine/aliases.go b/engine/aliases.go index a9fb00c65..23411392a 100644 --- a/engine/aliases.go +++ b/engine/aliases.go @@ -177,6 +177,7 @@ func NewAliasHandler(accountingDb AccountingStorage) *AliasHandler { } } +// SetAlias will set/overwrite specified alias func (am *AliasHandler) SetAlias(al Alias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() diff --git a/engine/calldesc.go b/engine/calldesc.go index 6fc95807e..f8864935b 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -746,16 +746,20 @@ func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { accountsCache := make(map[string]*Account) for _, increment := range cd.Increments { account, found := accountsCache[increment.BalanceInfo.AccountId] - if !found { - if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountId); err == nil && acc != nil { - account = acc - accountsCache[increment.BalanceInfo.AccountId] = account - defer accountingStorage.SetAccount(account) + Guardian.Guard(func() (interface{}, error) { + if !found { + if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountId); err == nil && acc != nil { + account = acc + accountsCache[increment.BalanceInfo.AccountId] = account + defer accountingStorage.SetAccount(account) + } } - } - account.refundIncrement(increment, cd, true) + utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) + account.refundIncrement(increment, cd, true) + return 0, err + }, 0, increment.BalanceInfo.AccountId) } - return 0.0, err + return 0, err } func (cd *CallDescriptor) FlushCache() (err error) { diff --git a/engine/responder.go b/engine/responder.go index 4f53e45cf..8b190d12f 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -224,10 +224,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err if rs.Bal != nil { *reply, err = rs.callMethod(arg, "Responder.RefundIncrements") } else { - r, e := Guardian.Guard(func() (interface{}, error) { - return arg.RefundIncrements() - }, 0, arg.GetAccountKey()) - *reply, err = r.(float64), e + *reply, err = arg.RefundIncrements() } rs.getCache().Cache(utils.REFUND_INCR_CACHE_PREFIX+arg.CgrId, &cache2go.CacheItem{ Value: reply, diff --git a/glide.lock b/glide.lock index 52cc5d80f..b3bf24c37 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 -updated: 2016-01-05T14:15:26.840209805+02:00 +hash: 0ca45753122a2e205a1b401e7f38b17e58ea22d3b105894604e504aeace503cb +updated: 2016-01-06T12:38:38.715365273+02:00 imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d @@ -24,11 +24,11 @@ imports: - diam/dict - diam/sm - name: github.com/go-sql-driver/mysql - version: d512f204a577a4ab037a1816604c48c9c13210be + version: bb006fd699a123d3eb514561dbefc352e978949d - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/jinzhu/gorm - version: 5e23d7013eabb9f28c263f64eebacdcbf9786ded + version: 2f7811c55f286c55cfc3a2aefb5c4049b9cd5214 - name: github.com/jinzhu/inflection version: 3272df6c21d04180007eb3349844c89a3856bc25 - name: github.com/kr/pty @@ -49,7 +49,7 @@ imports: - name: golang.org/x/crypto version: 552e9d568fde9701ea1944fb01c8aadaceaa7353 - name: golang.org/x/net - version: 3b90a77d2885fb0429e8a21ab72fc73ca6f8b401 + version: 961116aeebe66bfb58bb4d51818c70d256acbbb8 subpackages: - /websocket - name: golang.org/x/text diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go new file mode 100644 index 000000000..425475d11 --- /dev/null +++ b/sessionmanager/smg_it_test.go @@ -0,0 +1,231 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can Storagetribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be u297seful, +but WITH*out ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "flag" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args +var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") +var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + +var daCfgPath string +var daCfg *config.CGRConfig +var smgRPC *rpc.Client +var err error + +func TestSMGInitCfg(t *testing.T) { + if !*testIntegration { + return + } + daCfgPath = path.Join(*dataDir, "conf", "samples", "smg") + // Init config first + var err error + daCfg, err = config.NewCGRConfigFromFolder(daCfgPath) + if err != nil { + t.Error(err) + } + daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(daCfg) +} + +// Remove data in both rating and accounting db +func TestSMGResetDataDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitDataDb(daCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestSMGResetStorDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitStorDb(daCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestSMGStartEngine(t *testing.T) { + if !*testIntegration { + return + } + if _, err := engine.StopStartEngine(daCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestSMGApierRpcConn(t *testing.T) { + if !*testIntegration { + return + } + var err error + smgRPC, err = jsonrpc.Dial("tcp", daCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestSMGTPFromFolder(t *testing.T) { + if !*testIntegration { + return + } + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + var loadInst engine.LoadInstance + if err := smgRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestSMGMonetaryRefound(t *testing.T) { + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1004", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m30s", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 90 { + t.Error("Bad max usage: ", maxUsage) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 8.699800 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } + smgEv = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1004", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m", + } + var rpl string + if err = smgRPC.Call("SMGenericV1.SessionEnd", smgEv, &rpl); err != nil || rpl != utils.OK { + t.Error(err) + } + + attrs = &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal = 8.733201 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } +} + +func TestSMGVoiceRefound(t *testing.T) { + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1003", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m30s", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 90 { + t.Error("Bad max usage: ", maxUsage) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 120.0 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + smgEv = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1003", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m", + } + var rpl string + if err = smgRPC.Call("SMGenericV1.SessionEnd", smgEv, &rpl); err != nil || rpl != utils.OK { + t.Error(err) + } + + attrs = &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal = 150.0 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} From 27b0d6c6b160f610c1a00b439d3ad9161abf6bcf Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 Jan 2016 12:24:39 +0100 Subject: [PATCH 09/14] Diameter CCA field templates matching also in answer - fixes #348; field filters not anymore passing if the field id not found in message --- agents/libdmt.go | 7 +++++-- utils/rsrfield_test.go | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/agents/libdmt.go b/agents/libdmt.go index 642a15024..429421fda 100644 --- a/agents/libdmt.go +++ b/agents/libdmt.go @@ -235,9 +235,9 @@ func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField) (bool, int) if err != nil { return false, 0 } else if len(avps) == 0 { - return true, 0 + return false, 0 // No AVPs with field filter ID } - for avpIdx, avpVal := range avps { + for avpIdx, avpVal := range avps { // First match wins due to index if fieldFilter.FilterPasses(avpValAsString(avpVal)) { return true, avpIdx } @@ -620,6 +620,9 @@ func (self *CCA) AsDiameterMessage() *diam.Message { func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, maxUsage float64) error { for _, cfgFld := range reqProcessor.CCAFields { fmtOut, err := fieldOutVal(self.ccrMessage, cfgFld, maxUsage) + if err == ErrFilterNotPassing { // Field not in or filter not passing, try match in answer + fmtOut, err = fieldOutVal(self.diamMessage, cfgFld, maxUsage) + } if err != nil { if err == ErrFilterNotPassing { continue diff --git a/utils/rsrfield_test.go b/utils/rsrfield_test.go index 65841d0ee..1da60b703 100644 --- a/utils/rsrfield_test.go +++ b/utils/rsrfield_test.go @@ -265,6 +265,9 @@ func TestRSRFilterPass(t *testing.T) { if fltr.Pass("full_match1") { t.Error("Passing!") } + if fltr.Pass("") { + t.Error("Passing!") + } fltr, err = NewRSRFilter("^prefixMatch") // Prefix pass if err != nil { t.Error(err) From d95847cb8e72b38802c0111f137f33872e40ce16 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 6 Jan 2016 13:40:53 +0200 Subject: [PATCH 10/14] added one more dependency to glide --- glide.lock | 4 ++-- glide.yaml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/glide.lock b/glide.lock index b3bf24c37..a7791b5bd 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 0ca45753122a2e205a1b401e7f38b17e58ea22d3b105894604e504aeace503cb -updated: 2016-01-06T12:38:38.715365273+02:00 +hash: 330fc999239d5766f033409be2335338cfd172d0dcf18ad752b8613f45e9f451 +updated: 2016-01-06T13:35:12.445693385+02:00 imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d diff --git a/glide.yaml b/glide.yaml index 3bd347199..d69b5ba46 100644 --- a/glide.yaml +++ b/glide.yaml @@ -16,6 +16,7 @@ import: - package: github.com/go-sql-driver/mysql - package: github.com/gorhill/cronexpr - package: github.com/jinzhu/gorm +- package: github.com/jinzhu/inflection - package: github.com/kr/pty - package: github.com/lib/pq - package: github.com/mediocregopher/radix.v2 From bbb44cb02eafb7a57b3d81db04caecf7494f84dc Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 Jan 2016 13:06:44 +0100 Subject: [PATCH 11/14] Increased wait for smg_it_test --- sessionmanager/smg_it_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go index 425475d11..0fc3a6196 100644 --- a/sessionmanager/smg_it_test.go +++ b/sessionmanager/smg_it_test.go @@ -32,7 +32,7 @@ import ( ) var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args -var waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") +var waitRater = flag.Int("wait_rater", 150, "Number of miliseconds to wait for rater to start and cache") var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") var daCfgPath string From 382f4c308d74ada70f82154fc4286bcc24d29418 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 6 Jan 2016 14:16:10 +0200 Subject: [PATCH 12/14] fix for *voice balances refund --- engine/calldesc.go | 25 ++++++++++++++++++------- sessionmanager/session.go | 1 + sessionmanager/smg_it_test.go | 6 ++++++ sessionmanager/smg_session.go | 1 + 4 files changed, 26 insertions(+), 7 deletions(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index f8864935b..667f1656b 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -743,22 +743,33 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { cd.account = nil // make sure it's not cached - accountsCache := make(map[string]*Account) + // get account list for locking + // all must be locked in order to use cache + var accountIDs []string for _, increment := range cd.Increments { - account, found := accountsCache[increment.BalanceInfo.AccountId] - Guardian.Guard(func() (interface{}, error) { + accountIDs = append(accountIDs, increment.BalanceInfo.AccountId) + } + Guardian.Guard(func() (interface{}, error) { + accountsCache := make(map[string]*Account) + for _, increment := range cd.Increments { + account, found := accountsCache[increment.BalanceInfo.AccountId] if !found { if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountId); err == nil && acc != nil { account = acc accountsCache[increment.BalanceInfo.AccountId] = account + // will save the account only once at the end of the function defer accountingStorage.SetAccount(account) } } - utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) + if account == nil { + utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountId)) + continue + } + //utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) account.refundIncrement(increment, cd, true) - return 0, err - }, 0, increment.BalanceInfo.AccountId) - } + } + return 0, err + }, 0, accountIDs...) return 0, err } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index f30bf276b..0713956a2 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -201,6 +201,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { Subject: lastCC.Subject, Account: lastCC.Account, Destination: lastCC.Destination, + TOR: lastCC.TOR, Increments: refundIncrements, } var response float64 diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go index 0fc3a6196..acf495e1f 100644 --- a/sessionmanager/smg_it_test.go +++ b/sessionmanager/smg_it_test.go @@ -111,6 +111,9 @@ func TestSMGTPFromFolder(t *testing.T) { } func TestSMGMonetaryRefound(t *testing.T) { + if !*testIntegration { + return + } smgEv := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: utils.VOICE, @@ -171,6 +174,9 @@ func TestSMGMonetaryRefound(t *testing.T) { } func TestSMGVoiceRefound(t *testing.T) { + if !*testIntegration { + return + } smgEv := SMGenericEvent{ utils.EVENT_NAME: "TEST_EVENT", utils.TOR: utils.VOICE, diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 2ee36f58e..2837ccb3b 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -152,6 +152,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { Subject: lastCC.Subject, Account: lastCC.Account, Destination: lastCC.Destination, + TOR: lastCC.TOR, Increments: refundIncrements, } var response float64 From 731a33b3bd4a3585d6e627f81699d1c154c7ebe9 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 6 Jan 2016 14:29:11 +0200 Subject: [PATCH 13/14] unique refund account ids --- engine/calldesc.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/engine/calldesc.go b/engine/calldesc.go index 667f1656b..d372f1c98 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -745,10 +745,15 @@ func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { cd.account = nil // make sure it's not cached // get account list for locking // all must be locked in order to use cache + accMap := make(map[string]struct{}) var accountIDs []string for _, increment := range cd.Increments { - accountIDs = append(accountIDs, increment.BalanceInfo.AccountId) + accMap[increment.BalanceInfo.AccountId] = struct{}{} } + for key := range accMap { + accountIDs = append(accountIDs, key) + } + // start increment refunding loop Guardian.Guard(func() (interface{}, error) { accountsCache := make(map[string]*Account) for _, increment := range cd.Increments { From 1bb9b1e01484a314f7b17022a329951ae9a51281 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 6 Jan 2016 13:50:22 +0100 Subject: [PATCH 14/14] Diameter test improvement --- agents/dmtagent_it_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index 2de7b88fb..9a449b0b0 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -320,8 +320,11 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) { if err := dmtClient.SendMessage(m); err != nil { t.Error(err) } - time.Sleep(time.Duration(100) * time.Millisecond) + time.Sleep(time.Duration(150) * time.Millisecond) msg := dmtClient.ReceivedMessage() + if msg == nil { + t.Fatal("No answer to CCR terminate received") + } if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { t.Error(err) } else if len(avps) == 0 {