From 6ffe61ddf19feaec2dd7480a826e6917018356fd Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 13 Oct 2021 12:28:53 +0300 Subject: [PATCH] Added *dump_to_json as partial cache action --- config/configsanity.go | 17 +++--- config/configsanity_test.go | 9 --- ers/ers.go | 112 ++++++++++++++++++++++++++++++------ utils/consts.go | 1 + 4 files changed, 105 insertions(+), 34 deletions(-) diff --git a/config/configsanity.go b/config/configsanity.go index 66ea605b5..85ccb9726 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -703,7 +703,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { pAct := utils.IfaceAsString(rdr.Opts[utils.PartialCacheActionOpt]) if pAct != utils.MetaDumpToFile && pAct != utils.MetaNone && - pAct != utils.MetaPostCDR { + pAct != utils.MetaPostCDR && + pAct != utils.MetaDumpToJSON { return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID) } if pAct != utils.MetaNone { // if is *none we do not process the evicted events @@ -712,7 +713,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialOrderFieldOpt, rdr.ID) } } - if pAct == utils.MetaDumpToFile { // only if the action is *dump_to_file + if pAct == utils.MetaDumpToFile || + pAct == utils.MetaDumpToJSON { // only if the action is *dump_to_file path := rdr.ProcessedPath if pathVal, has := rdr.Opts[utils.PartialPathOpt]; has { // the path from options needs to exists if overwriten by reader path = utils.IfaceAsString(pathVal) @@ -720,12 +722,11 @@ func (cfg *CGRConfig) checkConfigSanity() error { if _, err := os.Stat(utils.IfaceAsString(path)); err != nil && os.IsNotExist(err) { return fmt.Errorf("<%s> nonexistent partial folder: %s for reader with ID: %s", utils.ERs, path, rdr.ID) } - if fldSep, has := rdr.Opts[utils.PartialCSVFieldSepartorOpt]; has && // the separtor must not be empty - utils.IfaceAsString(fldSep) == utils.EmptyString { - return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialCSVFieldSepartorOpt, rdr.ID) - } - if len(rdr.CacheDumpFields) == 0 { - return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.CacheDumpFieldsCfg, rdr.ID) + if pAct == utils.MetaDumpToFile { + if fldSep, has := rdr.Opts[utils.PartialCSVFieldSepartorOpt]; has && // the separtor must not be empty + utils.IfaceAsString(fldSep) == utils.EmptyString { + return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialCSVFieldSepartorOpt, rdr.ID) + } } } switch rdr.Type { diff --git a/config/configsanity_test.go b/config/configsanity_test.go index 37b0b9cf0..b9313f465 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -2018,15 +2018,6 @@ func TestConfigSanityErs(t *testing.T) { t.Errorf("expected: <%v>,\n received: <%v>", expected, err) } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ - utils.PartialCacheActionOpt: utils.MetaDumpToFile, - utils.PartialCSVFieldSepartorOpt: utils.FieldsSep, - } - expected = " empty cache_dump_fields for reader with ID: rdrID" - if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { - t.Errorf("expected: <%v>,\n received: <%v>", expected, err) - } - cfg.ersCfg.Readers[0].Opts = map[string]interface{}{ utils.PartialOrderFieldOpt: "non_empty", utils.PartialCacheActionOpt: utils.MetaDumpToFile, diff --git a/ers/ers.go b/ers/ers.go index c2fb023c6..7dbc386a1 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -20,9 +20,12 @@ package ers import ( "encoding/csv" + "encoding/json" "fmt" "os" "path" + "sort" + "strings" "sync" "time" @@ -413,25 +416,37 @@ func (erS *ERService) onEvicted(id string, value interface{}) { utils.ERs, utils.ToJSON(eEvs.events), err.Error())) return } - // convert the event to record - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: erS.cfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant), - erS.filterS, map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - }) + var record []string + if len(eEvs.rdrCfg.CacheDumpFields) != 0 { + // convert the event to record + eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaReq: utils.MapStorage(cgrEv.Event), + utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), + utils.MetaCfg: erS.cfg.GetDataProvider(), + }, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant), + erS.filterS, map[string]*utils.OrderedNavigableMap{ + utils.MetaExp: utils.NewOrderedNavigableMap(), + }) - if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", - utils.ERs, id, err.Error())) - return + if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", + utils.ERs, id, err.Error())) + return + } + + record = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings() + } else { + keys := make([]string, 0, len(cgrEv.Event)) + for k := range cgrEv.Event { + keys = append(keys, k) + } + sort.Strings(keys) + record = make([]string, len(keys)) + for i, k := range keys { + record[i] = utils.IfaceAsString(cgrEv.Event[k]) + } } - - record := eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings() - // open the file and write the record dumpFilePath := path.Join(expPath, fmt.Sprintf("%s.%d%s", id, time.Now().Unix(), utils.TmpSuffix)) @@ -452,6 +467,69 @@ func (erS *ERService) onEvicted(id string, value interface{}) { } csvWriter.Flush() fileOut.Close() + case utils.MetaDumpToJSON: // apply the cacheDumpFields to the united events and write the record to file + expPath := eEvs.rdrCfg.ProcessedPath + if pathVal, has := eEvs.rdrCfg.Opts[utils.PartialPathOpt]; has { + expPath = utils.IfaceAsString(pathVal) + } + if expPath == utils.EmptyString { // do not write the partial event to file + return + } + cgrEv, err := mergePartialEvents(eEvs.events, eEvs.rdrCfg, erS.filterS, // merge the partial events + erS.cfg.GeneralCfg().DefaultTenant, + erS.cfg.GeneralCfg().DefaultTimezone, + erS.cfg.GeneralCfg().RSRSep) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed posting expired parial events <%s> due error <%s>", + utils.ERs, utils.ToJSON(eEvs.events), err.Error())) + return + } + var record map[string]interface{} + if len(eEvs.rdrCfg.CacheDumpFields) != 0 { + // convert the event to record + eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaReq: utils.MapStorage(cgrEv.Event), + utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), + utils.MetaCfg: erS.cfg.GetDataProvider(), + }, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant), + erS.filterS, map[string]*utils.OrderedNavigableMap{ + utils.MetaExp: utils.NewOrderedNavigableMap(), + }) + + if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>", + utils.ERs, id, err.Error())) + return + } + + record = make(map[string]interface{}) + for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path) + path = path[:len(path)-1] // remove the last index + record[strings.Join(path, utils.NestingSep)] = nmIt.Data + } + } else { + record = cgrEv.Event + } + // open the file and write the record + dumpFilePath := path.Join(expPath, fmt.Sprintf("%s.%d%s", + id, time.Now().Unix(), utils.TmpSuffix)) + fileOut, err := os.Create(dumpFilePath) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s", + utils.ERs, dumpFilePath, err.Error())) + return + } + + if err = json.NewEncoder(fileOut).Encode(record); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s", + utils.ERs, record, dumpFilePath, err.Error())) + } + fileOut.Close() + } } diff --git a/utils/consts.go b/utils/consts.go index 021c46f80..b9221f9f8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -369,6 +369,7 @@ const ( MetaUnixTimestamp = "*unix_timestamp" MetaPostCDR = "*post_cdr" MetaDumpToFile = "*dump_to_file" + MetaDumpToJSON = "*dump_to_json" NonTransactional = "" DataDB = "data_db" StorDB = "stor_db"