diff --git a/agents/dmtagent.go b/agents/dmtagent.go index bc247aeeb..61737c216 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -65,7 +65,7 @@ func (self *DiameterAgent) handlers() diam.Handler { return dSM } -func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor) (*CCA, error) { +func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestProcessor) *CCA { passesAllFilters := true for _, fldFilter := range reqProcessor.RequestFilter { if passes, _ := passesFieldFilter(ccr.diamMessage, fldFilter); !passes { @@ -73,51 +73,58 @@ func (self DiameterAgent) processCCR(ccr *CCR, reqProcessor *config.DARequestPro } } if !passesAllFilters { // Not going with this processor further - return nil, nil - } - smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) - if err != nil { - return nil, err + return nil } cca := NewBareCCAFromCCR(ccr, self.cgrCfg.DiameterAgentCfg().OriginHost, self.cgrCfg.DiameterAgentCfg().OriginRealm) + smgEv, err := ccr.AsSMGenericEvent(reqProcessor.CCRFields) + if err != nil { + cca.ResultCode = DiameterRatingFailed + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v AsSMGenericEvent, error: %s", ccr.diamMessage, err)) + return cca + } + var maxUsage float64 if reqProcessor.DryRun { // DryRun does not send over network utils.Logger.Info(fmt.Sprintf(" RequestProcessor: %s", reqProcessor.Id)) utils.Logger.Info(fmt.Sprintf(" CCR message: %s", ccr.diamMessage)) utils.Logger.Info(fmt.Sprintf(" SMGenericEvent: %+v", smgEv)) cca.ResultCode = diam.LimitedSuccess - if err := cca.SetProcessorAVPs(reqProcessor, 0); err != nil { + } else { // Find out maxUsage over APIs + switch ccr.CCRequestType { + case 1: + err = self.smg.Call("SMGenericV1.SessionStart", smgEv, &maxUsage) + case 2: + err = self.smg.Call("SMGenericV1.SessionUpdate", smgEv, &maxUsage) + case 3, 4: + var rpl string + if ccr.CCRequestType == 3 { + err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) + } else if ccr.CCRequestType == 4 { + err = self.smg.Call("SMGenericV1.ChargeEvent", smgEv, &rpl) + } + if self.cgrCfg.DiameterAgentCfg().CreateCDR { + if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { + err = errCdr + } + } + } + if err != nil { cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil + utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, API error: %s", ccr.diamMessage, err)) + return cca } - return cca, nil - } - var maxUsage float64 - switch ccr.CCRequestType { - case 1: - err = self.smg.Call("SMGenericV1.SessionStart", smgEv, &maxUsage) - case 2: - err = self.smg.Call("SMGenericV1.SessionUpdate", smgEv, &maxUsage) - case 3: - var rpl string - err = self.smg.Call("SMGenericV1.SessionEnd", smgEv, &rpl) - if errCdr := self.smg.Call("SMGenericV1.ProcessCdr", smgEv, &rpl); errCdr != nil { - err = errCdr + if ccr.CCRequestType != 3 && maxUsage == 0 { // Not enough balance, RFC demands 4012 + cca.ResultCode = 4012 + } else { + cca.ResultCode = diam.Success } + cca.GrantedServiceUnit.CCTime = int(maxUsage) } - if err != nil { - cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil - } - cca.ResultCode = diam.Success - cca.GrantedServiceUnit.CCTime = int(maxUsage) if err := cca.SetProcessorAVPs(reqProcessor, maxUsage); err != nil { cca.ResultCode = DiameterRatingFailed - utils.Logger.Err(fmt.Sprintf(" Processing message: %+v, error: %s", ccr.diamMessage, err)) - return cca, nil + utils.Logger.Err(fmt.Sprintf(" CCA SetProcessorAVPs for message: %+v, error: %s", ccr.diamMessage, err)) + return cca } - return cca, nil + return cca } func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { @@ -128,16 +135,12 @@ func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { } var cca *CCA // For now we simply overload in loop, maybe we will find some other use of this for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors { - if cca, err = self.processCCR(ccr, reqProcessor); err != nil { - utils.Logger.Err(fmt.Sprintf(" Error processing CCR %+v, processor id: %s, error: %s", ccr, reqProcessor.Id, err.Error())) - } + cca = self.processCCR(ccr, reqProcessor) if cca != nil && !reqProcessor.ContinueOnSuccess { break } } - if err != nil { //ToDo: return standard diameter error - return - } else if cca == nil { + if cca == nil { utils.Logger.Err(fmt.Sprintf(" No request processor enabled for CCR: %+v, ignoring request", ccr)) return } diff --git a/agents/dmtagent_it_test.go b/agents/dmtagent_it_test.go index a423f7126..9a449b0b0 100644 --- a/agents/dmtagent_it_test.go +++ b/agents/dmtagent_it_test.go @@ -31,6 +31,9 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" + "github.com/fiorix/go-diameter/diam" + "github.com/fiorix/go-diameter/diam/avp" + "github.com/fiorix/go-diameter/diam/datatype" "github.com/fiorix/go-diameter/diam/dict" ) @@ -317,8 +320,11 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) { if err := dmtClient.SendMessage(m); err != nil { t.Error(err) } - time.Sleep(time.Duration(100) * time.Millisecond) + time.Sleep(time.Duration(150) * time.Millisecond) msg := dmtClient.ReceivedMessage() + if msg == nil { + t.Fatal("No answer to CCR terminate received") + } if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { t.Error(err) } else if len(avps) == 0 { @@ -336,6 +342,84 @@ func TestDmtAgentSendCCRTerminate(t *testing.T) { } } +func TestDmtAgentSendCCRSMS(t *testing.T) { + if !*testIntegration { + return + } + ccr := diam.NewRequest(diam.CreditControl, 4, nil) + ccr.NewAVP(avp.SessionID, avp.Mbit, 0, datatype.UTF8String("cgrates;1451911932;00082")) + ccr.NewAVP(avp.OriginHost, avp.Mbit, 0, datatype.DiameterIdentity("CGR-DA")) + ccr.NewAVP(avp.OriginRealm, avp.Mbit, 0, datatype.DiameterIdentity("cgrates.org")) + ccr.NewAVP(avp.AuthApplicationID, avp.Mbit, 0, datatype.Unsigned32(4)) + ccr.NewAVP(avp.ServiceContextID, avp.Mbit, 0, datatype.UTF8String("message@huawei.com")) + ccr.NewAVP(avp.CCRequestType, avp.Mbit, 0, datatype.Enumerated(4)) + ccr.NewAVP(avp.CCRequestNumber, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.EventTimestamp, avp.Mbit, 0, datatype.Time(time.Date(2016, 1, 5, 11, 30, 10, 0, time.UTC))) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(0)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("1001")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.SubscriptionID, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.SubscriptionIDType, avp.Mbit, 0, datatype.Enumerated(1)), + diam.NewAVP(avp.SubscriptionIDData, avp.Mbit, 0, datatype.UTF8String("104502200011")), // Subscription-Id-Data + }}) + ccr.NewAVP(avp.ServiceIdentifier, avp.Mbit, 0, datatype.Unsigned32(0)) + ccr.NewAVP(avp.RequestedAction, avp.Mbit, 0, datatype.Enumerated(0)) + ccr.NewAVP(avp.RequestedServiceUnit, avp.Mbit, 0, &diam.GroupedAVP{ + AVP: []*diam.AVP{ + diam.NewAVP(avp.CCTime, avp.Mbit, 0, datatype.Unsigned32(1))}}) + ccr.NewAVP(873, avp.Mbit, 10415, &diam.GroupedAVP{ // + AVP: []*diam.AVP{ + diam.NewAVP(20300, avp.Mbit, 2011, &diam.GroupedAVP{ // IN-Information + AVP: []*diam.AVP{ + diam.NewAVP(20302, avp.Mbit, 2011, datatype.UTF8String("22509")), // Calling-Vlr-Number + diam.NewAVP(20385, avp.Mbit, 2011, datatype.UTF8String("4002")), // Called-Party-NP + }, + }), + diam.NewAVP(2000, avp.Mbit, 10415, &diam.GroupedAVP{ // SMS-Information + AVP: []*diam.AVP{ + diam.NewAVP(886, avp.Mbit, 10415, &diam.GroupedAVP{ // Originator-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("49602200011")), // Address-Data + }}), + diam.NewAVP(1201, avp.Mbit, 10415, &diam.GroupedAVP{ // Recipient-Address + AVP: []*diam.AVP{ + diam.NewAVP(899, avp.Mbit, 10415, datatype.Enumerated(1)), // Address-Type + diam.NewAVP(897, avp.Mbit, 10415, datatype.UTF8String("49780029555")), // Address-Data + }}), + }, + }), + }}) + if err := dmtClient.SendMessage(ccr); err != nil { + t.Error(err) + } + /* + time.Sleep(time.Duration(100) * time.Millisecond) + msg := dmtClient.ReceivedMessage() + if msg == nil { + t.Fatal("No message returned") + } + if avps, err := msg.FindAVPsWithPath([]interface{}{"Granted-Service-Unit", "CC-Time"}, dict.UndefinedVendorID); err != nil { + t.Error(err) + } else if len(avps) == 0 { + t.Error("Granted-Service-Unit not found") + } else if strCCTime := avpValAsString(avps[0]); strCCTime != "0" { + t.Errorf("Expecting 0, received: %s", strCCTime) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 9.205 + if err := apierRpc.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { // Should also consider derived charges which double the cost of 6m10s - 2x0.7584 + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } + */ +} + func TestDmtAgentCdrs(t *testing.T) { if !*testIntegration { return diff --git a/agents/dmtclient.go b/agents/dmtclient.go index 4025b6bc5..fd02fd5fe 100644 --- a/agents/dmtclient.go +++ b/agents/dmtclient.go @@ -85,5 +85,10 @@ func (dc *DiameterClient) handleALL(c diam.Conn, m *diam.Message) { // Returns the message out of received buffer func (dc *DiameterClient) ReceivedMessage() *diam.Message { - return <-dc.received + select { + case rcv := <-dc.received: + return rcv + case <-time.After(time.Duration(1) * time.Second): // Timeout reading + return nil + } } diff --git a/agents/libdmt.go b/agents/libdmt.go index 642a15024..429421fda 100644 --- a/agents/libdmt.go +++ b/agents/libdmt.go @@ -235,9 +235,9 @@ func passesFieldFilter(m *diam.Message, fieldFilter *utils.RSRField) (bool, int) if err != nil { return false, 0 } else if len(avps) == 0 { - return true, 0 + return false, 0 // No AVPs with field filter ID } - for avpIdx, avpVal := range avps { + for avpIdx, avpVal := range avps { // First match wins due to index if fieldFilter.FilterPasses(avpValAsString(avpVal)) { return true, avpIdx } @@ -620,6 +620,9 @@ func (self *CCA) AsDiameterMessage() *diam.Message { func (self *CCA) SetProcessorAVPs(reqProcessor *config.DARequestProcessor, maxUsage float64) error { for _, cfgFld := range reqProcessor.CCAFields { fmtOut, err := fieldOutVal(self.ccrMessage, cfgFld, maxUsage) + if err == ErrFilterNotPassing { // Field not in or filter not passing, try match in answer + fmtOut, err = fieldOutVal(self.diamMessage, cfgFld, maxUsage) + } if err != nil { if err == ErrFilterNotPassing { continue diff --git a/apier/v1/smgenericbirpcv1.go b/apier/v1/smgenericbirpcv1.go index caa93cb67..b80bcf960 100644 --- a/apier/v1/smgenericbirpcv1.go +++ b/apier/v1/smgenericbirpcv1.go @@ -99,6 +99,15 @@ func (self *SMGenericBiRpcV1) SessionEnd(clnt *rpc2.Client, ev sessionmanager.SM return nil } +// Called on individual Events (eg SMS) +func (self *SMGenericBiRpcV1) ChargeEvent(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error { + if err := self.sm.ChargeEvent(ev, clnt); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + // Called on session end, should send the CDR to CDRS func (self *SMGenericBiRpcV1) ProcessCdr(clnt *rpc2.Client, ev sessionmanager.SMGenericEvent, reply *string) error { if err := self.sm.ProcessCdr(ev); err != nil { diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 864ba1c5f..834ebbe9f 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -70,6 +70,15 @@ func (self *SMGenericV1) SessionEnd(ev sessionmanager.SMGenericEvent, reply *str return nil } +// Called on individual Events (eg SMS) +func (self *SMGenericV1) ChargeEvent(ev sessionmanager.SMGenericEvent, reply *string) error { + if err := self.sm.ChargeEvent(ev, nil); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + // Called on session end, should send the CDR to CDRS func (self *SMGenericV1) ProcessCdr(ev sessionmanager.SMGenericEvent, reply *string) error { if err := self.sm.ProcessCdr(ev); err != nil { @@ -132,6 +141,16 @@ func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply inte return rpcclient.ErrWrongReplyType } return self.SessionEnd(argsConverted, replyConverted) + case "SMGenericV1.ChargeEvent": + argsConverted, canConvert := args.(sessionmanager.SMGenericEvent) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := reply.(*string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return self.ChargeEvent(argsConverted, replyConverted) case "SMGenericV1.ProcessCdr": argsConverted, canConvert := args.(sessionmanager.SMGenericEvent) if !canConvert { diff --git a/config/config_defaults.go b/config/config_defaults.go index 564f370bc..8040aec98 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -235,6 +235,7 @@ const CGRATES_CFG_JSON = ` "empty_balance_ann_file": "", // file to be played before disconnecting prepaid calls on empty balance (applies only if no context defined) "subscribe_park": true, // subscribe via fsock to receive park events "channel_sync_interval": "5m", // sync channels with freeswitch regularly + "max_wait_connection": "2s", // maximum duration to wait for a connection to be retrieved from the pool "connections":[ // instantiate connections to multiple FreeSWITCH servers {"server": "127.0.0.1:8021", "password": "ClueCon", "reconnects": 5} ], @@ -269,7 +270,7 @@ const CGRATES_CFG_JSON = ` {"server": "internal"} // address where to reach CDR Server, empty to disable CDR capturing ], "reconnects": 5, // number of reconnects if connection is lost - "create_cdr": false, // create CDR out of events and sends them to CDRS component + "create_cdr": false, // create CDR out of events and sends it to CDRS component "debit_interval": "10s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this "max_call_duration": "3h", // maximum call duration a prepaid call can last @@ -284,7 +285,8 @@ const CGRATES_CFG_JSON = ` "dictionaries_dir": "/usr/share/cgrates/diameter/dict/", // path towards directory holding additional dictionaries to load "sm_generic_conns": [ {"server": "internal"} // connection towards SMG component for session management - ], + ], // connection towards SMG component for session management + "create_cdr": true, // create CDR out of CCR terminate and send it to SMG component "debit_interval": "5m", // interval for CCR updates "timezone": "", // timezone for timestamps where not specified, empty for general defaults <""|UTC|Local|$IANA_TZ_DB> "dialect": "huawei", // the diameter dialect used in the communication, supported: @@ -295,7 +297,7 @@ const CGRATES_CFG_JSON = ` "request_processors": [ { "id": "*default", // formal identifier of this processor - "dry_run": false, // do not send the CDRs to CDRS, just parse them + "dry_run": false, // do not send the events to SMG, just log them "request_filter": "Subscription-Id>Subscription-Id-Type(0)", // filter requests processed by this processor "continue_on_success": false, // continue to the next template if executed "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value diff --git a/config/config_json_test.go b/config/config_json_test.go index a1fca484a..30f6f6fbc 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -376,6 +376,7 @@ func TestSmFsJsonCfg(t *testing.T) { Empty_balance_ann_file: utils.StringPointer(""), Subscribe_park: utils.BoolPointer(true), Channel_sync_interval: utils.StringPointer("5m"), + Max_wait_connection: utils.StringPointer("2s"), Connections: &[]*FsConnJsonCfg{ &FsConnJsonCfg{ Server: utils.StringPointer("127.0.0.1:8021"), @@ -454,6 +455,7 @@ func TestDiameterAgentJsonCfg(t *testing.T) { &HaPoolJsonCfg{ Server: utils.StringPointer("internal"), }}, + Create_cdr: utils.BoolPointer(true), Debit_interval: utils.StringPointer("5m"), Timezone: utils.StringPointer(""), Dialect: utils.StringPointer("huawei"), diff --git a/config/daconfig.go b/config/daconfig.go index 89a7584a7..0a2588240 100644 --- a/config/daconfig.go +++ b/config/daconfig.go @@ -29,6 +29,7 @@ type DiameterAgentCfg struct { Listen string // address where to listen for diameter requests DictionariesDir string SMGenericConns []*HaPoolConfig // connections towards SMG component + CreateCDR bool DebitInterval time.Duration Timezone string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Dialect string // the diameter dialect used in the implementation @@ -59,6 +60,9 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro self.SMGenericConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Create_cdr != nil { + self.CreateCDR = *jsnCfg.Create_cdr + } if jsnCfg.Debit_interval != nil { var err error if self.DebitInterval, err = utils.ParseDurationWithSecs(*jsnCfg.Debit_interval); err != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 1aa99a41a..24195518c 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -189,6 +189,7 @@ type SmFsJsonCfg struct { Empty_balance_ann_file *string Subscribe_park *bool Channel_sync_interval *string + Max_wait_connection *string Connections *[]*FsConnJsonCfg } @@ -248,6 +249,7 @@ type DiameterAgentJsonCfg struct { Listen *string // address where to listen for diameter requests Dictionaries_dir *string // path towards additional dictionaries Sm_generic_conns *[]*HaPoolJsonCfg // Connections towards generic SM + Create_cdr *bool Debit_interval *string Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Dialect *string diff --git a/config/smconfig.go b/config/smconfig.go index 1feebd934..299d6cd56 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -148,6 +148,7 @@ type SmFsConfig struct { EmptyBalanceAnnFile string SubscribePark bool ChannelSyncInterval time.Duration + MaxWaitConnection time.Duration Connections []*FsConnConfig } @@ -218,6 +219,11 @@ func (self *SmFsConfig) loadFromJsonCfg(jsnCfg *SmFsJsonCfg) error { return err } } + if jsnCfg.Max_wait_connection != nil { + if self.MaxWaitConnection, err = utils.ParseDurationWithSecs(*jsnCfg.Max_wait_connection); err != nil { + return err + } + } if jsnCfg.Connections != nil { self.Connections = make([]*FsConnConfig, len(*jsnCfg.Connections)) for idx, jsnConnCfg := range *jsnCfg.Connections { diff --git a/data/conf/samples/dmtagent/diameter_processors.json b/data/conf/samples/dmtagent/diameter_processors.json new file mode 100644 index 000000000..90d8b611a --- /dev/null +++ b/data/conf/samples/dmtagent/diameter_processors.json @@ -0,0 +1,53 @@ +{ + +"diameter_agent": { + "request_processors": [ + { + "id": "*default", // formal identifier of this processor + "dry_run": false, // do not send the events to SMG, just log them + "request_filter": "Service-Context-Id(^voice)", // filter requests processed by this processor + "continue_on_success": false, // continue to the next template if executed + "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "Session-Id", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "Service-Information>IN-Information>Real-Called-Number", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*handler", "handler_id": "*ccr_usage", "mandatory": true}, + {"tag": "SubscriberID", "field_id": "SubscriberId", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "mandatory": true}, + ], + "cca_fields":[ // fields returned in CCA + {"tag": "GrantedUnits", "field_id": "Granted-Service-Unit>CC-Time", "type": "*handler", "handler_id": "*cca_usage", "mandatory": true}, + ], + }, + { + "id": "message", // formal identifier of this processor + "dry_run": false, // do not send the events to SMG, just log them + "request_filter": "Service-Context-Id(^message)", // filter requests processed by this processor + "continue_on_success": false, // continue to the next template if executed + "ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*sms", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "Session-Id", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*prepaid", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "field_filter":"Subscription-Id>Subscription-Id-Type(0)", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "field_filter":"Subscription-Id>Subscription-Id-Type(0)", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "Service-Information>SMS-Information>Recipient-Address>Address-Data", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "Requested-Service-Unit>CC-Time", "mandatory": true}, + ], + }, + ], +}, + +} + diff --git a/data/conf/samples/smg/cgrates.json b/data/conf/samples/smg/cgrates.json new file mode 100644 index 000000000..0cbfc28c2 --- /dev/null +++ b/data/conf/samples/smg/cgrates.json @@ -0,0 +1,32 @@ +{ +// CGRateS Configuration file +// +// Used for cgradmin +// Starts rater, scheduler + +"listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address +}, + +"rater": { + "enabled": true, // enable Rater service: +}, + +"scheduler": { + "enabled": true, // start Scheduler service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: + "rater": "internal", // address where to reach the Rater for cost calculation, empty to disable functionality: <""|internal|x.y.z.y:1234> +}, + +"sm_generic": { + "enabled": true, + "rater": "internal", + "cdrs": "internal", +}, + +} diff --git a/engine/action.go b/engine/action.go index b3ae9bcff..abd90f72c 100644 --- a/engine/action.go +++ b/engine/action.go @@ -257,7 +257,6 @@ func cdrLogAction(acc *Account, sq *StatsQueueTriggered, a *Action, acs Actions) return err } } - b, _ := json.Marshal(cdrs) a.ExpirationString = string(b) // testing purpose only return diff --git a/engine/aliases.go b/engine/aliases.go index d4fdc736b..3621ad2f7 100644 --- a/engine/aliases.go +++ b/engine/aliases.go @@ -172,6 +172,7 @@ type AttrAddAlias struct { Overwrite bool } +// SetAlias will set/overwrite specified alias func (am *AliasHandler) SetAlias(attr *AttrAddAlias, reply *string) error { am.mu.Lock() defer am.mu.Unlock() diff --git a/engine/calldesc.go b/engine/calldesc.go index afc0a794f..a45c4b1c6 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -743,19 +743,39 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { cd.account = nil // make sure it's not cached - accountsCache := make(map[string]*Account) + // get account list for locking + // all must be locked in order to use cache + accMap := make(map[string]struct{}) + var accountIDs []string for _, increment := range cd.Increments { - account, found := accountsCache[increment.BalanceInfo.AccountId] - if !found { - if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountId); err == nil && acc != nil { - account = acc - accountsCache[increment.BalanceInfo.AccountId] = account - defer accountingStorage.SetAccount(account) - } - } - account.refundIncrement(increment, cd, true) + accMap[increment.BalanceInfo.AccountId] = struct{}{} } - return 0.0, err + for key := range accMap { + accountIDs = append(accountIDs, key) + } + // start increment refunding loop + Guardian.Guard(func() (interface{}, error) { + accountsCache := make(map[string]*Account) + for _, increment := range cd.Increments { + account, found := accountsCache[increment.BalanceInfo.AccountId] + if !found { + if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountId); err == nil && acc != nil { + account = acc + accountsCache[increment.BalanceInfo.AccountId] = account + // will save the account only once at the end of the function + defer accountingStorage.SetAccount(account) + } + } + if account == nil { + utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountId)) + continue + } + //utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) + account.refundIncrement(increment, cd, true) + } + return 0, err + }, 0, accountIDs...) + return 0, err } func (cd *CallDescriptor) FlushCache() (err error) { diff --git a/engine/responder.go b/engine/responder.go index 00a605356..6a9955e4c 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -220,10 +220,7 @@ func (rs *Responder) RefundIncrements(arg *CallDescriptor, reply *float64) (err if rs.Bal != nil { *reply, err = rs.callMethod(arg, "Responder.RefundIncrements") } else { - r, e := Guardian.Guard(func() (interface{}, error) { - return arg.RefundIncrements() - }, 0, arg.GetAccountKey()) - *reply, err = r.(float64), e + *reply, err = arg.RefundIncrements() } rs.getCache().Cache(cacheKey, &cache2go.CacheItem{Value: reply, Err: err}) return diff --git a/engine/units_counter_test.go b/engine/units_counter_test.go index be8b3a49c..f136c23b7 100644 --- a/engine/units_counter_test.go +++ b/engine/units_counter_test.go @@ -246,47 +246,46 @@ func TestUnitCountersCountAllVoiceDestinationEvent(t *testing.T) { } } -/* func TestUnitCountersResetCounterById(t *testing.T) { a := &Account{ ActionTriggers: ActionTriggers{ &ActionTrigger{ - ID: "TestTR1", + UniqueID: "TestTR1", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.MONETARY, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR11", + UniqueID: "TestTR11", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.MONETARY, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR2", + UniqueID: "TestTR2", ThresholdType: utils.TRIGGER_MAX_EVENT_COUNTER, BalanceType: utils.VOICE, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR3", + UniqueID: "TestTR3", ThresholdType: utils.TRIGGER_MAX_BALANCE_COUNTER, BalanceType: utils.VOICE, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR4", + UniqueID: "TestTR4", ThresholdType: utils.TRIGGER_MAX_BALANCE_COUNTER, BalanceType: utils.SMS, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), BalanceWeight: 10, }, &ActionTrigger{ - ID: "TestTR5", + UniqueID: "TestTR5", ThresholdType: utils.TRIGGER_MAX_BALANCE, BalanceType: utils.SMS, BalanceDirections: utils.NewStringMap(utils.OUT, utils.IN), @@ -328,4 +327,3 @@ func TestUnitCountersResetCounterById(t *testing.T) { t.Errorf("Error Initializing adding unit counters: %v", len(a.UnitCounters)) } } -*/ diff --git a/glide.lock b/glide.lock index 94e964ee3..57fb59cd8 100644 --- a/glide.lock +++ b/glide.lock @@ -1,12 +1,17 @@ +<<<<<<< HEAD hash: 0ca45753122a2e205a1b401e7f38b17e58ea22d3b105894604e504aeace503cb updated: 2015-12-24T18:29:52.686317738+02:00 +======= +hash: 330fc999239d5766f033409be2335338cfd172d0dcf18ad752b8613f45e9f451 +updated: 2016-01-06T13:35:12.445693385+02:00 +>>>>>>> master imports: - name: github.com/cenkalti/hub version: 57d753b5f4856e77b3cf8ecce78c97215a7d324d - name: github.com/cenkalti/rpc2 version: 2d1be381ce47537e9e076b2b76dc70933162e4e9 - name: github.com/cgrates/fsock - version: f0d40ceb94f8bd15223a05466fe221df20ff5444 + version: c3b1d274ae0e42742ba1bce2bf3a138d72fb82ee - name: github.com/cgrates/kamevapi version: a376b1f937ba959857929fa3e111c0f3243278c0 - name: github.com/cgrates/osipsdagram @@ -24,11 +29,11 @@ imports: - diam/dict - diam/sm - name: github.com/go-sql-driver/mysql - version: d512f204a577a4ab037a1816604c48c9c13210be + version: bb006fd699a123d3eb514561dbefc352e978949d - name: github.com/gorhill/cronexpr version: a557574d6c024ed6e36acc8b610f5f211c91568a - name: github.com/jinzhu/gorm - version: 4a821a5beff551a4f445283fc232e892e2cff324 + version: 2f7811c55f286c55cfc3a2aefb5c4049b9cd5214 - name: github.com/jinzhu/inflection version: 3272df6c21d04180007eb3349844c89a3856bc25 repo: https://github.com/jinzhu/inflection @@ -48,9 +53,9 @@ imports: subpackages: - /codec - name: golang.org/x/crypto - version: f18420efc3b4f8e9f3d51f6bd2476e92c46260e9 + version: 552e9d568fde9701ea1944fb01c8aadaceaa7353 - name: golang.org/x/net - version: ea6dba8c93880aa07d6ebed83c3c680cd9faa63a + version: 961116aeebe66bfb58bb4d51818c70d256acbbb8 subpackages: - /websocket - name: golang.org/x/text diff --git a/glide.yaml b/glide.yaml index 3bd347199..d69b5ba46 100644 --- a/glide.yaml +++ b/glide.yaml @@ -16,6 +16,7 @@ import: - package: github.com/go-sql-driver/mysql - package: github.com/gorhill/cronexpr - package: github.com/jinzhu/gorm +- package: github.com/jinzhu/inflection - package: github.com/kr/pty - package: github.com/lib/pq - package: github.com/mediocregopher/radix.v2 diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 3bd1bc215..6332bbe1d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -246,7 +246,7 @@ func (sm *FSSessionManager) Connect() error { errChan <- err } }() - if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, + if fsSenderPool, err := fsock.NewFSockPool(5, connCfg.Server, connCfg.Password, 1, sm.cfg.MaxWaitConnection, make(map[string][]func(string, string)), make(map[string]string), utils.Logger.(*syslog.Writer), connId); err != nil { return fmt.Errorf("Cannot connect FreeSWITCH senders pool, error: %s", err.Error()) } else if fsSenderPool == nil { @@ -358,21 +358,27 @@ application_data:+10800 alloted_timeout uuid:3427e500-10e5-4864-a589-e306b70419a */ func (sm *FSSessionManager) SyncSessions() error { for connId, senderPool := range sm.senderPools { + var aChans []map[string]string fsConn, err := senderPool.PopFSock() if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) - continue - } - activeChanStr, err := fsConn.SendApiCmd("show channels") - senderPool.PushFSock(fsConn) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) - continue - } - aChans := fsock.MapChanData(activeChanStr) - if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS - utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", activeChanStr)) - continue + if err == fsock.ErrConnectionPoolTimeout { // Timeout waiting for connections to re-establish, cleanup calls + aChans = make([]map[string]string, 0) // Emulate no call information so we can disconnect bellow + } else { + utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + } else { + activeChanStr, err := fsConn.SendApiCmd("show channels") + senderPool.PushFSock(fsConn) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Error on syncing active calls, senderPool: %+v, error: %s", senderPool, err.Error())) + continue + } + aChans = fsock.MapChanData(activeChanStr) + if len(aChans) == 0 && strings.HasPrefix(activeChanStr, "uuid,direction") { // Failed converting output from FS + utils.Logger.Err(fmt.Sprintf(" Syncing active calls, failed converting output from FS: %s", activeChanStr)) + continue + } } for _, session := range sm.sessions.getSessions() { if session.connId != connId { // This session belongs to another connectionId diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 30f36905c..16523bc3e 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -202,6 +202,7 @@ func (s *Session) Refund(lastCC *engine.CallCost, hangupTime time.Time) error { Subject: lastCC.Subject, Account: lastCC.Account, Destination: lastCC.Destination, + TOR: lastCC.TOR, Increments: refundIncrements, } var response float64 diff --git a/sessionmanager/smg_it_test.go b/sessionmanager/smg_it_test.go new file mode 100644 index 000000000..acf495e1f --- /dev/null +++ b/sessionmanager/smg_it_test.go @@ -0,0 +1,237 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can Storagetribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be u297seful, +but WITH*out ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessionmanager + +import ( + "flag" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var testIntegration = flag.Bool("integration", false, "Perform the tests in integration mode, not by default.") // This flag will be passed here via "go test -local" args +var waitRater = flag.Int("wait_rater", 150, "Number of miliseconds to wait for rater to start and cache") +var dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + +var daCfgPath string +var daCfg *config.CGRConfig +var smgRPC *rpc.Client +var err error + +func TestSMGInitCfg(t *testing.T) { + if !*testIntegration { + return + } + daCfgPath = path.Join(*dataDir, "conf", "samples", "smg") + // Init config first + var err error + daCfg, err = config.NewCGRConfigFromFolder(daCfgPath) + if err != nil { + t.Error(err) + } + daCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(daCfg) +} + +// Remove data in both rating and accounting db +func TestSMGResetDataDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitDataDb(daCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func TestSMGResetStorDb(t *testing.T) { + if !*testIntegration { + return + } + if err := engine.InitStorDb(daCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func TestSMGStartEngine(t *testing.T) { + if !*testIntegration { + return + } + if _, err := engine.StopStartEngine(daCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestSMGApierRpcConn(t *testing.T) { + if !*testIntegration { + return + } + var err error + smgRPC, err = jsonrpc.Dial("tcp", daCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func TestSMGTPFromFolder(t *testing.T) { + if !*testIntegration { + return + } + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")} + var loadInst engine.LoadInstance + if err := smgRPC.Call("ApierV2.LoadTariffPlanFromFolder", attrs, &loadInst); err != nil { + t.Error(err) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func TestSMGMonetaryRefound(t *testing.T) { + if !*testIntegration { + return + } + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1004", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m30s", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 90 { + t.Error("Bad max usage: ", maxUsage) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 8.699800 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } + smgEv = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1004", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m", + } + var rpl string + if err = smgRPC.Call("SMGenericV1.SessionEnd", smgEv, &rpl); err != nil || rpl != utils.OK { + t.Error(err) + } + + attrs = &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal = 8.733201 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.MONETARY].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.MONETARY].GetTotalValue()) + } +} + +func TestSMGVoiceRefound(t *testing.T) { + if !*testIntegration { + return + } + smgEv := SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1003", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m30s", + } + var maxUsage float64 + if err := smgRPC.Call("SMGenericV1.SessionStart", smgEv, &maxUsage); err != nil { + t.Error(err) + } + if maxUsage != 90 { + t.Error("Bad max usage: ", maxUsage) + } + var acnt *engine.Account + attrs := &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal := 120.0 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } + smgEv = SMGenericEvent{ + utils.EVENT_NAME: "TEST_EVENT", + utils.TOR: utils.VOICE, + utils.ACCID: "12345", + utils.DIRECTION: utils.OUT, + utils.ACCOUNT: "1001", + utils.SUBJECT: "1001", + utils.DESTINATION: "1003", + utils.CATEGORY: "call", + utils.TENANT: "cgrates.org", + utils.REQTYPE: utils.META_PREPAID, + utils.SETUP_TIME: "2016-01-05 18:30:49", + utils.ANSWER_TIME: "2016-01-05 18:31:05", + utils.USAGE: "1m", + } + var rpl string + if err = smgRPC.Call("SMGenericV1.SessionEnd", smgEv, &rpl); err != nil || rpl != utils.OK { + t.Error(err) + } + + attrs = &utils.AttrGetAccount{Tenant: "cgrates.org", Account: "1001"} + eAcntVal = 150.0 + if err := smgRPC.Call("ApierV2.GetAccount", attrs, &acnt); err != nil { + t.Error(err) + } else if acnt.BalanceMap[utils.VOICE].GetTotalValue() != eAcntVal { + t.Errorf("Expected: %f, received: %f", eAcntVal, acnt.BalanceMap[utils.VOICE].GetTotalValue()) + } +} diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index c12e1e5c8..2fa99c2db 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -153,6 +153,7 @@ func (self *SMGSession) refund(refundDuration time.Duration) error { Subject: lastCC.Subject, Account: lastCC.Account, Destination: lastCC.Destination, + TOR: lastCC.TOR, Increments: refundIncrements, } var response float64 diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index ede6f4b3f..81143d87d 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -31,7 +31,10 @@ import ( "github.com/cgrates/rpcclient" ) +var ErrPartiallyExecuted = errors.New("Partially executed") + func NewSMGeneric(cgrCfg *config.CGRConfig, rater rpcclient.RpcClientConnection, cdrsrv rpcclient.RpcClientConnection, timezone string, extconns *SMGExternalConnections) *SMGeneric { + gsm := &SMGeneric{cgrCfg: cgrCfg, rater: rater, cdrsrv: cdrsrv, extconns: extconns, timezone: timezone, sessions: make(map[string][]*SMGSession), sessionsMux: new(sync.Mutex), guard: engine.NewGuardianLock()} return gsm @@ -199,6 +202,40 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error { return nil } +// Processes one time events (eg: SMS) +func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) error { + var sessionRuns []*engine.SessionRun + if err := self.rater.Call("Responder.GetSessionRuns", gev.AsStoredCdr(self.cgrCfg, self.timezone), &sessionRuns); err != nil { + return err + } else if len(sessionRuns) == 0 { + return nil + } + var withErrors bool + for _, sR := range sessionRuns { + cc := new(engine.CallCost) + if err := self.rater.Call("Responder.Debit", sR.CallDescriptor, cc); err != nil { + withErrors = true + utils.Logger.Err(fmt.Sprintf(" Could not Debit CD: %+v, RunID: %s, error: %s", sR.CallDescriptor, sR.DerivedCharger.RunID, err.Error())) + continue + } + var reply string + if err := self.cdrsrv.Call("CdrServer.LogCallCost", &engine.CallCostLog{ + CgrId: gev.GetCgrId(self.timezone), + Source: utils.SESSION_MANAGER_SOURCE, + RunId: sR.DerivedCharger.RunID, + CallCost: cc, + CheckDuplicate: true, + }, &reply); err != nil && err != utils.ErrExists { + withErrors = true + utils.Logger.Err(fmt.Sprintf(" Could not save CC: %+v, RunID: %s error: %s", cc, sR.DerivedCharger.RunID, err.Error())) + } + } + if withErrors { + return ErrPartiallyExecuted + } + return nil +} + func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error { var reply string if err := self.cdrsrv.Call("CdrServer.ProcessCdr", gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { diff --git a/utils/rsrfield_test.go b/utils/rsrfield_test.go index 65841d0ee..1da60b703 100644 --- a/utils/rsrfield_test.go +++ b/utils/rsrfield_test.go @@ -265,6 +265,9 @@ func TestRSRFilterPass(t *testing.T) { if fltr.Pass("full_match1") { t.Error("Passing!") } + if fltr.Pass("") { + t.Error("Passing!") + } fltr, err = NewRSRFilter("^prefixMatch") // Prefix pass if err != nil { t.Error(err)