From 6ed45a52fe85a4737374edb0aec5f3850afe4cb4 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 8 Feb 2015 20:53:56 +0100 Subject: [PATCH] RSRField with filterValue support, CDRC implementing instances based on processed folder, import filters --- apier/v1/apier_local_test.go | 4 +- cdrc/cdrc.go | 66 ++++++++++++++++++------ cdrc/cdrc_local_test.go | 16 ++++-- cdrc/cdrc_test.go | 16 +++--- cdre/cdrexporter.go | 6 +-- cdre/cdrexporter_test.go | 8 +-- cmd/cgr-engine/cgr-engine.go | 20 ++++--- config/cdrcconfig.go | 30 +++++++---- config/cfgcdrfield.go | 24 ++++----- config/config.go | 26 ++++++---- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/libconfig_json.go | 3 +- general_tests/multiplecdrc_local_test.go | 22 ++++---- utils/consts.go | 2 + utils/rsrfield.go | 26 +++++++--- utils/rsrfield_test.go | 18 +++++++ 17 files changed, 194 insertions(+), 95 deletions(-) diff --git a/apier/v1/apier_local_test.go b/apier/v1/apier_local_test.go index b63798d6c..1dae33ff8 100644 --- a/apier/v1/apier_local_test.go +++ b/apier/v1/apier_local_test.go @@ -79,8 +79,8 @@ func TestCreateDirs(t *testing.T) { if !*testLocal { return } - for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDir, cfg.CdrcProfiles[utils.META_DEFAULT].CdrInDir, cfg.CdrcProfiles[utils.META_DEFAULT].CdrOutDir, - cfg.HistoryDir} { + for _, pathDir := range []string{cfg.CdreProfiles[utils.META_DEFAULT].ExportDir, "/var/log/cgrates/cdrc/in", "/var/log/cgrates/cdrc/out", cfg.HistoryDir} { + if err := os.RemoveAll(pathDir); err != nil { t.Fatal("Error removing folder: ", pathDir, err) } diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index b7fbcfc90..d66bfea3e 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -1,6 +1,6 @@ /* Real-time Charging System for Telecom & ISP environments -Copyright (C) 2012-2014 ITsysCOM GmbH +Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -81,10 +81,22 @@ func populateStoredCdrField(cdr *utils.StoredCdr, fieldId, fieldVal string) erro return nil } -func NewCdrc(cdrcCfg *config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, exitChan chan struct{}) (*Cdrc, error) { +func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, exitChan chan struct{}) (*Cdrc, error) { + var cdrcCfg *config.CdrcConfig + for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one + break + } cdrc := &Cdrc{cdrsAddress: cdrcCfg.CdrsAddress, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, cdrSourceId: cdrcCfg.CdrSourceId, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, duMultiplyFactor: cdrcCfg.DataUsageMultiplyFactor, - cdrFields: cdrcCfg.CdrFields, httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan} + httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan} + cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) + cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs)) + idx := 0 + for _, cfg := range cdrcCfgs { + cdrc.cdrFilters[idx] = cfg.CdrFilter + cdrc.cdrFields[idx] = cfg.CdrFields + idx += 1 + } // Before processing, make sure in and out folders exist for _, dir := range []string{cdrc.cdrInDir, cdrc.cdrOutDir} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { @@ -104,7 +116,8 @@ type Cdrc struct { runDelay time.Duration csvSep rune duMultiplyFactor float64 - cdrFields []*config.CfgCdrField + cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes + cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters httpSkipTlsCheck bool cdrServer *engine.CDRS // Reference towards internal cdrServer if that is the case httpClient *http.Client @@ -202,20 +215,39 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Row %d - csv error: %s", procRowNr, err.Error())) continue // Other csv related errors, ignore } - storedCdr, err := self.recordToStoredCdr(record) - if err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error())) - continue - } - if self.cdrsAddress == utils.INTERNAL { - if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) + recordCdrs := make([]*utils.StoredCdr, 0) // More CDRs based on the number of filters and field templates + for idx, cdrFieldsInst := range self.cdrFields { + // Make sure filters are matching + filterBreak := false + for _, rsrFilter := range self.cdrFilters[idx] { + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { + filterBreak = true + break + } + } + if filterBreak { // Stop importing cdrc fields profile due to non matching filter continue } - } else { // CDRs listening on IP - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) + if storedCdr, err := self.recordToStoredCdr(record, cdrFieldsInst); err != nil { + engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error())) continue + } else { + recordCdrs = append(recordCdrs, storedCdr) + } + } + for _, storedCdr := range recordCdrs { + if self.cdrsAddress == utils.INTERNAL { + if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) + continue + } + } else { // CDRs listening on IP + if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) + continue + } } } } @@ -231,11 +263,11 @@ func (self *Cdrc) processFile(filePath string) error { } // Takes the record out of csv and turns it into http form which can be posted -func (self *Cdrc) recordToStoredCdr(record []string) (*utils.StoredCdr, error) { +func (self *Cdrc) recordToStoredCdr(record []string, cdrFields []*config.CfgCdrField) (*utils.StoredCdr, error) { storedCdr := &utils.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceId, ExtraFields: make(map[string]string), Cost: -1} var err error var lazyHttpFields []*config.CfgCdrField - for _, cdrFldCfg := range self.cdrFields { + for _, cdrFldCfg := range cdrFields { var fieldVal string if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) { if cdrFldCfg.Type == utils.CDRFIELD { diff --git a/cdrc/cdrc_local_test.go b/cdrc/cdrc_local_test.go index e6583710f..867147a9b 100644 --- a/cdrc/cdrc_local_test.go +++ b/cdrc/cdrc_local_test.go @@ -49,6 +49,7 @@ README: var cfgPath string var cfg *config.CGRConfig +var cdrcCfgs map[string]*config.CdrcConfig var cdrcCfg *config.CdrcConfig var testLocal = flag.Bool("local", false, "Perform the tests only on local test environment, not by default.") // This flag will be passed here via "go test -local" args @@ -99,7 +100,7 @@ func TestLoadConfigt(*testing.T) { cfgPath = path.Join(*dataDir, "conf", "samples", "apier") cfg, _ = config.NewCGRConfigFromFolder(cfgPath) if len(cfg.CdrcProfiles) > 0 { - cdrcCfg = cfg.CdrcProfiles[utils.META_DEFAULT] + cdrcCfgs = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"] } } @@ -134,9 +135,12 @@ func TestCreateCdrFiles(t *testing.T) { if !*testLocal { return } - if cdrcCfg == nil { + if cdrcCfgs == nil { t.Fatal("Empty default cdrc configuration") } + for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one + break + } if err := os.RemoveAll(cdrcCfg.CdrInDir); err != nil { t.Fatal("Error removing folder: ", cdrcCfg.CdrInDir, err) } @@ -162,13 +166,17 @@ func TestProcessCdrDir(t *testing.T) { if !*testLocal { return } + var cdrcCfg *config.CdrcConfig + for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one + break + } if cdrcCfg.CdrsAddress == utils.INTERNAL { // For now we only test over network cdrcCfg.CdrsAddress = "127.0.0.1:2013" } if err := startEngine(); err != nil { t.Fatal(err.Error()) } - cdrc, err := NewCdrc(cdrcCfg, true, nil, make(chan struct{})) + cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{})) if err != nil { t.Fatal(err.Error()) } @@ -204,7 +212,7 @@ func TestProcessCdr3Dir(t *testing.T) { if err := startEngine(); err != nil { t.Fatal(err.Error()) } - cdrc, err := NewCdrc(cdrcCfg, true, nil, make(chan struct{})) + cdrc, err := NewCdrc(cdrcCfgs, true, nil, make(chan struct{})) if err != nil { t.Fatal(err.Error()) } diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index c9db55b69..ca7972726 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -29,17 +29,17 @@ import ( func TestRecordForkCdr(t *testing.T) { cgrConfig, _ := config.NewDefaultCGRConfig() - cdrcConfig := cgrConfig.CdrcProfiles[utils.META_DEFAULT] + cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}}) - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: cdrcConfig.CdrFields} + cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} cdrRow := []string{"firstField", "secondField"} - _, err := cdrc.recordToStoredCdr(cdrRow) + _, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) if err == nil { t.Error("Failed to corectly detect missing fields from record") } cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow) + rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -70,9 +70,9 @@ func TestRecordForkCdr(t *testing.T) { func TestDataMultiplyFactor(t *testing.T) { cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}}, &config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}} - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: cdrFields} + cdrc := &Cdrc{CdrFormat: CSV, cdrSourceId: "TEST_CDRC", cdrFields: [][]*config.CfgCdrField{cdrFields}} cdrRow := []string{"*data", "1"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow) + rtCdr, err := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]) if err != nil { t.Error("Failed to parse CDR in rated cdr", err) } @@ -99,7 +99,7 @@ func TestDataMultiplyFactor(t *testing.T) { ExtraFields: map[string]string{}, Cost: -1, } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow); !reflect.DeepEqual(expectedCdr, rtCdr) { + if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } cdrRow = []string{"*voice", "1"} @@ -112,7 +112,7 @@ func TestDataMultiplyFactor(t *testing.T) { ExtraFields: map[string]string{}, Cost: -1, } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow); !reflect.DeepEqual(expectedCdr, rtCdr) { + if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, cdrc.cdrFields[0]); !reflect.DeepEqual(expectedCdr, rtCdr) { t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) } } diff --git a/cdre/cdrexporter.go b/cdre/cdrexporter.go index 82ea89608..a6c820d33 100644 --- a/cdre/cdrexporter.go +++ b/cdre/cdrexporter.go @@ -121,7 +121,7 @@ func (cdre *CdrExporter) getCdrCostDetails(cgrId, runId string) (string, error) func (cdre *CdrExporter) getCombimedCdrFieldVal(processedCdr *utils.StoredCdr, cfgCdrFld *config.CfgCdrField) (string, error) { var combinedVal string // Will result as combination of the field values, filters must match - for _, filterRule := range cfgCdrFld.Filter { + for _, filterRule := range cfgCdrFld.FieldFilter { fltrPass, ftrPassValue := processedCdr.PassesFieldFilter(filterRule) if !fltrPass { return "", nil @@ -152,7 +152,7 @@ func (cdre *CdrExporter) getDateTimeFieldVal(cdr *utils.StoredCdr, cfgCdrFld *co if len(cfgCdrFld.Value) == 0 { return "", nil } - for _, fltrRl := range cfgCdrFld.Filter { + for _, fltrRl := range cfgCdrFld.FieldFilter { if fltrPass, _ := cdr.PassesFieldFilter(fltrRl); !fltrPass { return "", fmt.Errorf("Field: %s not matching filter rule %v", fltrRl.Id, fltrRl) } @@ -170,7 +170,7 @@ func (cdre *CdrExporter) getDateTimeFieldVal(cdr *utils.StoredCdr, cfgCdrFld *co // Extracts the value specified by cfgHdr out of cdr func (cdre *CdrExporter) cdrFieldValue(cdr *utils.StoredCdr, cfgCdrFld *config.CfgCdrField) (string, error) { - for _, fltrRl := range cfgCdrFld.Filter { + for _, fltrRl := range cfgCdrFld.FieldFilter { if fltrPass, _ := cdr.PassesFieldFilter(fltrRl); !fltrPass { return "", fmt.Errorf("Field: %s not matching filter rule %v", fltrRl.Id, fltrRl) } diff --git a/cdre/cdrexporter_test.go b/cdre/cdrexporter_test.go index 99f2300d7..41e02ff99 100644 --- a/cdre/cdrexporter_test.go +++ b/cdre/cdrexporter_test.go @@ -58,7 +58,7 @@ func TestCdreGetCombimedCdrFieldVal(t *testing.T) { } fltrRule, _ := utils.ParseRSRFields("~mediation_runid:s/default/RUN_RTL/", utils.INFIELD_SEP) val, _ := utils.ParseRSRFields("cost", utils.INFIELD_SEP) - cfgCdrFld := &config.CfgCdrField{Tag: "cost", Type: "cdrfield", CdrFieldId: "cost", Value: val, Filter: fltrRule} + cfgCdrFld := &config.CfgCdrField{Tag: "cost", Type: "cdrfield", CdrFieldId: "cost", Value: val, FieldFilter: fltrRule} if costVal, err := cdre.getCombimedCdrFieldVal(cdrs[3], cfgCdrFld); err != nil { t.Error(err) } else if costVal != "1.01" { @@ -66,7 +66,7 @@ func TestCdreGetCombimedCdrFieldVal(t *testing.T) { } fltrRule, _ = utils.ParseRSRFields("~mediation_runid:s/default/RETAIL1/", utils.INFIELD_SEP) val, _ = utils.ParseRSRFields("account", utils.INFIELD_SEP) - cfgCdrFld = &config.CfgCdrField{Tag: "account", Type: "cdrfield", CdrFieldId: "account", Value: val, Filter: fltrRule} + cfgCdrFld = &config.CfgCdrField{Tag: "account", Type: "cdrfield", CdrFieldId: "account", Value: val, FieldFilter: fltrRule} if acntVal, err := cdre.getCombimedCdrFieldVal(cdrs[3], cfgCdrFld); err != nil { t.Error(err) } else if acntVal != "1000" { @@ -91,7 +91,7 @@ func TestGetDateTimeFieldVal(t *testing.T) { } // Test filter fltr, _ := utils.ParseRSRFields("~tenant:s/(.+)/itsyscom.com/", utils.INFIELD_SEP) - cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: "cdrfield", CdrFieldId: "stop_time", Value: val, Filter: fltr, Layout: layout} + cfgCdrFld = &config.CfgCdrField{Tag: "stop_time", Type: "cdrfield", CdrFieldId: "stop_time", Value: val, FieldFilter: fltr, Layout: layout} if _, err := cdreTst.getDateTimeFieldVal(cdrTst, cfgCdrFld); err == nil { t.Error(err) } @@ -117,7 +117,7 @@ func TestCdreCdrFieldValue(t *testing.T) { t.Errorf("Expecting: %s, received: %s", cdr.Destination, val) } fltr, _ := utils.ParseRSRFields("~tenant:s/(.+)/itsyscom.com/", utils.INFIELD_SEP) - cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: "cdrfield", CdrFieldId: "destination", Value: val, Filter: fltr} + cfgCdrFld = &config.CfgCdrField{Tag: "destination", Type: "cdrfield", CdrFieldId: "destination", Value: val, FieldFilter: fltr} if _, err := cdre.cdrFieldValue(cdr, cfgCdrFld); err == nil { t.Error("Failed to use filter") } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 245d76f89..d1bf48d20 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -125,11 +125,15 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD } // Fires up a cdrc instance -func startCdrc(cdrsChan chan struct{}, cdrcCfg *config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, closeChan chan struct{}) { +func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CDRS, closeChan chan struct{}) { + var cdrcCfg *config.CdrcConfig + for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one + break + } if cdrcCfg.CdrsAddress == utils.INTERNAL { <-cdrsChan // Wait for CDRServer to come up before start processing } - cdrc, err := cdrc.NewCdrc(cdrcCfg, httpSkipTlsCheck, cdrServer, closeChan) + cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrServer, closeChan) if err != nil { engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true @@ -502,15 +506,19 @@ func main() { go shutdownSessionmanagerSingnalHandler() } var cdrcEnabled bool - for _, cdrcConfig := range cfg.CdrcProfiles { - if cdrcConfig.Enabled == false { + for _, cdrcCfgs := range cfg.CdrcProfiles { + var cdrcCfg *config.CdrcConfig + for _, cdrcCfg = range cdrcCfgs { // Take a random config out since they should be the same + break + } + if cdrcCfg.Enabled == false { continue // Ignore not enabled } else if !cdrcEnabled { cdrcEnabled = true // Mark that at least one cdrc service is active } - go startCdrc(cdrsChan, cdrcConfig, cfg.HttpSkipTlsVerify, cdrServer, cfg.ConfigReloads[utils.CDRC]) + go startCdrc(cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cdrServer, cfg.ConfigReloads[utils.CDRC]) } - if cdrcEnabled { + if len(cfg.CdrcProfiles) != 0 { engine.Logger.Info("Starting CGRateS CDR client.") } diff --git a/config/cdrcconfig.go b/config/cdrcconfig.go index f6dd6d1c7..54a885208 100644 --- a/config/cdrcconfig.go +++ b/config/cdrcconfig.go @@ -19,20 +19,22 @@ along with this program. If not, see package config import ( + "github.com/cgrates/cgrates/utils" "time" ) type CdrcConfig struct { - Enabled bool // Enable/Disable the profile - CdrsAddress string // The address where CDRs can be reached - CdrFormat string // The type of CDR file to process - FieldSeparator rune // The separator to use when reading csvs - DataUsageMultiplyFactor float64 // Conversion factor for data usage - RunDelay time.Duration // Delay between runs, 0 for inotify driven requests - CdrInDir string // Folder to process CDRs from - CdrOutDir string // Folder to move processed CDRs to - CdrSourceId string // Source identifier for the processed CDRs - CdrFields []*CfgCdrField // List of fields to be processed + Enabled bool // Enable/Disable the profile + CdrsAddress string // The address where CDRs can be reached + CdrFormat string // The type of CDR file to process + FieldSeparator rune // The separator to use when reading csvs + DataUsageMultiplyFactor float64 // Conversion factor for data usage + RunDelay time.Duration // Delay between runs, 0 for inotify driven requests + CdrInDir string // Folder to process CDRs from + CdrOutDir string // Folder to move processed CDRs to + CdrSourceId string // Source identifier for the processed CDRs + CdrFilter utils.RSRFields // Filter CDR records to import + CdrFields []*CfgCdrField // List of fields to be processed } func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { @@ -68,6 +70,14 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { if jsnCfg.Cdr_out_dir != nil { self.CdrOutDir = *jsnCfg.Cdr_out_dir } + if jsnCfg.Cdr_source_id != nil { + self.CdrSourceId = *jsnCfg.Cdr_source_id + } + if jsnCfg.Cdr_filter != nil { + if self.CdrFilter, err = utils.ParseRSRFields(*jsnCfg.Cdr_filter, utils.INFIELD_SEP); err != nil { + return err + } + } if jsnCfg.Cdr_fields != nil { if self.CdrFields, err = CfgCdrFieldsFromCdrFieldsJsonCfg(*jsnCfg.Cdr_fields); err != nil { return err diff --git a/config/cfgcdrfield.go b/config/cfgcdrfield.go index 6ca1e2c53..ce1168d77 100644 --- a/config/cfgcdrfield.go +++ b/config/cfgcdrfield.go @@ -39,8 +39,8 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField return nil, err } } - if jsnCfgFld.Filter != nil { - if cfgFld.Filter, err = utils.ParseRSRFields(*jsnCfgFld.Filter, utils.INFIELD_SEP); err != nil { + if jsnCfgFld.Field_filter != nil { + if cfgFld.FieldFilter, err = utils.ParseRSRFields(*jsnCfgFld.Field_filter, utils.INFIELD_SEP); err != nil { return nil, err } } @@ -63,16 +63,16 @@ func NewCfgCdrFieldFromCdrFieldJsonCfg(jsnCfgFld *CdrFieldJsonCfg) (*CfgCdrField } type CfgCdrField struct { - Tag string // Identifier for the administrator - Type string // Type of field - CdrFieldId string // StoredCdr field name - Value utils.RSRFields - Filter utils.RSRFields - Width int - Strip string - Padding string - Layout string - Mandatory bool + Tag string // Identifier for the administrator + Type string // Type of field + CdrFieldId string // StoredCdr field name + Value utils.RSRFields + FieldFilter utils.RSRFields + Width int + Strip string + Padding string + Layout string + Mandatory bool } func CfgCdrFieldsFromCdrFieldsJsonCfg(jsnCfgFldss []*CdrFieldJsonCfg) ([]*CfgCdrField, error) { diff --git a/config/config.go b/config/config.go index 1f9f133fb..e499c3246 100644 --- a/config/config.go +++ b/config/config.go @@ -74,7 +74,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { return nil, err } cfg.dfltCdreProfile = cfg.CdreProfiles[utils.META_DEFAULT].Clone() // So default will stay unique, will have nil pointer in case of no defaults loaded which is an extra check - cfg.dfltCdrcProfile = cfg.CdrcProfiles[utils.META_DEFAULT].Clone() + cfg.dfltCdrcProfile = cfg.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT].Clone() dfltFsConnConfig = cfg.SmFsConfig.Connections[0] // We leave it crashing here on purpose if no Connection defaults defined dfltKamConnConfig = cfg.SmKamConfig.Connections[0] dfltOsipsConnConfig = cfg.SmOsipsConfig.Connections[0] @@ -193,10 +193,10 @@ type CGRConfig struct { CDRStatsEnabled bool // Enable CDR Stats service CDRStatConfig *CdrStatsConfig // Active cdr stats configuration instances, platform level CdreProfiles map[string]*CdreConfig - CdrcProfiles map[string]*CdrcConfig // Number of CDRC instances running imports - SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration - SmKamConfig *SmKamConfig // SM-Kamailio Configuration - SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration + CdrcProfiles map[string]map[string]*CdrcConfig // Number of CDRC instances running imports, format map[dirPath]map[instanceName]{Configs} + SmFsConfig *SmFsConfig // SM-FreeSWITCH configuration + SmKamConfig *SmKamConfig // SM-Kamailio Configuration + SmOsipsConfig *SmOsipsConfig // SM-OpenSIPS Configuration SMEnabled bool SMSwitchType string SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer @@ -243,7 +243,7 @@ type CGRConfig struct { func (self *CGRConfig) checkConfigSanity() error { // CDRC sanity checks - for _, cdrcInst := range self.CdrcProfiles { + for _, cdrcInst := range self.CdrcProfiles["/var/log/cgrates/cdrc/in"] { if cdrcInst.Enabled == true { if len(cdrcInst.CdrFields) == 0 { return errors.New("CdrC enabled but no fields to be processed defined!") @@ -530,16 +530,20 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } if jsnCdrcCfg != nil { if self.CdrcProfiles == nil { - self.CdrcProfiles = make(map[string]*CdrcConfig) + self.CdrcProfiles = make(map[string]map[string]*CdrcConfig) } for profileName, jsnCrc1Cfg := range jsnCdrcCfg { + if _, hasDir := self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir]; !hasDir { + self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir] = make(map[string]*CdrcConfig) + } if _, hasProfile := self.CdrcProfiles[profileName]; !hasProfile { - self.CdrcProfiles[profileName] = new(CdrcConfig) - if profileName != utils.META_DEFAULT { - self.CdrcProfiles[profileName] = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers + if profileName == utils.META_DEFAULT { + self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = new(CdrcConfig) + } else { + self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName] = self.dfltCdrcProfile.Clone() // Clone default so we do not inherit pointers } } - if err = self.CdrcProfiles[profileName].loadFromJsonCfg(jsnCrc1Cfg); err != nil { + if err = self.CdrcProfiles[*jsnCrc1Cfg.Cdr_in_dir][profileName].loadFromJsonCfg(jsnCrc1Cfg); err != nil { return err } } diff --git a/config/config_defaults.go b/config/config_defaults.go index 5404c825e..7785eb1e9 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -182,6 +182,7 @@ const CGRATES_CFG_JSON = ` "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database + "cdr_filter": "", // Filter CDR records to import "cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true}, {"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 1aa1e3f1e..f94c2f006 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -323,6 +323,7 @@ func TestDfCdrcJsonCfg(t *testing.T) { Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"), Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"), Cdr_source_id: utils.StringPointer("freeswitch_csv"), + Cdr_filter: utils.StringPointer(""), Cdr_fields: &cdrFields, }, } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index ff41b3ad4..e094d488a 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -117,7 +117,7 @@ type CdrFieldJsonCfg struct { Strip *string Padding *string Layout *string - Filter *string + Field_filter *string Mandatory *bool } @@ -149,6 +149,7 @@ type CdrcJsonCfg struct { Cdr_in_dir *string Cdr_out_dir *string Cdr_source_id *string + Cdr_filter *string Cdr_fields *[]*CdrFieldJsonCfg } diff --git a/general_tests/multiplecdrc_local_test.go b/general_tests/multiplecdrc_local_test.go index 1d6c47aae..a6a6dc90d 100644 --- a/general_tests/multiplecdrc_local_test.go +++ b/general_tests/multiplecdrc_local_test.go @@ -103,13 +103,15 @@ func TestCreateCdrDirs(t *testing.T) { if !*testLocal { return } - for _, cdrcInst := range cfg.CdrcProfiles { - for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { - if err := os.RemoveAll(dir); err != nil { - t.Fatal("Error removing folder: ", dir, err) - } - if err := os.MkdirAll(dir, 0755); err != nil { - t.Fatal("Error creating folder: ", dir, err) + for _, cdrcProfiles := range cfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } } } } @@ -157,7 +159,7 @@ dbafe9c8614c785a65aabd116dd3959c3c56f7f7,default,*voice,dsafdsag,rated,*out,cgra if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1), 0644); err != nil { t.Fatal(err.Error) } - if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV1"].CdrInDir, fileName)); err != nil { + if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc1/in", fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } } @@ -175,7 +177,7 @@ func TestHandleCdr2File(t *testing.T) { if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil { t.Fatal(err.Error) } - if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV2"].CdrInDir, fileName)); err != nil { + if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc2/in", fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } } @@ -192,7 +194,7 @@ func TestHandleCdr3File(t *testing.T) { if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent), 0644); err != nil { t.Fatal(err.Error) } - if err := os.Rename(tmpFilePath, path.Join(cfg.CdrcProfiles["CDRC-CSV3"].CdrInDir, fileName)); err != nil { + if err := os.Rename(tmpFilePath, path.Join("/tmp/cgrates/cdrc3/in", fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } } diff --git a/utils/consts.go b/utils/consts.go index 01a69bf78..bba2c3b1f 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -78,6 +78,8 @@ const ( FIELDS_SEP = "," STATIC_HDRVAL_SEP = "::" REGEXP_PREFIX = "~" + FILTER_VAL_START = "(" + FILTER_VAL_END = ")" JSON = "json" GOB = "gob" MSGPACK = "msgpack" diff --git a/utils/rsrfield.go b/utils/rsrfield.go index e89236f9e..c91f6ae1d 100644 --- a/utils/rsrfield.go +++ b/utils/rsrfield.go @@ -21,17 +21,24 @@ package utils import ( "fmt" "regexp" - "strconv" "strings" ) func NewRSRField(fldStr string) (*RSRField, error) { - if fldStrUnquoted, err := strconv.Unquote(fldStr); err == nil { - fldStr = fldStrUnquoted - } if len(fldStr) == 0 { return nil, nil } + var filterVal string + if strings.HasSuffix(fldStr, FILTER_VAL_END) { // Has filter, populate the var + fltrStart := strings.LastIndex(fldStr, FILTER_VAL_START) + if fltrStart < 1 { + return nil, fmt.Errorf("Invalid FilterStartValue in string: %s", fldStr) + } + filterVal = fldStr[fltrStart+1 : len(fldStr)-1] + fldStr = fldStr[:fltrStart] // Take the filter part out before compiling further + + } + if strings.HasPrefix(fldStr, STATIC_VALUE_PREFIX) { // Special case when RSR is defined as static header/value var staticHdr, staticVal string if splt := strings.Split(fldStr, STATIC_HDRVAL_SEP); len(splt) == 2 { // Using / as separator since ':' is often use in date/time fields @@ -44,17 +51,17 @@ func NewRSRField(fldStr string) (*RSRField, error) { } else { staticHdr, staticVal = splt[0][1:], splt[0][1:] // If no split, header will remain as original, value as header without the prefix } - return &RSRField{Id: staticHdr, staticValue: staticVal}, nil + return &RSRField{Id: staticHdr, staticValue: staticVal, filterValue: filterVal}, nil } if !strings.HasPrefix(fldStr, REGEXP_PREFIX) { - return &RSRField{Id: fldStr}, nil + return &RSRField{Id: fldStr, filterValue: filterVal}, nil } spltRgxp := regexp.MustCompile(`:s\/`) spltRules := spltRgxp.Split(fldStr, -1) if len(spltRules) < 2 { return nil, fmt.Errorf("Invalid Split of Search&Replace field rule. %s", fldStr) } - rsrField := &RSRField{Id: spltRules[0][1:]} // Original id in form ~hdr_name + rsrField := &RSRField{Id: spltRules[0][1:], filterValue: filterVal} // Original id in form ~hdr_name rulesRgxp := regexp.MustCompile(`(?:(.+[^\\])\/(.*[^\\])*\/){1,}`) for _, ruleStr := range spltRules[1:] { // :s/ already removed through split allMatches := rulesRgxp.FindStringSubmatch(ruleStr) @@ -74,6 +81,7 @@ type RSRField struct { Id string // Identifier RSRules []*ReSearchReplace // Rules to use when processing field value staticValue string // If defined, enforces parsing always to this value + filterValue string // The value to compare when used as filter } // Parse the field value from a string @@ -105,6 +113,10 @@ func (rsrf *RSRField) RegexpMatched() bool { // Investigate whether we had a reg return false } +func (rsrf *RSRField) FilterPasses(value string) bool { + return len(rsrf.filterValue) == 0 || rsrf.ParseValue(value) == rsrf.filterValue +} + // Parses list of RSRFields, used for example as multiple filters in derived charging func ParseRSRFields(fldsStr, sep string) (RSRFields, error) { //rsrRlsPattern := regexp.MustCompile(`^(~\w+:s/.+/.*/)|(\^.+(/.+/)?)(;(~\w+:s/.+/.*/)|(\^.+(/.+/)?))*$`) //ToDo:Fix here rule able to confirm the content diff --git a/utils/rsrfield_test.go b/utils/rsrfield_test.go index 78c907608..3fbdabe95 100644 --- a/utils/rsrfield_test.go +++ b/utils/rsrfield_test.go @@ -33,6 +33,14 @@ func TestNewRSRField1(t *testing.T) { } else if !reflect.DeepEqual(expRSRField1, rsrField) { t.Errorf("Expecting: %v, received: %v", expRSRField1, rsrField) } + // With filter + expRSRField2 := &RSRField{Id: "sip_redirected_to", filterValue: "086517174963", + RSRules: []*ReSearchReplace{&ReSearchReplace{SearchRegexp: regexp.MustCompile(`sip:\+49(\d+)@`), ReplaceTemplate: "0$1"}}} + if rsrField, err := NewRSRField(`~sip_redirected_to:s/sip:\+49(\d+)@/0$1/(086517174963)`); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if !reflect.DeepEqual(expRSRField2, rsrField) { + t.Errorf("Expecting: %v, received: %v", expRSRField2, rsrField) + } // Separator escaped if rsrField, err := NewRSRField(`~sip_redirected_to:s\/sip:\+49(\d+)@/0$1/`); err == nil { t.Errorf("Parse error, field rule does not contain correct number of separators, received: %v", rsrField) @@ -178,3 +186,13 @@ func TestParseCdrcDn1(t *testing.T) { t.Errorf("Expecting: +4914001888, received: %s", parsed2) } } + +func TestFilterPasses(t *testing.T) { + rl, err := NewRSRField(`~1:s/^00(\d+)(?:[a-zA-Z].{3})*0*([1-9]\d+)$/+$1$2/:s/^\+49(18\d{2})$/+491400$1/(+49630415354)`) + if err != nil { + t.Error("Unexpected error: ", err) + } + if rl.FilterPasses("0031ABOC0630415354") { + t.Error("Passing filter") + } +}