mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Fixed ers tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
d53b6175d5
commit
1f80d0df01
@@ -382,7 +382,7 @@ const CGRATES_CFG_JSON = `
|
||||
// "fstFailedCallsPrefix": "" // Used in case of flatstore CDRs to avoid searching for BYE records
|
||||
// "fstRecordCacheTTL": "1s" // Duration to cache partial records when not pairing
|
||||
// "fstLazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field
|
||||
"fstMethod": "~*req.1", // the rsr parser that will determine the method of the current record
|
||||
"fstMethod": "~*req.0", // the rsr parser that will determine the method of the current record
|
||||
"fstOriginID": "~*req.3;~*req.1;~*req.2", // the rsr parser that will determine the originID of the current record
|
||||
"fstMadatoryACK": false, // if we should receive the ACK before processing the record
|
||||
|
||||
|
||||
@@ -716,7 +716,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
if rdr.Type == utils.MetaPartialCSV {
|
||||
if act, has := rdr.Opts[utils.PartialCSVCacheExpiryActionOpt]; has && (utils.IfaceAsString(act) != utils.MetaDumpToFile ||
|
||||
if act, has := rdr.Opts[utils.PartialCSVCacheExpiryActionOpt]; has && (utils.IfaceAsString(act) != utils.MetaDumpToFile &&
|
||||
utils.IfaceAsString(act) != utils.MetaPostCDR) {
|
||||
return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID)
|
||||
}
|
||||
|
||||
@@ -347,16 +347,16 @@
|
||||
"fields":[
|
||||
{"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true},
|
||||
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
|
||||
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true},
|
||||
{"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"}
|
||||
],
|
||||
|
||||
@@ -348,15 +348,16 @@
|
||||
"fields":[
|
||||
{"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true},
|
||||
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
|
||||
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true},
|
||||
{"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"}
|
||||
],
|
||||
|
||||
@@ -345,15 +345,16 @@
|
||||
"fields":[
|
||||
{"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true},
|
||||
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
|
||||
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true},
|
||||
{"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"}
|
||||
],
|
||||
|
||||
@@ -342,15 +342,16 @@
|
||||
"fields":[
|
||||
{"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true},
|
||||
{"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true},
|
||||
{"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
|
||||
{"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true},
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "mandatory": true}, // Value for Usage is composed based on record
|
||||
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true},
|
||||
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true},
|
||||
{"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]},
|
||||
{"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true},
|
||||
{"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"}
|
||||
],
|
||||
|
||||
@@ -66,7 +66,8 @@ var initialDPPrefixes = utils.NewStringSet([]string{
|
||||
utils.MetaCgrep, utils.MetaRep, utils.MetaAct,
|
||||
utils.MetaEC, utils.MetaUCH, utils.MetaOpts,
|
||||
utils.MetaHdr, utils.MetaTrl, utils.MetaCfg,
|
||||
utils.MetaTenant})
|
||||
utils.MetaTenant, utils.MetaInvite, utils.MetaBye,
|
||||
utils.MetaAck})
|
||||
|
||||
func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) {
|
||||
if len(fldPath) == 0 {
|
||||
|
||||
@@ -190,7 +190,7 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
} else if method != utils.FstInvite &&
|
||||
method != utils.FstBye &&
|
||||
method != utils.FstAck {
|
||||
return fmt.Errorf("Unsuported method<%s>", method)
|
||||
return fmt.Errorf("unsupported method: <%q>", method)
|
||||
}
|
||||
|
||||
var originID string
|
||||
@@ -198,21 +198,28 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
cacheKey := utils.ConcatenatedKey(originID, method)
|
||||
if rdr.cache.HasItem(cacheKey) {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> Overwriting the %s method for record <%s>", utils.ERs, method, originID))
|
||||
rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID})
|
||||
continue
|
||||
}
|
||||
records := rdr.cache.GetGroupItems(originID)
|
||||
|
||||
if lrecords := len(records); !failedCallsFile && // do not set in cache if we know that the calls are failed
|
||||
(lrecords == 0 ||
|
||||
(mandatoryAcK && lrecords != 2) ||
|
||||
(!mandatoryAcK && lrecords != 1)) {
|
||||
rdr.cache.Set(utils.ConcatenatedKey(originID, method), &fstRecord{method: method, values: record, fileName: fName}, []string{originID})
|
||||
rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID})
|
||||
continue
|
||||
}
|
||||
rdr.cache.RemoveGroup(originID)
|
||||
extraDP := map[string]utils.DataProvider{method: req}
|
||||
extraDP := map[string]utils.DataProvider{utils.FstMethodToPrfx[method]: req}
|
||||
for _, record := range records {
|
||||
req := record.(*fstRecord)
|
||||
extraDP[req.method] = config.NewSliceDP(req.values, nil)
|
||||
rdr.cache.Set(utils.ConcatenatedKey(originID, req.method), nil, []string{originID})
|
||||
extraDP[utils.FstMethodToPrfx[req.method]] = config.NewSliceDP(req.values, nil)
|
||||
}
|
||||
rdr.cache.RemoveGroup(originID)
|
||||
|
||||
rowNr++ // increment the rowNr after checking if it's not the end of file
|
||||
agReq := agents.NewAgentRequest(
|
||||
@@ -259,72 +266,11 @@ func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
func NewUnpairedRecord(record []string, timezone string, fileName string) (*UnpairedRecord, error) {
|
||||
if len(record) < 7 {
|
||||
return nil, errors.New("MISSING_IE")
|
||||
}
|
||||
pr := &UnpairedRecord{Method: record[0], OriginID: record[3] + record[1] + record[2], Values: record, FileName: fileName}
|
||||
var err error
|
||||
if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6], timezone); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pr, nil
|
||||
}
|
||||
|
||||
// UnpairedRecord is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration
|
||||
type UnpairedRecord struct {
|
||||
Method string // INVITE or BYE
|
||||
OriginID string // Copute here the OriginID
|
||||
Timestamp time.Time // Timestamp of the event, as written by db_flastore module
|
||||
Values []string // Can contain original values or updated via UpdateValues
|
||||
FileName string
|
||||
}
|
||||
|
||||
// Pairs INVITE and BYE into final record containing as last element the duration
|
||||
func pairToRecord(part1, part2 *UnpairedRecord) ([]string, error) {
|
||||
var invite, bye *UnpairedRecord
|
||||
if part1.Method == "INVITE" {
|
||||
invite = part1
|
||||
} else if part2.Method == "INVITE" {
|
||||
invite = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_INVITE")
|
||||
}
|
||||
if part1.Method == "BYE" {
|
||||
bye = part1
|
||||
} else if part2.Method == "BYE" {
|
||||
bye = part2
|
||||
} else {
|
||||
return nil, errors.New("MISSING_BYE")
|
||||
}
|
||||
if len(invite.Values) != len(bye.Values) {
|
||||
return nil, errors.New("INCONSISTENT_VALUES_LENGTH")
|
||||
}
|
||||
record := invite.Values
|
||||
for idx := range record {
|
||||
switch idx {
|
||||
case 0, 1, 2, 3, 6: // Leave these values as they are
|
||||
case 4, 5:
|
||||
record[idx] = bye.Values[idx] // Update record with status from bye
|
||||
default:
|
||||
if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty
|
||||
record[idx] = bye.Values[idx]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
callDur := bye.Timestamp.Sub(invite.Timestamp)
|
||||
record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64))
|
||||
return record, nil
|
||||
}
|
||||
*/
|
||||
func (rdr *FlatstoreER) dumpToFile(itmID string, value interface{}) {
|
||||
if value == nil {
|
||||
return
|
||||
}
|
||||
unpRcd := value.(*fstRecord)
|
||||
|
||||
dumpFilePath := path.Join(rdr.Config().ProcessedPath, unpRcd.fileName+utils.TmpSuffix)
|
||||
fileOut, err := os.Create(dumpFilePath)
|
||||
if err != nil {
|
||||
|
||||
@@ -177,7 +177,11 @@ func testFlatstoreITHandleCdr1File(t *testing.T) {
|
||||
// check the files to be processed
|
||||
filesInDir, _ := os.ReadDir("/tmp/flatstoreErs/in")
|
||||
if len(filesInDir) != 0 {
|
||||
t.Errorf("Files in ersInDir: %+v", filesInDir)
|
||||
fls := make([]string, len(filesInDir))
|
||||
for i, fs := range filesInDir {
|
||||
fls[i] = fs.Name()
|
||||
}
|
||||
t.Errorf("Files in ersInDir: %+v", fls)
|
||||
}
|
||||
filesOutDir, _ := os.ReadDir("/tmp/flatstoreErs/out")
|
||||
if len(filesOutDir) != 5 {
|
||||
@@ -218,6 +222,7 @@ func testFlatstoreITKillEngine(t *testing.T) {
|
||||
func TestFlatstoreProcessEvent(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
cfg.ERsCfg().Readers[0].Opts[utils.FstFailedCallsPrefixOpt] = "file"
|
||||
fltrs := &engine.FilterS{}
|
||||
filePath := "/tmp/TestFlatstoreProcessEvent/"
|
||||
fname := "file1.csv"
|
||||
@@ -228,7 +233,7 @@ func TestFlatstoreProcessEvent(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file.Write([]byte(",a,ToR,b,c,d,e,f,g,h,i,j,k,l"))
|
||||
file.Write([]byte("INVITE,a,ToR,b,c,d,e,f,g,h,i,j,k,l"))
|
||||
file.Close()
|
||||
eR := &FlatstoreER{
|
||||
cgrCfg: cfg,
|
||||
@@ -240,6 +245,7 @@ func TestFlatstoreProcessEvent(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
eR.cache = ltcache.NewCache(-1, 0, false, eR.dumpToFile)
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
@@ -253,7 +259,7 @@ func TestFlatstoreProcessEvent(t *testing.T) {
|
||||
utils.Subject: "h",
|
||||
utils.Tenant: "e",
|
||||
utils.ToR: "ToR",
|
||||
utils.Usage: "0",
|
||||
utils.Usage: "l",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
@@ -280,6 +286,15 @@ func TestFlatstoreProcessEvent(t *testing.T) {
|
||||
func TestFlatstoreProcessEvent2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
cfg.ERsCfg().Readers[0].Fields = append(cfg.ERsCfg().Readers[0].Fields, &config.FCTemplate{
|
||||
Tag: "Usage",
|
||||
Path: "*cgreq.Usage",
|
||||
Type: utils.MetaUsageDifference,
|
||||
Value: config.NewRSRParsersMustCompile("~*bye.6;~*invite.6", utils.InfieldSep),
|
||||
})
|
||||
for _, v := range cfg.ERsCfg().Readers[0].Fields {
|
||||
v.ComputePath()
|
||||
}
|
||||
fltrs := &engine.FilterS{}
|
||||
filePath := "/tmp/TestFlatstoreProcessEvent/"
|
||||
fname := "file1.csv"
|
||||
@@ -303,10 +318,10 @@ func TestFlatstoreProcessEvent2(t *testing.T) {
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"}
|
||||
record := []string{utils.FstBye, "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"}
|
||||
pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname}
|
||||
eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile)
|
||||
eR.cache.Set("baToR", pr, nil)
|
||||
eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil)
|
||||
eR.cache.Set(utils.ConcatenatedKey("baToR", utils.FstBye), pr, []string{"baToR"})
|
||||
expEvent := &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
Event: map[string]interface{}{
|
||||
@@ -320,7 +335,7 @@ func TestFlatstoreProcessEvent2(t *testing.T) {
|
||||
utils.Subject: "h",
|
||||
utils.Tenant: "2013-12-30T15:00:01Z",
|
||||
utils.ToR: "ToR",
|
||||
utils.Usage: "3600",
|
||||
utils.Usage: "1h0m0s",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
}
|
||||
@@ -383,60 +398,6 @@ func TestFlatstoreProcessEvent2CacheNotSet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//Test unsupported time format err while making the unpaired record.
|
||||
func TestFlatstoreProcessEvent2Error1(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.ERsCfg().Readers[0].ProcessedPath = ""
|
||||
fltrs := &engine.FilterS{}
|
||||
filePath := "/tmp/TestFlatstoreProcessEvent/"
|
||||
fname := "file1.csv"
|
||||
if err := os.MkdirAll(filePath, 0777); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
file, err := os.Create(path.Join(filePath, fname))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
//Create new logger
|
||||
utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
utils.Logger.SetLogLevel(7)
|
||||
buf := new(bytes.Buffer)
|
||||
log.SetOutput(buf)
|
||||
file.Write([]byte("INVITE,a,ToR,b,c,d,invalid_time,f,g,h,i,j,k,l"))
|
||||
file.Close()
|
||||
eR := &FlatstoreER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: 0,
|
||||
fltrS: fltrs,
|
||||
rdrDir: "/tmp/flatstoreErs/out",
|
||||
rdrEvents: make(chan *erEvent, 1),
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
}
|
||||
record := []string{utils.ByeCgr, "a", "ToR", "b", "c", "d", "invalid_time", "f", "g", "h", "i", "j", "k", "l"}
|
||||
pr := &fstRecord{method: utils.FstBye, values: record, fileName: fname}
|
||||
eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile)
|
||||
eR.cache.Set("baToR", pr, nil)
|
||||
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x"
|
||||
if err := eR.processFile(filePath, fname); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
errExpect := "[WARNING] <ERs> Converting row : <[INVITE a ToR b c d invalid_time f g h i j k l]> to unpairedRecord , ignoring due to error: <Unsupported time format>"
|
||||
if rcv := buf.String(); !strings.Contains(rcv, errExpect) {
|
||||
t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv)
|
||||
}
|
||||
if err := os.RemoveAll(filePath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
buf.Reset()
|
||||
}
|
||||
|
||||
//Test pairToRecord() error, where both methods of unpaired record object are INVITE
|
||||
func TestFlatstoreProcessEvent2Error2(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
@@ -475,13 +436,13 @@ func TestFlatstoreProcessEvent2Error2(t *testing.T) {
|
||||
record := []string{"INVITE", "a", "ToR", "b", "c", "d", "2013-12-30T16:00:01Z", "f", "g", "h", "i", "j", "k", "l"}
|
||||
pr := &fstRecord{method: utils.FstInvite, values: record, fileName: fname}
|
||||
eR.cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, eR.dumpToFile)
|
||||
eR.cache.Set("baToR", pr, nil)
|
||||
eR.cache.Set("baToR:INVITE", pr, []string{"baToR"})
|
||||
eR.conReqs <- struct{}{}
|
||||
eR.Config().Opts[utils.FstFailedCallsPrefixOpt] = "x"
|
||||
if err := eR.processFile(filePath, fname); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
errExpect := "[WARNING] <ERs> Merging unpairedRecords"
|
||||
errExpect := "[WARNING] <ERs> Overwriting the INVITE method for record <baToR>"
|
||||
if rcv := buf.String(); !strings.Contains(rcv, errExpect) {
|
||||
t.Errorf("\nExpected %v but \nreceived %v", errExpect, rcv)
|
||||
}
|
||||
@@ -506,7 +467,7 @@ func TestFlatstoreProcessEventError2(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage
|
||||
,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`))
|
||||
,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`))
|
||||
file.Close()
|
||||
eR := &FlatstoreER{
|
||||
cgrCfg: cfg,
|
||||
@@ -524,7 +485,7 @@ func TestFlatstoreProcessEventError2(t *testing.T) {
|
||||
{},
|
||||
}
|
||||
|
||||
errExpect := "unsupported type: <>"
|
||||
errExpect := `unsupported method: <"">`
|
||||
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
|
||||
t.Errorf("Expected %v but received %v", errExpect, err)
|
||||
}
|
||||
@@ -551,7 +512,7 @@ func TestFlatstoreProcessEventError3(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
file.Write([]byte(`#ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage
|
||||
,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`))
|
||||
BYE,,*voice,OriginCDR1,*prepaid,,cgrates.org,*call,1001,SUBJECT_TEST_1001,1002,2021-01-07 17:00:02 +0000 UTC,2021-01-07 17:00:04 +0000 UTC,1h2m`))
|
||||
file.Close()
|
||||
eR := &FlatstoreER{
|
||||
cgrCfg: cfg,
|
||||
@@ -562,9 +523,10 @@ func TestFlatstoreProcessEventError3(t *testing.T) {
|
||||
rdrError: make(chan error, 1),
|
||||
rdrExit: make(chan struct{}),
|
||||
conReqs: make(chan struct{}, 1),
|
||||
cache: ltcache.NewCache(-1, 0, false, nil),
|
||||
}
|
||||
eR.conReqs <- struct{}{}
|
||||
|
||||
eR.cache.Set("OriginCDR1*voice:INVITE", &fstRecord{method: utils.FstInvite, values: []string{}}, []string{"OriginCDR1*voice"})
|
||||
//
|
||||
eR.Config().Filters = []string{"Filter1"}
|
||||
errExpect := "NOT_FOUND:Filter1"
|
||||
@@ -573,6 +535,8 @@ func TestFlatstoreProcessEventError3(t *testing.T) {
|
||||
}
|
||||
|
||||
//
|
||||
eR.cache.Set("OriginCDR1*voice:INVITE", &fstRecord{method: utils.FstInvite, values: []string{}}, []string{"OriginCDR1*voice"})
|
||||
|
||||
eR.Config().Filters = []string{"*exists:~*req..Account:"}
|
||||
errExpect = "Invalid fieldPath [ Account]"
|
||||
if err := eR.processFile(filePath, fname); err == nil || err.Error() != errExpect {
|
||||
|
||||
@@ -2568,6 +2568,10 @@ const (
|
||||
FstBye = "BYE"
|
||||
FstAck = "ACK"
|
||||
|
||||
MetaInvite = "*invite"
|
||||
MetaBye = "*bye"
|
||||
MetaAck = "*ack"
|
||||
|
||||
FstFailedCallsPrefixOpt = "fstFailedCallsPrefix"
|
||||
FstPartialRecordCacheOpt = "fstRecordCacheTTL"
|
||||
FstMethodOpt = "fstMethod"
|
||||
@@ -2597,6 +2601,15 @@ const (
|
||||
KafkaMaxWait = "kafkaMaxWait"
|
||||
)
|
||||
|
||||
var (
|
||||
// FstMethodToPrfx used for flatstore to convert the method in DP prefix
|
||||
FstMethodToPrfx = map[string]string{
|
||||
FstInvite: MetaInvite,
|
||||
FstBye: MetaBye,
|
||||
FstAck: MetaAck,
|
||||
}
|
||||
)
|
||||
|
||||
// Analyzers constants
|
||||
const (
|
||||
MetaScorch = "*scorch"
|
||||
|
||||
Reference in New Issue
Block a user