From 2f52829b7e4f069a98898dc502d8f56889bdd16b Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 7 Aug 2018 10:24:35 -0400 Subject: [PATCH] Add for cdrc csv new filters --- cdrc/cdrc.go | 16 +- cdrc/csv.go | 89 +++++++-- cdrc/csv_it_test.go | 177 ++++++++++++++++++ cdrc/fwv.go | 6 +- cdrc/xml.go | 5 +- cdrc/xml_test.go | 3 +- cmd/cgr-engine/cgr-engine.go | 19 +- config/cdrcconfig.go | 15 +- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 1 + config/configcdrc_test.go | 4 + config/libconfig_json.go | 1 + .../samples/cdrccsvwithfilter/cgrates.json | 81 ++++++++ dispatcher/suppliers_it_test.go | 4 +- 15 files changed, 391 insertions(+), 32 deletions(-) create mode 100755 data/conf/samples/cdrccsvwithfilter/cgrates.json diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index ddbd5485f..ee92ae270 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -55,7 +55,8 @@ Common parameters within configs processed: Parameters specific per config instance: * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields */ -func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int) (*Cdrc, error) { +func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, + closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) { var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break @@ -80,6 +81,7 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien return nil, fmt.Errorf("Nonexistent folder: %s", dir) } } + cdrc.filterS = filterS cdrc.httpClient = new(http.Client) return cdrc, nil } @@ -95,6 +97,7 @@ type Cdrc struct { maxOpenFiles chan struct{} // Maximum number of simultaneous files processed unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process partialRecordsCache *PartialRecordsCache + filterS *engine.FilterS } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -181,12 +184,15 @@ func (self *Cdrc) processFile(filePath string) error { case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE, utils.PartialCSV: csvReader := csv.NewReader(bufio.NewReader(file)) csvReader.Comma = self.dfltCdrcCfg.FieldSeparator - recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs, - self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, self.dfltCdrcCfg.CacheDumpFields) + recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, + self.cdrcCfgs, self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, + self.dfltCdrcCfg.CacheDumpFields, self.filterS) case utils.FWV: - recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpClient, self.httpSkipTlsCheck, self.timezone) + recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, + self.httpClient, self.httpSkipTlsCheck, self.timezone, self.filterS) case utils.XML: - if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRPath, self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs); err != nil { + if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRPath, + self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs, self.filterS); err != nil { return err } default: diff --git a/cdrc/csv.go b/cdrc/csv.go index e11240aed..1d8832e65 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -33,10 +33,11 @@ import ( func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string, dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, - httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, cacheDumpFields []*config.CfgCdrField) *CsvRecordsProcessor { + httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, + cacheDumpFields []*config.CfgCdrField, filterS *engine.FilterS) *CsvRecordsProcessor { return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName, dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache, - partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields} + partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields, filterS: filterS} } @@ -51,6 +52,7 @@ type CsvRecordsProcessor struct { unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder partialRecordsCache *PartialRecordsCache // Cache records which are of type "Partial" partialCacheDumpFields []*config.CfgCdrField + filterS *engine.FilterS } func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 { @@ -100,24 +102,33 @@ func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]stri // Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) { + utils.Logger.Debug(fmt.Sprintf("Record from CSV : %+v \n", record)) recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records // Make sure filters are matching passes := true - for _, rsrFilter := range cdrcCfg.CdrFilter { - if rsrFilter == nil { // Nil filter does not need to match anything + if len(cdrcCfg.Filters) == 0 { + for _, rsrFilter := range cdrcCfg.CdrFilter { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + } else if _, err := rsrFilter.Parse(record[cfgFieldIdx]); err != nil { + passes = false + break + } + } + if !passes { // Stop importing cdrc fields profile due to non matching filter continue } - if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { - return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) - } else if _, err := rsrFilter.Parse(record[cfgFieldIdx]); err != nil { - passes = false - break + } else { + csvprovider, _ := newCsvProvider(record) + if pass, err := self.filterS.Pass("cgrates.org", + cdrcCfg.Filters, csvprovider); err != nil || !pass { + continue // Not passes filters, ignore this CDR } } - if !passes { // Stop importing cdrc fields profile due to non matching filter - continue - } storedCdr, err := self.recordToStoredCdr(record, cdrcCfg) if err != nil { return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) @@ -237,3 +248,57 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con } return storedCdr, nil } + +// newRADataProvider constructs a DataProvider +func newCsvProvider(record []string) (dP engine.DataProvider, err error) { + dP = &csvProvider{req: record, cache: engine.NewNavigableMap(nil)} + return +} + +// csvProvider implements engine.DataProvider so we can pass it to filters +type csvProvider struct { + req []string + cache *engine.NavigableMap +} + +// String is part of engine.DataProvider interface +// when called, it will display the already parsed values out of cache +func (cP *csvProvider) String() string { + return utils.ToJSON(cP) +} + +// FieldAsInterface is part of engine.DataProvider interface +func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) { + if len(fldPath) != 1 { + return nil, utils.ErrNotFound + } + if data, err = cP.cache.FieldAsInterface(fldPath); err == nil || + err != utils.ErrNotFound { // item found in cache + return + } + err = nil // cancel previous err + if cfgFieldIdx, _ := strconv.Atoi(fldPath[0]); len(cP.req) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v", cP.req) + } else { + data = cP.req[cfgFieldIdx] + } + cP.cache.Set(fldPath, data, false) + return +} + +// FieldAsString is part of engine.DataProvider interface +func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) { + var valIface interface{} + valIface, err = cP.FieldAsInterface(fldPath) + if err != nil { + return + } + data, _ = utils.CastFieldIfToString(valIface) + return +} + +// AsNavigableMap is part of engine.DataProvider interface +func (cP *csvProvider) AsNavigableMap([]*config.CfgCdrField) ( + nm *engine.NavigableMap, err error) { + return nil, utils.ErrNotImplemented +} diff --git a/cdrc/csv_it_test.go b/cdrc/csv_it_test.go index 2f94ccb52..b80f8c291 100644 --- a/cdrc/csv_it_test.go +++ b/cdrc/csv_it_test.go @@ -165,3 +165,180 @@ func TestCsvITKillEngine(t *testing.T) { t.Error(err) } } + +// Begin tests for cdrc csv with new filters +var fileContent1_2 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 +accid22;*postpaid;itsyscom.com;1002;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1 +accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1` + +func TestCsvIT2InitConfig(t *testing.T) { + var err error + csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsvwithfilter") + if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestCsvIT2InitCdrDb(t *testing.T) { + if err := engine.InitStorDb(csvCfg); err != nil { + t.Fatal(err) + } +} + +func TestCsvIT2CreateCdrDirs(t *testing.T) { + for _, cdrcProfiles := range csvCfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + } + } +} + +func TestCsvIT2StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestCsvIT2RpcConn(t *testing.T) { + var err error + cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// Scenario out of first .xml config +func TestCsvIT2HandleCdr2File(t *testing.T) { + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_2), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctestswithfilters/csvit1/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestCsvIT2ProcessedFiles(t *testing.T) { + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) + if outContent2, err := ioutil.ReadFile("/tmp/cdrctestswithfilters/csvit1/out/file1.csv"); err != nil { + t.Error(err) + } else if fileContent1_2 != string(outContent2) { + t.Errorf("Expecting: %q, received: %q", fileContent1_2, string(outContent2)) + } +} + +func TestCsvIT2AnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 2 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{RequestTypes: []string{utils.META_PREPAID}}, &reply); err != nil { + t.Error("Unexpected error: ", err) // Original 08651 was converted + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func TestCsvIT2KillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} + +// Begin tests for cdrc csv with new filters +var fileContent1_3 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1 +accid22;*prepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1 +accid23;*prepaid;cgrates.org;1002;086517174963;2013-02-03 19:54:00;76;val_extra3;"";val_extra1` + +func TestCsvIT3InitConfig(t *testing.T) { + var err error + csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsvwithfilter") + if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestCsvIT3InitCdrDb(t *testing.T) { + if err := engine.InitStorDb(csvCfg); err != nil { + t.Fatal(err) + } +} + +func TestCsvIT3CreateCdrDirs(t *testing.T) { + for _, cdrcProfiles := range csvCfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + } + } +} + +func TestCsvIT3StartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestCsvIT3RpcConn(t *testing.T) { + var err error + cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// Scenario out of first .xml config +func TestCsvIT3HandleCdr2File(t *testing.T) { + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_3), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctestswithfilters/csvit2/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestCsvIT3ProcessedFiles(t *testing.T) { + time.Sleep(time.Duration(2**waitRater) * time.Millisecond) + if outContent2, err := ioutil.ReadFile("/tmp/cdrctestswithfilters/csvit2/out/file1.csv"); err != nil { + t.Error(err) + } else if fileContent1_3 != string(outContent2) { + t.Errorf("Expecting: %q, received: %q", fileContent1_3, string(outContent2)) + } +} + +func TestCsvIT3AnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func TestCsvIT3KillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/cdrc/fwv.go b/cdrc/fwv.go index 0e38263ee..35083517e 100644 --- a/cdrc/fwv.go +++ b/cdrc/fwv.go @@ -49,8 +49,9 @@ func fwvValue(cdrLine string, indexStart, width int, padding string) string { return rawVal } -func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, httpClient *http.Client, httpSkipTlsCheck bool, timezone string) *FwvRecordsProcessor { - return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone} +func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, httpClient *http.Client, + httpSkipTlsCheck bool, timezone string, filterS *engine.FilterS) *FwvRecordsProcessor { + return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone, filterS: filterS} } type FwvRecordsProcessor struct { @@ -65,6 +66,7 @@ type FwvRecordsProcessor struct { processedRecordsNr int64 // Number of content records in file trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs headerCdr *engine.CDR // Cache here the general purpose stored CDR + filterS *engine.FilterS } // Sets the line length based on first line, sets offset back to initial after reading diff --git a/cdrc/xml.go b/cdrc/xml.go index ac7a5088b..47f3c16ce 100644 --- a/cdrc/xml.go +++ b/cdrc/xml.go @@ -92,7 +92,7 @@ func handlerSubstractUsage(xmlElmnt tree.Res, argsTpl utils.RSRFields, cdrPath u } func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, timezone string, - httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) { + httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig, filterS *engine.FilterS) (*XMLRecordsProcessor, error) { xp, err := goxpath.Parse(cdrPath.AsString("/", true)) if err != nil { return nil, err @@ -105,7 +105,7 @@ func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath return nil, err } xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, timezone: timezone, - httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs} + httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, filterS: filterS} xmlProc.cdrXmlElmts = goxpath.MustExec(xp, xmlNode, nil) return xmlProc, nil } @@ -117,6 +117,7 @@ type XMLRecordsProcessor struct { timezone string httpSkipTlsCheck bool cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring + filterS *engine.FilterS } func (xmlProc *XMLRecordsProcessor) ProcessedRecordsNr() int64 { diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index 1e99f8996..93510fa2d 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -242,7 +242,8 @@ func TestXMLRPProcess(t *testing.T) { }, }, } - xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs) + xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), + utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs, nil) if err != nil { t.Error(err) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b841c3a7f..82790fd58 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -72,7 +72,11 @@ var ( cfg *config.CGRConfig ) -func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, exitChan chan bool) { +func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, + exitChan chan bool, filterSChan chan *engine.FilterS) { + // Not sure here if FilterS is passed correct at line 103 in fo start cdrc + filterS := <-filterSChan + filterSChan <- filterS cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case) var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork for { @@ -94,9 +98,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn enabledCfgs = append(enabledCfgs, cdrcCfg) } } - if len(enabledCfgs) != 0 { - go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, cfg.HttpSkipTlsVerify, cdrcChildrenChan, exitChan) + go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, cfg.HttpSkipTlsVerify, + cdrcChildrenChan, exitChan, filterSChan) } else { utils.Logger.Info(" No enabled CDRC clients") } @@ -107,7 +111,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn // Fires up a cdrc instance func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, - closeChan chan struct{}, exitChan chan bool) { + closeChan chan struct{}, exitChan chan bool, filterSChan chan *engine.FilterS) { + filterS := <-filterSChan + filterSChan <- filterS var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break @@ -120,7 +126,8 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne exitChan <- true return } - cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals) + cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, + closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals, filterS) if err != nil { utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true @@ -1244,7 +1251,7 @@ func main() { } // Start CDRC components if necessary - go startCdrcs(internalCdrSChan, internalRaterChan, exitChan) + go startCdrcs(internalCdrSChan, internalRaterChan, exitChan, filterSChan) // Start SM-Generic if cfg.SessionSCfg().Enabled { diff --git a/config/cdrcconfig.go b/config/cdrcconfig.go index f9d8326fb..86b19aa14 100644 --- a/config/cdrcconfig.go +++ b/config/cdrcconfig.go @@ -41,8 +41,9 @@ type CdrcConfig struct { CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements CdrSourceId string // Source identifier for the processed CDRs CdrFilter utils.RSRFields // Filter CDR records to import - ContinueOnSuccess bool // Continue after execution - PartialRecordCache time.Duration // Duration to cache partial records when not pairing + Filters []string + ContinueOnSuccess bool // Continue after execution + PartialRecordCache time.Duration // Duration to cache partial records when not pairing PartialCacheExpiryAction string HeaderFields []*CfgCdrField ContentFields []*CfgCdrField @@ -110,6 +111,12 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { return err } } + if jsnCfg.Filters != nil { + self.Filters = make([]string, len(*jsnCfg.Filters)) + for i, fltr := range *jsnCfg.Filters { + self.Filters[i] = fltr + } + } if jsnCfg.Continue_on_success != nil { self.ContinueOnSuccess = *jsnCfg.Continue_on_success } @@ -166,6 +173,10 @@ func (self *CdrcConfig) Clone() *CdrcConfig { for i, path := range self.CDRPath { clnCdrc.CDRPath[i] = path } + clnCdrc.Filters = make([]string, len(self.Filters)) + for i, fltr := range self.Filters { + clnCdrc.Filters[i] = fltr + } clnCdrc.CdrSourceId = self.CdrSourceId clnCdrc.PartialRecordCache = self.PartialRecordCache clnCdrc.PartialCacheExpiryAction = self.PartialCacheExpiryAction diff --git a/config/config_defaults.go b/config/config_defaults.go index 17626097f..7164c67f0 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -256,6 +256,7 @@ const CGRATES_CFG_JSON = ` "cdr_path": "", // path towards one CDR element in case of XML CDRs "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database "cdr_filter": "", // filter CDR records to import + "filters" :[], // new filters used in FilterS subsystem "continue_on_success": false, // continue to the next template if executed "partial_record_cache": "10s", // duration to cache partial records when not pairing "partial_cache_expiry_action": "*dump_to_file", // action taken when cache when records in cache are timed-out <*dump_to_file|*post_cdr> diff --git a/config/config_json_test.go b/config/config_json_test.go index 9bdd873fa..c22de5d60 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -476,6 +476,7 @@ func TestDfCdrcJsonCfg(t *testing.T) { Cdr_path: utils.StringPointer(""), Cdr_source_id: utils.StringPointer("freeswitch_csv"), Cdr_filter: utils.StringPointer(""), + Filters: &[]string{}, Continue_on_success: utils.BoolPointer(false), Partial_record_cache: utils.StringPointer("10s"), Partial_cache_expiry_action: utils.StringPointer(utils.MetaDumpToFile), diff --git a/config/config_test.go b/config/config_test.go index 853617947..5d53430a3 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -187,6 +187,7 @@ func TestCgrCfgCDRC(t *testing.T) { FailedCallsPrefix: "missed_calls", CDRPath: utils.HierarchyPath([]string{""}), CdrSourceId: "freeswitch_csv", + Filters: []string{}, ContinueOnSuccess: false, PartialRecordCache: time.Duration(10 * time.Second), PartialCacheExpiryAction: "*dump_to_file", diff --git a/config/configcdrc_test.go b/config/configcdrc_test.go index aaea5347f..3eb3648f5 100644 --- a/config/configcdrc_test.go +++ b/config/configcdrc_test.go @@ -49,6 +49,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { CDRPath: utils.HierarchyPath([]string{""}), CdrSourceId: "freeswitch_csv", CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + Filters: []string{}, PartialRecordCache: time.Duration(10) * time.Second, PartialCacheExpiryAction: utils.MetaDumpToFile, HeaderFields: make([]*CfgCdrField, 0), @@ -146,6 +147,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { CDRPath: utils.HierarchyPath([]string{""}), CdrSourceId: "csv1", CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + Filters: []string{}, PartialRecordCache: time.Duration(10) * time.Second, PartialCacheExpiryAction: utils.MetaDumpToFile, HeaderFields: make([]*CfgCdrField, 0), @@ -243,6 +245,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { CDRPath: utils.HierarchyPath([]string{""}), CdrSourceId: "csv2", CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + Filters: []string{}, PartialRecordCache: time.Duration(10) * time.Second, PartialCacheExpiryAction: utils.MetaDumpToFile, HeaderFields: make([]*CfgCdrField, 0), @@ -310,6 +313,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { CDRPath: utils.HierarchyPath([]string{""}), CdrSourceId: "csv3", CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), + Filters: []string{}, PartialRecordCache: time.Duration(10) * time.Second, PartialCacheExpiryAction: utils.MetaDumpToFile, HeaderFields: make([]*CfgCdrField, 0), diff --git a/config/libconfig_json.go b/config/libconfig_json.go index dfc9eddb3..cfc01cdc3 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -201,6 +201,7 @@ type CdrcJsonCfg struct { Cdr_path *string Cdr_source_id *string Cdr_filter *string + Filters *[]string Continue_on_success *bool Max_open_files *int Partial_record_cache *string diff --git a/data/conf/samples/cdrccsvwithfilter/cgrates.json b/data/conf/samples/cdrccsvwithfilter/cgrates.json new file mode 100755 index 000000000..cb1a74ddb --- /dev/null +++ b/data/conf/samples/cdrccsvwithfilter/cgrates.json @@ -0,0 +1,81 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + "rals": { + "enabled": true // so we can query CDRs + }, + + "cdrs": { + "enabled": true, + "rals_conns": [], // no rating support, just *raw CDR testing +}, + + + + "cdrc": [ + { + "id": "*CSVWithFilter1", // identifier of the CDRC runner + "enabled": true, // enable CDR client functionality + "field_separator": ";", + "cdr_in_dir": "/tmp/cdrctestswithfilters/csvit1/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctestswithfilters/csvit1/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "csvit1", // free form field, tag identifying the source of the CDRs within CDRS database + "filters":["*string:3:1002"], //filter Account to be 1002 + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true}, + ], + }, + { + "id": "*CSVWithFilter2", // identifier of the CDRC runner + "enabled": true, // enable CDR client functionality + "field_separator": ";", + "cdr_in_dir": "/tmp/cdrctestswithfilters/csvit2/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctestswithfilters/csvit2/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "csvit2", // free form field, tag identifying the source of the CDRs within CDRS database + "filters":["*string:3:1002","*string:1:*prepaid","*gte:6:70"], //filter Account to be 1002 and RequestType *prepaid + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true}, + {"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true}, + ], + }, +], + + +} \ No newline at end of file diff --git a/dispatcher/suppliers_it_test.go b/dispatcher/suppliers_it_test.go index e2e4573cd..c90c67cbb 100755 --- a/dispatcher/suppliers_it_test.go +++ b/dispatcher/suppliers_it_test.go @@ -249,7 +249,7 @@ func testDspSupTestAuthKey2(t *testing.T) { Sorting: utils.MetaLeastCost, SortedSuppliers: []*engine.SortedSupplier{ &engine.SortedSupplier{ - SupplierID: "supplier2", + SupplierID: "supplier1", SupplierParameters: "", SortingData: map[string]interface{}{ utils.Cost: 0.1166, @@ -258,7 +258,7 @@ func testDspSupTestAuthKey2(t *testing.T) { }, }, &engine.SortedSupplier{ - SupplierID: "supplier1", + SupplierID: "supplier2", SupplierParameters: "", SortingData: map[string]interface{}{ utils.Cost: 0.2334,