diff --git a/cdrc/csv.go b/cdrc/csv.go index f3ab090a9..d277275fc 100644 --- a/cdrc/csv.go +++ b/cdrc/csv.go @@ -142,6 +142,21 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con var err error var lazyHttpFields []*config.CfgCdrField for _, cdrFldCfg := range cdrcCfg.ContentFields { + filterBreak := false + for _, rsrFilter := range cdrFldCfg.FieldFilter { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot compile field filter %+v", record, rsrFilter) + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { + filterBreak = true + break + } + } + if filterBreak { // Stop processing this field template since it's filters are not matching + continue + } if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.dfltCdrcCfg.CdrFormat) { // Hardcode some values in case of flatstore switch cdrFldCfg.FieldId { case utils.ACCID: @@ -152,7 +167,8 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con } var fieldVal string - if cdrFldCfg.Type == utils.META_COMPOSED { + switch cdrFldCfg.Type { + case utils.META_COMPOSED, utils.MetaUnixTimestamp: for _, cfgFieldRSR := range cdrFldCfg.Value { if cfgFieldRSR.IsStatic() { fieldVal += cfgFieldRSR.ParseValue("") @@ -160,13 +176,18 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx { return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag) } else { - fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx]) + strVal := cfgFieldRSR.ParseValue(record[cfgFieldIdx]) + if cdrFldCfg.Type == utils.MetaUnixTimestamp { + t, _ := utils.ParseTimeDetectLayout(strVal, self.timezone) + strVal = strconv.Itoa(int(t.Unix())) + } + fieldVal += strVal } } } - } else if cdrFldCfg.Type == utils.META_HTTP_POST { + case utils.META_HTTP_POST: lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server - } else { + default: return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type) } if err := storedCdr.ParseFieldValue(cdrFldCfg.FieldId, fieldVal, self.timezone); err != nil { diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index cf465d0e1..8504d6b75 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -33,12 +33,12 @@ import ( ) const ( - PartialRecordsSuffix = ".partial" + PartialRecordsSuffix = "partial" ) func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool) (*PartialRecordsCache, error) { return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, - partialRecords: make(map[string]*PartialCDRRecord), guard: engine.Guardian}, nil + partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), guard: engine.Guardian}, nil } type PartialRecordsCache struct { @@ -120,7 +120,7 @@ func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (* originID := pCDR.cdrs[0].OriginID pCDRIf, err := prc.guard.Guard(func() (interface{}, error) { if _, hasIt := prc.partialRecords[originID]; !hasIt && pCDR.Len() == 1 && !pCDR.cdrs[0].Partial { - return pCDR, nil // Special case when not a partial CDR and not having cached CDRs on same OriginID + return pCDR.cdrs[0], nil // Special case when not a partial CDR and not having cached CDRs on same OriginID } cachedPartialCDR := prc.cachePartialCDR(pCDR) var final bool @@ -136,6 +136,9 @@ func (prc *PartialRecordsCache) MergePartialCDRRecord(pCDR *PartialCDRRecord) (* prc.uncachePartialCDR(cachedPartialCDR) return cachedPartialCDR.MergeCDRs(), nil }, 0, originID) + if pCDRIf == nil { + return nil, err + } return pCDRIf.(*engine.CDR), err } diff --git a/cdrc/partialcsv_it_test.go b/cdrc/partialcsv_it_test.go new file mode 100644 index 000000000..eba1f1b80 --- /dev/null +++ b/cdrc/partialcsv_it_test.go @@ -0,0 +1,199 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012-2015 ITsysCOM + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "io/ioutil" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var partpartcsvCfgPath string +var partcsvCfg *config.CGRConfig +var partcsvRPC *rpc.Client +var partcsvCDRCDirIn, partcsvCDRCDirOut string + +var partCsvFileContent1 = `4986517174963,004986517174964,DE-National,04.07.2016 18:58:55,04.07.2016 18:58:55,1,65,Peak,0.014560,498651,partial +4986517174964,004986517174963,DE-National,04.07.2016 20:58:55,04.07.2016 20:58:55,0,74,Offpeak,0.003360,498651,complete +` + +var partCsvFileContent2 = `4986517174963,004986517174964,DE-National,04.07.2016 19:00:00,04.07.2016 18:58:55,0,15,Offpeak,0.003360,498651,partial` + +var eCacheDumpFile1 = `4986517174963_004986517174964_04.07.2016 18:58:55,1467651600,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,15,-1.00000 +4986517174963_004986517174964_04.07.2016 18:58:55,1467651535,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,65,-1.00000 +` + +func TestPartcsvITInitConfig(t *testing.T) { + if !*testIT { + return + } + var err error + partpartcsvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrc_partcsv") + if partcsvCfg, err = config.NewCGRConfigFromFolder(partpartcsvCfgPath); err != nil { + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func TestPartcsvITInitCdrDb(t *testing.T) { + if !*testIT { + return + } + if err := engine.InitStorDb(partcsvCfg); err != nil { + t.Fatal(err) + } +} + +func TestPartcsvITCreateCdrDirs(t *testing.T) { + if !*testIT { + return + } + for _, cdrcProfiles := range partcsvCfg.CdrcProfiles { + for i, cdrcInst := range cdrcProfiles { + if i == 0 { + partcsvCDRCDirIn, partcsvCDRCDirOut = cdrcInst.CdrInDir, cdrcInst.CdrOutDir + } + for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + } + } +} + +func TestPartcsvITStartEngine(t *testing.T) { + if !*testIT { + return + } + if _, err := engine.StopStartEngine(partpartcsvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func TestPartcsvITRpcConn(t *testing.T) { + if !*testIT { + return + } + var err error + partcsvRPC, err = jsonrpc.Dial("tcp", partcsvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func TestPartcsvITHandleCdr1File(t *testing.T) { + if !*testIT { + return + } + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent1), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn, fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +// Scenario out of first .xml config +func TestPartcsvITHandleCdr2File(t *testing.T) { + if !*testIT { + return + } + fileName := "file2.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent2), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn, fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +func TestPartcsvITProcessedFiles(t *testing.T) { + if !*testIT { + return + } + time.Sleep(time.Duration(2 * time.Second)) + if outContent1, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, "file1.csv")); err != nil { + t.Error(err) + } else if partCsvFileContent1 != string(outContent1) { + t.Errorf("Expecting: %q, received: %q", partCsvFileContent1, string(outContent1)) + } + if outContent2, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, "file2.csv")); err != nil { + t.Error(err) + } else if partCsvFileContent2 != string(outContent2) { + t.Errorf("Expecting: %q, received: %q", partCsvFileContent2, string(outContent2)) + } + filesInDir, _ := ioutil.ReadDir(partcsvCDRCDirOut) + var fileName string + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + if strings.HasPrefix(file.Name(), "4986517174963_004986517174964") { + fileName = file.Name() + break + } + } + if contentCacheDump, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, fileName)); err != nil { + t.Error(err) + } else if eCacheDumpFile1 != string(contentCacheDump) { + t.Errorf("Expecting: %q, received: %q", eCacheDumpFile1, string(contentCacheDump)) + } +} + +func TestPartcsvITAnalyseCDRs(t *testing.T) { + if !*testIT { + return + } + var reply []*engine.ExternalCDR + if err := partcsvRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + if err := partcsvRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174963"}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + +} + +func TestPartcsvITKillEngine(t *testing.T) { + if !*testIT { + return + } + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +} diff --git a/config/config_defaults.go b/config/config_defaults.go index 118974b12..1d9856bf7 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -142,7 +142,7 @@ const CGRATES_CFG_JSON = ` "cdrs_conns": [ {"address": "*internal"} // address where to reach CDR server. <*internal|x.y.z.y:1234> ], - "cdr_format": "csv", // CDR file format + "cdr_format": "csv", // CDR file format "field_separator": ",", // separator used in case of csv files "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify @@ -156,6 +156,7 @@ const CGRATES_CFG_JSON = ` "cdr_filter": "", // filter CDR records to import "continue_on_success": false, // continue to the next template if executed "partial_record_cache": "10s", // duration to cache partial records when not pairing + "cache_expiry_action": "*post_cdr", // action taken when cache when records in cache are timed-out "header_fields": [], // template of the import header fields "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "2", "mandatory": true}, diff --git a/data/conf/samples/cdrc_partcsv/cgrates.json b/data/conf/samples/cdrc_partcsv/cgrates.json new file mode 100644 index 000000000..33aaa2b53 --- /dev/null +++ b/data/conf/samples/cdrc_partcsv/cgrates.json @@ -0,0 +1,64 @@ +{ + +// Real-time Charging System for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + + "rals": { + "enabled": true // so we can query CDRs + }, + + "cdrs": { + "enabled": true, + "rals_conns": [], // no rating support, just *raw CDR testing +}, + + + + "cdrc": [ + { + "id": "*default", + "enabled": true, + "cdr_format": "partial_csv", + "cdr_in_dir": "/tmp/cdrctests/partcsv/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctests/partcsv/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "partial_csv_test", // free form field, tag identifying the source of the CDRs within CDRS database + "partial_record_cache": "1s", // duration to cache partial records when not pairing + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "^_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "^_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "3"}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*rated", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "~0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "~6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*composed", "value": "^true", "field_filter": "10(partial)"}, + ], + "cache_dump_fields": [ + {"tag": "OriginID", "type": "*composed", "value": "OriginID"}, + {"tag": "OrderID", "type": "*composed", "value": "OrderID"}, + {"tag": "RequestType", "type": "*composed", "value": "RequestType"}, + {"tag": "Account", "type": "*composed", "value": "Account"}, + {"tag": "Destination", "type": "*composed", "value": "Destination"}, + {"tag": "SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "type": "*composed", "value": "Usage"}, + {"tag": "Cost", "type": "*composed", "value": "Cost"}, + ], + }, +], + + +} \ No newline at end of file diff --git a/utils/consts.go b/utils/consts.go index 7ef6103c1..a95efad0e 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -299,4 +299,5 @@ const ( MetaJSONrpc = "*json" MetaDateTime = "*datetime" MetaMaskedDestination = "*masked_destination" + MetaUnixTimestamp = "*unix_timestamp" )