Add *partial_csv type in eventReader ( almost complete)

This commit is contained in:
TeoV
2020-01-17 15:58:47 +02:00
parent 35caf1390f
commit c074fd2369
12 changed files with 694 additions and 22 deletions

View File

@@ -299,6 +299,16 @@ func (ar *AgentRequest) ParseField(
}
out = strconv.FormatFloat(utils.Round(val*math.Pow10(exp),
config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64)
case utils.MetaUnixTimestamp:
val, err := cfgFld.Value.ParseDataProvider(ar, utils.NestingSep)
if err != nil {
return nil, err
}
t, err := utils.ParseTimeDetectLayout(val, cfgFld.Timezone)
if err != nil {
return nil, err
}
out = strconv.Itoa(int(t.Unix()))
}
if err != nil &&

View File

@@ -304,7 +304,8 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
utils.MetaDispatchers, utils.MetaDispatcherHosts})
var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV})
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV,
utils.MetaPartialCSV})
func (cfg *CGRConfig) LazySanityCheck() {
for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports {

View File

@@ -434,7 +434,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
if rdr.RunDelay > 0 {
return fmt.Errorf("<%s> RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID)
}
case utils.MetaFileXML, utils.MetaFileFWV:
case utils.MetaFileXML, utils.MetaFileFWV, utils.MetaPartialCSV:
for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> Nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID)

View File

@@ -365,6 +365,32 @@ func (nM *NavigableMap) AsCGREvent(tnt string, pathSep string) (cgrEv *utils.CGR
return
}
// AsMapStringIface builds a linear map[string]interface{} with joined paths
// treats particular case when the value of map is []*NMItem - used in agents/AgentRequest
func (nM *NavigableMap) AsMapStringIface(pathSep string) (mp map[string]interface{}) {
if nM == nil || len(nM.data) == 0 {
return
}
mp = make(map[string]interface{})
if len(nM.order) == 0 {
indexMapPaths(nM.data, nil, &nM.order)
}
for _, branchPath := range nM.order {
val, _ := nM.FieldAsInterface(branchPath)
if nmItms, isNMItems := val.([]*NMItem); isNMItems { // special case when we have added multiple items inside a key, used in agents
for _, nmItm := range nmItms {
if nmItm.Config == nil ||
nmItm.Config.AttributeID == "" {
val = nmItm.Data // first item which is not an attribute will become the value
break
}
}
}
mp[strings.Join(branchPath, pathSep)] = val
}
return
}
// XMLElement is specially crafted to be automatically marshalled by encoding/xml
type XMLElement struct {
XMLName xml.Name

View File

@@ -255,6 +255,77 @@
{"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"},
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"},
],
},
{
"id": "PartialCSV1",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs1/in",
"flags": ["*cdrs"],
"processed_path": "/tmp/partErs1/out",
"partial_record_cache": "2s",
"partial_cache_expiry_action": "*dump_to_file",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},
],
"cache_dump_fields": [
{"tag": "OriginID", "type": "*composed", "value": "~OriginID"},
{"tag": "OrderID", "type": "*composed", "value": "~OrderID"},
{"tag": "RequestType", "type": "*composed", "value": "~RequestType"},
{"tag": "Account", "type": "*composed", "value": "~Account"},
{"tag": "Destination", "type": "*composed", "value": "~Destination"},
{"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "type": "*composed", "value": "~Usage"},
{"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5},
],
},
{
"id": "PartialCSV_PostExpiry",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs2/in",
"processed_path": "/tmp/partErs2/out",
"flags": ["*cdrs"],
"partial_record_cache": "1s",
"partial_cache_expiry_action": "*post_cdr",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}
],
}
],
},

View File

@@ -258,6 +258,77 @@
{"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"},
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"},
],
},
{
"id": "PartialCSV1",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs1/in",
"flags": ["*cdrs"],
"processed_path": "/tmp/partErs1/out",
"partial_record_cache": "2s",
"partial_cache_expiry_action": "*dump_to_file",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},
],
"cache_dump_fields": [
{"tag": "OriginID", "type": "*composed", "value": "~OriginID"},
{"tag": "OrderID", "type": "*composed", "value": "~OrderID"},
{"tag": "RequestType", "type": "*composed", "value": "~RequestType"},
{"tag": "Account", "type": "*composed", "value": "~Account"},
{"tag": "Destination", "type": "*composed", "value": "~Destination"},
{"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "type": "*composed", "value": "~Usage"},
{"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5},
],
},
{
"id": "PartialCSV_PostExpiry",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs2/in",
"processed_path": "/tmp/partErs2/out",
"flags": ["*cdrs"],
"partial_record_cache": "1s",
"partial_cache_expiry_action": "*post_cdr",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}
],
}
],
},

