From 146b4fff6e6c289f1fe50dc5627957b3e2de8a86 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 31 Jul 2016 20:35:38 +0200 Subject: [PATCH] Finished PartialCDR processing, fixes #139 --- cdrc/partial_cdr.go | 2 +- cdrc/partialcsv_it_test.go | 54 ++++++++++++++++----- data/conf/samples/cdrc_partcsv/cgrates.json | 34 ++++++++++++- 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 0bbd1d0d1..8222ed8a5 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -38,7 +38,7 @@ const ( ) func NewPartialRecordsCache(ttl time.Duration, expiryAction string, cdrOutDir string, csvSep rune, roundDecimals int, timezone string, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection) (*PartialRecordsCache, error) { - return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, + return &PartialRecordsCache{ttl: ttl, expiryAction: expiryAction, cdrOutDir: cdrOutDir, csvSep: csvSep, roundDecimals: roundDecimals, timezone: timezone, httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, partialRecords: make(map[string]*PartialCDRRecord), dumpTimers: make(map[string]*time.Timer), guard: engine.Guardian}, nil } diff --git a/cdrc/partialcsv_it_test.go b/cdrc/partialcsv_it_test.go index eba1f1b80..99f73b482 100644 --- a/cdrc/partialcsv_it_test.go +++ b/cdrc/partialcsv_it_test.go @@ -36,13 +36,14 @@ import ( var partpartcsvCfgPath string var partcsvCfg *config.CGRConfig var partcsvRPC *rpc.Client -var partcsvCDRCDirIn, partcsvCDRCDirOut string +var partcsvCDRCDirIn1, partcsvCDRCDirOut1, partcsvCDRCDirIn2, partcsvCDRCDirOut2 string var partCsvFileContent1 = `4986517174963,004986517174964,DE-National,04.07.2016 18:58:55,04.07.2016 18:58:55,1,65,Peak,0.014560,498651,partial 4986517174964,004986517174963,DE-National,04.07.2016 20:58:55,04.07.2016 20:58:55,0,74,Offpeak,0.003360,498651,complete ` var partCsvFileContent2 = `4986517174963,004986517174964,DE-National,04.07.2016 19:00:00,04.07.2016 18:58:55,0,15,Offpeak,0.003360,498651,partial` +var partCsvFileContent3 = `4986517174964,004986517174960,DE-National,04.07.2016 19:05:55,04.07.2016 19:05:55,0,23,Offpeak,0.003360,498651,partial` var eCacheDumpFile1 = `4986517174963_004986517174964_04.07.2016 18:58:55,1467651600,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,15,-1.00000 4986517174963_004986517174964_04.07.2016 18:58:55,1467651535,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,65,-1.00000 @@ -73,10 +74,12 @@ func TestPartcsvITCreateCdrDirs(t *testing.T) { if !*testIT { return } - for _, cdrcProfiles := range partcsvCfg.CdrcProfiles { - for i, cdrcInst := range cdrcProfiles { - if i == 0 { - partcsvCDRCDirIn, partcsvCDRCDirOut = cdrcInst.CdrInDir, cdrcInst.CdrOutDir + for path, cdrcProfiles := range partcsvCfg.CdrcProfiles { + for _, cdrcInst := range cdrcProfiles { + if path == "/tmp/cdrctests/partcsv1/in" { + partcsvCDRCDirIn1, partcsvCDRCDirOut1 = cdrcInst.CdrInDir, cdrcInst.CdrOutDir + } else if path == "/tmp/cdrctests/partcsv2/in" { + partcsvCDRCDirIn2, partcsvCDRCDirOut2 = cdrcInst.CdrInDir, cdrcInst.CdrOutDir } for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} { if err := os.RemoveAll(dir); err != nil { @@ -121,7 +124,7 @@ func TestPartcsvITHandleCdr1File(t *testing.T) { if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent1), 0644); err != nil { t.Fatal(err.Error) } - if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn, fileName)); err != nil { + if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn1, fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } } @@ -136,7 +139,22 @@ func TestPartcsvITHandleCdr2File(t *testing.T) { if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent2), 0644); err != nil { t.Fatal(err.Error) } - if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn, fileName)); err != nil { + if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn1, fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +// Scenario out of first .xml config +func TestPartcsvITHandleCdr3File(t *testing.T) { + if !*testIT { + return + } + fileName := "file3.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent3), 0644); err != nil { + t.Fatal(err.Error) + } + if err := os.Rename(tmpFilePath, path.Join(partcsvCDRCDirIn2, fileName)); err != nil { t.Fatal("Error moving file to processing directory: ", err) } } @@ -145,18 +163,18 @@ func TestPartcsvITProcessedFiles(t *testing.T) { if !*testIT { return } - time.Sleep(time.Duration(2 * time.Second)) - if outContent1, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, "file1.csv")); err != nil { + time.Sleep(time.Duration(3 * time.Second)) + if outContent1, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file1.csv")); err != nil { t.Error(err) } else if partCsvFileContent1 != string(outContent1) { t.Errorf("Expecting: %q, received: %q", partCsvFileContent1, string(outContent1)) } - if outContent2, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, "file2.csv")); err != nil { + if outContent2, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, "file2.csv")); err != nil { t.Error(err) } else if partCsvFileContent2 != string(outContent2) { t.Errorf("Expecting: %q, received: %q", partCsvFileContent2, string(outContent2)) } - filesInDir, _ := ioutil.ReadDir(partcsvCDRCDirOut) + filesInDir, _ := ioutil.ReadDir(partcsvCDRCDirOut1) var fileName string for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config if strings.HasPrefix(file.Name(), "4986517174963_004986517174964") { @@ -164,11 +182,16 @@ func TestPartcsvITProcessedFiles(t *testing.T) { break } } - if contentCacheDump, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut, fileName)); err != nil { + if contentCacheDump, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut1, fileName)); err != nil { t.Error(err) } else if eCacheDumpFile1 != string(contentCacheDump) { t.Errorf("Expecting: %q, received: %q", eCacheDumpFile1, string(contentCacheDump)) } + if outContent3, err := ioutil.ReadFile(path.Join(partcsvCDRCDirOut2, "file3.csv")); err != nil { + t.Error(err) + } else if partCsvFileContent3 != string(outContent3) { + t.Errorf("Expecting: %q, received: %q", partCsvFileContent3, string(outContent3)) + } } func TestPartcsvITAnalyseCDRs(t *testing.T) { @@ -178,7 +201,7 @@ func TestPartcsvITAnalyseCDRs(t *testing.T) { var reply []*engine.ExternalCDR if err := partcsvRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) - } else if len(reply) != 1 { + } else if len(reply) != 2 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } if err := partcsvRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174963"}}, &reply); err != nil { @@ -186,6 +209,11 @@ func TestPartcsvITAnalyseCDRs(t *testing.T) { } else if len(reply) != 1 { t.Error("Unexpected number of CDRs returned: ", len(reply)) } + if err := partcsvRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174960"}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } } diff --git a/data/conf/samples/cdrc_partcsv/cgrates.json b/data/conf/samples/cdrc_partcsv/cgrates.json index 33aaa2b53..aaf1c8316 100644 --- a/data/conf/samples/cdrc_partcsv/cgrates.json +++ b/data/conf/samples/cdrc_partcsv/cgrates.json @@ -23,10 +23,11 @@ "id": "*default", "enabled": true, "cdr_format": "partial_csv", - "cdr_in_dir": "/tmp/cdrctests/partcsv/in", // absolute path towards the directory where the CDRs are stored - "cdr_out_dir": "/tmp/cdrctests/partcsv/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_in_dir": "/tmp/cdrctests/partcsv1/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctests/partcsv1/out", // absolute path towards the directory where processed CDRs will be moved "cdr_source_id": "partial_csv_test", // free form field, tag identifying the source of the CDRs within CDRS database "partial_record_cache": "1s", // duration to cache partial records when not pairing + "partial_cache_expiry_action": "*dump_to_file", "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "0"}, @@ -58,6 +59,35 @@ {"tag": "Cost", "type": "*composed", "value": "Cost"}, ], }, + { + "id": "post_on_expiry", + "enabled": true, + "cdr_format": "partial_csv", + "cdr_in_dir": "/tmp/cdrctests/partcsv2/in", // absolute path towards the directory where the CDRs are stored + "cdr_out_dir": "/tmp/cdrctests/partcsv2/out", // absolute path towards the directory where processed CDRs will be moved + "cdr_source_id": "partial_csv_test2", // free form field, tag identifying the source of the CDRs within CDRS database + "partial_record_cache": "1s", // duration to cache partial records when not pairing + "partial_cache_expiry_action": "*post_cdr", + "content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + {"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "^_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "^_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "3"}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*rated", "mandatory": true}, + {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "^cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "~0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "~6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*composed", "value": "^true", "field_filter": "10(partial)"}, + ], + }, ],