diff --git a/agents/dmtagent.go b/agents/dmtagent.go index 3e034bf97..2f5c54cfc 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -65,19 +65,44 @@ func (self *DiameterAgent) handlers() diam.Handler { return dSM } -func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { - //utils.Logger.Warning(fmt.Sprintf(" Received CCR message from %s:\n%s", c.RemoteAddr(), m)) - var ccr CCR - if err := m.Unmarshal(&ccr); err != nil { - utils.Logger.Err(fmt.Sprintf(" Unmarshaling message: %s, error: %s", m, err)) - return - } - ccr.diamMessage = m // Save it for later searches inside AVPs - cca := NewCCAFromCCR(&ccr) +func (self *DiameterAgent) processCCR(ccr *CCR) (*CCA, error) { + cca := NewCCAFromCCR(ccr) cca.OriginHost = self.cgrCfg.DiameterAgentCfg().OriginHost cca.OriginRealm = self.cgrCfg.DiameterAgentCfg().OriginRealm cca.GrantedServiceUnit.CCTime = 300 cca.ResultCode = diam.Success + return cca, nil +} + +func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { + ccr, err := NewCCRFromDiameterMessage(m) + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Unmarshaling message: %s, error: %s", m, err)) + return + } + var cca *CCA // For now we simply overload in loop, maybe we will find some other use of this + for _, reqProcessor := range self.cgrCfg.DiameterAgentCfg().RequestProcessors { + passesAllFilters := true + for _, fldFilter := range reqProcessor.RequestFilter { + if !ccr.passesFieldFilter(fldFilter) { + passesAllFilters = false + } + } + if !passesAllFilters { // Not going with this processor further + continue + } + cca, err = self.processCCR(ccr) + if !reqProcessor.ContinueOnSuccess { + break + } + } + if err != nil { + utils.Logger.Err(fmt.Sprintf(" Failed to generate CCA, error: %s", err.Error())) + return + } else if cca == nil { + utils.Logger.Err(fmt.Sprintf(" No request processor enabled for CCR: %+v, ignoring request", ccr)) + return + } if dmtA, err := cca.AsDiameterMessage(); err != nil { utils.Logger.Err(fmt.Sprintf(" Failed to convert cca as diameter message, error: %s", err.Error())) return @@ -85,7 +110,6 @@ func (self *DiameterAgent) handleCCR(c diam.Conn, m *diam.Message) { utils.Logger.Err(fmt.Sprintf(" Failed to write message to %s: %s\n%s\n", c.RemoteAddr(), err, dmtA)) return } - } func (self *DiameterAgent) handleALL(c diam.Conn, m *diam.Message) { diff --git a/agents/libdmt.go b/agents/libdmt.go index 5f0aaa5f3..5ac2c0951 100644 --- a/agents/libdmt.go +++ b/agents/libdmt.go @@ -143,6 +143,15 @@ func storedCdrToCCR(cdr *engine.StoredCdr, originHost, originRealm string, vendo return ccr } +func NewCCRFromDiameterMessage(m *diam.Message) (*CCR, error) { + var ccr CCR + if err := m.Unmarshal(&ccr); err != nil { + return nil, err + } + ccr.diamMessage = m + return &ccr, nil +} + // CallControl Request type CCR struct { SessionId string `avp:"Session-Id"` @@ -332,6 +341,14 @@ func avpValAsString(a *diam.AVP) string { return dataVal[startIdx+1 : endIdx] } +// Follows the implementation in the StorCdr +func (self *CCR) passesFieldFilter(fieldFilter *utils.RSRField) bool { + if fieldFilter == nil { + return true + } + return fieldFilter.FilterPasses(self.eventFieldValue(utils.RSRFields{fieldFilter})) +} + // Handler for meta functions func (self *CCR) metaHandler(tag, arg string) (string, error) { switch tag { @@ -342,9 +359,9 @@ func (self *CCR) metaHandler(tag, arg string) (string, error) { return "", nil } -func (self *CCR) eventFieldValue(cfgFld *config.CfgCdrField) string { +func (self *CCR) eventFieldValue(fldTpl utils.RSRFields) string { var outVal string - for _, rsrTpl := range cfgFld.Value { + for _, rsrTpl := range fldTpl { if rsrTpl.IsStatic() { outVal += rsrTpl.ParseValue("") } else { @@ -387,7 +404,7 @@ func (self *CCR) AsSMGenericEvent(cfgFlds []*config.CfgCdrField) (sessionmanager utils.Logger.Warning(fmt.Sprintf(" Ignoring processing of metafunction: %s, error: %s", cfgFld.HandlerId, err.Error())) } case utils.META_COMPOSED: - outVal = self.eventFieldValue(cfgFld) + outVal = self.eventFieldValue(cfgFld.Value) } fmtOut := outVal if fmtOut, err = utils.FmtFieldWidth(outVal, cfgFld.Width, cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { diff --git a/config/config_defaults.go b/config/config_defaults.go index 522e564f9..53753034e 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -273,7 +273,7 @@ const CGRATES_CFG_JSON = ` { "id": "*default", // formal identifier of this processor "dry_run": false, // do not send the CDRs to CDRS, just parse them - "request_filter": "Subscription-Id>Subscription-Type(0)", // filter requests processed by this processor + "request_filter": "Subscription-Id>Subscription-Id-Type(0)", // filter requests processed by this processor "continue_on_success": false, // continue to the next template if executed "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "tor", "field_id": "TOR", "type": "*composed", "value": "^*voice", "mandatory": true}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 39cdfd5b5..2ab9edcfa 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -429,7 +429,7 @@ func TestDiameterAgentJsonCfg(t *testing.T) { &DARequestProcessorJsnCfg{ Id: utils.StringPointer("*default"), Dry_run: utils.BoolPointer(false), - Request_filter: utils.StringPointer("Subscription-Id>Subscription-Type(0)"), + Request_filter: utils.StringPointer("Subscription-Id>Subscription-Id-Type(0)"), Continue_on_success: utils.BoolPointer(false), Content_fields: &[]*CdrFieldJsonCfg{ &CdrFieldJsonCfg{Tag: utils.StringPointer("tor"), Field_id: utils.StringPointer(utils.TOR), Type: utils.StringPointer(utils.META_COMPOSED), diff --git a/data/conf/samples/dmtagent/cgrates.json b/data/conf/samples/dmtagent/cgrates.json index 5a71f17fb..210a91b55 100644 --- a/data/conf/samples/dmtagent/cgrates.json +++ b/data/conf/samples/dmtagent/cgrates.json @@ -57,24 +57,24 @@ //"dictionaries_dir": "", // path towards directory holding additional dictionaries to load "request_processors": [ { - "id": "*default", // Identifier of this processor + "id": "*default", // formal identifier of this processor "dry_run": false, // do not send the CDRs to CDRS, just parse them - "request_filter": "Subscription-Id>Subscription-Type(0)", // filter requests processed by this processor + "request_filter": "Subscription-Id>Subscription-Id-Type(0)", // filter requests processed by this processor "continue_on_success": false, // continue to the next template if executed "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - {"tag": "tor", "cdr_field_id": "TOR", "type": "cdrfield", "value": "^*voice", "mandatory": true}, - {"tag": "accid", "cdr_field_id": "AccId", "type": "cdrfield", "value": "Session-Id", "mandatory": true}, - {"tag": "reqtype", "cdr_field_id": "ReqType", "type": "cdrfield", "value": "^*users", "mandatory": true}, - {"tag": "direction", "cdr_field_id": "Direction", "type": "cdrfield", "value": "^*out", "mandatory": true}, - {"tag": "tenant", "cdr_field_id": "Tenant", "type": "cdrfield", "value": "^*users", "mandatory": true}, - {"tag": "category", "cdr_field_id": "Category", "type": "cdrfield", "value": "^call_;~Calling-Vlr-Number:s/^$/33000/;~Calling-Vlr-Number:s/^(\\d{5})/${1}/", "mandatory": true}, - {"tag": "account", "cdr_field_id": "Account", "type": "cdrfield", "value": "^*users", "mandatory": true}, - {"tag": "subject", "cdr_field_id": "Subject", "type": "cdrfield", "value": "^*users", "mandatory": true}, - {"tag": "destination", "cdr_field_id": "Destination", "type": "cdrfield", "value": "Real-Called-Number", "mandatory": true}, - {"tag": "setup_time", "cdr_field_id": "SetupTime", "type": "cdrfield", "value": "Event-Time", "mandatory": true}, - {"tag": "answer_time", "cdr_field_id": "AnswerTime", "type": "cdrfield", "value": "Event-Time", "mandatory": true}, - {"tag": "usage", "cdr_field_id": "Usage", "type": "cdrfield", "value": "CC-Time", "mandatory": true}, - {"tag": "subscriber_id", "cdr_field_id": "SubscriberId", "type": "cdrfield", "value": "Subscription-Id>Subscription-Id-Data", "mandatory": true}, + {"tag": "tor", "field_id": "TOR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "accid", "field_id": "AccId", "type": "*composed", "value": "Session-Id", "mandatory": true}, + {"tag": "reqtype", "field_id": "ReqType", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "tenant", "field_id": "Tenant", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "category", "field_id": "Category", "type": "*composed", "value": "^call_;~Service-Information>IN-Information>Calling-Vlr-Number:s/^$/33000/;~Service-Information>IN-Information>Calling-Vlr-Number:s/^(\\d{5})/${1}/", "mandatory": true}, + {"tag": "account", "field_id": "Account", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "subject", "field_id": "Subject", "type": "*composed", "value": "^*users", "mandatory": true}, + {"tag": "destination", "field_id": "Destination", "type": "*composed", "value": "Service-Information>IN-Information>Real-Called-Number", "mandatory": true}, + {"tag": "setup_time", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "answer_time", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true}, + {"tag": "usage", "field_id": "Usage", "type": "*composed", "value": "Requested-Service-Unit>CC-Time", "mandatory": true}, + {"tag": "subscriber_id", "field_id": "SubscriberId", "type": "*composed", "value": "Subscription-Id>Subscription-Id-Data", "mandatory": true}, ], }, ],