From 35caf1390f1938d90d4473653dd5889b42ea8b0f Mon Sep 17 00:00:00 2001 From: TeoV Date: Thu, 16 Jan 2020 16:02:52 +0200 Subject: [PATCH] Add PartialCDR in EventReader( incomplete ) --- apier/v1/tp_it_test.go | 6 +- cdrc/partial_cdr.go | 1 - config/erscfg.go | 46 ++-- config/libconfig_json.go | 33 +-- data/conf/samples/cdrc_partcsv/cgrates.json | 2 - engine/storage_csv.go | 44 ++-- engine/tpexporter.go | 24 +- engine/tpimporter_csv.go | 22 +- ers/partial_csv.go | 253 ++++++++++++++++++++ ers/reader.go | 2 + utils/consts.go | 39 +-- 11 files changed, 363 insertions(+), 109 deletions(-) create mode 100644 ers/partial_csv.go diff --git a/apier/v1/tp_it_test.go b/apier/v1/tp_it_test.go index 255601bca..cf897e63f 100644 --- a/apier/v1/tp_it_test.go +++ b/apier/v1/tp_it_test.go @@ -122,9 +122,9 @@ func testTPExportTPToFolder(t *testing.T) { expectedTPStas := &utils.ExportedTPStats{ Compressed: true, ExportPath: "/tmp/", - ExportedFiles: []string{utils.RATING_PROFILES_CSV, utils.RATING_PLANS_CSV, utils.ACTIONS_CSV, utils.ACCOUNT_ACTIONS_CSV, - utils.ChargersCsv, utils.TIMINGS_CSV, utils.ACTION_PLANS_CSV, utils.ResourcesCsv, utils.StatsCsv, utils.ThresholdsCsv, - utils.DESTINATIONS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.FiltersCsv, utils.SuppliersCsv, utils.AttributesCsv}, + ExportedFiles: []string{utils.RatingProfilesCsv, utils.RatingPlansCsv, utils.ActionsCsv, utils.AccountActionsCsv, + utils.ChargersCsv, utils.TimingsCsv, utils.ActionPlansCsv, utils.ResourcesCsv, utils.StatsCsv, utils.ThresholdsCsv, + utils.DestinationsCsv, utils.RatesCsv, utils.DestinationRatesCsv, utils.FiltersCsv, utils.SuppliersCsv, utils.AttributesCsv}, } sort.Strings(expectedTPStas.ExportedFiles) tpid := "TEST_TPID2" diff --git a/cdrc/partial_cdr.go b/cdrc/partial_cdr.go index 07e3e6a55..23afbce29 100644 --- a/cdrc/partial_cdr.go +++ b/cdrc/partial_cdr.go @@ -136,7 +136,6 @@ func (prc *PartialRecordsCache) cachePartialCDR(pCDR *PartialCDRRecord) (*Partia default: return nil, fmt.Errorf("Unsupported PartialCacheExpiryAction: %s", prc.expiryAction) } - } if _, hasIt := prc.partialRecords[originID]; !hasIt { prc.partialRecords[originID] = pCDR diff --git a/config/erscfg.go b/config/erscfg.go index bdaa599fb..f46fa5bba 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -99,21 +99,24 @@ func (erS *ERsCfg) Clone() (cln *ERsCfg) { } type EventReaderCfg struct { - ID string - Type string - FieldSep string - RunDelay time.Duration - ConcurrentReqs int - SourcePath string - ProcessedPath string - XmlRootPath utils.HierarchyPath - Tenant RSRParsers - Timezone string - Filters []string - Flags utils.FlagsWithParams - HeaderFields []*FCTemplate - ContentFields []*FCTemplate - TrailerFields []*FCTemplate + ID string + Type string + FieldSep string + RunDelay time.Duration + ConcurrentReqs int + SourcePath string + ProcessedPath string + XmlRootPath utils.HierarchyPath + Tenant RSRParsers + Timezone string + Filters []string + Flags utils.FlagsWithParams + PartialRecordCache time.Duration // Duration to cache partial records when not pairing + PartialCacheExpiryAction string + HeaderFields []*FCTemplate + ContentFields []*FCTemplate + TrailerFields []*FCTemplate + CacheDumpFields []*FCTemplate } func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string) (err error) { @@ -163,6 +166,14 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string return } } + if jsnCfg.Partial_record_cache != nil { + if er.PartialRecordCache, err = utils.ParseDurationWithNanosecs(*jsnCfg.Partial_record_cache); err != nil { + return err + } + } + if jsnCfg.Partial_cache_expiry_action != nil { + er.PartialCacheExpiryAction = *jsnCfg.Partial_cache_expiry_action + } if jsnCfg.Header_fields != nil { if er.HeaderFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Header_fields, sep); err != nil { return err @@ -178,6 +189,11 @@ func (er *EventReaderCfg) loadFromJsonCfg(jsnCfg *EventReaderJsonCfg, sep string return err } } + if jsnCfg.Cache_dump_fields != nil { + if er.CacheDumpFields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnCfg.Cache_dump_fields, sep); err != nil { + return err + } + } return } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 5072de192..b792b43e9 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -193,21 +193,24 @@ type ERsJsonCfg struct { // EventReaderSJsonCfg is the configuration of a single EventReader type EventReaderJsonCfg struct { - Id *string - Type *string - Field_separator *string - Run_delay *int - Concurrent_requests *int - Source_path *string - Processed_path *string - Xml_root_path *string - Tenant *string - Timezone *string - Filters *[]string - Flags *[]string - Header_fields *[]*FcTemplateJsonCfg - Content_fields *[]*FcTemplateJsonCfg - Trailer_fields *[]*FcTemplateJsonCfg + Id *string + Type *string + Field_separator *string + Run_delay *int + Concurrent_requests *int + Source_path *string + Processed_path *string + Xml_root_path *string + Tenant *string + Timezone *string + Filters *[]string + Flags *[]string + Partial_record_cache *string + Partial_cache_expiry_action *string + Header_fields *[]*FcTemplateJsonCfg + Content_fields *[]*FcTemplateJsonCfg + Trailer_fields *[]*FcTemplateJsonCfg + Cache_dump_fields *[]*FcTemplateJsonCfg } // SM-Generic config section diff --git a/data/conf/samples/cdrc_partcsv/cgrates.json b/data/conf/samples/cdrc_partcsv/cgrates.json index da06c6cbe..3a2eeddac 100644 --- a/data/conf/samples/cdrc_partcsv/cgrates.json +++ b/data/conf/samples/cdrc_partcsv/cgrates.json @@ -44,7 +44,6 @@ {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true}, - {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true}, {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "call", "mandatory": true}, {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, @@ -84,7 +83,6 @@ {"tag": "AccId5", "field_id": "OriginID", "type": "*composed", "value": "~*req.4"}, {"tag": "OrderID", "field_id": "OrderID", "type": "*unix_timestamp", "value": "~*req.3"}, {"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "*rated", "mandatory": true}, - {"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "*out", "mandatory": true}, {"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "cgrates.org", "mandatory": true}, {"tag": "Category", "field_id": "Category", "type": "*composed", "value": "call", "mandatory": true}, {"tag": "Account", "field_id": "Account", "type": "*composed", "value": "~*req.0:s/^49([1-9]\\d+)$/0$1/", "mandatory": true}, diff --git a/engine/storage_csv.go b/engine/storage_csv.go index 41c57dc93..815ede252 100644 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -98,17 +98,17 @@ func NewCSVStorage(sep rune, } func NewFileCSVStorage(sep rune, dataPath string, recursive bool) *CSVStorage { - destinations_paths := []string{path.Join(dataPath, utils.DESTINATIONS_CSV)} - timings_paths := []string{path.Join(dataPath, utils.TIMINGS_CSV)} - rates_paths := []string{path.Join(dataPath, utils.RATES_CSV)} - destination_rates_paths := []string{path.Join(dataPath, utils.DESTINATION_RATES_CSV)} - rating_plans_paths := []string{path.Join(dataPath, utils.RATING_PLANS_CSV)} - rating_profiles_paths := []string{path.Join(dataPath, utils.RATING_PROFILES_CSV)} - shared_groups_paths := []string{path.Join(dataPath, utils.SHARED_GROUPS_CSV)} - actions_paths := []string{path.Join(dataPath, utils.ACTIONS_CSV)} - action_plans_paths := []string{path.Join(dataPath, utils.ACTION_PLANS_CSV)} - action_triggers_paths := []string{path.Join(dataPath, utils.ACTION_TRIGGERS_CSV)} - account_actions_paths := []string{path.Join(dataPath, utils.ACCOUNT_ACTIONS_CSV)} + destinations_paths := []string{path.Join(dataPath, utils.DestinationsCsv)} + timings_paths := []string{path.Join(dataPath, utils.TimingsCsv)} + rates_paths := []string{path.Join(dataPath, utils.RatesCsv)} + destination_rates_paths := []string{path.Join(dataPath, utils.DestinationRatesCsv)} + rating_plans_paths := []string{path.Join(dataPath, utils.RatingPlansCsv)} + rating_profiles_paths := []string{path.Join(dataPath, utils.RatingProfilesCsv)} + shared_groups_paths := []string{path.Join(dataPath, utils.SharedGroupsCsv)} + actions_paths := []string{path.Join(dataPath, utils.ActionsCsv)} + action_plans_paths := []string{path.Join(dataPath, utils.ActionPlansCsv)} + action_triggers_paths := []string{path.Join(dataPath, utils.ActionTriggersCsv)} + account_actions_paths := []string{path.Join(dataPath, utils.AccountActionsCsv)} resources_paths := []string{path.Join(dataPath, utils.ResourcesCsv)} stats_paths := []string{path.Join(dataPath, utils.StatsCsv)} thresholds_paths := []string{path.Join(dataPath, utils.ThresholdsCsv)} @@ -124,17 +124,17 @@ func NewFileCSVStorage(sep rune, dataPath string, recursive bool) *CSVStorage { if err != nil { log.Fatal(err) } - destinations_paths = appendName(allFoldersPath, utils.DESTINATIONS_CSV) - timings_paths = appendName(allFoldersPath, utils.TIMINGS_CSV) - rates_paths = appendName(allFoldersPath, utils.RATES_CSV) - destination_rates_paths = appendName(allFoldersPath, utils.DESTINATION_RATES_CSV) - rating_plans_paths = appendName(allFoldersPath, utils.RATING_PLANS_CSV) - rating_profiles_paths = appendName(allFoldersPath, utils.RATING_PROFILES_CSV) - shared_groups_paths = appendName(allFoldersPath, utils.SHARED_GROUPS_CSV) - actions_paths = appendName(allFoldersPath, utils.ACTIONS_CSV) - action_plans_paths = appendName(allFoldersPath, utils.ACTION_PLANS_CSV) - action_triggers_paths = appendName(allFoldersPath, utils.ACTION_TRIGGERS_CSV) - account_actions_paths = appendName(allFoldersPath, utils.ACCOUNT_ACTIONS_CSV) + destinations_paths = appendName(allFoldersPath, utils.DestinationsCsv) + timings_paths = appendName(allFoldersPath, utils.TimingsCsv) + rates_paths = appendName(allFoldersPath, utils.RatesCsv) + destination_rates_paths = appendName(allFoldersPath, utils.DestinationRatesCsv) + rating_plans_paths = appendName(allFoldersPath, utils.RatingPlansCsv) + rating_profiles_paths = appendName(allFoldersPath, utils.RatingProfilesCsv) + shared_groups_paths = appendName(allFoldersPath, utils.SharedGroupsCsv) + actions_paths = appendName(allFoldersPath, utils.ActionsCsv) + action_plans_paths = appendName(allFoldersPath, utils.ActionPlansCsv) + action_triggers_paths = appendName(allFoldersPath, utils.ActionTriggersCsv) + account_actions_paths = appendName(allFoldersPath, utils.AccountActionsCsv) resources_paths = appendName(allFoldersPath, utils.ResourcesCsv) stats_paths = appendName(allFoldersPath, utils.StatsCsv) thresholds_paths = appendName(allFoldersPath, utils.ThresholdsCsv) diff --git a/engine/tpexporter.go b/engine/tpexporter.go index 199309aa5..d2298ffad 100644 --- a/engine/tpexporter.go +++ b/engine/tpexporter.go @@ -93,9 +93,9 @@ func (self *TPExporter) Run() error { return err } storDataModelTimings := APItoModelTimings(storDataTimings) - toExportMap[utils.TIMINGS_CSV] = make([]interface{}, len(storDataTimings)) + toExportMap[utils.TimingsCsv] = make([]interface{}, len(storDataTimings)) for i, sd := range storDataModelTimings { - toExportMap[utils.TIMINGS_CSV][i] = sd + toExportMap[utils.TimingsCsv][i] = sd } storDataDestinations, err := self.storDb.GetTPDestinations(self.tpID, "") @@ -105,7 +105,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataDestinations { sdModels := APItoModelDestination(sd) for _, sdModel := range sdModels { - toExportMap[utils.DESTINATIONS_CSV] = append(toExportMap[utils.DESTINATIONS_CSV], sdModel) + toExportMap[utils.DestinationsCsv] = append(toExportMap[utils.DestinationsCsv], sdModel) } } @@ -116,7 +116,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataRates { sdModels := APItoModelRate(sd) for _, sdModel := range sdModels { - toExportMap[utils.RATES_CSV] = append(toExportMap[utils.RATES_CSV], sdModel) + toExportMap[utils.RatesCsv] = append(toExportMap[utils.RatesCsv], sdModel) } } @@ -127,7 +127,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataDestinationRates { sdModels := APItoModelDestinationRate(sd) for _, sdModel := range sdModels { - toExportMap[utils.DESTINATION_RATES_CSV] = append(toExportMap[utils.DESTINATION_RATES_CSV], sdModel) + toExportMap[utils.DestinationRatesCsv] = append(toExportMap[utils.DestinationRatesCsv], sdModel) } } @@ -138,7 +138,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataRatingPlans { sdModels := APItoModelRatingPlan(sd) for _, sdModel := range sdModels { - toExportMap[utils.RATING_PLANS_CSV] = append(toExportMap[utils.RATING_PLANS_CSV], sdModel) + toExportMap[utils.RatingPlansCsv] = append(toExportMap[utils.RatingPlansCsv], sdModel) } } @@ -149,7 +149,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataRatingProfiles { sdModels := APItoModelRatingProfile(sd) for _, sdModel := range sdModels { - toExportMap[utils.RATING_PROFILES_CSV] = append(toExportMap[utils.RATING_PROFILES_CSV], sdModel) + toExportMap[utils.RatingProfilesCsv] = append(toExportMap[utils.RatingProfilesCsv], sdModel) } } @@ -161,7 +161,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataSharedGroups { sdModels := APItoModelSharedGroup(sd) for _, sdModel := range sdModels { - toExportMap[utils.SHARED_GROUPS_CSV] = append(toExportMap[utils.SHARED_GROUPS_CSV], sdModel) + toExportMap[utils.SharedGroupsCsv] = append(toExportMap[utils.SharedGroupsCsv], sdModel) } } @@ -172,7 +172,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataActions { sdModels := APItoModelAction(sd) for _, sdModel := range sdModels { - toExportMap[utils.ACTIONS_CSV] = append(toExportMap[utils.ACTIONS_CSV], sdModel) + toExportMap[utils.ActionsCsv] = append(toExportMap[utils.ActionsCsv], sdModel) } } @@ -183,7 +183,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataActionPlans { sdModels := APItoModelActionPlan(sd) for _, sdModel := range sdModels { - toExportMap[utils.ACTION_PLANS_CSV] = append(toExportMap[utils.ACTION_PLANS_CSV], sdModel) + toExportMap[utils.ActionPlansCsv] = append(toExportMap[utils.ActionPlansCsv], sdModel) } } @@ -194,7 +194,7 @@ func (self *TPExporter) Run() error { for _, sd := range storDataActionTriggers { sdModels := APItoModelActionTrigger(sd) for _, sdModel := range sdModels { - toExportMap[utils.ACTION_TRIGGERS_CSV] = append(toExportMap[utils.ACTION_TRIGGERS_CSV], sdModel) + toExportMap[utils.ActionTriggersCsv] = append(toExportMap[utils.ActionTriggersCsv], sdModel) } } @@ -204,7 +204,7 @@ func (self *TPExporter) Run() error { } for _, sd := range storDataAccountActions { sdModel := APItoModelAccountAction(sd) - toExportMap[utils.ACCOUNT_ACTIONS_CSV] = append(toExportMap[utils.ACCOUNT_ACTIONS_CSV], sdModel) + toExportMap[utils.AccountActionsCsv] = append(toExportMap[utils.AccountActionsCsv], sdModel) } storDataResources, err := self.storDb.GetTPResources(self.tpID, "", "") diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index 36baeb050..cb8acf184 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -40,17 +40,17 @@ type TPCSVImporter struct { // Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis. // Change it to func(string) error as soon as Travis updates. var fileHandlers = map[string]func(*TPCSVImporter, string) error{ - utils.TIMINGS_CSV: (*TPCSVImporter).importTimings, - utils.DESTINATIONS_CSV: (*TPCSVImporter).importDestinations, - utils.RATES_CSV: (*TPCSVImporter).importRates, - utils.DESTINATION_RATES_CSV: (*TPCSVImporter).importDestinationRates, - utils.RATING_PLANS_CSV: (*TPCSVImporter).importRatingPlans, - utils.RATING_PROFILES_CSV: (*TPCSVImporter).importRatingProfiles, - utils.SHARED_GROUPS_CSV: (*TPCSVImporter).importSharedGroups, - utils.ACTIONS_CSV: (*TPCSVImporter).importActions, - utils.ACTION_PLANS_CSV: (*TPCSVImporter).importActionTimings, - utils.ACTION_TRIGGERS_CSV: (*TPCSVImporter).importActionTriggers, - utils.ACCOUNT_ACTIONS_CSV: (*TPCSVImporter).importAccountActions, + utils.TimingsCsv: (*TPCSVImporter).importTimings, + utils.DestinationsCsv: (*TPCSVImporter).importDestinations, + utils.RatesCsv: (*TPCSVImporter).importRates, + utils.DestinationRatesCsv: (*TPCSVImporter).importDestinationRates, + utils.RatingPlansCsv: (*TPCSVImporter).importRatingPlans, + utils.RatingProfilesCsv: (*TPCSVImporter).importRatingProfiles, + utils.SharedGroupsCsv: (*TPCSVImporter).importSharedGroups, + utils.ActionsCsv: (*TPCSVImporter).importActions, + utils.ActionPlansCsv: (*TPCSVImporter).importActionTimings, + utils.ActionTriggersCsv: (*TPCSVImporter).importActionTriggers, + utils.AccountActionsCsv: (*TPCSVImporter).importAccountActions, utils.ResourcesCsv: (*TPCSVImporter).importResources, utils.StatsCsv: (*TPCSVImporter).importStats, utils.ThresholdsCsv: (*TPCSVImporter).importThresholds, diff --git a/ers/partial_csv.go b/ers/partial_csv.go new file mode 100644 index 000000000..7c225df96 --- /dev/null +++ b/ers/partial_csv.go @@ -0,0 +1,253 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "bufio" + "encoding/csv" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/cgrates/cgrates/agents" + + "github.com/cgrates/ltcache" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewPartialCSVFileER(cfg *config.CGRConfig, cfgIdx int, + rdrEvents chan *erEvent, rdrErr chan error, + fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { + srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath + if strings.HasSuffix(srcPath, utils.Slash) { + srcPath = srcPath[:len(srcPath)-1] + } + + pCSVFileER := &PartialCSVFileER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + rdrDir: srcPath, + rdrEvents: rdrEvents, + rdrError: rdrErr, + rdrExit: rdrExit} + + var function func(itmID string, value interface{}) + if cfg.ERsCfg().Readers[cfgIdx].PartialCacheExpiryAction == utils.MetaDumpToFile { + function = pCSVFileER.dumpToFile + } else { + function = pCSVFileER.postCDR + } + pCSVFileER.cache = ltcache.NewCache(ltcache.UnlimitedCaching, cfg.ERsCfg().Readers[cfgIdx].PartialRecordCache, false, function) + return pCSVFileER, nil +} + +// CSVFileER implements EventReader interface for .csv files +type PartialCSVFileER struct { + sync.RWMutex + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + fltrS *engine.FilterS + cache *ltcache.Cache + rdrDir string + rdrEvents chan *erEvent // channel to dispatch the events created to + rdrError chan error + rdrExit chan struct{} + conReqs chan struct{} // limit number of opened files +} + +func (rdr *PartialCSVFileER) Config() *config.EventReaderCfg { + return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] +} + +func (rdr *PartialCSVFileER) Serve() (err error) { + switch rdr.Config().RunDelay { + case time.Duration(0): // 0 disables the automatic read, maybe done per API + return + case time.Duration(-1): + return watchDir(rdr.rdrDir, rdr.processFile, + utils.ERs, rdr.rdrExit) + default: + go func() { + for { + // Not automated, process and sleep approach + select { + case <-rdr.rdrExit: + utils.Logger.Info( + fmt.Sprintf("<%s> stop monitoring path <%s>", + utils.ERs, rdr.rdrDir)) + return + default: + } + filesInDir, _ := ioutil.ReadDir(rdr.rdrDir) + for _, file := range filesInDir { + if !strings.HasSuffix(file.Name(), utils.CSVSuffix) { // hardcoded file extension for csv event reader + continue // used in order to filter the files from directory + } + go func(fileName string) { + if err := rdr.processFile(rdr.rdrDir, fileName); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing file %s, error: %s", + utils.ERs, fileName, err.Error())) + } + }(file.Name()) + } + time.Sleep(rdr.Config().RunDelay) + } + }() + } + return +} + +// processFile is called for each file in a directory and dispatches erEvents from it +func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { + if cap(rdr.conReqs) != 0 { // 0 goes for no limit + processFile := <-rdr.conReqs // Queue here for maxOpenFiles + defer func() { rdr.conReqs <- processFile }() + } + absPath := path.Join(fPath, fName) + utils.Logger.Info( + fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath)) + var file *os.File + if file, err = os.Open(absPath); err != nil { + return + } + defer file.Close() + csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.Comma = ',' + if len(rdr.Config().FieldSep) > 0 { + csvReader.Comma = rune(rdr.Config().FieldSep[0]) + } + csvReader.Comment = '#' + rowNr := 0 // This counts the rows in the file, not really number of CDRs + evsPosted := 0 + timeStart := time.Now() + reqVars := make(map[string]interface{}) + for { + var record []string + if record, err = csvReader.Read(); err != nil { + if err == io.EOF { + break + } + return + } + rowNr++ // increment the rowNr after checking if it's not the end of file + agReq := agents.NewAgentRequest( + config.NewSliceDP(record, utils.EmptyString), reqVars, + nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS) // create an AgentRequest + if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq); err != nil || !pass { + continue + } + navMp, err := agReq.AsNavigableMap(rdr.Config().ContentFields) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> reading file: <%s> row <%d>, ignoring due to error: <%s>", + utils.ERs, absPath, rowNr, err.Error())) + continue + } + safeEv := engine.NewSafEvent(navMp.AsCGREvent(agReq.Tenant, utils.NestingSep).Event) + // take OriginID field from NavigableMap + orgId, err := navMp.FieldAsString([]string{utils.OriginID}) + if err == utils.ErrNotFound { + continue + } + // take Partial field from NavigableMap + partial, err := navMp.FieldAsString([]string{utils.Partial}) + if err == utils.ErrNotFound { + continue + } + + if val, has := rdr.cache.Get(orgId); !has { + rdr.cache.Set(orgId, navMp, nil) + if partial == "false" { // complete CDR remove it from cache + rdr.cache.Remove(orgId) + rdr.rdrEvents <- &erEvent{cgrEvent: val.(*config.NavigableMap).AsCGREvent( + rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.NestingSep), + rdrCfg: rdr.Config()} + evsPosted++ + } + } else { + originalNavMp := val.(*config.NavigableMap) + originalNavMp.Merge(navMp) + // overwrite the cache value with merged NavigableMap + rdr.cache.Set(orgId, originalNavMp, nil) + } + + } + if rdr.Config().ProcessedPath != "" { + // Finished with file, move it to processed folder + outPath := path.Join(rdr.Config().ProcessedPath, fName) + if err = os.Rename(absPath, outPath); err != nil { + return + } + } + + utils.Logger.Info( + fmt.Sprintf("%s finished processing file <%s>. Total records processed: %d, events posted: %d, run duration: %s", + utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart))) + return +} + +const ( + PartialRecordsSuffix = "partial" +) + +func (rdr *PartialCSVFileER) dumpToFile(itmID string, value interface{}) { + nM := value.(*config.NavigableMap) + // complete CDR are handling in processFile function + if partial, err := nM.FieldAsString([]string{utils.Partial}); err == nil && partial == "false" { + return + } + record := value.(*config.NavigableMap).AsExportedRecord() + dumpFilePath := path.Join(rdr.Config().ProcessedPath, fmt.Sprintf("%s.%s.%d", itmID, PartialRecordsSuffix, time.Now().Unix())) + fileOut, err := os.Create(dumpFilePath) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", utils.ERs, dumpFilePath, err.Error())) + return + } + csvWriter := csv.NewWriter(fileOut) + csvWriter.Comma = utils.CSV_SEP + if err := csvWriter.Write(record); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", utils.ERs, record, dumpFilePath, err.Error())) + return + } + csvWriter.Flush() + +} + +func (rdr *PartialCSVFileER) postCDR(itmID string, value interface{}) { + // complete CDR are handling in processFile function + if partial, err := value.(*config.NavigableMap).FieldAsString([]string{utils.Partial}); err == nil && partial == "false" { + return + } + // how to post incomplete CDR +} diff --git a/ers/reader.go b/ers/reader.go index abd8c72ab..4aceaba3a 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -40,6 +40,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int, err = fmt.Errorf("unsupported reader type: <%s>", cfg.ERsCfg().Readers[cfgIdx].Type) case utils.MetaFileCSV: return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) + case utils.MetaPartialCSV: + return NewPartialCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileXML: return NewXMLFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit) case utils.MetaFileFWV: diff --git a/utils/consts.go b/utils/consts.go index 298071ae3..a93fb8ec8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -588,17 +588,10 @@ const ( MetaInit = "*init" MetaRatingPlanCost = "*rating_plan_cost" RatingPlanIDs = "RatingPlanIDs" - MetaAccount = "*account" ERs = "ERs" Ratio = "Ratio" Load = "Load" Slash = "/" - NameLow = "name" - TypeLow = "type" - UserLow = "user" - PassLow = "pass" - SentinelLow = "sentinel" - QueryLow = "query" UUID = "UUID" ActionsID = "ActionsID" MetaAct = "*act" @@ -764,11 +757,7 @@ const ( MetaTpResources = "*tp_resources" MetaTpRates = "*tp_rates" MetaTpTimings = "*tp_timings" - MetaTpResource = "*tp_resources" - MetaTpCdrStats = "*tp_cdrstats" MetaTpDestinations = "*tp_destinations" - MetaTpRatingPlan = "*tp_rating_plans" - MetaTpRatingProfile = "*tp_rating_profiles" MetaTpChargers = "*tp_chargers" MetaTpDispatchers = "*tp_dispatchers" MetaDurationSeconds = "*duration_seconds" @@ -778,7 +767,6 @@ const ( CapResourceAllocation = "ResourceAllocation" CapMaxUsage = "MaxUsage" CapSuppliers = "Suppliers" - CapThresholdHits = "ThresholdHits" CapThresholds = "Thresholds" CapStatQueues = "StatQueues" ) @@ -800,7 +788,6 @@ const ( TpRates = "TpRates" TpTiming = "TpTiming" TpResource = "TpResource" - TpCdrStats = "TpCdrStats" TpDestinations = "TpDestinations" TpRatingPlan = "TpRatingPlan" TpRatingProfile = "TpRatingProfile" @@ -813,7 +800,6 @@ const ( MetaFirst = "*first" MetaRandom = "*random" MetaBroadcast = "*broadcast" - MetaNext = "*next" MetaRoundRobin = "*round_robin" MetaRatio = "*ratio" ThresholdSv1 = "ThresholdSv1" @@ -827,7 +813,6 @@ const ( APIKey = "APIKey" RouteID = "RouteID" APIMethods = "APIMethods" - APIMethod = "APIMethod" NestingSep = "." ArgDispatcherField = "ArgDispatcher" ) @@ -844,8 +829,6 @@ const ( MetaRSR = "*rsr" MetaStatS = "*stats" MetaDestinations = "*destinations" - MetaMinCapPrefix = "*min_" - MetaMaxCapPrefix = "*max_" MetaLessThan = "*lt" MetaLessOrEqual = "*lte" MetaGreaterThan = "*gt" @@ -1383,17 +1366,17 @@ const ( //CSV file name const ( - TIMINGS_CSV = "Timings.csv" - DESTINATIONS_CSV = "Destinations.csv" - RATES_CSV = "Rates.csv" - DESTINATION_RATES_CSV = "DestinationRates.csv" - RATING_PLANS_CSV = "RatingPlans.csv" - RATING_PROFILES_CSV = "RatingProfiles.csv" - SHARED_GROUPS_CSV = "SharedGroups.csv" - ACTIONS_CSV = "Actions.csv" - ACTION_PLANS_CSV = "ActionPlans.csv" - ACTION_TRIGGERS_CSV = "ActionTriggers.csv" - ACCOUNT_ACTIONS_CSV = "AccountActions.csv" + TimingsCsv = "Timings.csv" + DestinationsCsv = "Destinations.csv" + RatesCsv = "Rates.csv" + DestinationRatesCsv = "DestinationRates.csv" + RatingPlansCsv = "RatingPlans.csv" + RatingProfilesCsv = "RatingProfiles.csv" + SharedGroupsCsv = "SharedGroups.csv" + ActionsCsv = "Actions.csv" + ActionPlansCsv = "ActionPlans.csv" + ActionTriggersCsv = "ActionTriggers.csv" + AccountActionsCsv = "AccountActions.csv" ResourcesCsv = "Resources.csv" StatsCsv = "Stats.csv" ThresholdsCsv = "Thresholds.csv"