View File

@@ -255,6 +255,78 @@
{"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"},
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"},
],
},
{
"id": "PartialCSV1",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs1/in",
"flags": ["*cdrs"],
"processed_path": "/tmp/partErs1/out",
"partial_record_cache": "2s",
"partial_cache_expiry_action": "*dump_to_file",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},
],
"cache_dump_fields": [
{"tag": "CGRID", "type": "*composed", "value": "~CGRID"},
{"tag": "OriginID", "type": "*composed", "value": "~OriginID"},
{"tag": "OrderID", "type": "*composed", "value": "~OrderID"},
{"tag": "RequestType", "type": "*composed", "value": "~RequestType"},
{"tag": "Account", "type": "*composed", "value": "~Account"},
{"tag": "Destination", "type": "*composed", "value": "~Destination"},
{"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "type": "*composed", "value": "~Usage"},
{"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5},
],
},
{
"id": "PartialCSV_PostExpiry",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs2/in",
"processed_path": "/tmp/partErs2/out",
"flags": ["*cdrs"],
"partial_record_cache": "1s",
"partial_cache_expiry_action": "*post_cdr",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}
],
}
],
},

View File

@@ -252,6 +252,77 @@
{"tag": "NrOfCdrs", "type": "metatag", "metatag_id":"total_cdrs", "value": "~*req.142-150"},
{"tag": "TotalDuration", "type": "metatag", "metatag_id":"total_duration", "value": "~*req.150-162"},
],
},
{
"id": "PartialCSV1",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs1/in",
"flags": ["*cdrs"],
"processed_path": "/tmp/partErs1/out",
"partial_record_cache": "2s",
"partial_cache_expiry_action": "*dump_to_file",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},
],
"cache_dump_fields": [
{"tag": "OriginID", "type": "*composed", "value": "~OriginID"},
{"tag": "OrderID", "type": "*composed", "value": "~OrderID"},
{"tag": "RequestType", "type": "*composed", "value": "~RequestType"},
{"tag": "Account", "type": "*composed", "value": "~Account"},
{"tag": "Destination", "type": "*composed", "value": "~Destination"},
{"tag": "SetupTime", "type": "*composed", "value": "~SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "AnswerTime", "type": "*composed", "value": "~AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
{"tag": "Usage", "type": "*composed", "value": "~Usage"},
{"tag": "Cost", "type": "*composed", "value": "~Cost","rounding_decimals":5},
],
},
{
"id": "PartialCSV_PostExpiry",
"enabled": true,
"run_delay": -1,
"type": "*partial_csv",
"source_path": "/tmp/partErs2/in",
"processed_path": "/tmp/partErs2/out",
"flags": ["*cdrs"],
"partial_record_cache": "1s",
"partial_cache_expiry_action": "*post_cdr",
"content_fields":[
{"tag": "ToR", "field_id": "ToR", "type": "*constant", "value": "*voice", "mandatory": true},
{"tag": "AccId1", "field_id": "OriginID", "type": "*composed", "value": "~*req.0"},
{"tag": "AccId2", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId3", "field_id": "OriginID", "type": "*composed", "value": "~*req.1"},
{"tag": "AccId4", "field_id": "OriginID", "type": "*composed", "value": "_"},
{"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"},
{"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"},
{"tag": "OriginHost", "field_id": "OriginHost", "type": "*constant", "value": "127.0.0.1", "mandatory": true},
{"tag": "RequestType", "field_id": "RequestType", "type": "*constant", "value": "*rated", "mandatory": true},
{"tag": "Tenant", "field_id": "Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true},
{"tag": "Category", "field_id": "Category", "type": "*constant", "value": "call", "mandatory": true},
{"tag": "Account", "field_id": "Account", "type": "*variable", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true},
{"tag": "Destination", "field_id": "Destination", "type": "*variable", "value": "~*req.1:s/^00(\\d+)$/+$1/", "mandatory": true},
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true},
{"tag": "Usage", "field_id": "Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true},
{"tag": "Partial", "field_id": "Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}
],
}
],
},

51
engine/cgrsafev.go Normal file
View File

@@ -0,0 +1,51 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOev. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"time"
"github.com/cgrates/cgrates/utils"
)
func NewCGRSafEventFromCGREvent(cgrEv *utils.CGREvent) *CGRSafEvent {
return &CGRSafEvent{
Tenant: cgrEv.Tenant,
ID: cgrEv.ID,
Time: cgrEv.Time,
Event: NewSafEvent(cgrEv.Event),
}
}
// CGRSafEvent is a safe CGREvent
type CGRSafEvent struct {
Tenant string
ID string
Time *time.Time // event time
Event *SafEvent
}
func (cgrSafEv *CGRSafEvent) AsCGREvent() *utils.CGREvent {
return &utils.CGREvent{
Tenant: cgrSafEv.Tenant,
ID: cgrSafEv.ID,
Time: cgrSafEv.Time,
Event: cgrSafEv.Event.AsMapInterface(),
}
}

