diff --git a/agents/libdiam.go b/agents/libdiam.go index 41b074d28..54207246d 100644 --- a/agents/libdiam.go +++ b/agents/libdiam.go @@ -199,10 +199,10 @@ func headerLen(a *diam.AVP) int { return 8 } -func updateAVPLenght(avps []*diam.AVP) (l int) { +func updateAVPLength(avps []*diam.AVP) (l int) { for _, avp := range avps { if v, ok := (avp.Data).(*diam.GroupedAVP); ok { - avp.Length = headerLen(avp) + updateAVPLenght(v.AVP) + avp.Length = headerLen(avp) + updateAVPLength(v.AVP) } l += avp.Length } diff --git a/config/config.go b/config/config.go index 095977fad..8054643a3 100644 --- a/config/config.go +++ b/config/config.go @@ -320,7 +320,7 @@ var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV, - utils.MetaPartialCSV, utils.MetaFlatstore, utils.MetaFileJSON, utils.MetaNone}) + utils.MetaFileJSON, utils.MetaNone}) var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaNone, utils.MetaFileFWV, utils.MetaHTTPPost, utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, diff --git a/config/config_defaults.go b/config/config_defaults.go index 470b40cac..976982cf6 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -354,9 +354,12 @@ const CGRATES_CFG_JSON = ` }, -"ers": { // EventReaderService - "enabled": false, // starts the EventReader service: - "sessions_conns":["*internal"], // RPC Connections IDs +"ers": { // EventReaderService + "enabled": false, // starts the EventReader service: + "sessions_conns":["*internal"], // RPC Connections IDs + "partial_cache_ttl": "1s", // the duration to cache partial records when not pairing + "partial_cache_action": "*post_cdr", // the action that will be exeuted for the partial CSVs that are not matched<*post_cdr|*dump_to_file> + // "partial_path": "/var/spool/cgrates/ers/partial", // the path were the partial events will be sent "readers": [ { "id": "*default", // identifier of the EventReader profile @@ -366,26 +369,18 @@ const CGRATES_CFG_JSON = ` "source_path": "/var/spool/cgrates/ers/in", // read data from this path "processed_path": "/var/spool/cgrates/ers/out", // move processed data here "opts": { - // FileCSV and PartialCSV + // Partial + // "partialPath": "/", // the path were the partial events will be sent + // "partialCacheAction": "*post_cdr", // the action that will be exeuted for the partial CSVs that are not matched<*post_cdr|*dump_to_file> + "partialOrderField": "~*req.AnswerTime", // the field after what the events are order when merged + // "partialcsvFieldSeparator": "," // separator used when dumping the fields + + // FileCSV "csvRowLength": 0, // Number of fields from csv file "csvFieldSeparator": ",", // separator used when reading the fields "csvHeaderDefineChar": ":", // the starting character for header definition used in case of CSV files // "csvLazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field - // PartialCSV - "csvCacheExpiryAction": "*post_cdr", // the action that will be exeuted for the partial CSVs that are not matched<*post_cdr|*dump_to_file> - // "csvRecordCacheTTL": "1s" // Duration to cache partial records when not pairing - - // FlatStore - "fstRowLength": 0, // Number of fields from csv file - "fstFieldSeparator": ",", // separator used when reading the fields - // "fstFailedCallsPrefix": "" // Used in case of flatstore CDRs to avoid searching for BYE records - // "fstRecordCacheTTL": "1s" // Duration to cache partial records when not pairing - // "fstLazyQuotes": false, // if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field - "fstMethod": "~*req.0", // the rsr parser that will determine the method of the current record - "fstOriginID": "~*req.3;~*req.1;~*req.2", // the rsr parser that will determine the originID of the current record - "fstMadatoryACK": false, // if we should receive the ACK before processing the record - // FileXML "xmlRootPath": "", // path towards one event in case of XML CDRs @@ -459,6 +454,7 @@ const CGRATES_CFG_JSON = ` {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.12", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.13", "mandatory": true}, ], + "partial_commit_fields": [], "cache_dump_fields": [], }, ], @@ -524,7 +520,7 @@ const CGRATES_CFG_JSON = ` // "awsKey": "", // AWSKey // "awsSecret": "", // AWSSecret // "awsToken": "", // AWSToken - // "s3FolderPath": "", // AWSFolderPath + // "s3FolderPath": "", // S3FolderPath }, // extra options for exporter "tenant": "", // tenant used in filterS.Pass diff --git a/config/configsanity.go b/config/configsanity.go index d5646a574..4c5811734 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -695,8 +695,12 @@ func (cfg *CGRConfig) checkConfigSanity() error { } switch rdr.Type { - case utils.MetaFileCSV, utils.MetaPartialCSV: - for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { + case utils.MetaFileCSV: + paths := []string{rdr.ProcessedPath, rdr.SourcePath} + if rdr.ProcessedPath == utils.EmptyString { + paths = []string{rdr.SourcePath} + } + for _, dir := range paths { if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) } @@ -715,42 +719,6 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.CSV+utils.LazyQuotes, err.Error(), rdr.ID) } } - if rdr.Type == utils.MetaPartialCSV { - if act, has := rdr.Opts[utils.PartialCSVCacheExpiryActionOpt]; has && (utils.IfaceAsString(act) != utils.MetaDumpToFile && - utils.IfaceAsString(act) != utils.MetaPostCDR) { - return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID) - } - if ttl, has := rdr.Opts[utils.PartialCSVRecordCacheOpt]; has { - if _, err := utils.IfaceAsDuration(ttl); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.PartialCSVRecordCacheOpt, err.Error(), rdr.ID) - } - } - } - case utils.MetaFlatstore: - for _, dir := range []string{rdr.ProcessedPath, rdr.SourcePath} { - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.ERs, dir, rdr.ID) - } - } - if fldSep, has := rdr.Opts[utils.FlatstorePrfx+utils.FieldSepOpt]; has && - utils.IfaceAsString(fldSep) == utils.EmptyString { - return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.FlatstorePrfx+utils.FieldSepOpt, rdr.ID) - } - if rowl, has := rdr.Opts[utils.FlatstorePrfx+utils.RowLengthOpt]; has { - if _, err := utils.IfaceAsTInt64(rowl); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.FlatstorePrfx+utils.RowLengthOpt, err.Error(), rdr.ID) - } - } - if lq, has := rdr.Opts[utils.FlatstorePrfx+utils.LazyQuotes]; has { - if _, err := utils.IfaceAsBool(lq); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.FlatstorePrfx+utils.LazyQuotes, err.Error(), rdr.ID) - } - } - if ttl, has := rdr.Opts[utils.FstPartialRecordCacheOpt]; has { - if _, err := utils.IfaceAsDuration(ttl); err != nil { - return fmt.Errorf("<%s> error when converting %s: <%s> for reader with ID: %s", utils.ERs, utils.FstPartialRecordCacheOpt, err.Error(), rdr.ID) - } - } case utils.MetaKafkajsonMap: if rdr.RunDelay > 0 { return fmt.Errorf("<%s> the RunDelay field can not be bigger than zero for reader with ID: %s", utils.ERs, rdr.ID) diff --git a/config/erscfg.go b/config/erscfg.go index 2d9845dfd..7df044283 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -26,9 +26,12 @@ import ( // ERsCfg the config for ERs type ERsCfg struct { - Enabled bool - SessionSConns []string - Readers []*EventReaderCfg + Enabled bool + SessionSConns []string + Readers []*EventReaderCfg + PartialCacheTTL time.Duration + PartialCacheAction string + PartialPath string } func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][]*FCTemplate, sep string, dfltRdrCfg *EventReaderCfg, separator string) (err error) { @@ -48,6 +51,17 @@ func (erS *ERsCfg) loadFromJSONCfg(jsnCfg *ERsJsonCfg, msgTemplates map[string][ } } } + if jsnCfg.Partial_cache_ttl != nil { + if erS.PartialCacheTTL, err = utils.ParseDurationWithNanosecs(*jsnCfg.Partial_cache_ttl); err != nil { + return + } + } + if jsnCfg.Partial_cache_action != nil { + erS.PartialCacheAction = *jsnCfg.Partial_cache_action + } + if jsnCfg.Partial_path != nil { + erS.PartialPath = *jsnCfg.Partial_path + } return erS.appendERsReaders(jsnCfg.Readers, msgTemplates, sep, dfltRdrCfg) } @@ -126,19 +140,20 @@ func (erS *ERsCfg) AsMapInterface(separator string) (initialMP map[string]interf // EventReaderCfg the event for the Event Reader type EventReaderCfg struct { - ID string - Type string - RunDelay time.Duration - ConcurrentReqs int - SourcePath string - ProcessedPath string - Opts map[string]interface{} - Tenant RSRParsers - Timezone string - Filters []string - Flags utils.FlagsWithParams - Fields []*FCTemplate - CacheDumpFields []*FCTemplate + ID string + Type string + RunDelay time.Duration + ConcurrentReqs int + SourcePath string + ProcessedPath string + Opts map[string]interface{} + Tenant RSRParsers + Timezone string + Filters []string + Flags utils.FlagsWithParams + Fields []*FCTemplate + PartialCommitFields []*FCTemplate + CacheDumpFields []*FCTemplate } func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplates map[string][]*FCTemplate, sep string) (err error) { @@ -202,6 +217,16 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat er.CacheDumpFields = tpls } } + if jsnCfg.Partial_commit_fields != nil { + if er.PartialCommitFields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Partial_commit_fields, sep); err != nil { + return err + } + if tpls, err := InflateTemplates(er.PartialCommitFields, msgTemplates); err != nil { + return err + } else if tpls != nil { + er.PartialCommitFields = tpls + } + } if jsnCfg.Opts != nil { for k, v := range jsnCfg.Opts { er.Opts[k] = v @@ -242,6 +267,12 @@ func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln.CacheDumpFields[idx] = fld.Clone() } } + if er.PartialCommitFields != nil { + cln.PartialCommitFields = make([]*FCTemplate, len(er.PartialCommitFields)) + for idx, fld := range er.PartialCommitFields { + cln.PartialCommitFields[idx] = fld.Clone() + } + } for k, v := range er.Opts { cln.Opts[k] = v } @@ -287,6 +318,13 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string } initialMP[utils.CacheDumpFieldsCfg] = cacheDumpFields } + if er.PartialCommitFields != nil { + parCFields := make([]map[string]interface{}, len(er.PartialCommitFields)) + for i, item := range er.PartialCommitFields { + parCFields[i] = item.AsMapInterface(separator) + } + initialMP[utils.PartialCommitFieldsCfg] = parCFields + } if er.RunDelay > 0 { initialMP[utils.RunDelayCfg] = er.RunDelay.String() diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 04f346d97..1e4e8c1bc 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -156,26 +156,30 @@ type CdrsJsonCfg struct { // EventReaderSJsonCfg contains the configuration of EventReaderService type ERsJsonCfg struct { - Enabled *bool - Sessions_conns *[]string - Readers *[]*EventReaderJsonCfg + Enabled *bool + Sessions_conns *[]string + Readers *[]*EventReaderJsonCfg + Partial_cache_ttl *string + Partial_cache_action *string + Partial_path *string } // EventReaderSJsonCfg is the configuration of a single EventReader type EventReaderJsonCfg struct { - Id *string - Type *string - Run_delay *string - Concurrent_requests *int - Source_path *string - Processed_path *string - Opts map[string]interface{} - Tenant *string - Timezone *string - Filters *[]string - Flags *[]string - Fields *[]*FcTemplateJsonCfg - Cache_dump_fields *[]*FcTemplateJsonCfg + Id *string + Type *string + Run_delay *string + Concurrent_requests *int + Source_path *string + Processed_path *string + Opts map[string]interface{} + Tenant *string + Timezone *string + Filters *[]string + Flags *[]string + Fields *[]*FcTemplateJsonCfg + Partial_commit_fields *[]*FcTemplateJsonCfg + Cache_dump_fields *[]*FcTemplateJsonCfg } // EEsJsonCfg contains the configuration of EventExporterService diff --git a/data/conf/samples/ers_internal/cgrates.json b/data/conf/samples/ers_internal/cgrates.json index ddb16acf5..c218f8515 100644 --- a/data/conf/samples/ers_internal/cgrates.json +++ b/data/conf/samples/ers_internal/cgrates.json @@ -78,6 +78,7 @@ "ers": { "enabled": true, "sessions_conns": ["*internal"], + "partial_cache_ttl": "500ms", "readers": [ { "id": "file_reader1", @@ -260,13 +261,14 @@ "id": "PartialCSV1", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs1/in", "flags": ["*cdrs"], - "processed_path": "/tmp/partErs1/out", + "processed_path": "", "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*dump_to_file", + "partialOrderField": "~*req.AnswerTime", + "partialPath": "/tmp/partErs1/out", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -285,7 +287,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]}, ], "cache_dump_fields": [ {"tag": "OriginID", "path":"*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, @@ -303,13 +307,14 @@ "id": "PartialCSV_PostExpiry", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs2/in", - "processed_path": "/tmp/partErs2/out", + "processed_path": "", "flags": ["*cdrs"], "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*post_cdr", + "partialOrderField": "~*req.AnswerTime", + "partialPath": "/tmp/partErs2/out", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -328,38 +333,66 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]} ], }, { "id": "FlatstoreOsips", "enabled": true, "run_delay": "-1", - "type": "*flatstore", + "type": "*file_csv", "opts": { - "fstFieldSeparator":"|", - "fstFailedCallsPrefix": "missed_calls", - "fstRecordCacheTTL": "500ms", + "csvFieldSeparator":"|", + "partialcsvFieldSeparator": "|", + "partialCacheAction": "*dump_to_file", + "partialOrderField": "~*opts.order", + "partialPath": "/tmp/flatstoreErs/out", }, "source_path": "/tmp/flatstoreErs/in", "processed_path": "/tmp/flatstoreErs/out", "flags": ["*cdrs"], "fields":[ - {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + {"tag": "OriginHost", "path": "*cgreq.OriginHost", "type": "*constant","value":"flatStore", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.3;~*req.1;~*req.2", "mandatory": true}, - {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*invite.7", "mandatory": true}, - {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, - {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, - {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*invite.8", "mandatory": true}, - {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*invite.8", "mandatory": true}, - {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*invite.9", "mandatory": true}, - {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, - {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*invite.6", "mandatory": true}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, - {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*bye.6;~*invite.6", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls"]}, + {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value": "~*req.7", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value": "~*req.8", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value": "~*req.8", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value": "~*req.9", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.6", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.6", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + {"tag": "EndTime", "path": "*cgreq.EndTime", "type": "*variable","value": "~*req.6", "mandatory": true, "filters": ["*string:~*req.0:BYE"]}, {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value": "~*req.4; ;~*req.5", "mandatory": true}, - {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"} + {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value": "~*req.11"}, + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "true"}, + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "false","filters": ["*prefix:~*vars.FileName:missed_calls"]}, + {"tag": "Invite", "path": "*opts.invite", "type": "*constant", "value": "true", "filters":["*string:~*req.0:INVITE"]}, + {"tag": "Bye", "path": "*opts.bye", "type": "*constant", "value": "true", "filters":["*string:~*req.0:BYE"]}, + {"tag": "Order", "path": "*opts.order", "type": "*constant", "value": "0", "filters":["*string:~*req.0:INVITE"]}, + {"tag": "Order", "path": "*opts.order", "type": "*constant", "value": "1", "filters":["*string:~*req.0:BYE"]}, ], + "partial_commit_fields": [ + // {"tag": "Tor", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, + // {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable","value":"~*req.OriginID", "mandatory": true}, + // {"tag": "RequestType", "path": "*cgreq.RequestType", "type": "*variable", "value":"~*req.RequestType", "mandatory": true,"filters": ["*string:~*req.0:INVITE"]}, + // {"tag": "Tenant", "path": "*cgreq.Tenant", "type": "*constant", "value": "cgrates.org", "mandatory": true}, + // {"tag": "Category", "path": "*cgreq.Category", "type": "*constant", "value": "call", "mandatory": true}, + // {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", "value":"~*req.Account", "mandatory": true}, + // {"tag": "Subject", "path": "*cgreq.Subject", "type": "*variable", "value":"~*req.Subject", "mandatory": true}, + // {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", "value":"~*req.Destination" , "mandatory": true}, + // {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value":"~*req.SetupTime" , "mandatory": true}, + // {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value":"~*req.AnswerTime" , "mandatory": true}, + // {"tag": "EndTime", "path": "*cgreq.EndTime", "type": "*variable","value": "~*req.EndTime"}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*constant","value": "0", "mandatory": true, "filters": ["*prefix:~*vars.FileName:missed_calls"]}, + {"tag": "Usage", "path": "*cgreq.Usage", "type": "*usage_difference","value": "~*req.EndTime;~*req.AnswerTime", "mandatory": true, "filters": ["*notprefix:~*vars.FileName:missed_calls","*exists:~*opts.invite:","*exists:~*opts.bye:"]}, + // {"tag": "DisconnectCause", "path": "*cgreq.DisconnectCause", "type": "*variable", "value":"~*req.DisconnectCause", "mandatory": true}, + // {"tag": "DialogId", "path": "*cgreq.DialogId", "type": "*variable", "value":"~*req.DialogId"}, + {"tag": "Partial", "path": "*opts.*partial", "type": "*constant", "value": "false","filters": ["*exists:~*opts.invite:","*exists:~*opts.bye:"]}, + ], + "cache_dump_fields": [ + ], + }, { "id": "JSONReader", diff --git a/data/conf/samples/ers_mongo/cgrates.json b/data/conf/samples/ers_mongo/cgrates.json index dc462a86a..e4972d42a 100644 --- a/data/conf/samples/ers_mongo/cgrates.json +++ b/data/conf/samples/ers_mongo/cgrates.json @@ -261,13 +261,13 @@ "id": "PartialCSV1", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs1/in", "flags": ["*cdrs"], "processed_path": "/tmp/partErs1/out", "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*dump_to_file", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -286,7 +286,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]}, ], "cache_dump_fields": [ {"tag": "OriginID", "path":"*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, @@ -304,13 +306,13 @@ "id": "PartialCSV_PostExpiry", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs2/in", "processed_path": "/tmp/partErs2/out", "flags": ["*cdrs"], "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*post_cdr", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -329,7 +331,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]} ], }, { diff --git a/data/conf/samples/ers_mysql/cgrates.json b/data/conf/samples/ers_mysql/cgrates.json index df0497f1c..10f2e6bdc 100644 --- a/data/conf/samples/ers_mysql/cgrates.json +++ b/data/conf/samples/ers_mysql/cgrates.json @@ -258,13 +258,13 @@ "id": "PartialCSV1", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs1/in", "flags": ["*cdrs"], "processed_path": "/tmp/partErs1/out", "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*dump_to_file", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -283,7 +283,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]}, ], "cache_dump_fields": [ {"tag": "OriginID", "path":"*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, @@ -301,13 +303,13 @@ "id": "PartialCSV_PostExpiry", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs2/in", "processed_path": "/tmp/partErs2/out", "flags": ["*cdrs"], "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*post_cdr", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -326,7 +328,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]} ], }, { diff --git a/data/conf/samples/ers_postgres/cgrates.json b/data/conf/samples/ers_postgres/cgrates.json index 93289da56..29033304d 100644 --- a/data/conf/samples/ers_postgres/cgrates.json +++ b/data/conf/samples/ers_postgres/cgrates.json @@ -255,13 +255,13 @@ "id": "PartialCSV1", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs1/in", "flags": ["*cdrs"], "processed_path": "/tmp/partErs1/out", "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*dump_to_file", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -280,7 +280,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]}, ], "cache_dump_fields": [ {"tag": "OriginID", "path":"*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"}, @@ -298,13 +300,13 @@ "id": "PartialCSV_PostExpiry", "enabled": true, "run_delay": "-1", - "type": "*partial_csv", + "type": "*file_csv", "source_path": "/tmp/partErs2/in", "processed_path": "/tmp/partErs2/out", "flags": ["*cdrs"], "opts": { - "csvRecordCacheTTL": "500ms", "csvCacheExpiryAction": "*post_cdr", + "partialOrderField": "~*req.AnswerTime", }, "fields":[ {"tag": "ToR", "path": "*cgreq.ToR", "type": "*constant", "value": "*voice", "mandatory": true}, @@ -323,7 +325,9 @@ {"tag": "SetupTime", "path": "*cgreq.SetupTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "AnswerTime", "path": "*cgreq.AnswerTime", "type": "*variable", "value": "~*req.4", "mandatory": true}, {"tag": "Usage", "path": "*cgreq.Usage", "type": "*variable", "value": "~*req.6:s/^(\\d+)$/${1}s/", "mandatory": true}, - {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]} + {"tag": "Partial", "path": "*cgreq.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]},// keep this here for partial cdr field + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "true", "filters":["*string:~*req.10:partial"]}, + {"tag": "Partial", "path": "*opts.Partial", "type": "*constant", "value": "false", "filters":["*notstring:~*req.10:partial"]} ], }, { diff --git a/docs/ers.rst b/docs/ers.rst index 5f1240f36..321addf2c 100644 --- a/docs/ers.rst +++ b/docs/ers.rst @@ -134,12 +134,6 @@ type **\*file_csv** Reader for *comma separated* files. - **\*partial_csv** - Reader for *comma separated* where content spans over multiple files. - - **\*flatstore** - Reader for Kamailio_/OpenSIPS_ *db_flatstore* files. - **\*file_xml** Reader for *.xml* formatted files. diff --git a/engine/account.go b/engine/account.go index dc8a1fed8..bb7c2c665 100644 --- a/engine/account.go +++ b/engine/account.go @@ -248,7 +248,7 @@ func (acc *Account) debitBalanceAction(a *Action, reset, resetIfNegative bool) e } acc.BalanceMap[balanceType] = append(acc.BalanceMap[balanceType], bClone) _, err := guardian.Guardian.Guard(func() (interface{}, error) { - sgs := make([]string, len(bClone.SharedGroups)) // aici + sgs := make([]string, len(bClone.SharedGroups)) i := 0 for sgID := range bClone.SharedGroups { // add shared group member diff --git a/engine/stats.go b/engine/stats.go index 4bdb1e4a9..7def4b3ae 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -299,7 +299,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st for _, sq := range matchSQs { stsIDs = append(stsIDs, sq.ID) lkID := utils.StatQueuePrefix + sq.TenantID() - guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + guardian.Guardian.Guard(func() (_ interface{}, _ error) { err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm) return }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) diff --git a/ers/amqp.go b/ers/amqp.go index 3dfa23dce..cb7d13744 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -32,15 +32,16 @@ import ( // NewAMQPER return a new amqp event reader func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &AMQPER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -68,10 +69,11 @@ type AMQPER struct { exchangeType string routingKey string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} conn *amqp.Connection channel *amqp.Channel @@ -201,7 +203,11 @@ func (rdr *AMQPER) processMessage(msg []byte) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index 8018f79cb..dfab82caf 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -71,7 +71,7 @@ func TestAMQPER(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewAMQPER(cfg, 1, rdrEvents, + if rdr, err = NewAMQPER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } @@ -134,7 +134,7 @@ func TestAMQPERServeError(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 expected := "AMQP scheme must be either 'amqp://' or 'amqps://'" - rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil) + rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/amqpv1.go b/ers/amqpv1.go index 9c6328271..f9f68ccc2 100644 --- a/ers/amqpv1.go +++ b/ers/amqpv1.go @@ -33,15 +33,16 @@ import ( // NewAMQPv1ER return a new amqpv1 event reader func NewAMQPv1ER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &AMQPv1ER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -65,10 +66,11 @@ type AMQPv1ER struct { queueID string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} conn *amqpv1.Client ses *amqpv1.Session @@ -174,7 +176,11 @@ func (rdr *AMQPv1ER) processMessage(msg []byte) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/amqpv1_it_test.go b/ers/amqpv1_it_test.go index 50826d361..d1ff786bd 100644 --- a/ers/amqpv1_it_test.go +++ b/ers/amqpv1_it_test.go @@ -76,7 +76,7 @@ func TestAMQPERv1(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, + if rdr, err = NewAMQPv1ER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } @@ -151,7 +151,7 @@ func TestAmqpv1NewAMQPv1ER(t *testing.T) { }, } - result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil) + result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -182,7 +182,7 @@ func TestAmqpv1NewAMQPv1ER2(t *testing.T) { }, } - result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil) + result, err := NewAMQPv1ER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/ers.go b/ers/ers.go index 3232b24d5..133d5b4d7 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -19,13 +19,20 @@ along with this program. If not, see package ers import ( + "encoding/csv" "fmt" + "os" + "path" + "sort" "sync" + "time" + "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" ) // erEvent is passed from reader to ERs @@ -35,31 +42,37 @@ type erEvent struct { } // NewERService instantiates the ERService -func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) *ERService { - return &ERService{ - cfg: cfg, - rdrs: make(map[string]EventReader), - rdrPaths: make(map[string]string), - stopLsn: make(map[string]chan struct{}), - rdrEvents: make(chan *erEvent), - rdrErr: make(chan error), - filterS: filterS, - connMgr: connMgr, +func NewERService(cfg *config.CGRConfig, filterS *engine.FilterS, connMgr *engine.ConnManager) (ers *ERService) { + ers = &ERService{ + cfg: cfg, + rdrs: make(map[string]EventReader), + rdrPaths: make(map[string]string), + stopLsn: make(map[string]chan struct{}), + rdrEvents: make(chan *erEvent), + partialEvents: make(chan *erEvent), + rdrErr: make(chan error), + filterS: filterS, + connMgr: connMgr, } + ers.partialCache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().PartialCacheTTL, false, ers.onEvicted) + return } // ERService is managing the EventReaders type ERService struct { sync.RWMutex - cfg *config.CGRConfig - rdrs map[string]EventReader // map[rdrID]EventReader - rdrPaths map[string]string // used for reloads in case of path changes - stopLsn map[string]chan struct{} // map[rdrID] chan struct{} - rdrEvents chan *erEvent // receive here the events from readers - rdrErr chan error // receive here errors which should stop the app + cfg *config.CGRConfig + rdrs map[string]EventReader // map[rdrID]EventReader + rdrPaths map[string]string // used for reloads in case of path changes + stopLsn map[string]chan struct{} // map[rdrID] chan struct{} + rdrEvents chan *erEvent // receive here the events from readers + partialEvents chan *erEvent // receive here the partial events from readers + rdrErr chan error // receive here errors which should stop the app filterS *engine.FilterS connMgr *engine.ConnManager + + partialCache *ltcache.Cache } // ListenAndServe keeps the service alive @@ -89,8 +102,14 @@ func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) (err er case erEv := <-erS.rdrEvents: if err := erS.processEvent(erEv.cgrEvent, erEv.rdrCfg); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> reading event: <%s> got error: <%s>", - utils.ERs, utils.ToIJSON(erEv.cgrEvent), err.Error())) + fmt.Sprintf("<%s> reading event: <%s> from reader: <%s> got error: <%s>", + utils.ERs, utils.ToJSON(erEv.cgrEvent), erEv.rdrCfg.ID, err.Error())) + } + case pEv := <-erS.partialEvents: + if err := erS.processPartialEvent(pEv.cgrEvent, pEv.rdrCfg); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading partial event: <%s> from reader: <%s> got error: <%s>", + utils.ERs, utils.ToJSON(pEv.cgrEvent), pEv.rdrCfg.ID, err.Error())) } case <-cfgRldChan: // handle reload cfgIDs := make(map[string]int) @@ -142,7 +161,7 @@ func (erS *ERService) addReader(rdrID string, cfgIdx int) (err error) { erS.stopLsn[rdrID] = make(chan struct{}) var rdr EventReader if rdr, err = NewEventReader(erS.cfg, cfgIdx, - erS.rdrEvents, erS.rdrErr, + erS.rdrEvents, erS.partialEvents, erS.rdrErr, erS.filterS, erS.stopLsn[rdrID]); err != nil { return } @@ -300,3 +319,200 @@ func (erS *ERService) closeAllRdrs() { close(stopL) } } + +const ( + partialOpt = "*partial" +) + +type erEvents struct { + events []*utils.CGREvent + rdrCfg *config.EventReaderCfg +} + +func (erS *ERService) processPartialEvent(ev *utils.CGREvent, rdrCfg *config.EventReaderCfg) (err error) { + orgID, err := ev.FieldAsString(utils.OriginID) + if err == utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s> Missing field for partial event <%s>", + utils.ERs, utils.ToJSON(ev))) + return + } + orgHost, err := ev.FieldAsString(utils.OriginHost) + if err == utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s> Missing field for partial event <%s>", + utils.ERs, utils.ToJSON(ev))) + return + } + cgrID := utils.Sha1(orgID, orgHost) + + evs, has := erS.partialCache.Get(cgrID) + var cgrEvs *erEvents + if !has || evs == nil { + cgrEvs = &erEvents{ + events: []*utils.CGREvent{ev}, + rdrCfg: rdrCfg, + } + } else { + cgrEvs = evs.(*erEvents) + cgrEvs.events = append(cgrEvs.events, ev) + cgrEvs.rdrCfg = rdrCfg + } + + var cgrEv *utils.CGREvent + if cgrEv, err = erS.preparePartialEvents(cgrEvs.events, cgrEvs.rdrCfg); err != nil { + return + } + if partial := cgrEv.APIOpts[partialOpt]; !utils.IsSliceMember([]string{"false", utils.EmptyString}, utils.IfaceAsString(partial)) { + erS.partialCache.Set(cgrID, cgrEvs, nil) + return + } + + // complete CDR + if len(cgrEvs.events) != 1 { + erS.partialCache.Set(cgrID, nil, nil) // set it with nil in cache to ignore when we expire the item + erS.partialCache.Remove(cgrID) + } + go func() { erS.rdrEvents <- &erEvent{cgrEvent: cgrEv, rdrCfg: rdrCfg} }() + return +} + +func (erS *ERService) preparePartialEvents(cgrEvs []*utils.CGREvent, cfg *config.EventReaderCfg) (cgrEv *utils.CGREvent, err error) { + cgrEv = cgrEvs[0] + if len(cgrEvs) != 1 { + ordFld := utils.IfaceAsString(cfg.Opts[utils.PartialOrderFieldOpt]) + if ordFld == utils.EmptyString { + return nil, utils.NewErrMandatoryIeMissing(utils.PartialOrderFieldOpt) + } + fields := make([]interface{}, len(cgrEvs)) + + var ordPath config.RSRParsers + if ordPath, err = config.NewRSRParsers(ordFld, erS.cfg.GeneralCfg().RSRSep); err != nil { + return nil, err + } + + for i, ev := range cgrEvs { + if fields[i], err = ordPath.ParseDataProviderWithInterfaces(ev.AsDataProvider()); err != nil { + return + } + if fldStr, castStr := fields[i].(string); castStr { // attempt converting string since deserialization fails here (ie: time.Time fields) + fields[i] = utils.StringToInterface(fldStr) + } + } + //sort CGREvents based on partialOrderFieldOpt + sort.Slice(cgrEvs, func(i, j int) bool { + gt, serr := utils.GreaterThan(fields[i], fields[j], true) + if serr != nil { + err = serr + } + return gt + }) + if err != nil { + return + } + + // compose the CGREvent from slice + cgrEv = &utils.CGREvent{ + Tenant: cgrEvs[0].Tenant, + ID: utils.UUIDSha1Prefix(), + Time: utils.TimePointer(time.Now()), + Event: make(map[string]interface{}), + APIOpts: make(map[string]interface{}), + } + for _, ev := range cgrEvs { + for key, value := range ev.Event { + cgrEv.Event[key] = value + } + for key, val := range ev.APIOpts { + cgrEv.APIOpts[key] = val + } + } + } + if len(cfg.PartialCommitFields) != 0 { + agReq := agents.NewAgentRequest( + utils.MapStorage(cgrEv.Event), nil, + nil, nil, cgrEv.APIOpts, cfg.Tenant, + erS.cfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(cfg.Timezone, + erS.cfg.GeneralCfg().DefaultTimezone), + erS.filterS, nil) // create an AgentRequest + if err = agReq.SetFields(cfg.PartialCommitFields); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing partial event: <%s>, ignoring due to error: <%s>", + utils.ERs, utils.ToJSON(cgrEv), err.Error())) + return + } + cgrEv = utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) + } + + return +} + +func (erS *ERService) onEvicted(id string, value interface{}) { + if value == nil { + return + } + eEvs := value.(*erEvents) + action := erS.cfg.ERsCfg().PartialCacheAction + if cAct, has := eEvs.rdrCfg.Opts[utils.PartialCacheAction]; has { + action = utils.IfaceAsString(cAct) + } + switch action { + case utils.MetaNone: + case utils.MetaPostCDR: + cgrEv, err := erS.preparePartialEvents(eEvs.events, eEvs.rdrCfg) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed posting expired parial events <%s> due error <%s>", + utils.ERs, utils.ToJSON(eEvs.events), err.Error())) + return + } + erS.rdrEvents <- &erEvent{cgrEvent: cgrEv, rdrCfg: eEvs.rdrCfg} + case utils.MetaDumpToFile: + tmz := utils.FirstNonEmpty(eEvs.rdrCfg.Timezone, erS.cfg.GeneralCfg().DefaultTimezone) + expPath := erS.cfg.ERsCfg().PartialPath + if path, has := eEvs.rdrCfg.Opts[utils.PartialPathOpt]; has { + expPath = utils.IfaceAsString(path) + } + if expPath == utils.EmptyString { // do not send the partial events to any file + return + } + dumpFilePath := path.Join(expPath, fmt.Sprintf("%s.%d%s", + id, time.Now().Unix(), utils.TmpSuffix)) + fileOut, err := os.Create(dumpFilePath) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", + utils.ERs, dumpFilePath, err.Error())) + return + } + defer fileOut.Close() + csvWriter := csv.NewWriter(fileOut) + if fldSep, has := eEvs.rdrCfg.Opts[utils.PartialCSVFieldSepartor]; has { + csvWriter.Comma = rune(utils.IfaceAsString(fldSep)[0]) + } + for _, ev := range eEvs.events { + oNm := map[string]*utils.OrderedNavigableMap{ + utils.MetaExp: utils.NewOrderedNavigableMap(), + } + eeReq := engine.NewEventRequest(utils.MapStorage(ev.Event), utils.MapStorage{}, ev.APIOpts, + eEvs.rdrCfg.Tenant, erS.cfg.GeneralCfg().DefaultTenant, + tmz, erS.filterS, oNm) + + if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", + utils.ERs, id, err.Error())) + return + } + + record := eeReq.OrdNavMP[utils.MetaExp].OrderedFieldsAsStrings() + if err = csvWriter.Write(record); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", + utils.ERs, record, dumpFilePath, err.Error())) + return + } + } + csvWriter.Flush() + } + +} diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index c3dac48d8..7d7a19804 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -22,7 +22,6 @@ package ers import ( "errors" "reflect" - "sync" "testing" "time" @@ -342,7 +341,6 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) { fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) exp := &CSVFileER{ - RWMutex: sync.RWMutex{}, cgrCfg: cfg, cfgIdx: 0, fltrS: nil, diff --git a/ers/filecsv.go b/ers/filecsv.go index d54de29c7..9172dd99f 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -25,7 +25,6 @@ import ( "os" "path" "strings" - "sync" "time" "github.com/cgrates/cgrates/agents" @@ -35,21 +34,22 @@ import ( ) func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } csvEr := &CSVFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrError: rdrErr, + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { csvEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop @@ -59,15 +59,16 @@ func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int, // CSVFileER implements EventReader interface for .csv files type CSVFileER struct { - sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files + } func (rdr *CSVFileER) Config() *config.EventReaderCfg { @@ -188,7 +189,11 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/filefwv.go b/ers/filefwv.go index 23a95ab3b..f90fc3be2 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -35,21 +35,22 @@ import ( ) func NewFWVFileER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } fwvER := &FWVFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrError: rdrErr, + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { fwvER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop @@ -65,6 +66,7 @@ type FWVFileER struct { fltrS *engine.FilterS rdrDir string rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error rdrExit chan struct{} conReqs chan struct{} // limit number of opened files @@ -219,7 +221,11 @@ func (rdr *FWVFileER) processFile(fPath, fName string) (err error) { } rdr.offset += rdr.lineLen // increase the offset cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } @@ -307,7 +313,11 @@ func (rdr *FWVFileER) processTrailer(file *os.File, rowNr, evsPosted int, absPat return err } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } @@ -349,7 +359,11 @@ func (rdr *FWVFileER) createHeaderMap(record string, rowNr, evsPosted int, absPa } rdr.offset += rdr.headerOffset // increase the offset cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/filefwv_it_test.go b/ers/filefwv_it_test.go index 86993d87b..8d903640c 100644 --- a/ers/filefwv_it_test.go +++ b/ers/filefwv_it_test.go @@ -211,7 +211,7 @@ func TestNewFWVFileER(t *testing.T) { rdrDir: "", } cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs = 1 - result, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil) + result, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -252,7 +252,7 @@ func TestFWVFileConfig(t *testing.T) { }, } expected := cfg.ERsCfg().Readers[0] - rdr, err := NewFWVFileER(cfg, 0, nil, nil, nil, nil) + rdr, err := NewFWVFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -312,7 +312,7 @@ func TestFileFWVProcessEvent(t *testing.T) { func TestFileFWVServeErrTimeDuration0(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 - rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil) + rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -326,7 +326,7 @@ func TestFileFWVServeErrTimeDuration0(t *testing.T) { func TestFileFWVServeErrTimeDurationNeg1(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 - rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil) + rdr, err := NewFWVFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/filejson.go b/ers/filejson.go index 04c8aa40b..0581532f0 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -36,21 +36,22 @@ import ( ) func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } jsonEr := &JSONFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrError: rdrErr, + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { jsonEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop @@ -61,14 +62,15 @@ func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int, // JSONFileER implements EventReader interface for .json files type JSONFileER struct { sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files } func (rdr *JSONFileER) Config() *config.EventReaderCfg { @@ -169,7 +171,11 @@ func (rdr *JSONFileER) processFile(fPath, fName string) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/filejson_it_test.go b/ers/filejson_it_test.go index 0d4456080..02a252b4a 100644 --- a/ers/filejson_it_test.go +++ b/ers/filejson_it_test.go @@ -232,7 +232,7 @@ func testJSONKillEngine(t *testing.T) { func TestFileJSONServeErrTimeDuration0(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 - rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) + rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -246,7 +246,7 @@ func TestFileJSONServeErrTimeDuration0(t *testing.T) { func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfgIdx := 0 - rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) + rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } @@ -261,7 +261,7 @@ func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { // func TestFileJSONServeTimeDefault(t *testing.T) { // cfg := config.NewDefaultCGRConfig() // cfgIdx := 0 -// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) +// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil,nil) // if err != nil { // t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) // } @@ -291,7 +291,7 @@ func TestFileJSONServeErrTimeDurationNeg1(t *testing.T) { // func TestFileJSONProcessFile(t *testing.T) { // cfg := config.NewDefaultCGRConfig() // cfgIdx := 0 -// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) +// rdr, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil,nil) // if err != nil { // t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) // } diff --git a/ers/filejson_test.go b/ers/filejson_test.go index d2a136065..c7b9f2047 100644 --- a/ers/filejson_test.go +++ b/ers/filejson_test.go @@ -43,7 +43,7 @@ func TestNewJSONFileER(t *testing.T) { } cfg.ERsCfg().Readers[0].ConcurrentReqs = 1 cfg.ERsCfg().Readers[0].SourcePath = "/" - result, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil) + result, err := NewJSONFileER(cfg, cfgIdx, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } diff --git a/ers/filexml.go b/ers/filexml.go index 6c0793b5c..e21577d68 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -36,21 +36,22 @@ import ( ) func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath if strings.HasSuffix(srcPath, utils.Slash) { srcPath = srcPath[:len(srcPath)-1] } xmlER := &XMLFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrError: rdrErr, + rdrExit: rdrExit, + conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} var processFile struct{} for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { xmlER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop @@ -61,14 +62,15 @@ func NewXMLFileER(cfg *config.CGRConfig, cfgIdx int, // XMLFileER implements EventReader interface for .xml files type XMLFileER struct { sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files } func (rdr *XMLFileER) Config() *config.EventReaderCfg { @@ -165,7 +167,11 @@ func (rdr *XMLFileER) processFile(fPath, fName string) (err error) { continue } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/filexml_it_test.go b/ers/filexml_it_test.go index bfec10df0..8a7afa1c7 100644 --- a/ers/filexml_it_test.go +++ b/ers/filexml_it_test.go @@ -304,7 +304,7 @@ func TestNewXMLFileER(t *testing.T) { } var value struct{} expEr.conReqs <- value - eR, err := NewXMLFileER(cfg, 0, nil, nil, fltrs, nil) + eR, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltrs, nil) expConReq := make(chan struct{}, 1) expConReq <- struct{}{} if <-expConReq != <-eR.(*XMLFileER).conReqs { diff --git a/ers/filexml_test.go b/ers/filexml_test.go index a3a7bb92b..aebd97452 100644 --- a/ers/filexml_test.go +++ b/ers/filexml_test.go @@ -38,7 +38,7 @@ func TestERSNewXMLFileER(t *testing.T) { rdrExit: nil, conReqs: nil, } - result, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil) + result, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err) } @@ -60,7 +60,7 @@ func TestERSXMLFileERConfig(t *testing.T) { Filters: []string{}, Opts: make(map[string]interface{}), } - result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil) + result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err) } @@ -82,7 +82,7 @@ func TestERSXMLFileERServeNil(t *testing.T) { Filters: []string{}, Opts: make(map[string]interface{}), } - result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil) + result1, err := NewXMLFileER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err) } diff --git a/ers/flatstore.go b/ers/flatstore.go deleted file mode 100644 index f6f312bc3..000000000 --- a/ers/flatstore.go +++ /dev/null @@ -1,290 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ers - -import ( - "encoding/csv" - "fmt" - "io" - "os" - "path" - "strings" - "sync" - "time" - - "github.com/cgrates/ltcache" - - "github.com/cgrates/cgrates/agents" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -type fstRecord struct { - method string - values []string - fileName string -} - -func NewFlatstoreER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { - srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath - if strings.HasSuffix(srcPath, utils.Slash) { - srcPath = srcPath[:len(srcPath)-1] - } - flatER := &FlatstoreER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs), - } - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - flatER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop - } - var ttl time.Duration - if ttlOpt, has := flatER.Config().Opts[utils.FstPartialRecordCacheOpt]; has { - if ttl, err = utils.IfaceAsDuration(ttlOpt); err != nil { - return - } - } - flatER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, ttl, false, flatER.dumpToFile) - return flatER, nil -} - -// FlatstoreER implements EventReader interface for Flatstore CDR -type FlatstoreER struct { - sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - cache *ltcache.Cache - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files -} - -func (rdr *FlatstoreER) Config() *config.EventReaderCfg { - return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] -} - -func (rdr *FlatstoreER) serveDefault() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } -} - -func (rdr *FlatstoreER) Serve() (err error) { - switch rdr.Config().RunDelay { - case time.Duration(0): // 0 disables the automatic read, maybe done per API - return - case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, - utils.ERs, rdr.rdrExit) - default: - go rdr.serveDefault() - } - return -} - -// processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *FlatstoreER) processFile(fPath, fName string) (err error) { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() - } - absPath := path.Join(fPath, fName) - utils.Logger.Info( - fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) - var file *os.File - if file, err = os.Open(absPath); err != nil { - return - } - defer file.Close() - var csvReader *csv.Reader - if csvReader, err = newCSVReader(file, rdr.Config().Opts, utils.FlatstorePrfx); err != nil { - utils.Logger.Err( - fmt.Sprintf("<%s> failed creating flatStore reader for <%s>, due to option parsing error: <%s>", - utils.ERs, rdr.Config().ID, err.Error())) - return - } - rowNr := 0 // This counts the rows in the file, not really number of CDRs - evsPosted := 0 - timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.FileName: utils.NewLeafNode(fName)}} - faildCallPrfx := utils.IfaceAsString(rdr.Config().Opts[utils.FstFailedCallsPrefixOpt]) - failedCallsFile := len(faildCallPrfx) != 0 && strings.HasPrefix(fName, faildCallPrfx) - var methodTmp config.RSRParsers - if methodTmp, err = config.NewRSRParsers(utils.IfaceAsString(rdr.Config().Opts[utils.FstMethodOpt]), rdr.cgrCfg.GeneralCfg().RSRSep); err != nil { - return - } - var originTmp config.RSRParsers - if originTmp, err = config.NewRSRParsers(utils.IfaceAsString(rdr.Config().Opts[utils.FstOriginIDOpt]), rdr.cgrCfg.GeneralCfg().RSRSep); err != nil { - return - } - var mandatoryAcK bool - if mandatoryAcK, err = utils.IfaceAsBool(rdr.Config().Opts[utils.FstMadatoryACKOpt]); err != nil { - return - } - for { - var record []string - if record, err = csvReader.Read(); err != nil { - if err == io.EOF { - err = nil //If it reaches the end of the file, return nil - break - } - return - } - req := config.NewSliceDP(record, nil) - tmpReq := utils.MapStorage{utils.MetaReq: req} - var method string - if method, err = methodTmp.ParseDataProvider(tmpReq); err != nil { - return - } else if method != utils.FstInvite && - method != utils.FstBye && - method != utils.FstAck { - return fmt.Errorf("unsupported method: <%q>", method) - } - - var originID string - if originID, err = originTmp.ParseDataProvider(tmpReq); err != nil { - return - } - - cacheKey := utils.ConcatenatedKey(originID, method) - if rdr.cache.HasItem(cacheKey) { - utils.Logger.Warning(fmt.Sprintf("<%s> Overwriting the %s method for record <%s>", utils.ERs, method, originID)) - rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) - continue - } - records := rdr.cache.GetGroupItems(originID) - - if lrecords := len(records); !failedCallsFile && // do not set in cache if we know that the calls are failed - (lrecords == 0 || - (mandatoryAcK && lrecords != 2) || - (!mandatoryAcK && lrecords != 1)) { - rdr.cache.Set(cacheKey, &fstRecord{method: method, values: record, fileName: fName}, []string{originID}) - continue - } - extraDP := map[string]utils.DataProvider{utils.FstMethodToPrfx[method]: req} - for _, record := range records { - req := record.(*fstRecord) - rdr.cache.Set(utils.ConcatenatedKey(originID, req.method), nil, []string{originID}) - extraDP[utils.FstMethodToPrfx[req.method]] = config.NewSliceDP(req.values, nil) - } - rdr.cache.RemoveGroup(originID) - - rowNr++ // increment the rowNr after checking if it's not the end of file - agReq := agents.NewAgentRequest( - req, reqVars, - nil, nil, nil, rdr.Config().Tenant, - rdr.cgrCfg.GeneralCfg().DefaultTenant, - utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, extraDP) // create an AgentRequest - if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) - return err - } else if !pass { - continue - } - if err = agReq.SetFields(rdr.Config().Fields); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) - return - } - - cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ - cgrEvent: cgrEv, - rdrCfg: rdr.Config(), - } - evsPosted++ - } - if rdr.Config().ProcessedPath != "" { - // Finished with file, move it to processed folder - outPath := path.Join(rdr.Config().ProcessedPath, fName) - if err = os.Rename(absPath, outPath); err != nil { - return - } - } - - utils.Logger.Info( - fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) - return -} - -func (rdr *FlatstoreER) dumpToFile(itmID string, value interface{}) { - if value == nil { - return - } - unpRcd := value.(*fstRecord) - dumpFilePath := path.Join(rdr.Config().ProcessedPath, unpRcd.fileName+utils.TmpSuffix) - fileOut, err := os.Create(dumpFilePath) - if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", - utils.ERs, dumpFilePath, err.Error())) - return - } - csvWriter := csv.NewWriter(fileOut) - csvWriter.Comma = rune(utils.IfaceAsString(rdr.Config().Opts[utils.FlatstorePrfx+utils.FieldSepOpt])[0]) - if err = csvWriter.Write(unpRcd.values); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", - utils.ERs, unpRcd.values, dumpFilePath, err.Error())) - // return // let it close the opened file - } - csvWriter.Flush() - fileOut.Close() -} diff --git a/ers/flatstore_it_test.go b/ers/flatstore_it_test.go index 01e6ddcdc..871bdee15 100644 --- a/ers/flatstore_it_test.go +++ b/ers/flatstore_it_test.go @@ -21,19 +21,14 @@ along with this program. If not, see package ers import ( - "bytes" - "fmt" - "log" "net/rpc" "os" "path" - "reflect" - "strings" + "path/filepath" "testing" "time" v1 "github.com/cgrates/cgrates/apier/v1" - "github.com/cgrates/ltcache" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -53,16 +48,16 @@ BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|14 INVITE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454657|*prepaid|1001|1002||2407:1884881533 BYE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454661|||||2407:1884881533 INVITE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454690|*prepaid|1001|1002||3099:1909036290 -BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290` +BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290` // 4 fullMissed = `INVITE|ef6c6256|da501581|0bfdd176d1b93e7df3de5c6f4873ee04@0:0:0:0:0:0:0:0|487|Request Terminated|1436454643|*prepaid|1001|1002||1224:339382783 INVITE|7905e511||81880da80a94bda81b425b09009e055c@0:0:0:0:0:0:0:0|404|Not Found|1436454668|*prepaid|1001|1002||1980:1216490844 -INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1001||474:130115066` +INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1001||474:130115066` // 3 - part1 = `BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549742` + part1 = `BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549742` //1 part2 = `INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4ccb@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1003||1877:893549742 -INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9p@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475` +INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9p@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475` //1 flatstoreTests = []func(t *testing.T){ testCreateDirs, @@ -184,18 +179,25 @@ func testFlatstoreITHandleCdr1File(t *testing.T) { t.Errorf("Files in ersInDir: %+v", fls) } filesOutDir, _ := os.ReadDir("/tmp/flatstoreErs/out") + ids := []string{} + for _, fD := range filesOutDir { + ids = append(ids, fD.Name()) + } if len(filesOutDir) != 5 { - ids := []string{} - for _, fD := range filesOutDir { - ids = append(ids, fD.Name()) - } t.Errorf("Unexpected number of files in output directory: %+v, %q", len(filesOutDir), ids) } ePartContent := "INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9p@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475\n" - if partContent, err := os.ReadFile(path.Join("/tmp/flatstoreErs/out", "acc_3.log.tmp")); err != nil { + tmpl := path.Join("/tmp/flatstoreErs/out", "f7aed15c98b31fea0e3b02b52fc947879a3c5bbc.*.tmp") + if match, err := filepath.Glob(tmpl); err != nil { t.Error(err) - } else if (ePartContent) != (string(partContent)) { - t.Errorf("Expecting:\n%s\nReceived:\n%s", ePartContent, string(partContent)) + } else if len(match) != 1 { + t.Errorf("Wrong number of files matches the template: %q", match) + t.Errorf("template: %q", tmpl) + t.Errorf("files: %q", ids) + } else if partContent, err := os.ReadFile(match[0]); err != nil { + t.Error(err) + } else if ePartContent != string(partContent) { + t.Errorf("Expecting:\n%q\nReceived:\n%q", ePartContent, string(partContent)) } } @@ -205,6 +207,7 @@ func testFlatstoreITAnalyseCDRs(t *testing.T) { t.Error("Unexpected error: ", err.Error()) } else if len(reply) != 8 { t.Error("Unexpected number of CDRs returned: ", len(reply)) + t.Error(utils.ToJSON(reply)) } if err := flatstoreRPC.Call(utils.APIerSv2GetCDRs, &utils.RPCCDRsFilter{MinUsage: "1"}, &reply); err != nil { t.Error("Unexpected error: ", err.Error()) @@ -219,6 +222,7 @@ func testFlatstoreITKillEngine(t *testing.T) { } } +/* func TestFlatstoreProcessEvent(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers[0].ProcessedPath = "" @@ -659,3 +663,4 @@ func TestFlatstoreServeErrTimeDurationNeg1(t *testing.T) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } +*/ diff --git a/ers/flatstore_test.go b/ers/flatstore_test.go deleted file mode 100644 index 1c96f4517..000000000 --- a/ers/flatstore_test.go +++ /dev/null @@ -1,114 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ers - -import ( - "reflect" - "testing" - "time" - - "github.com/cgrates/cgrates/utils" - - "github.com/cgrates/cgrates/config" -) - -func TestNewFlatstoreER(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - expected := &FlatstoreER{ - cgrCfg: cfg, - } - cfg.ERsCfg().Readers[0].SourcePath = "/" - result, err := NewFlatstoreER(cfg, 0, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) - } - result.(*FlatstoreER).cache = nil - result.(*FlatstoreER).conReqs = nil - if !reflect.DeepEqual(result, expected) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestFlatstoreConfig(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.ERsCfg().Readers = []*config.EventReaderCfg{ - { - ID: "file_reader1", - Type: utils.MetaFileCSV, - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, - Opts: map[string]interface{}{utils.FlatstorePrfx + utils.RowLengthOpt: 5}, - }, - { - ID: "file_reader2", - Type: utils.MetaFileCSV, - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, - Opts: map[string]interface{}{utils.FlatstorePrfx + utils.RowLengthOpt: 5}, - }, - } - expected := cfg.ERsCfg().Readers[0] - rdr, err := NewFlatstoreER(cfg, 0, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) - } - result := rdr.Config() - if !reflect.DeepEqual(result, expected) { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result) - } -} - -func TestFlatstoreServeNil(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - result, err := NewFlatstoreER(cfg, 0, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err) - } - expected := &FlatstoreER{ - cgrCfg: cfg, - cfgIdx: 0, - fltrS: nil, - cache: result.(*FlatstoreER).cache, - rdrDir: "/var/spool/cgrates/ers/in", - rdrEvents: nil, - rdrError: nil, - rdrExit: nil, - conReqs: result.(*FlatstoreER).conReqs, - } - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", expected, result) - } - result.Config().RunDelay = time.Duration(0) - err = result.Serve() - if err != nil { - t.Errorf("\nExpected: <%+v>, \nreceived: <%+v>", nil, err) - } -} diff --git a/ers/kafka.go b/ers/kafka.go index d02bd5197..be8e5de73 100644 --- a/ers/kafka.go +++ b/ers/kafka.go @@ -35,16 +35,17 @@ import ( // NewKafkaER return a new kafka event reader func NewKafkaER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &KafkaER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -72,10 +73,11 @@ type KafkaER struct { groupID string maxWait time.Duration - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} poster engine.Poster } @@ -170,7 +172,11 @@ func (rdr *KafkaER) processMessage(msg []byte) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/kafka_it_test.go b/ers/kafka_it_test.go index 9948ea566..dd6ce2cb7 100644 --- a/ers/kafka_it_test.go +++ b/ers/kafka_it_test.go @@ -20,6 +20,19 @@ along with this program. If not, see package ers +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + kafka "github.com/segmentio/kafka-go" +) + var ( rdrEvents chan *erEvent rdrErr chan error @@ -27,7 +40,6 @@ var ( rdr EventReader ) -/* func TestKafkaER(t *testing.T) { cfg, err := config.NewCGRConfigFromJSONStringWithDefaults(`{ "ers": { // EventReaderService @@ -58,7 +70,7 @@ func TestKafkaER(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewKafkaER(cfg, 1, rdrEvents, + if rdr, err = NewKafkaER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } @@ -93,7 +105,7 @@ func TestKafkaER(t *testing.T) { Event: map[string]interface{}{ "CGRID": randomCGRID, }, - Opts: map[string]interface{}{}, + APIOpts: map[string]interface{}{}, } if !reflect.DeepEqual(ev.cgrEvent, expected) { t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent)) @@ -103,4 +115,3 @@ func TestKafkaER(t *testing.T) { } close(rdrExit) } -*/ diff --git a/ers/kafka_test.go b/ers/kafka_test.go index 2a7ffd52c..a8e909700 100644 --- a/ers/kafka_test.go +++ b/ers/kafka_test.go @@ -103,7 +103,7 @@ func TestKafkaERServe(t *testing.T) { rdrEvents := make(chan *erEvent, 1) rdrExit := make(chan struct{}, 1) rdrErr := make(chan error, 1) - rdr, err := NewKafkaER(cfg, 0, rdrEvents, rdrErr, fltrS, rdrExit) + rdr, err := NewKafkaER(cfg, 0, rdrEvents, make(chan *erEvent, 1), rdrErr, fltrS, rdrExit) if err != nil { t.Error(err) } diff --git a/ers/partial_csv.go b/ers/partial_csv.go deleted file mode 100644 index b1fa204f9..000000000 --- a/ers/partial_csv.go +++ /dev/null @@ -1,411 +0,0 @@ -/* -Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ers - -import ( - "encoding/csv" - "fmt" - "io" - "os" - "path" - "sort" - "strings" - "sync" - "time" - - "github.com/cgrates/cgrates/agents" - - "github.com/cgrates/ltcache" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { - srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath - if strings.HasSuffix(srcPath, utils.Slash) { - srcPath = srcPath[:len(srcPath)-1] - } - - pCSVFileER := &PartialCSVFileER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrDir: srcPath, - rdrEvents: rdrEvents, - rdrError: rdrErr, - rdrExit: rdrExit, - conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)} - - function := pCSVFileER.postCDR - if utils.IfaceAsString(pCSVFileER.Config().Opts[utils.PartialCSVCacheExpiryActionOpt]) == utils.MetaDumpToFile { - function = pCSVFileER.dumpToFile - } - var processFile struct{} - for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ { - pCSVFileER.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop - } - var ttl time.Duration - if ttlOpt, has := pCSVFileER.Config().Opts[utils.PartialCSVRecordCacheOpt]; has { - if ttl, err = utils.IfaceAsDuration(ttlOpt); err != nil { - return - } - } - pCSVFileER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, ttl, false, function) - return pCSVFileER, nil -} - -// CSVFileER implements EventReader interface for .csv files -type PartialCSVFileER struct { - sync.RWMutex - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - fltrS *engine.FilterS - cache *ltcache.Cache - rdrDir string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrError chan error - rdrExit chan struct{} - conReqs chan struct{} // limit number of opened files -} - -func (rdr *PartialCSVFileER) Config() *config.EventReaderCfg { - return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] -} - -func (rdr *PartialCSVFileER) Serve() (err error) { - switch rdr.Config().RunDelay { - case time.Duration(0): // 0 disables the automatic read, maybe done per API - return - case time.Duration(-1): - return utils.WatchDir(rdr.rdrDir, rdr.processFile, - utils.ERs, rdr.rdrExit) - default: - go func() { - tm := time.NewTimer(0) - for { - // Not automated, process and sleep approach - select { - case <-rdr.rdrExit: - tm.Stop() - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring path <%s>", - utils.ERs, rdr.rdrDir)) - return - case <-tm.C: - } - filesInDir, _ := os.ReadDir(rdr.rdrDir) - for _, file := range filesInDir { - if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader - continue // used in order to filter the files from directory - } - go func(fileName string) { - if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing file %s, error: %s", - utils.ERs, fileName, err.Error())) - } - }(file.Name()) - } - tm.Reset(rdr.Config().RunDelay) - } - }() - } - return -} - -// processFile is called for each file in a directory and dispatches erEvents from it -func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { - if cap(rdr.conReqs) != 0 { // 0 goes for no limit - processFile := <-rdr.conReqs // Queue here for maxOpenFiles - defer func() { rdr.conReqs <- processFile }() - } - absPath := path.Join(fPath, fName) - utils.Logger.Info( - fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) - var file *os.File - if file, err = os.Open(absPath); err != nil { - return - } - defer file.Close() - var csvReader *csv.Reader - if csvReader, err = newCSVReader(file, rdr.Config().Opts, utils.CSV); err != nil { - utils.Logger.Err( - fmt.Sprintf("<%s> failed creating CSV reader for <%s>, due to option parsing error: <%s>", - utils.ERs, rdr.Config().ID, err.Error())) - return - } - var indxAls map[string]int - rowNr := 0 // This counts the rows in the file, not really number of CDRs - evsPosted := 0 - timeStart := time.Now() - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.FileName: utils.NewLeafNode(fName)}} - hdrDefChar := utils.IfaceAsString(rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].Opts[utils.HeaderDefineCharOpt]) - for { - var record []string - if record, err = csvReader.Read(); err != nil { - if err == io.EOF { - err = nil //If we reach the end of the file, return nil - break - } - return - } - if rowNr == 0 && len(record) > 0 && - strings.HasPrefix(record[0], hdrDefChar) { - record[0] = strings.TrimPrefix(record[0], hdrDefChar) - // map the templates - indxAls = make(map[string]int) - for i, hdr := range record { - indxAls[hdr] = i - } - continue - } - rowNr++ // increment the rowNr after checking if it's not the end of file - agReq := agents.NewAgentRequest( - config.NewSliceDP(record, indxAls), reqVars, - nil, nil, nil, rdr.Config().Tenant, - rdr.cgrCfg.GeneralCfg().DefaultTenant, - utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil) // create an AgentRequest - if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to filter error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) - return err - } else if !pass { - continue - } - if err = agReq.SetFields(rdr.Config().Fields); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", - utils.ERs, absPath, rowNr, err.Error())) - return - } - - // take OriginID and OriginHost to compose CGRID - orgID, err := agReq.CGRRequest.FieldAsString([]string{utils.OriginID}) - if err == utils.ErrNotFound { - utils.Logger.Warning( - fmt.Sprintf("<%s> Missing field for row <%d> , <%s>", - utils.ERs, rowNr, record)) - continue - } - orgHost, err := agReq.CGRRequest.FieldAsString([]string{utils.OriginHost}) - if err == utils.ErrNotFound { - utils.Logger.Warning( - fmt.Sprintf("<%s> Missing field for row <%d> , <%s>", - utils.ERs, rowNr, record)) - continue - } - cgrID := utils.Sha1(orgID, orgHost) - // take Partial field from NavigableMap - partial, _ := agReq.CGRRequest.FieldAsString([]string{utils.Partial}) - cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - if val, has := rdr.cache.Get(cgrID); !has { - if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR - rdr.rdrEvents <- &erEvent{ - cgrEvent: cgrEv, - rdrCfg: rdr.Config(), - } - evsPosted++ - } else { - rdr.cache.Set(cgrID, - []*utils.CGREvent{cgrEv}, nil) - } - } else { - origCgrEvs := val.([]*utils.CGREvent) - origCgrEvs = append(origCgrEvs, cgrEv) - if utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { // complete CDR - //sort CGREvents based on AnswertTime and SetupTime - sort.Slice(origCgrEvs, func(i, j int) bool { - aTime, err := origCgrEvs[i].FieldAsTime(utils.AnswerTime, agReq.Timezone) - if err != nil && err == utils.ErrNotFound { - sTime, _ := origCgrEvs[i].FieldAsTime(utils.SetupTime, agReq.Timezone) - sTime2, _ := origCgrEvs[j].FieldAsTime(utils.SetupTime, agReq.Timezone) - return sTime.Before(sTime2) - } - aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, agReq.Timezone) - return aTime.Before(aTime2) - }) - // compose the CGREvent from slice - cgrEv := new(utils.CGREvent) - cgrEv.ID = utils.UUIDSha1Prefix() - cgrEv.Time = utils.TimePointer(time.Now()) - cgrEv.APIOpts = map[string]interface{}{} - for i, origCgrEv := range origCgrEvs { - if i == 0 { - cgrEv.Tenant = origCgrEv.Tenant - } - for key, value := range origCgrEv.Event { - cgrEv.Event[key] = value - } - for key, val := range origCgrEv.APIOpts { - cgrEv.APIOpts[key] = val - } - } - rdr.rdrEvents <- &erEvent{ - cgrEvent: cgrEv, - rdrCfg: rdr.Config(), - } - evsPosted++ - rdr.cache.Set(cgrID, nil, nil) - rdr.cache.Remove(cgrID) - } else { - // overwrite the cache value with merged NavigableMap - rdr.cache.Set(cgrID, origCgrEvs, nil) - } - } - - } - if rdr.Config().ProcessedPath != "" { - // Finished with file, move it to processed folder - outPath := path.Join(rdr.Config().ProcessedPath, fName) - if err = os.Rename(absPath, outPath); err != nil { - return - } - } - - utils.Logger.Info( - fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", - utils.ERs, absPath, rowNr, evsPosted, time.Since(timeStart))) - return -} - -func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { - tmz := utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone) - origCgrEvs := value.([]*utils.CGREvent) - for _, origCgrEv := range origCgrEvs { - // complete CDR are handling in processFile function - if partial, _ := origCgrEv.FieldAsString(utils.Partial); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { - return - } - } - // Need to process the first event separate to take the name for the file - cdr, err := engine.NewMapEvent(origCgrEvs[0].Event).AsCDR(rdr.cgrCfg, origCgrEvs[0].Tenant, tmz) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting Event : <%s> to cdr , ignoring due to error: <%s>", - utils.ERs, utils.ToJSON(origCgrEvs[0].Event), err.Error())) - return - } - record, err := cdr.AsExportRecord(rdr.Config().CacheDumpFields, nil, rdr.fltrS) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", - utils.ERs, cdr.CGRID, err.Error())) - return - } - dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s%s.%d", - cdr.OriginID, utils.TmpSuffix, time.Now().Unix())) - fileOut, err := os.Create(dumpFilePath) - if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", - utils.ERs, dumpFilePath, err.Error())) - return - } - csvWriter := csv.NewWriter(fileOut) - csvWriter.Comma = rune(utils.IfaceAsString(rdr.Config().Opts[utils.CSV+utils.FieldSepOpt])[0]) - if err = csvWriter.Write(record); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", - utils.ERs, record, dumpFilePath, err.Error())) - return - } - if len(origCgrEvs) > 1 { - for _, origCgrEv := range origCgrEvs[1:] { - // Need to process the first event separate to take the name for the file - cdr, err = engine.NewMapEvent(origCgrEv.Event).AsCDR(rdr.cgrCfg, origCgrEv.Tenant, tmz) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting Event : <%s> to cdr , ignoring due to error: <%s>", - utils.ERs, utils.ToJSON(origCgrEv.Event), err.Error())) - return - } - record, err = cdr.AsExportRecord(rdr.Config().CacheDumpFields, nil, rdr.fltrS) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", - utils.ERs, cdr.CGRID, err.Error())) - return - } - if err = csvWriter.Write(record); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", - utils.ERs, record, dumpFilePath, err.Error())) - return - } - } - } - - csvWriter.Flush() -} - -func (rdr *PartialCSVFileER) postCDR(itmID string, value interface{}) { - if value == nil { - return - } - tmz := utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone) - origCgrEvs := value.([]*utils.CGREvent) - for _, origCgrEv := range origCgrEvs { - // complete CDR are handling in processFile function - if partial, _ := origCgrEv.FieldAsString(utils.Partial); utils.IsSliceMember([]string{"false", utils.EmptyString}, partial) { - return - } - } - - // how to post incomplete CDR - //sort CGREvents based on AnswertTime and SetupTime - sort.Slice(origCgrEvs, func(i, j int) bool { - aTime, err := origCgrEvs[i].FieldAsTime(utils.AnswerTime, tmz) - if err != nil && err == utils.ErrNotFound { - sTime, _ := origCgrEvs[i].FieldAsTime(utils.SetupTime, tmz) - sTime2, _ := origCgrEvs[j].FieldAsTime(utils.SetupTime, tmz) - return sTime.Before(sTime2) - } - aTime2, _ := origCgrEvs[j].FieldAsTime(utils.AnswerTime, tmz) - return aTime.Before(aTime2) - }) - // compose the CGREvent from slice - cgrEv := &utils.CGREvent{ - ID: utils.UUIDSha1Prefix(), - Time: utils.TimePointer(time.Now()), - Event: make(map[string]interface{}), - APIOpts: make(map[string]interface{}), - } - for i, origCgrEv := range origCgrEvs { - if i == 0 { - cgrEv.Tenant = origCgrEv.Tenant - } - for key, value := range origCgrEv.Event { - cgrEv.Event[key] = value - } - for key, val := range origCgrEv.APIOpts { - cgrEv.APIOpts[key] = val - } - } - rdr.rdrEvents <- &erEvent{ - cgrEvent: cgrEv, - rdrCfg: rdr.Config(), - } -} diff --git a/ers/partial_csv_it_test.go b/ers/partial_csv_it_test.go index f020db9f3..3fefa8aaf 100644 --- a/ers/partial_csv_it_test.go +++ b/ers/partial_csv_it_test.go @@ -21,13 +21,9 @@ along with this program. If not, see package ers import ( - "bytes" - "fmt" - "log" "net/rpc" "os" "path" - "reflect" "strings" "testing" "time" @@ -229,6 +225,7 @@ func testPartITKillEngine(t *testing.T) { } } +/* func TestNewPartialCSVFileER(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} @@ -933,3 +930,4 @@ func TestPartialCSVPostCDR(t *testing.T) { t.Error("Time limit exceeded") } } +*/ diff --git a/ers/reader.go b/ers/reader.go index cdcf028cf..cce12f040 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -34,35 +34,31 @@ type EventReader interface { // NewEventReader instantiates the event reader based on configuration at index func NewEventReader(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { switch cfg.ERsCfg().Readers[cfgIdx].Type { default: err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) case utils.MetaFileCSV: - return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) - case utils.MetaPartialCSV: - return NewPartialCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewCSVFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileXML: - return NewXMLFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewXMLFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileFWV: - return NewFWVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewFWVFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaKafkajsonMap: - return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewKafkaER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaSQL: - return NewSQLEventReader(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) - case utils.MetaFlatstore: - return NewFlatstoreER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewSQLEventReader(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileJSON: - return NewJSONFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewJSONFileER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaAMQPjsonMap: - return NewAMQPER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewAMQPER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaS3jsonMap: - return NewS3ER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewS3ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaSQSjsonMap: - return NewSQSER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewSQSER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) case utils.MetaAMQPV1jsonMap: - return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + return NewAMQPv1ER(cfg, cfgIdx, rdrEvents, partialEvents, rdrErr, fltrS, rdrExit) } return } diff --git a/ers/readers_test.go b/ers/readers_test.go index 72a74d528..329157224 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/ltcache" ) func TestNewInvalidReader(t *testing.T) { @@ -37,7 +36,7 @@ func TestNewInvalidReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - if _, err := NewEventReader(cfg, 1, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: " { + if _, err := NewEventReader(cfg, 1, nil, nil, nil, &engine.FilterS{}, nil); err == nil || err.Error() != "unsupported reader type: " { t.Errorf("Expecting: >, received: <%+v>", err) } } @@ -62,7 +61,7 @@ func TestNewCsvReader(t *testing.T) { rdrExit: nil, conReqs: nil} var expected EventReader = exp - if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else { // because we use function make to init the channel when we create the EventReader reflect.DeepEqual @@ -85,11 +84,11 @@ func TestNewKafkaReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - expected, err := NewKafkaER(cfg, 1, nil, nil, fltr, nil) + expected, err := NewKafkaER(cfg, 1, nil, nil, nil, fltr, nil) if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -110,11 +109,11 @@ func TestNewSQLReader(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - expected, err := NewSQLEventReader(cfg, 1, nil, nil, fltr, nil) + expected, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil) if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } - if rcv, err := NewEventReader(cfg, 1, nil, nil, fltr, nil); err != nil { + if rcv, err := NewEventReader(cfg, 1, nil, nil, nil, fltr, nil); err != nil { t.Errorf("Expecting: , received: <%+v>", err) } else if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, rcv) @@ -132,45 +131,21 @@ func TestNewSQLReaderError(t *testing.T) { reader.SourcePath = "#" reader.ProcessedPath = "" expected := "unknown db_type " - _, err := NewSQLEventReader(cfg, 0, nil, nil, fltr, nil) + _, err := NewSQLEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err == nil || err.Error() != expected { t.Errorf("Expecting: <%+v>, received: <%+v>", expected, err) } } -func TestNewPartialCSVReader(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - fltr := &engine.FilterS{} - cfg.ERsCfg().Readers[0].Type = utils.MetaPartialCSV - cfg.ERsCfg().Readers[0].SourcePath = "/tmp/ers/in" - cfg.ERsCfg().Readers[0].ConcurrentReqs = 1024 - expected, err := NewPartialCSVFileER(cfg, 0, nil, nil, fltr, nil) - if err != nil { - t.Errorf("Expecting: , received: <%+v>", err) - } - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) - if err != nil { - t.Error(err) - } else { - rcv.(*PartialCSVFileER).conReqs = nil - rcv.(*PartialCSVFileER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil) - expected.(*PartialCSVFileER).conReqs = nil - expected.(*PartialCSVFileER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil) - if !reflect.DeepEqual(expected, rcv) { - t.Errorf("Expecting %v but received %v", expected, rcv) - } - } -} - func TestNewFileXMLReader(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} cfg.ERsCfg().Readers[0].Type = utils.MetaFileXML - expected, err := NewXMLFileER(cfg, 0, nil, nil, fltr, nil) + expected, err := NewXMLFileER(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } else { @@ -186,11 +161,11 @@ func TestNewFileFWVReader(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} cfg.ERsCfg().Readers[0].Type = utils.MetaFileFWV - expected, err := NewFWVFileER(cfg, 0, nil, nil, fltr, nil) + expected, err := NewFWVFileER(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(nil) } else { @@ -202,37 +177,15 @@ func TestNewFileFWVReader(t *testing.T) { } } -func TestNewFlatstoreReader(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - fltr := &engine.FilterS{} - cfg.ERsCfg().Readers[0].Type = utils.MetaFlatstore - expected, err := NewFlatstoreER(cfg, 0, nil, nil, fltr, nil) - if err != nil { - t.Error(err) - } - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) - if err != nil { - t.Error(err) - } else { - rcv.(*FlatstoreER).conReqs = nil - expected.(*FlatstoreER).conReqs = nil - rcv.(*FlatstoreER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil) - expected.(*FlatstoreER).cache = ltcache.NewCache(ltcache.UnlimitedCaching, 0, false, nil) - if !reflect.DeepEqual(expected, rcv) { - t.Errorf("Expecting %v but received %v", expected, rcv) - } - } -} - func TestNewJSONReader(t *testing.T) { cfg := config.NewDefaultCGRConfig() fltr := &engine.FilterS{} cfg.ERsCfg().Readers[0].Type = utils.MetaFileJSON - expected, err := NewJSONFileER(cfg, 0, nil, nil, fltr, nil) + expected, err := NewJSONFileER(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } else { @@ -262,7 +215,7 @@ func TestNewAMQPReader(t *testing.T) { exp.setOpts(map[string]interface{}{}) exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -287,7 +240,7 @@ func TestNewAMQPv1Reader(t *testing.T) { exp.Config().Opts = map[string]interface{}{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -313,7 +266,7 @@ func TestNewS3Reader(t *testing.T) { exp.Config().Opts = map[string]interface{}{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) } else if !reflect.DeepEqual(expected, rcv) { @@ -350,7 +303,7 @@ func TestNewSQSReader(t *testing.T) { exp.Config().Opts = map[string]interface{}{} exp.createPoster() var expected EventReader = exp - rcv, err := NewEventReader(cfg, 0, nil, nil, fltr, nil) + rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) exp.session = rcv.(*SQSER).session if err != nil { t.Error(err) diff --git a/ers/s3.go b/ers/s3.go index e1ea1d60a..5ba01694d 100644 --- a/ers/s3.go +++ b/ers/s3.go @@ -36,16 +36,17 @@ import ( // NewS3ER return a new s3 event reader func NewS3ER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &S3ER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -65,10 +66,11 @@ type S3ER struct { cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} awsRegion string awsID string @@ -136,7 +138,11 @@ func (rdr *S3ER) processMessage(body []byte) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/s3_it_test.go b/ers/s3_it_test.go index 524494b6c..c3489afdf 100644 --- a/ers/s3_it_test.go +++ b/ers/s3_it_test.go @@ -81,7 +81,7 @@ func TestS3ER(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewS3ER(cfg, 1, rdrEvents, + if rdr, err = NewS3ER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestNewS3ER(t *testing.T) { }, } - rdr, err := NewS3ER(cfg, 1, nil, + rdr, err := NewS3ER(cfg, 1, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) @@ -214,7 +214,7 @@ func TestNewS3ERCase2(t *testing.T) { }, } - rdr, err := NewS3ER(cfg, 0, nil, + rdr, err := NewS3ER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) diff --git a/ers/s3_test.go b/ers/s3_test.go index 201bbc597..6b49fd51c 100644 --- a/ers/s3_test.go +++ b/ers/s3_test.go @@ -33,7 +33,7 @@ import ( func TestS3ERServe(t *testing.T) { cfg := config.NewDefaultCGRConfig() - rdr, err := NewS3ER(cfg, 0, nil, + rdr, err := NewS3ER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Error(err) diff --git a/ers/sql.go b/ers/sql.go index 872bc1b69..a2b5f1e2b 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -45,16 +45,17 @@ const ( // NewSQLEventReader return a new sql event reader func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &SQLEventReader{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -85,10 +86,11 @@ type SQLEventReader struct { expConnType string expTableName string - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} } // Config returns the curent configuration @@ -252,7 +254,11 @@ func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/sql_it_test.go b/ers/sql_it_test.go index 6733a73ff..c4bf0afc4 100644 --- a/ers/sql_it_test.go +++ b/ers/sql_it_test.go @@ -230,7 +230,7 @@ func testSQLReader(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit) + sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit) if err != nil { t.Fatal(err) } @@ -476,7 +476,7 @@ func testSQLReader3(t *testing.T) { rdrEvents = make(chan *erEvent, 1) rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit) + sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit) if err != nil { t.Fatal(err) } @@ -678,7 +678,7 @@ func TestErsSqlPostCDRS(t *testing.T) { if len(cfg.ERsCfg().Readers) != 2 { t.Errorf("Expecting: <2>, received: <%+v>", len(cfg.ERsCfg().Readers)) } - sqlEvReader, err := NewSQLEventReader(cfg, 1, nil, nil, fltr, nil) + sqlEvReader, err := NewSQLEventReader(cfg, 1, nil, nil, nil, fltr, nil) if err != nil { t.Errorf("Expecting: , received: <%+v>", err) } diff --git a/ers/sqs.go b/ers/sqs.go index a9bad2cef..c964a7520 100644 --- a/ers/sqs.go +++ b/ers/sqs.go @@ -36,16 +36,17 @@ import ( // NewSQSER return a new sqs event reader func NewSQSER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents chan *erEvent, rdrErr chan error, + rdrEvents, partialEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { rdr := &SQSER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrEvents: rdrEvents, + partialEvents: partialEvents, + rdrExit: rdrExit, + rdrErr: rdrErr, } if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { rdr.cap = make(chan struct{}, concReq) @@ -65,10 +66,11 @@ type SQSER struct { cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - rdrEvents chan *erEvent // channel to dispatch the events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + rdrEvents chan *erEvent // channel to dispatch the events created to + partialEvents chan *erEvent // channel to dispatch the partial events created to + rdrExit chan struct{} + rdrErr chan error + cap chan struct{} queueURL *string awsRegion string @@ -124,7 +126,11 @@ func (rdr *SQSER) processMessage(body []byte) (err error) { return } cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdr.rdrEvents <- &erEvent{ + rdrEv := rdr.rdrEvents + if _, isPartial := cgrEv.APIOpts[partialOpt]; isPartial { + rdrEv = rdr.partialEvents + } + rdrEv <- &erEvent{ cgrEvent: cgrEv, rdrCfg: rdr.Config(), } diff --git a/ers/sqs_it_test.go b/ers/sqs_it_test.go index d24a63cd1..397ab1ec7 100644 --- a/ers/sqs_it_test.go +++ b/ers/sqs_it_test.go @@ -80,7 +80,7 @@ func TestSQSER(t *testing.T) { rdrErr = make(chan error, 1) rdrExit = make(chan struct{}, 1) - if rdr, err = NewSQSER(cfg, 1, rdrEvents, + if rdr, err = NewSQSER(cfg, 1, rdrEvents, make(chan *erEvent, 1), rdrErr, new(engine.FilterS), rdrExit); err != nil { t.Fatal(err) } diff --git a/ers/sqs_test.go b/ers/sqs_test.go index 4ec457131..8408ad7b8 100644 --- a/ers/sqs_test.go +++ b/ers/sqs_test.go @@ -51,7 +51,7 @@ func TestNewSQSER(t *testing.T) { Opts: make(map[string]interface{}), }, } - rdr, err := NewSQSER(cfg, 0, nil, + rdr, err := NewSQSER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) @@ -79,7 +79,7 @@ func TestSQSERServeRunDelay0(t *testing.T) { Opts: make(map[string]interface{}), }, } - rdr, err := NewSQSER(cfg, 0, nil, + rdr, err := NewSQSER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) @@ -105,7 +105,7 @@ func TestSQSERServe(t *testing.T) { Opts: make(map[string]interface{}), }, } - rdr, err := NewSQSER(cfg, 0, nil, + rdr, err := NewSQSER(cfg, 0, nil, nil, nil, nil, nil) if err != nil { t.Fatal(err) diff --git a/utils/consts.go b/utils/consts.go index 841db25b5..d84c82fca 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -320,7 +320,6 @@ const ( StaticValuePrefix = "^" CSV = "csv" FWV = "fwv" - MetaPartialCSV = "*partial_csv" MetaCombimed = "*combimed" MetaMongo = "*mongo" MetaPostgres = "*postgres" @@ -397,7 +396,6 @@ const ( MetaS3jsonMap = "*s3_json_map" ConfigPath = "/etc/cgrates/" DisconnectCause = "DisconnectCause" - MetaFlatstore = "*flatstore" MetaRating = "*rating" NotAvailable = "N/A" Call = "call" @@ -2340,16 +2338,17 @@ const ( // EventReaderCfg const ( - IDCfg = "id" - CacheCfg = "cache" - FieldSepCfg = "field_separator" - RunDelayCfg = "run_delay" - SourcePathCfg = "source_path" - ProcessedPathCfg = "processed_path" - TenantCfg = "tenant" - FlagsCfg = "flags" - FieldsCfg = "fields" - CacheDumpFieldsCfg = "cache_dump_fields" + IDCfg = "id" + CacheCfg = "cache" + FieldSepCfg = "field_separator" + RunDelayCfg = "run_delay" + SourcePathCfg = "source_path" + ProcessedPathCfg = "processed_path" + TenantCfg = "tenant" + FlagsCfg = "flags" + FieldsCfg = "fields" + CacheDumpFieldsCfg = "cache_dump_fields" + PartialCommitFieldsCfg = "partial_commit_fields" ) // RegistrarCCfg @@ -2599,6 +2598,12 @@ const ( KafkaTopic = "kafkaTopic" KafkaGroupID = "kafkaGroupID" KafkaMaxWait = "kafkaMaxWait" + + // partial + PartialOrderFieldOpt = "partialOrderField" + PartialCacheAction = "partialCacheAction" + PartialPathOpt = "partialPath" + PartialCSVFieldSepartor = "partialcsvFieldSeparator" ) var ( diff --git a/utils/datanode.go b/utils/datanode.go index f8b4e346d..47a19fffb 100644 --- a/utils/datanode.go +++ b/utils/datanode.go @@ -123,7 +123,7 @@ func (n *DataNode) Field(path []string) (*DataLeaf, error) { if err != nil { return nil, err } - if idx < 0 { // in case the index is negative add the slice lenght + if idx < 0 { // in case the index is negative add the slice length idx += len(n.Slice) } if idx < 0 || idx >= len(n.Slice) { // check if the index is in range [0,len(slice)) @@ -171,7 +171,7 @@ func (n *DataNode) fieldAsInterface(path []string) (interface{}, error) { if err != nil { return nil, err } - if idx < 0 { // in case the index is negative add the slice lenght + if idx < 0 { // in case the index is negative add the slice length idx += len(n.Slice) } if idx < 0 || idx >= len(n.Slice) { // check if the index is in range [0,len(slice)) @@ -233,7 +233,7 @@ func (n *DataNode) Set(path []string, val interface{}) (addedNew bool, err error return true, err } // try dynamic path instead - // if idx < 0 { // in case the index is negative add the slice lenght + // if idx < 0 { // in case the index is negative add the slice length // idx += len(n.Slice) // path[0] = strconv.Itoa(idx) // update the slice to reflect on orderNavMap // } @@ -281,7 +281,7 @@ func (n *DataNode) Remove(path []string) error { if err != nil { return err // the only error is when we expect an index but is not int } - if idx < 0 { // in case the index is negative add the slice lenght + if idx < 0 { // in case the index is negative add the slice length idx += len(n.Slice) path[0] = strconv.Itoa(idx) // update the path for OrdNavMap } @@ -341,7 +341,7 @@ func (n *DataNode) Append(path []string, val *DataLeaf) (idx int, err error) { return node.Append(path[1:], val) } // try dynamic path instead - // if idx < 0 { // in case the index is negative add the slice lenght + // if idx < 0 { // in case the index is negative add the slice length // idx += len(n.Slice) // path[0] = strconv.Itoa(idx) // update the slice to reflect on orderNavMap // } @@ -401,7 +401,7 @@ func (n *DataNode) Compose(path []string, val *DataLeaf) (err error) { return node.Compose(path[1:], val) } // try dynamic path instead - // if idx < 0 { // in case the index is negative add the slice lenght + // if idx < 0 { // in case the index is negative add the slice length // idx += len(n.Slice) // path[0] = strconv.Itoa(idx) // update the slice to reflect on orderNavMap // }