Added *dump_to_json as partial cache action

This commit is contained in:
Trial97
2021-10-13 12:28:53 +03:00
committed by Dan Christian Bogos
parent 3e2ff64ee1
commit 6ffe61ddf1
4 changed files with 105 additions and 34 deletions

View File

@@ -703,7 +703,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
pAct := utils.IfaceAsString(rdr.Opts[utils.PartialCacheActionOpt])
if pAct != utils.MetaDumpToFile &&
pAct != utils.MetaNone &&
pAct != utils.MetaPostCDR {
pAct != utils.MetaPostCDR &&
pAct != utils.MetaDumpToJSON {
return fmt.Errorf("<%s> wrong partial expiry action for reader with ID: %s", utils.ERs, rdr.ID)
}
if pAct != utils.MetaNone { // if is *none we do not process the evicted events
@@ -712,7 +713,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialOrderFieldOpt, rdr.ID)
}
}
if pAct == utils.MetaDumpToFile { // only if the action is *dump_to_file
if pAct == utils.MetaDumpToFile ||
pAct == utils.MetaDumpToJSON { // only if the action is *dump_to_file
path := rdr.ProcessedPath
if pathVal, has := rdr.Opts[utils.PartialPathOpt]; has { // the path from options needs to exists if overwriten by reader
path = utils.IfaceAsString(pathVal)
@@ -720,12 +722,11 @@ func (cfg *CGRConfig) checkConfigSanity() error {
if _, err := os.Stat(utils.IfaceAsString(path)); err != nil && os.IsNotExist(err) {
return fmt.Errorf("<%s> nonexistent partial folder: %s for reader with ID: %s", utils.ERs, path, rdr.ID)
}
if fldSep, has := rdr.Opts[utils.PartialCSVFieldSepartorOpt]; has && // the separtor must not be empty
utils.IfaceAsString(fldSep) == utils.EmptyString {
return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialCSVFieldSepartorOpt, rdr.ID)
}
if len(rdr.CacheDumpFields) == 0 {
return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.CacheDumpFieldsCfg, rdr.ID)
if pAct == utils.MetaDumpToFile {
if fldSep, has := rdr.Opts[utils.PartialCSVFieldSepartorOpt]; has && // the separtor must not be empty
utils.IfaceAsString(fldSep) == utils.EmptyString {
return fmt.Errorf("<%s> empty %s for reader with ID: %s", utils.ERs, utils.PartialCSVFieldSepartorOpt, rdr.ID)
}
}
}
switch rdr.Type {

View File

@@ -2018,15 +2018,6 @@ func TestConfigSanityErs(t *testing.T) {
t.Errorf("expected: <%v>,\n received: <%v>", expected, err)
}
cfg.ersCfg.Readers[0].Opts = map[string]interface{}{
utils.PartialCacheActionOpt: utils.MetaDumpToFile,
utils.PartialCSVFieldSepartorOpt: utils.FieldsSep,
}
expected = "<ERs> empty cache_dump_fields for reader with ID: rdrID"
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("expected: <%v>,\n received: <%v>", expected, err)
}
cfg.ersCfg.Readers[0].Opts = map[string]interface{}{
utils.PartialOrderFieldOpt: "non_empty",
utils.PartialCacheActionOpt: utils.MetaDumpToFile,

View File

@@ -20,9 +20,12 @@ package ers
import (
"encoding/csv"
"encoding/json"
"fmt"
"os"
"path"
"sort"
"strings"
"sync"
"time"
@@ -413,25 +416,37 @@ func (erS *ERService) onEvicted(id string, value interface{}) {
utils.ERs, utils.ToJSON(eEvs.events), err.Error()))
return
}
// convert the event to record
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: erS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant),
erS.filterS, map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
})
var record []string
if len(eEvs.rdrCfg.CacheDumpFields) != 0 {
// convert the event to record
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: erS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant),
erS.filterS, map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
})
if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>",
utils.ERs, id, err.Error()))
return
if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>",
utils.ERs, id, err.Error()))
return
}
record = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
} else {
keys := make([]string, 0, len(cgrEv.Event))
for k := range cgrEv.Event {
keys = append(keys, k)
}
sort.Strings(keys)
record = make([]string, len(keys))
for i, k := range keys {
record[i] = utils.IfaceAsString(cgrEv.Event[k])
}
}
record := eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
// open the file and write the record
dumpFilePath := path.Join(expPath, fmt.Sprintf("%s.%d%s",
id, time.Now().Unix(), utils.TmpSuffix))
@@ -452,6 +467,69 @@ func (erS *ERService) onEvicted(id string, value interface{}) {
}
csvWriter.Flush()
fileOut.Close()
case utils.MetaDumpToJSON: // apply the cacheDumpFields to the united events and write the record to file
expPath := eEvs.rdrCfg.ProcessedPath
if pathVal, has := eEvs.rdrCfg.Opts[utils.PartialPathOpt]; has {
expPath = utils.IfaceAsString(pathVal)
}
if expPath == utils.EmptyString { // do not write the partial event to file
return
}
cgrEv, err := mergePartialEvents(eEvs.events, eEvs.rdrCfg, erS.filterS, // merge the partial events
erS.cfg.GeneralCfg().DefaultTenant,
erS.cfg.GeneralCfg().DefaultTimezone,
erS.cfg.GeneralCfg().RSRSep)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed posting expired parial events <%s> due error <%s>",
utils.ERs, utils.ToJSON(eEvs.events), err.Error()))
return
}
var record map[string]interface{}
if len(eEvs.rdrCfg.CacheDumpFields) != 0 {
// convert the event to record
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: erS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, erS.cfg.GeneralCfg().DefaultTenant),
erS.filterS, map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
})
if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>",
utils.ERs, id, err.Error()))
return
}
record = make(map[string]interface{})
for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path)
path = path[:len(path)-1] // remove the last index
record[strings.Join(path, utils.NestingSep)] = nmIt.Data
}
} else {
record = cgrEv.Event
}
// open the file and write the record
dumpFilePath := path.Join(expPath, fmt.Sprintf("%s.%d%s",
id, time.Now().Unix(), utils.TmpSuffix))
fileOut, err := os.Create(dumpFilePath)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed creating %s, error: %s",
utils.ERs, dumpFilePath, err.Error()))
return
}
if err = json.NewEncoder(fileOut).Encode(record); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Failed writing partial record %v to file: %s, error: %s",
utils.ERs, record, dumpFilePath, err.Error()))
}
fileOut.Close()
}
}

View File

@@ -369,6 +369,7 @@ const (
MetaUnixTimestamp = "*unix_timestamp"
MetaPostCDR = "*post_cdr"
MetaDumpToFile = "*dump_to_file"
MetaDumpToJSON = "*dump_to_json"
NonTransactional = ""
DataDB = "data_db"
StorDB = "stor_db"