View File

@@ -254,3 +254,16 @@ func (se *SafEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err
se.RUnlock()
return
}
// AsCGREvent exports the SafEvent as CGREvent
func (se *SafEvent) AsCGREvent(tnt string) (cgrEv *utils.CGREvent) {
se.RLock()
cgrEv = &utils.CGREvent{
Tenant: tnt,
ID: utils.UUIDSha1Prefix(),
Time: utils.TimePointer(time.Now()),
Event: se.Me.Data(),
}
se.RUnlock()
return
}

View File

@@ -174,32 +174,47 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
utils.ERs, absPath, rowNr, err.Error()))
continue
}
safeEv := engine.NewSafEvent(navMp.AsCGREvent(agReq.Tenant, utils.NestingSep).Event)
// take OriginID field from NavigableMap
// take OriginID and OriginHost to compose CGRID
orgId, err := navMp.FieldAsString([]string{utils.OriginID})
if err == utils.ErrNotFound {
utils.Logger.Warning(
fmt.Sprintf("<%s> Missing <OriginID> field for row <%d> , <%s>",
utils.ERs, rowNr, record))
continue
}
// take Partial field from NavigableMap
partial, err := navMp.FieldAsString([]string{utils.Partial})
orgHost, err := navMp.FieldAsString([]string{utils.OriginHost})
if err == utils.ErrNotFound {
utils.Logger.Warning(
fmt.Sprintf("<%s> Missing <OriginHost> field for row <%d> , <%s>",
utils.ERs, rowNr, record))
continue
}
if val, has := rdr.cache.Get(orgId); !has {
rdr.cache.Set(orgId, navMp, nil)
if partial == "false" { // complete CDR remove it from cache
rdr.cache.Remove(orgId)
rdr.rdrEvents <- &erEvent{cgrEvent: val.(*config.NavigableMap).AsCGREvent(
rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.NestingSep),
cgrID := utils.Sha1(orgId, orgHost)
// take Partial field from NavigableMap
partial, _ := navMp.FieldAsString([]string{utils.Partial})
if val, has := rdr.cache.Get(cgrID); !has {
if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR
rdr.rdrEvents <- &erEvent{cgrEvent: navMp.AsCGREvent(agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config()}
evsPosted++
} else {
rdr.cache.Set(cgrID,
[]*engine.CGRSafEvent{engine.NewCGRSafEventFromCGREvent(navMp.AsCGREvent(agReq.Tenant, utils.NestingSep))}, nil)
}
} else {
originalNavMp := val.(*config.NavigableMap)
originalNavMp.Merge(navMp)
// overwrite the cache value with merged NavigableMap
rdr.cache.Set(orgId, originalNavMp, nil)
origCgrSafEvs := val.([]*engine.CGRSafEvent)
if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR
rdr.rdrEvents <- &erEvent{cgrEvent: origCgrSafEv.AsCGREvent(),
rdrCfg: rdr.Config()}
evsPosted++
rdr.cache.Remove(cgrID)
} else {
// overwrite the cache value with merged NavigableMap
rdr.cache.Set(cgrID, origCgrSafEv, nil)
}
}
}
@@ -222,13 +237,26 @@ const (
)
func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) {
nM := value.(*config.NavigableMap)
cgrSafEv := value.(*engine.CGRSafEvent)
// complete CDR are handling in processFile function
if partial, err := nM.FieldAsString([]string{utils.Partial}); err == nil && partial == "false" {
if partial, _ := cgrSafEv.Event.FieldAsString([]string{utils.Partial}); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) {
return
}
record := value.(*config.NavigableMap).AsExportedRecord()
dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s.%s.%d", itmID, PartialRecordsSuffix, time.Now().Unix()))
cdr, err := cgrSafEv.Event.AsCDR(nil, cgrSafEv.Tenant, rdr.Config().Timezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting Event : <%s> to cdr , ignoring due to error: <%s>",
utils.ERs, utils.ToJSON(cgrSafEv), err.Error()))
return
}
record, err := cdr.AsExportRecord(rdr.Config().CacheDumpFields, false, nil, rdr.fltrS)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>",
utils.ERs, cdr.CGRID, err.Error()))
return
}
dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s.%s.%d", cdr.OriginID, PartialRecordsSuffix, time.Now().Unix()))
fileOut, err := os.Create(dumpFilePath)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", utils.ERs, dumpFilePath, err.Error()))
@@ -245,9 +273,11 @@ func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) {
}
func (rdr *PartialCSVFileER) postCDR(itmID string, value interface{}) {
cgrSafEv := value.(*engine.CGRSafEvent)
// complete CDR are handling in processFile function
if partial, err := value.(*config.NavigableMap).FieldAsString([]string{utils.Partial}); err == nil && partial == "false" {
if partial, _ := cgrSafEv.Event.FieldAsString([]string{utils.Partial}); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) {
return
}
// how to post incomplete CDR
rdr.rdrEvents <- &erEvent{cgrEvent: cgrSafEv.AsCGREvent(), rdrCfg: rdr.Config()}
}

