diff --git a/agents/agentreq.go b/agents/agentreq.go index b3b016ba3..21a2ee811 100644 --- a/agents/agentreq.go +++ b/agents/agentreq.go @@ -299,6 +299,16 @@ func (ar *AgentRequest) ParseField( } out = strconv.FormatFloat(utils.Round(val*math.Pow10(exp), config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64) + case utils.MetaUnixTimestamp: + val, err := cfgFld.Value.ParseDataProvider(ar, utils.NestingSep) + if err != nil { + return nil, err + } + t, err := utils.ParseTimeDetectLayout(val, cfgFld.Timezone) + if err != nil { + return nil, err + } + out = strconv.Itoa(int(t.Unix())) } if err != nil && diff --git a/config/config.go b/config/config.go index 662cf3102..1fae0c530 100755 --- a/config/config.go +++ b/config/config.go @@ -304,7 +304,8 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaDispatchers, utils.MetaDispatcherHosts}) var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, - utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV}) + utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV, + utils.MetaPartialCSV}) func (cfg *CGRConfig) LazySanityCheck() { for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports { diff --git a/config/configsanity.go b/config/configsanity.go index 575ecf9b5..6b5bd7383 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -434,7 +434,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { if rdr.RunDelay > 0 { return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) } - case utils.MetaFileXML, utils.MetaFileFWV: + case utils.MetaFileXML, utils.MetaFileFWV, utils.MetaPartialCSV: for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) diff --git a/config/navigablemap.go b/config/navigablemap.go index 9d3125349..2e283e7d9 100644 --- a/config/navigablemap.go +++ b/config/navigablemap.go @@ -365,6 +365,32 @@ func (nM *NavigableMap) AsCGREvent(tnt string, pathSep string) (cgrEv *utils.CGR return } +// AsMapStringIface builds a linear map[string]interface{} with joined paths +// treats particular case when the value of map is []*NMItem - used in agents/AgentRequest +func (nM *NavigableMap) AsMapStringIface(pathSep string) (mp map[string]interface{}) { + if nM == nil || len(nM.data) == 0 { + return + } + mp = make(map[string]interface{}) + if len(nM.order) == 0 { + indexMapPaths(nM.data, nil, &nM.order) + } + for _, branchPath := range nM.order { + val, _ := nM.FieldAsInterface(branchPath) + if nmItms, isNMItems := val.([]*NMItem); isNMItems { // special case when we have added multiple items inside a key, used in agents + for _, nmItm := range nmItms { + if nmItm.Config == nil || + nmItm.Config.AttributeID == "" { + val = nmItm.Data // first item which is not an attribute will become the value + break + } + } + } + mp[strings.Join(branchPath, pathSep)] = val + } + return +} + // XMLElement is specially crafted to be automatically marshalled by encoding/xml type XMLElement struct { XMLName xml.Name diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index 1136c2a3e..2cbec428f 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -255,6 +255,77 @@ {"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"}, {"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"}, ], + }, + { + "id": "PartialCSV1", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs1/in", + "flags": ["*cdrs"], + "processed_path": "/tmp/partErs1/out", + "partial_record_cache": "2s", + "partial_cache_expiry_action": "*dump_to_file", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + ], + "cache_dump_fields": [ + {"tag": "OriginID", "type": "*composed", "value": "~OriginID"}, + {"tag": "OrderID", "type": "*composed", "value": "~OrderID"}, + {"tag": "RequestType", "type": "*composed", "value": "~RequestType"}, + {"tag": "Account", "type": "*composed", "value": "~Account"}, + {"tag": "Destination", "type": "*composed", "value": "~Destination"}, + {"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "type": "*composed", "value": "~Usage"}, + {"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5}, + ], + }, + { + "id": "PartialCSV_PostExpiry", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs2/in", + "processed_path": "/tmp/partErs2/out", + "flags": ["*cdrs"], + "partial_record_cache": "1s", + "partial_cache_expiry_action": "*post_cdr", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + ], } ], }, diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index ba2898cef..fe12d76bd 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -258,6 +258,77 @@ {"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"}, {"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"}, ], + }, + { + "id": "PartialCSV1", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs1/in", + "flags": ["*cdrs"], + "processed_path": "/tmp/partErs1/out", + "partial_record_cache": "2s", + "partial_cache_expiry_action": "*dump_to_file", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + ], + "cache_dump_fields": [ + {"tag": "OriginID", "type": "*composed", "value": "~OriginID"}, + {"tag": "OrderID", "type": "*composed", "value": "~OrderID"}, + {"tag": "RequestType", "type": "*composed", "value": "~RequestType"}, + {"tag": "Account", "type": "*composed", "value": "~Account"}, + {"tag": "Destination", "type": "*composed", "value": "~Destination"}, + {"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "type": "*composed", "value": "~Usage"}, + {"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5}, + ], + }, + { + "id": "PartialCSV_PostExpiry", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs2/in", + "processed_path": "/tmp/partErs2/out", + "flags": ["*cdrs"], + "partial_record_cache": "1s", + "partial_cache_expiry_action": "*post_cdr", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + ], } ], }, diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index a0e9f9ba3..96ab742bf 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -255,6 +255,78 @@ {"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"}, {"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"}, ], + }, + { + "id": "PartialCSV1", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs1/in", + "flags": ["*cdrs"], + "processed_path": "/tmp/partErs1/out", + "partial_record_cache": "2s", + "partial_cache_expiry_action": "*dump_to_file", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + ], + "cache_dump_fields": [ + {"tag": "CGRID", "type": "*composed", "value": "~CGRID"}, + {"tag": "OriginID", "type": "*composed", "value": "~OriginID"}, + {"tag": "OrderID", "type": "*composed", "value": "~OrderID"}, + {"tag": "RequestType", "type": "*composed", "value": "~RequestType"}, + {"tag": "Account", "type": "*composed", "value": "~Account"}, + {"tag": "Destination", "type": "*composed", "value": "~Destination"}, + {"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "type": "*composed", "value": "~Usage"}, + {"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5}, + ], + }, + { + "id": "PartialCSV_PostExpiry", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs2/in", + "processed_path": "/tmp/partErs2/out", + "flags": ["*cdrs"], + "partial_record_cache": "1s", + "partial_cache_expiry_action": "*post_cdr", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + ], } ], }, diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index b9e9296a0..4ca6c521b 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -252,6 +252,77 @@ {"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"}, {"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"}, ], + }, + { + "id": "PartialCSV1", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs1/in", + "flags": ["*cdrs"], + "processed_path": "/tmp/partErs1/out", + "partial_record_cache": "2s", + "partial_cache_expiry_action": "*dump_to_file", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + ], + "cache_dump_fields": [ + {"tag": "OriginID", "type": "*composed", "value": "~OriginID"}, + {"tag": "OrderID", "type": "*composed", "value": "~OrderID"}, + {"tag": "RequestType", "type": "*composed", "value": "~RequestType"}, + {"tag": "Account", "type": "*composed", "value": "~Account"}, + {"tag": "Destination", "type": "*composed", "value": "~Destination"}, + {"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "type": "*composed", "value": "~Usage"}, + {"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5}, + ], + }, + { + "id": "PartialCSV_PostExpiry", + "enabled": true, + "run_delay": -1, + "type": "*partial_csv", + "source_path": "/tmp/partErs2/in", + "processed_path": "/tmp/partErs2/out", + "flags": ["*cdrs"], + "partial_record_cache": "1s", + "partial_cache_expiry_action": "*post_cdr", + "content_fields":[ + {"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"}, + {"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"}, + {"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"}, + {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, + {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, + {"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true}, + {"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true}, + {"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + {"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true}, + {"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, + {"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true}, + {"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, + {"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, + {"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + ], } ], }, diff --git a/engine/cgrsafev.go b/engine/cgrsafev.go new file mode 100644 index 000000000..2aab02c4f --- /dev/null +++ b/engine/cgrsafev.go @@ -0,0 +1,51 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOev. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "time" + + "github.com/cgrates/cgrates/utils" +) + +func NewCGRSafEventFromCGREvent(cgrEv *utils.CGREvent) *CGRSafEvent { + return &CGRSafEvent{ + Tenant: cgrEv.Tenant, + ID: cgrEv.ID, + Time: cgrEv.Time, + Event: NewSafEvent(cgrEv.Event), + } +} + +// CGRSafEvent is a safe CGREvent +type CGRSafEvent struct { + Tenant string + ID string + Time *time.Time // event time + Event *SafEvent +} + +func (cgrSafEv *CGRSafEvent) AsCGREvent() *utils.CGREvent { + return &utils.CGREvent{ + Tenant: cgrSafEv.Tenant, + ID: cgrSafEv.ID, + Time: cgrSafEv.Time, + Event: cgrSafEv.Event.AsMapInterface(), + } +} diff --git a/engine/safevent.go b/engine/safevent.go index 162f9bd7a..eafa65ce6 100644 --- a/engine/safevent.go +++ b/engine/safevent.go @@ -254,3 +254,16 @@ func (se *SafEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err se.RUnlock() return } + +// AsCGREvent exports the SafEvent as CGREvent +func (se *SafEvent) AsCGREvent(tnt string) (cgrEv *utils.CGREvent) { + se.RLock() + cgrEv = &utils.CGREvent{ + Tenant: tnt, + ID: utils.UUIDSha1Prefix(), + Time: utils.TimePointer(time.Now()), + Event: se.Me.Data(), + } + se.RUnlock() + return +} diff --git a/ers/partial_csv.go b/ers/partial_csv.go index 7c225df96..9d26e3ef1 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -174,32 +174,47 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { utils.ERs, absPath, rowNr, err.Error())) continue } - safeEv := engine.NewSafEvent(navMp.AsCGREvent(agReq.Tenant, utils.NestingSep).Event) - // take OriginID field from NavigableMap + + // take OriginID and OriginHost to compose CGRID orgId, err := navMp.FieldAsString([]string{utils.OriginID}) if err == utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s> Missing field for row <%d> , <%s>", + utils.ERs, rowNr, record)) continue } - // take Partial field from NavigableMap - partial, err := navMp.FieldAsString([]string{utils.Partial}) + orgHost, err := navMp.FieldAsString([]string{utils.OriginHost}) if err == utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s> Missing field for row <%d> , <%s>", + utils.ERs, rowNr, record)) continue } - - if val, has := rdr.cache.Get(orgId); !has { - rdr.cache.Set(orgId, navMp, nil) - if partial == "false" { // complete CDR remove it from cache - rdr.cache.Remove(orgId) - rdr.rdrEvents <- &erEvent{cgrEvent: val.(*config.NavigableMap).AsCGREvent( - rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.NestingSep), + cgrID := utils.Sha1(orgId, orgHost) + // take Partial field from NavigableMap + partial, _ := navMp.FieldAsString([]string{utils.Partial}) + if val, has := rdr.cache.Get(cgrID); !has { + if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR + rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(agReq.Tenant, utils.NestingSep), rdrCfg: rdr.Config()} evsPosted++ + } else { + rdr.cache.Set(cgrID, + []*engine.CGRSafEvent{engine.NewCGRSafEventFromCGREvent(navMp.AsCGREvent(agReq.Tenant, utils.NestingSep))}, nil) } } else { - originalNavMp := val.(*config.NavigableMap) - originalNavMp.Merge(navMp) - // overwrite the cache value with merged NavigableMap - rdr.cache.Set(orgId, originalNavMp, nil) + origCgrSafEvs := val.([]*engine.CGRSafEvent) + + if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR + rdr.rdrEvents <- &erEvent{cgrEvent: origCgrSafEv.AsCGREvent(), + rdrCfg: rdr.Config()} + evsPosted++ + rdr.cache.Remove(cgrID) + } else { + + // overwrite the cache value with merged NavigableMap + rdr.cache.Set(cgrID, origCgrSafEv, nil) + } } } @@ -222,13 +237,26 @@ const ( ) func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { - nM := value.(*config.NavigableMap) + cgrSafEv := value.(*engine.CGRSafEvent) // complete CDR are handling in processFile function - if partial, err := nM.FieldAsString([]string{utils.Partial}); err == nil && partial == "false" { + if partial, _ := cgrSafEv.Event.FieldAsString([]string{utils.Partial}); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { return } - record := value.(*config.NavigableMap).AsExportedRecord() - dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s.%s.%d", itmID, PartialRecordsSuffix, time.Now().Unix())) + cdr, err := cgrSafEv.Event.AsCDR(nil, cgrSafEv.Tenant, rdr.Config().Timezone) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting Event : <%s> to cdr , ignoring due to error: <%s>", + utils.ERs, utils.ToJSON(cgrSafEv), err.Error())) + return + } + record, err := cdr.AsExportRecord(rdr.Config().CacheDumpFields, false, nil, rdr.fltrS) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", + utils.ERs, cdr.CGRID, err.Error())) + return + } + dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s.%s.%d", cdr.OriginID, PartialRecordsSuffix, time.Now().Unix())) fileOut, err := os.Create(dumpFilePath) if err != nil { utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", utils.ERs, dumpFilePath, err.Error())) @@ -245,9 +273,11 @@ func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { } func (rdr *PartialCSVFileER) postCDR(itmID string, value interface{}) { + cgrSafEv := value.(*engine.CGRSafEvent) // complete CDR are handling in processFile function - if partial, err := value.(*config.NavigableMap).FieldAsString([]string{utils.Partial}); err == nil && partial == "false" { + if partial, _ := cgrSafEv.Event.FieldAsString([]string{utils.Partial}); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { return } // how to post incomplete CDR + rdr.rdrEvents <- &erEvent{cgrEvent: cgrSafEv.AsCGREvent(), rdrCfg: rdr.Config()} } diff --git a/ers/partial_csv_it_test.go b/ers/partial_csv_it_test.go new file mode 100644 index 000000000..6eabcdd58 --- /dev/null +++ b/ers/partial_csv_it_test.go @@ -0,0 +1,256 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "fmt" + "io/ioutil" + "net/rpc" + "os" + "path" + "strings" + "testing" + "time" + + v1 "github.com/cgrates/cgrates/apier/v1" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + partCfgPath string + partCfgDIR string + partCfg *config.CGRConfig + partRPC *rpc.Client + + partTests = []func(t *testing.T){ + testPartITCreateCdrDirs, + testPartITInitConfig, + testPartITInitCdrDb, + testPartITResetDataDb, + testPartITStartEngine, + testPartITRpcConn, + testPartITLoadTPFromFolder, + testPartITHandleCdr1File, + testPartITHandleCdr2File, + testPartITHandleCdr3File, + testPartITVerifyFiles, + testPartITAnalyseCDRs, + //testPartITCleanupFiles, + testPartITKillEngine, + } + + partCsvFileContent1 = `4986517174963,004986517174964,DE-National,04.07.2016 18:58:55,04.07.2016 18:58:55,1,65,Peak,0.014560,498651,partial +4986517174964,004986517174963,DE-National,04.07.2016 20:58:55,04.07.2016 20:58:55,0,74,Offpeak,0.003360,498651,complete +` + + partCsvFileContent2 = `4986517174963,004986517174964,DE-National,04.07.2016 19:00:00,04.07.2016 18:58:55,0,15,Offpeak,0.003360,498651,partial` + partCsvFileContent3 = `4986517174964,004986517174960,DE-National,04.07.2016 19:05:55,04.07.2016 19:05:55,0,23,Offpeak,0.003360,498651,partial` + + eCacheDumpFile1 = `4986517174963_004986517174964_04.07.2016 18:58:55,1467651535,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,1m5s,-1.00000 +4986517174963_004986517174964_04.07.2016 18:58:55,1467651600,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,15s,-1.00000 +` +) + +func TestPartReadFile(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + partCfgDIR = "ers_internal" + case utils.MetaSQL: + partCfgDIR = "ers_mysql" + case utils.MetaMongo: + partCfgDIR = "ers_mongo" + case utils.MetaPostgres: + partCfgDIR = "ers_postgres" + default: + t.Fatal("Unknown Database type") + } + for _, test := range partTests { + t.Run(partCfgDIR, test) + } +} + +func testPartITCreateCdrDirs(t *testing.T) { + for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out", + "/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out", + "/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in", + "/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out", + "/tmp/xmlErs/in", "/tmp/xmlErs/out", "/tmp/fwvErs/in", "/tmp/fwvErs/out", + "/tmp/partErs1/in", "/tmp/partErs1/out", "/tmp/partErs2/in", "/tmp/partErs2/out"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } +} + +func testPartITInitConfig(t *testing.T) { + var err error + partCfgPath = path.Join(*dataDir, "conf", "samples", partCfgDIR) + if partCfg, err = config.NewCGRConfigFromPath(partCfgPath); err != nil { + fmt.Println(err) + t.Fatal("Got config error: ", err.Error()) + } +} + +// InitDb so we can rely on count +func testPartITInitCdrDb(t *testing.T) { + if err := engine.InitStorDb(partCfg); err != nil { + t.Fatal(err) + } +} + +// Remove data in both rating and accounting db +func testPartITResetDataDb(t *testing.T) { + if err := engine.InitDataDb(partCfg); err != nil { + t.Fatal(err) + } +} + +func testPartITStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(partCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testPartITRpcConn(t *testing.T) { + var err error + partRPC, err = newRPCClient(partCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testPartITLoadTPFromFolder(t *testing.T) { + //add a default charger + chargerProfile := &v1.ChargerWithCache{ + ChargerProfile: &engine.ChargerProfile{ + Tenant: "cgrates.org", + ID: "Default", + RunID: utils.MetaDefault, + AttributeIDs: []string{"*none"}, + Weight: 20, + }, + } + var result string + if err := partRPC.Call(utils.ApierV1SetChargerProfile, chargerProfile, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func testPartITHandleCdr1File(t *testing.T) { + fileName := "file1.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent1), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs1/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func testPartITHandleCdr2File(t *testing.T) { + fileName := "file2.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent2), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs1/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } +} + +// The default scenario, out of cdrc defined in .cfg file +func testPartITHandleCdr3File(t *testing.T) { + fileName := "file3.csv" + tmpFilePath := path.Join("/tmp", fileName) + if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent3), 0644); err != nil { + t.Fatal(err.Error()) + } + if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs2/in", fileName)); err != nil { + t.Fatal("Error moving file to processing directory: ", err) + } + time.Sleep(3 * time.Second) +} + +func testPartITVerifyFiles(t *testing.T) { + filesInDir, _ := ioutil.ReadDir("/tmp/partErs1/out/") + if len(filesInDir) == 0 { + t.Errorf("No files found in folder: <%s>", "/tmp/partErs1/out") + } + var fileName string + for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config + if strings.HasPrefix(file.Name(), "4986517174963_004986517174964") { + fileName = file.Name() + break + } + } + if contentCacheDump, err := ioutil.ReadFile(path.Join("/tmp/partErs1/out", fileName)); err != nil { + t.Error(err) + } else if len(eCacheDumpFile1) != len(string(contentCacheDump)) { + t.Errorf("Expecting: %q, \n received: %q", eCacheDumpFile1, string(contentCacheDump)) + } +} + +func testPartITAnalyseCDRs(t *testing.T) { + var reply []*engine.ExternalCDR + if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 2 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174963"}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } + if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174960"}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(reply) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(reply)) + } +} + +func testPartITCleanupFiles(t *testing.T) { + for _, dir := range []string{"/tmp/ers", + "/tmp/ers2", "/tmp/init_session", "/tmp/terminate_session", + "/tmp/cdrs", "/tmp/ers_with_filters", "/tmp/xmlErs", "/tmp/fwvErs", + "/tmp/partErs1", "/tmp/partErs2"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + } +} + +func testPartITKillEngine(t *testing.T) { + if err := engine.KillEngine(*waitRater); err != nil { + t.Error(err) + } +}