diff --git a/config/config_defaults.go b/config/config_defaults.go index 70a238a4c..470b40cac 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -382,7 +382,7 @@ const CGRATES_CFG_JSON = ` // "fstFailedCallsPrefix": "" // Used in case of flatstore CDRs to avoid searching for BYE records // "fstRecordCacheTTL": "1s" // Duration to cache partial records when not pairing // "fstLazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field - "fstMethod": "~*req.1", // the rsr parser that will determine the method of the current record + "fstMethod": "~*req.0", // the rsr parser that will determine the method of the current record "fstOriginID": "~*req.3;~*req.1;~*req.2", // the rsr parser that will determine the originID of the current record "fstMadatoryACK": false, // if we should receive the ACK before processing the record diff --git a/config/configsanity.go b/config/configsanity.go index 03acf7ae3..d5646a574 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -716,7 +716,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } if rdr.Type == utils.MetaPartialCSV { - if act, has := rdr.Opts[utils.PartialCSVCacheExpiryActionOpt]; has && (utils.IfaceAsString(act) != utils.MetaDumpToFile || + if act, has := rdr.Opts[utils.PartialCSVCacheExpiryActionOpt]; has && (utils.IfaceAsString(act) != utils.MetaDumpToFile && utils.IfaceAsString(act) != utils.MetaPostCDR) { return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID) } diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index c97d4aae9..ddb16acf5 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -347,16 +347,16 @@ "fields":[ {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true}, {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]}, {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} ], diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index 7332e0b9c..dc462a86a 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -348,15 +348,16 @@ "fields":[ {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true}, {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]}, {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} ], diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index 9374ee050..df0497f1c 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -345,15 +345,16 @@ "fields":[ {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true}, {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]}, {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} ], diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index bb479a4dc..93289da56 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -342,15 +342,16 @@ "fields":[ {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true}, {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]}, {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} ], diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go index d605c5fb2..ba68f79dc 100644 --- a/engine/dynamicdp.go +++ b/engine/dynamicdp.go @@ -66,7 +66,8 @@ var initialDPPrefixes = utils.NewStringSet([]string{ utils.MetaCgrep, utils.MetaRep, utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaHdr, utils.MetaTrl, utils.MetaCfg, - utils.MetaTenant}) + utils.MetaTenant, utils.MetaInvite, utils.MetaBye, + utils.MetaAck}) func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) { if len(fldPath) == 0 { diff --git a/ers/flatstore.go b/ers/flatstore.go index 67d8f0c82..f6f312bc3 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -190,7 +190,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { } else if method != utils.FstInvite && method != utils.FstBye && method != utils.FstAck { - return fmt.Errorf("Unsuported method<%s>", method) + return fmt.Errorf("unsupported method: <%q>", method) } var originID string @@ -198,21 +198,28 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { return } + cacheKey := utils.ConcatenatedKey(originID, method) + if rdr.cache.HasItem(cacheKey) { + utils.Logger.Warning(fmt.Sprintf("<%s> Overwriting the %s method for record <%s>", utils.ERs, method, originID)) + rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) + continue + } records := rdr.cache.GetGroupItems(originID) if lrecords := len(records); !failedCallsFile && // do not set in cache if we know that the calls are failed (lrecords == 0 || (mandatoryAcK && lrecords != 2) || (!mandatoryAcK && lrecords != 1)) { - rdr.cache.Set(utils.ConcatenatedKey(originID, method), &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) + rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) continue } - rdr.cache.RemoveGroup(originID) - extraDP := map[string]utils.DataProvider{method: req} + extraDP := map[string]utils.DataProvider{utils.FstMethodToPrfx[method]: req} for _, record := range records { req := record.(*fstRecord) - extraDP[req.method] = config.NewSliceDP(req.values, nil) + rdr.cache.Set(utils.ConcatenatedKey(originID, req.method), nil, []string{originID}) + extraDP[utils.FstMethodToPrfx[req.method]] = config.NewSliceDP(req.values, nil) } + rdr.cache.RemoveGroup(originID) rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( @@ -259,72 +266,11 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { return } -/* -func NewUnpairedRecord(record []string, timezone string, fileName string) (*UnpairedRecord, error) { - if len(record) < 7 { - return nil, errors.New("MISSING_IE") - } - pr := &UnpairedRecord{Method: record[0], OriginID: record[3] + record[1] + record[2], Values: record, FileName: fileName} - var err error - if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6], timezone); err != nil { - return nil, err - } - return pr, nil -} - -// UnpairedRecord is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration -type UnpairedRecord struct { - Method string // INVITE or BYE - OriginID string // Copute here the OriginID - Timestamp time.Time // Timestamp of the event, as written by db_flastore module - Values []string // Can contain original values or updated via UpdateValues - FileName string -} - -// Pairs INVITE and BYE into final record containing as last element the duration -func pairToRecord(part1, part2 *UnpairedRecord) ([]string, error) { - var invite, bye *UnpairedRecord - if part1.Method == "INVITE" { - invite = part1 - } else if part2.Method == "INVITE" { - invite = part2 - } else { - return nil, errors.New("MISSING_INVITE") - } - if part1.Method == "BYE" { - bye = part1 - } else if part2.Method == "BYE" { - bye = part2 - } else { - return nil, errors.New("MISSING_BYE") - } - if len(invite.Values) != len(bye.Values) { - return nil, errors.New("INCONSISTENT_VALUES_LENGTH") - } - record := invite.Values - for idx := range record { - switch idx { - case 0, 1, 2, 3, 6: // Leave these values as they are - case 4, 5: - record[idx] = bye.Values[idx] // Update record with status from bye - default: - if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty - record[idx] = bye.Values[idx] - } - - } - } - callDur := bye.Timestamp.Sub(invite.Timestamp) - record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) - return record, nil -} -*/ func (rdr *FlatstoreER) dumpToFile(itmID string, value interface{}) { if value == nil { return } unpRcd := value.(*fstRecord) - dumpFilePath := path.Join(rdr.Config().ProcessedPath, unpRcd.fileName+utils.TmpSuffix) fileOut, err := os.Create(dumpFilePath) if err != nil { diff --git a/ers/flatstore_it_test.go b/ers/flatstore_it_test.go index a8953e984..01e6ddcdc 100644 --- a/ers/flatstore_it_test.go +++ b/ers/flatstore_it_test.go @@ -177,7 +177,11 @@ func testFlatstoreITHandleCdr1File(t *testing.T) { // check the files to be processed filesInDir, _ := os.ReadDir("/tmp/flatstoreErs/in") if len(filesInDir) != 0 { - t.Errorf("Files in ersInDir: %+v", filesInDir) + fls := make([]string, len(filesInDir)) + for i, fs := range filesInDir { + fls[i] = fs.Name() + } + t.Errorf("Files in ersInDir: %+v", fls) } filesOutDir, _ := os.ReadDir("/tmp/flatstoreErs/out") if len(filesOutDir) != 5 { @@ -218,6 +222,7 @@ func testFlatstoreITKillEngine(t *testing.T) { func TestFlatstoreProcessEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" + cfg.ERsCfg().Readers[0].Opts[utils.FstFailedCallsPrefixOpt] = "file" fltrs := &engine.FilterS{} filePath := "/tmp/TestFlatstoreProcessEvent/" fname := "file1.csv" @@ -228,7 +233,7 @@ func TestFlatstoreProcessEvent(t *testing.T) { if err != nil { t.Error(err) } - file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l")) + file.Write([]byte("INVITE,a,ToR,b,c,d,e,f,g,h,i,j,k,l")) file.Close() eR := &FlatstoreER{ cgrCfg: cfg, @@ -240,6 +245,7 @@ func TestFlatstoreProcessEvent(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } + eR.cache = ltcache.NewCache(-1, 0, false, eR.dumpToFile) expEvent := &utils.CGREvent{ Tenant: "cgrates.org", Event: map[string]interface{}{ @@ -253,7 +259,7 @@ func TestFlatstoreProcessEvent(t *testing.T) { utils.Subject: "h", utils.Tenant: "e", utils.ToR: "ToR", - utils.Usage: "0", + utils.Usage: "l", }, APIOpts: map[string]interface{}{}, } @@ -280,6 +286,15 @@ func TestFlatstoreProcessEvent(t *testing.T) { func TestFlatstoreProcessEvent2(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" + cfg.ERsCfg().Readers[0].Fields = append(cfg.ERsCfg().Readers[0].Fields, &config.FCTemplate{ + Tag: "Usage", + Path: "*cgreq.Usage", + Type: utils.MetaUsageDifference, + Value: config.NewRSRParsersMustCompile("~*bye.6;~*invite.6", utils.InfieldSep), + }) + for _, v := range cfg.ERsCfg().Readers[0].Fields { + v.ComputePath() + } fltrs := &engine.FilterS{} filePath := "/tmp/TestFlatstoreProcessEvent/" fname := "file1.csv" @@ -303,10 +318,10 @@ func TestFlatstoreProcessEvent2(t *testing.T) { rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), } - record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"} + record := []string{utils.FstBye, "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"} pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname} - eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) - eR.cache.Set("baToR", pr, nil) + eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil) + eR.cache.Set(utils.ConcatenatedKey("baToR", utils.FstBye), pr, []string{"baToR"}) expEvent := &utils.CGREvent{ Tenant: "cgrates.org", Event: map[string]interface{}{ @@ -320,7 +335,7 @@ func TestFlatstoreProcessEvent2(t *testing.T) { utils.Subject: "h", utils.Tenant: "2013-12-30T15:00:01Z", utils.ToR: "ToR", - utils.Usage: "3600", + utils.Usage: "1h0m0s", }, APIOpts: map[string]interface{}{}, } @@ -383,60 +398,6 @@ func TestFlatstoreProcessEvent2CacheNotSet(t *testing.T) { } } -//Test unsupported time format err while making the unpaired record. -func TestFlatstoreProcessEvent2Error1(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.ERsCfg().Readers[0].ProcessedPath = "" - fltrs := &engine.FilterS{} - filePath := "/tmp/TestFlatstoreProcessEvent/" - fname := "file1.csv" - if err := os.MkdirAll(filePath, 0777); err != nil { - t.Error(err) - } - file, err := os.Create(path.Join(filePath, fname)) - if err != nil { - t.Error(err) - } - //Create new logger - utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) - if err != nil { - t.Error(err) - } - utils.Logger.SetLogLevel(7) - buf := new(bytes.Buffer) - log.SetOutput(buf) - file.Write([]byte("INVITE,a,ToR,b,c,d,invalid_time,f,g,h,i,j,k,l")) - file.Close() - eR := &FlatstoreER{ - cgrCfg: cfg, - cfgIdx: 0, - fltrS: fltrs, - rdrDir: "/tmp/flatstoreErs/out", - rdrEvents: make(chan *erEvent, 1), - rdrError: make(chan error, 1), - rdrExit: make(chan struct{}), - conReqs: make(chan struct{}, 1), - } - record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "invalid_time", "f", "g", "h", "i", "j", "k", "l"} - pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname} - eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) - eR.cache.Set("baToR", pr, nil) - - eR.conReqs <- struct{}{} - eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" - if err := eR.processFile(filePath, fname); err != nil { - t.Error(err) - } - errExpect := "[WARNING] Converting row : <[INVITE a ToR b c d invalid_time f g h i j k l]> to unpairedRecord , ignoring due to error: " - if rcv := buf.String(); !strings.Contains(rcv, errExpect) { - t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv) - } - if err := os.RemoveAll(filePath); err != nil { - t.Error(err) - } - buf.Reset() -} - //Test pairToRecord() error, where both methods of unpaired record object are INVITE func TestFlatstoreProcessEvent2Error2(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -475,13 +436,13 @@ func TestFlatstoreProcessEvent2Error2(t *testing.T) { record := []string{"INVITE", "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"} pr := &fstRecord{method: utils.FstInvite, values: record, fileName: fname} eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile) - eR.cache.Set("baToR", pr, nil) + eR.cache.Set("baToR:INVITE", pr, []string{"baToR"}) eR.conReqs <- struct{}{} eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x" if err := eR.processFile(filePath, fname); err != nil { t.Error(err) } - errExpect := "[WARNING] Merging unpairedRecords" + errExpect := "[WARNING] Overwriting the INVITE method for record " if rcv := buf.String(); !strings.Contains(rcv, errExpect) { t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv) } @@ -506,7 +467,7 @@ func TestFlatstoreProcessEventError2(t *testing.T) { t.Error(err) } file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage - ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) +,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) file.Close() eR := &FlatstoreER{ cgrCfg: cfg, @@ -524,7 +485,7 @@ func TestFlatstoreProcessEventError2(t *testing.T) { {}, } - errExpect := "unsupported type: <>" + errExpect := `unsupported method: <"">` if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } @@ -551,7 +512,7 @@ func TestFlatstoreProcessEventError3(t *testing.T) { t.Error(err) } file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage - ,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) +BYE,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`)) file.Close() eR := &FlatstoreER{ cgrCfg: cfg, @@ -562,9 +523,10 @@ func TestFlatstoreProcessEventError3(t *testing.T) { rdrError: make(chan error, 1), rdrExit: make(chan struct{}), conReqs: make(chan struct{}, 1), + cache: ltcache.NewCache(-1, 0, false, nil), } eR.conReqs <- struct{}{} - + eR.cache.Set("OriginCDR1*voice:INVITE", &fstRecord{method: utils.FstInvite, values: []string{}}, []string{"OriginCDR1*voice"}) // eR.Config().Filters = []string{"Filter1"} errExpect := "NOT_FOUND:Filter1" @@ -573,6 +535,8 @@ func TestFlatstoreProcessEventError3(t *testing.T) { } // + eR.cache.Set("OriginCDR1*voice:INVITE", &fstRecord{method: utils.FstInvite, values: []string{}}, []string{"OriginCDR1*voice"}) + eR.Config().Filters = []string{"*exists:~*req..Account:"} errExpect = "Invalid fieldPath [ Account]" if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect { diff --git a/utils/consts.go b/utils/consts.go index 06b310bb3..841db25b5 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2568,6 +2568,10 @@ const ( FstBye = "BYE" FstAck = "ACK" + MetaInvite = "*invite" + MetaBye = "*bye" + MetaAck = "*ack" + FstFailedCallsPrefixOpt = "fstFailedCallsPrefix" FstPartialRecordCacheOpt = "fstRecordCacheTTL" FstMethodOpt = "fstMethod" @@ -2597,6 +2601,15 @@ const ( KafkaMaxWait = "kafkaMaxWait" ) +var ( + // FstMethodToPrfx used for flatstore to convert the method in DP prefix + FstMethodToPrfx = map[string]string{ + FstInvite: MetaInvite, + FstBye: MetaBye, + FstAck: MetaAck, + } +) + // Analyzers constants const ( MetaScorch = "*scorch"