256
ers/partial_csv_it_test.go Normal file
View File

@@ -0,0 +1,256 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"fmt"
"io/ioutil"
"net/rpc"
"os"
"path"
"strings"
"testing"
"time"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
partCfgPath string
partCfgDIR string
partCfg *config.CGRConfig
partRPC *rpc.Client
partTests = []func(t *testing.T){
testPartITCreateCdrDirs,
testPartITInitConfig,
testPartITInitCdrDb,
testPartITResetDataDb,
testPartITStartEngine,
testPartITRpcConn,
testPartITLoadTPFromFolder,
testPartITHandleCdr1File,
testPartITHandleCdr2File,
testPartITHandleCdr3File,
testPartITVerifyFiles,
testPartITAnalyseCDRs,
//testPartITCleanupFiles,
testPartITKillEngine,
}
partCsvFileContent1 = `4986517174963,004986517174964,DE-National,04.07.2016 18:58:55,04.07.2016 18:58:55,1,65,Peak,0.014560,498651,partial
4986517174964,004986517174963,DE-National,04.07.2016 20:58:55,04.07.2016 20:58:55,0,74,Offpeak,0.003360,498651,complete
`
partCsvFileContent2 = `4986517174963,004986517174964,DE-National,04.07.2016 19:00:00,04.07.2016 18:58:55,0,15,Offpeak,0.003360,498651,partial`
partCsvFileContent3 = `4986517174964,004986517174960,DE-National,04.07.2016 19:05:55,04.07.2016 19:05:55,0,23,Offpeak,0.003360,498651,partial`
eCacheDumpFile1 = `4986517174963_004986517174964_04.07.2016 18:58:55,1467651535,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,1m5s,-1.00000
4986517174963_004986517174964_04.07.2016 18:58:55,1467651600,*rated,086517174963,+4986517174964,2016-07-04T18:58:55+02:00,2016-07-04T18:58:55+02:00,15s,-1.00000
`
)
func TestPartReadFile(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
partCfgDIR = "ers_internal"
case utils.MetaSQL:
partCfgDIR = "ers_mysql"
case utils.MetaMongo:
partCfgDIR = "ers_mongo"
case utils.MetaPostgres:
partCfgDIR = "ers_postgres"
default:
t.Fatal("Unknown Database type")
}
for _, test := range partTests {
t.Run(partCfgDIR, test)
}
}
func testPartITCreateCdrDirs(t *testing.T) {
for _, dir := range []string{"/tmp/ers/in", "/tmp/ers/out",
"/tmp/ers2/in", "/tmp/ers2/out", "/tmp/init_session/in", "/tmp/init_session/out",
"/tmp/terminate_session/in", "/tmp/terminate_session/out", "/tmp/cdrs/in",
"/tmp/cdrs/out", "/tmp/ers_with_filters/in", "/tmp/ers_with_filters/out",
"/tmp/xmlErs/in", "/tmp/xmlErs/out", "/tmp/fwvErs/in", "/tmp/fwvErs/out",
"/tmp/partErs1/in", "/tmp/partErs1/out", "/tmp/partErs2/in", "/tmp/partErs2/out"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
}
}
func testPartITInitConfig(t *testing.T) {
var err error
partCfgPath = path.Join(*dataDir, "conf", "samples", partCfgDIR)
if partCfg, err = config.NewCGRConfigFromPath(partCfgPath); err != nil {
fmt.Println(err)
t.Fatal("Got config error: ", err.Error())
}
}
// InitDb so we can rely on count
func testPartITInitCdrDb(t *testing.T) {
if err := engine.InitStorDb(partCfg); err != nil {
t.Fatal(err)
}
}
// Remove data in both rating and accounting db
func testPartITResetDataDb(t *testing.T) {
if err := engine.InitDataDb(partCfg); err != nil {
t.Fatal(err)
}
}
func testPartITStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(partCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testPartITRpcConn(t *testing.T) {
var err error
partRPC, err = newRPCClient(partCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testPartITLoadTPFromFolder(t *testing.T) {
//add a default charger
chargerProfile := &v1.ChargerWithCache{
ChargerProfile: &engine.ChargerProfile{
Tenant: "cgrates.org",
ID: "Default",
RunID: utils.MetaDefault,
AttributeIDs: []string{"*none"},
Weight: 20,
},
}
var result string
if err := partRPC.Call(utils.ApierV1SetChargerProfile, chargerProfile, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
// The default scenario, out of cdrc defined in .cfg file
func testPartITHandleCdr1File(t *testing.T) {
fileName := "file1.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent1), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs1/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
// The default scenario, out of cdrc defined in .cfg file
func testPartITHandleCdr2File(t *testing.T) {
fileName := "file2.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent2), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs1/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
}
// The default scenario, out of cdrc defined in .cfg file
func testPartITHandleCdr3File(t *testing.T) {
fileName := "file3.csv"
tmpFilePath := path.Join("/tmp", fileName)
if err := ioutil.WriteFile(tmpFilePath, []byte(partCsvFileContent3), 0644); err != nil {
t.Fatal(err.Error())
}
if err := os.Rename(tmpFilePath, path.Join("/tmp/partErs2/in", fileName)); err != nil {
t.Fatal("Error moving file to processing directory: ", err)
}
time.Sleep(3 * time.Second)
}
func testPartITVerifyFiles(t *testing.T) {
filesInDir, _ := ioutil.ReadDir("/tmp/partErs1/out/")
if len(filesInDir) == 0 {
t.Errorf("No files found in folder: <%s>", "/tmp/partErs1/out")
}
var fileName string
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
if strings.HasPrefix(file.Name(), "4986517174963_004986517174964") {
fileName = file.Name()
break
}
}
if contentCacheDump, err := ioutil.ReadFile(path.Join("/tmp/partErs1/out", fileName)); err != nil {
t.Error(err)
} else if len(eCacheDumpFile1) != len(string(contentCacheDump)) {
t.Errorf("Expecting: %q, \n received: %q", eCacheDumpFile1, string(contentCacheDump))
}
}
func testPartITAnalyseCDRs(t *testing.T) {
var reply []*engine.ExternalCDR
if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 2 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174963"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
if err := partRPC.Call(utils.ApierV2GetCDRs, utils.RPCCDRsFilter{DestinationPrefixes: []string{"+4986517174960"}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(reply) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(reply))
}
}
func testPartITCleanupFiles(t *testing.T) {
for _, dir := range []string{"/tmp/ers",
"/tmp/ers2", "/tmp/init_session", "/tmp/terminate_session",
"/tmp/cdrs", "/tmp/ers_with_filters", "/tmp/xmlErs", "/tmp/fwvErs",
"/tmp/partErs1", "/tmp/partErs2"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
}
}
func testPartITKillEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}