mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add infrastructure in config for EventExporter
This commit is contained in:
committed by
Dan Christian Bogos
parent
6b932f7ff3
commit
caa72a0b28
@@ -201,6 +201,13 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
// populate default EEs exporter
|
||||
for _, ersExp := range cfg.eesCfg.Exporters {
|
||||
if ersExp.ID == utils.MetaDefault {
|
||||
cfg.dfltEvExp = ersExp.Clone()
|
||||
break
|
||||
}
|
||||
}
|
||||
dfltFsConnConfig = cfg.fsAgentCfg.EventSocketConns[0] // We leave it crashing here on purpose if no Connection defaults defined
|
||||
dfltKamConnConfig = cfg.kamAgentCfg.EvapiConns[0]
|
||||
dfltAstConnCfg = cfg.asteriskAgentCfg.AsteriskConns[0]
|
||||
@@ -249,8 +256,9 @@ type CGRConfig struct {
|
||||
ConfigPath string // Path towards config
|
||||
|
||||
// Cache defaults loaded from json and needing clones
|
||||
dfltCdreProfile *CdreCfg // Default cdreConfig profile
|
||||
dfltEvRdr *EventReaderCfg // default event reader
|
||||
dfltCdreProfile *CdreCfg // Default cdreConfig profile
|
||||
dfltEvRdr *EventReaderCfg // default event reader
|
||||
dfltEvExp *EventExporterCfg // default event exporter
|
||||
|
||||
CdreProfiles map[string]*CdreCfg // Cdre config profiles
|
||||
loaderCfg LoaderSCfgs // LoaderS configs
|
||||
@@ -305,6 +313,8 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV,
|
||||
utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV,
|
||||
utils.MetaPartialCSV, utils.MetaFlatstore, utils.MetaJSON, utils.META_NONE})
|
||||
|
||||
var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE})
|
||||
|
||||
func (cfg *CGRConfig) LazySanityCheck() {
|
||||
for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports {
|
||||
if cdreProfile, hasIt := cfg.CdreProfiles[cdrePrfl]; hasIt && (cdreProfile.ExportFormat == utils.MetaS3jsonMap || cdreProfile.ExportFormat == utils.MetaSQSjsonMap) {
|
||||
@@ -338,7 +348,7 @@ func (cfg *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
cfg.loadThresholdSCfg, cfg.loadRouteSCfg, cfg.loadLoaderSCfg,
|
||||
cfg.loadMailerCfg, cfg.loadSureTaxCfg, cfg.loadDispatcherSCfg,
|
||||
cfg.loadLoaderCgrCfg, cfg.loadMigratorCgrCfg, cfg.loadTlsCgrCfg,
|
||||
cfg.loadAnalyzerCgrCfg, cfg.loadApierCfg, cfg.loadErsCfg} {
|
||||
cfg.loadAnalyzerCgrCfg, cfg.loadApierCfg, cfg.loadErsCfg, cfg.loadEesCfg} {
|
||||
if err = loadFunc(jsnCfg); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -722,6 +732,15 @@ func (cfg *CGRConfig) loadErsCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
return cfg.ersCfg.loadFromJsonCfg(jsnERsCfg, cfg.generalCfg.RSRSep, cfg.dfltEvRdr)
|
||||
}
|
||||
|
||||
// loadEesCfg loads the Ees section of the configuration
|
||||
func (cfg *CGRConfig) loadEesCfg(jsnCfg *CgrJsonCfg) (err error) {
|
||||
var jsnEEsCfg *EEsJsonCfg
|
||||
if jsnEEsCfg, err = jsnCfg.EEsJsonCfg(); err != nil {
|
||||
return
|
||||
}
|
||||
return cfg.eesCfg.loadFromJsonCfg(jsnEEsCfg, cfg.generalCfg.RSRSep, cfg.dfltEvExp)
|
||||
}
|
||||
|
||||
// SureTaxCfg use locking to retrieve the configuration, possibility later for runtime reload
|
||||
func (cfg *CGRConfig) SureTaxCfg() *SureTaxCfg {
|
||||
cfg.lks[SURETAX_JSON].Lock()
|
||||
@@ -1146,6 +1165,7 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error {
|
||||
CDRS_JSN: cfg.loadCdrsCfg,
|
||||
CDRE_JSN: cfg.loadCdreCfg,
|
||||
ERsJson: cfg.loadErsCfg,
|
||||
EEsJson: cfg.loadEesCfg,
|
||||
SessionSJson: cfg.loadSessionSCfg,
|
||||
AsteriskAgentJSN: cfg.loadAsteriskAgentCfg,
|
||||
FreeSWITCHAgentJSN: cfg.loadFreeswitchAgentCfg,
|
||||
|
||||
@@ -343,6 +343,44 @@ const CGRATES_CFG_JSON = `
|
||||
},
|
||||
|
||||
|
||||
"ees": { // EventExporterService
|
||||
"enabled": false, // starts the EventReader service: <true|false>
|
||||
"attributes_conns":["*internal"], // RPC Connections IDs
|
||||
"exporters": [
|
||||
{
|
||||
"id": "*default", // identifier of the EventReader profile
|
||||
"type": "*none", // exporter type
|
||||
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
|
||||
"tenant": "", // tenant used in filterS.Pass
|
||||
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
|
||||
"filters": [], // limit parsing based on the filters
|
||||
"flags": [], // flags to influence the event processing
|
||||
"attribute_context": "", // context used to process request with AttributeS
|
||||
"attribute_ids": [], // ids of attributes used to avoid searching
|
||||
"synchronous": false, // block processing until export has a result
|
||||
"attempts": 1, // export attempts
|
||||
"field_separator": ",", // separator used in case of csv files
|
||||
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"},
|
||||
{"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"},
|
||||
{"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"},
|
||||
{"tag": "OriginID", "path": "*exp.OriginID", "type": "*variable", "value": "~*req.OriginID"},
|
||||
{"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"},
|
||||
{"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"},
|
||||
{"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"},
|
||||
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
|
||||
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"},
|
||||
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"},
|
||||
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
{"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
{"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"},
|
||||
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4},
|
||||
],
|
||||
},
|
||||
],
|
||||
},
|
||||
|
||||
|
||||
"sessions": {
|
||||
"enabled": false, // starts the session service: <true|false>
|
||||
"listen_bijson": "127.0.0.1:2014", // address where to listen for bidirectional JSON-RPC requests
|
||||
|
||||
@@ -221,6 +221,16 @@ func (self CgrJsonCfg) ERsJsonCfg() (erSCfg *ERsJsonCfg, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) EEsJsonCfg() (erSCfg *EEsJsonCfg, err error) {
|
||||
rawCfg, hasKey := self[EEsJson]
|
||||
if !hasKey {
|
||||
return
|
||||
}
|
||||
erSCfg = new(EEsJsonCfg)
|
||||
err = json.Unmarshal(*rawCfg, &erSCfg)
|
||||
return
|
||||
}
|
||||
|
||||
func (self CgrJsonCfg) SessionSJsonCfg() (*SessionSJsonCfg, error) {
|
||||
rawCfg, hasKey := self[SessionSJson]
|
||||
if !hasKey {
|
||||
|
||||
@@ -1868,3 +1868,121 @@ func TestDfEventReaderCfg(t *testing.T) {
|
||||
t.Errorf("Expected: %+v, \nreceived: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestDfEventExporterCfg(t *testing.T) {
|
||||
eContentFlds := []*FcTemplateJsonCfg{
|
||||
{
|
||||
Tag: utils.StringPointer(utils.CGRID),
|
||||
Path: utils.StringPointer("*exp.CGRID"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.CGRID),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.RunID),
|
||||
Path: utils.StringPointer("*exp.RunID"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.RunID),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.ToR),
|
||||
Path: utils.StringPointer("*exp.ToR"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.ToR),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.OriginID),
|
||||
Path: utils.StringPointer("*exp.OriginID"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.OriginID),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.RequestType),
|
||||
Path: utils.StringPointer("*exp.RequestType"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.RequestType),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Tenant),
|
||||
Path: utils.StringPointer("*exp.Tenant"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Tenant),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Category),
|
||||
Path: utils.StringPointer("*exp.Category"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Category),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Account),
|
||||
Path: utils.StringPointer("*exp.Account"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Account),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Subject),
|
||||
Path: utils.StringPointer("*exp.Subject"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Subject),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Destination),
|
||||
Path: utils.StringPointer("*exp.Destination"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Destination),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.SetupTime),
|
||||
Path: utils.StringPointer("*exp.SetupTime"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.SetupTime),
|
||||
Layout: utils.StringPointer("2006-01-02T15:04:05Z07:00"),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.AnswerTime),
|
||||
Path: utils.StringPointer("*exp.AnswerTime"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.AnswerTime),
|
||||
Layout: utils.StringPointer("2006-01-02T15:04:05Z07:00"),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Usage),
|
||||
Path: utils.StringPointer("*exp.Usage"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Usage),
|
||||
},
|
||||
{
|
||||
Tag: utils.StringPointer(utils.Cost),
|
||||
Path: utils.StringPointer("*exp.Cost"),
|
||||
Type: utils.StringPointer(utils.MetaVariable),
|
||||
Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.COST),
|
||||
Rounding_decimals: utils.IntPointer(4),
|
||||
},
|
||||
}
|
||||
eCfg := &EEsJsonCfg{
|
||||
Enabled: utils.BoolPointer(false),
|
||||
Attributes_conns: &[]string{utils.MetaInternal},
|
||||
Exporters: &[]*EventExporterJsonCfg{
|
||||
&EventExporterJsonCfg{
|
||||
Id: utils.StringPointer(utils.MetaDefault),
|
||||
Type: utils.StringPointer(utils.META_NONE),
|
||||
Field_separator: utils.StringPointer(","),
|
||||
Export_path: utils.StringPointer("/var/spool/cgrates/ees"),
|
||||
Attribute_context: utils.StringPointer(utils.EmptyString),
|
||||
Tenant: utils.StringPointer(utils.EmptyString),
|
||||
Timezone: utils.StringPointer(utils.EmptyString),
|
||||
Filters: &[]string{},
|
||||
Attribute_ids: &[]string{},
|
||||
Flags: &[]string{},
|
||||
Synchronous: utils.BoolPointer(false),
|
||||
Attempts: utils.IntPointer(1),
|
||||
Fields: &eContentFlds,
|
||||
},
|
||||
},
|
||||
}
|
||||
if cfg, err := dfCgrJsonCfg.EEsJsonCfg(); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eCfg, cfg) {
|
||||
t.Errorf("Expected: %+v, \nreceived: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1834,6 +1834,131 @@ func TestCgrCdfEventReader(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCdfEventExporter(t *testing.T) {
|
||||
eCfg := &EEsCfg{
|
||||
Enabled: false,
|
||||
AttributeSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)},
|
||||
Exporters: []*EventExporterCfg{
|
||||
&EventExporterCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: utils.META_NONE,
|
||||
FieldSep: ",",
|
||||
Tenant: nil,
|
||||
ExportPath: "/var/spool/cgrates/ees",
|
||||
Attempts: 1,
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: []string{},
|
||||
AttributeSIDs: []string{},
|
||||
Flags: utils.FlagsWithParams{},
|
||||
Fields: []*FCTemplate{
|
||||
{
|
||||
Tag: utils.CGRID,
|
||||
Path: "*exp.CGRID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.CGRID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.RunID,
|
||||
Path: "*exp.RunID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.RunID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.ToR,
|
||||
Path: "*exp.ToR",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.ToR", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.OriginID,
|
||||
Path: "*exp.OriginID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.OriginID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.RequestType,
|
||||
Path: "*exp.RequestType",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.RequestType", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Tenant,
|
||||
Path: "*exp.Tenant",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Tenant", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Category,
|
||||
Path: "*exp.Category",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Category", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Account,
|
||||
Path: "*exp.Account",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Account", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Subject,
|
||||
Path: "*exp.Subject",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Subject", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Destination,
|
||||
Path: "*exp.Destination",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Destination", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.SetupTime,
|
||||
Path: "*exp.SetupTime",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.SetupTime", true, utils.INFIELD_SEP),
|
||||
Layout: "2006-01-02T15:04:05Z07:00",
|
||||
},
|
||||
{
|
||||
Tag: utils.AnswerTime,
|
||||
Path: "*exp.AnswerTime",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.AnswerTime", true, utils.INFIELD_SEP),
|
||||
Layout: "2006-01-02T15:04:05Z07:00",
|
||||
},
|
||||
{
|
||||
Tag: utils.Usage,
|
||||
Path: "*exp.Usage",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Usage", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Cost,
|
||||
Path: "*exp.Cost",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
RoundingDecimals: utils.IntPointer(4),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.eesCfg, eCfg) {
|
||||
t.Errorf("received: %+v,\n expecting: %+v", utils.ToJSON(cgrCfg.eesCfg), utils.ToJSON(eCfg))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
eCfg := &EventReaderCfg{
|
||||
ID: utils.MetaDefault,
|
||||
@@ -1880,6 +2005,125 @@ func TestCgrCfgEventReaderDefault(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestCgrCfgEventExporterDefault(t *testing.T) {
|
||||
eCfg := &EventExporterCfg{
|
||||
ID: utils.MetaDefault,
|
||||
Type: utils.META_NONE,
|
||||
FieldSep: ",",
|
||||
Tenant: nil,
|
||||
ExportPath: "/var/spool/cgrates/ees",
|
||||
Attempts: 1,
|
||||
Timezone: utils.EmptyString,
|
||||
Filters: nil,
|
||||
Flags: utils.FlagsWithParams{},
|
||||
Fields: []*FCTemplate{
|
||||
{
|
||||
Tag: utils.CGRID,
|
||||
Path: "*exp.CGRID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.CGRID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.RunID,
|
||||
Path: "*exp.RunID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.RunID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.ToR,
|
||||
Path: "*exp.ToR",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.ToR", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.OriginID,
|
||||
Path: "*exp.OriginID",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.OriginID", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.RequestType,
|
||||
Path: "*exp.RequestType",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.RequestType", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Tenant,
|
||||
Path: "*exp.Tenant",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Tenant", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Category,
|
||||
Path: "*exp.Category",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Category", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Account,
|
||||
Path: "*exp.Account",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Account", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Subject,
|
||||
Path: "*exp.Subject",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Subject", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Destination,
|
||||
Path: "*exp.Destination",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Destination", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.SetupTime,
|
||||
Path: "*exp.SetupTime",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.SetupTime", true, utils.INFIELD_SEP),
|
||||
Layout: "2006-01-02T15:04:05Z07:00",
|
||||
},
|
||||
{
|
||||
Tag: utils.AnswerTime,
|
||||
Path: "*exp.AnswerTime",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.AnswerTime", true, utils.INFIELD_SEP),
|
||||
Layout: "2006-01-02T15:04:05Z07:00",
|
||||
},
|
||||
{
|
||||
Tag: utils.Usage,
|
||||
Path: "*exp.Usage",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Usage", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
},
|
||||
{
|
||||
Tag: utils.Cost,
|
||||
Path: "*exp.Cost",
|
||||
Type: "*variable",
|
||||
Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP),
|
||||
Layout: time.RFC3339,
|
||||
RoundingDecimals: utils.IntPointer(4),
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(cgrCfg.dfltEvExp, eCfg) {
|
||||
t.Errorf("received: %+v,\n expecting: %+v", utils.ToJSON(cgrCfg.dfltEvExp), utils.ToJSON(eCfg))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRpcConnsDefaults(t *testing.T) {
|
||||
eCfg := make(map[string]*RPCConn)
|
||||
// hardoded the *internal and *localhost connections
|
||||
|
||||
@@ -427,6 +427,34 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
// EventExporter sanity checks
|
||||
if cfg.eesCfg.Enabled {
|
||||
for _, connID := range cfg.eesCfg.AttributeSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.attributeSCfg.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.EEs)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.EEs, connID)
|
||||
}
|
||||
}
|
||||
for _, exp := range cfg.eesCfg.Exporters {
|
||||
if !possibleExporterTypes.Has(exp.Type) {
|
||||
return fmt.Errorf("<%s> unsupported data type: %s for exporter with ID: %s", utils.EEs, exp.Type, exp.ID)
|
||||
}
|
||||
|
||||
switch exp.Type {
|
||||
case utils.MetaFileCSV:
|
||||
for _, dir := range []string{exp.ExportPath} {
|
||||
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
|
||||
return fmt.Errorf("<%s> nonexistent folder: %s for reader with ID: %s", utils.EEs, dir, exp.ID)
|
||||
}
|
||||
}
|
||||
if exp.FieldSep == utils.EmptyString {
|
||||
return fmt.Errorf("<%s> empty FieldSep for exporter with ID: %s", utils.EEs, exp.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// StorDB sanity checks
|
||||
if cfg.storDbCfg.Type == utils.POSTGRES {
|
||||
if !utils.IsSliceMember([]string{utils.PostgressSSLModeDisable, utils.PostgressSSLModeAllow,
|
||||
|
||||
164
config/eescfg.go
164
config/eescfg.go
@@ -28,6 +28,73 @@ type EEsCfg struct {
|
||||
Exporters []*EventExporterCfg
|
||||
}
|
||||
|
||||
func (eeS *EEsCfg) loadFromJsonCfg(jsnCfg *EEsJsonCfg, sep string, dfltExpCfg *EventExporterCfg) (err error) {
|
||||
if jsnCfg == nil {
|
||||
return
|
||||
}
|
||||
if jsnCfg.Enabled != nil {
|
||||
eeS.Enabled = *jsnCfg.Enabled
|
||||
}
|
||||
if jsnCfg.Attributes_conns != nil {
|
||||
eeS.AttributeSConns = make([]string, len(*jsnCfg.Attributes_conns))
|
||||
for i, fID := range *jsnCfg.Attributes_conns {
|
||||
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
|
||||
if fID == utils.MetaInternal {
|
||||
eeS.AttributeSConns[i] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)
|
||||
} else {
|
||||
eeS.AttributeSConns[i] = fID
|
||||
}
|
||||
}
|
||||
}
|
||||
return eeS.appendEEsExporters(jsnCfg.Exporters, sep, dfltExpCfg)
|
||||
}
|
||||
|
||||
func (eeS *EEsCfg) appendEEsExporters(exporters *[]*EventExporterJsonCfg, separator string, dfltExpCfg *EventExporterCfg) (err error) {
|
||||
if exporters == nil {
|
||||
return
|
||||
}
|
||||
for _, jsnExp := range *exporters {
|
||||
exp := new(EventExporterCfg)
|
||||
if dfltExpCfg != nil {
|
||||
exp = dfltExpCfg.Clone()
|
||||
}
|
||||
var haveID bool
|
||||
if jsnExp.Id != nil {
|
||||
for _, exporter := range eeS.Exporters {
|
||||
if exporter.ID == *jsnExp.Id {
|
||||
exp = exporter
|
||||
haveID = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := exp.loadFromJsonCfg(jsnExp, separator); err != nil {
|
||||
return err
|
||||
}
|
||||
if !haveID {
|
||||
eeS.Exporters = append(eeS.Exporters, exp)
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Clone itself into a new EEsCfg
|
||||
func (eeS *EEsCfg) Clone() (cln *EEsCfg) {
|
||||
cln = new(EEsCfg)
|
||||
cln.Enabled = eeS.Enabled
|
||||
cln.AttributeSConns = make([]string, len(eeS.AttributeSConns))
|
||||
for idx, sConn := range eeS.AttributeSConns {
|
||||
cln.AttributeSConns[idx] = sConn
|
||||
}
|
||||
cln.Exporters = make([]*EventExporterCfg, len(eeS.Exporters))
|
||||
for idx, exp := range eeS.Exporters {
|
||||
cln.Exporters[idx] = exp.Clone()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type EventExporterCfg struct {
|
||||
ID string
|
||||
Type string
|
||||
@@ -40,6 +107,101 @@ type EventExporterCfg struct {
|
||||
AttributeSIDs []string // selective AttributeS profiles
|
||||
Synchronous bool
|
||||
Attempts int
|
||||
FieldSep rune
|
||||
FieldSep string
|
||||
Fields []*FCTemplate
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) loadFromJsonCfg(jsnEec *EventExporterJsonCfg, separator string) (err error) {
|
||||
if jsnEec == nil {
|
||||
return
|
||||
}
|
||||
if jsnEec.Id != nil {
|
||||
eeC.ID = *jsnEec.Id
|
||||
}
|
||||
if jsnEec.Type != nil {
|
||||
eeC.Type = *jsnEec.Type
|
||||
}
|
||||
if jsnEec.Export_path != nil {
|
||||
eeC.ExportPath = *jsnEec.Export_path
|
||||
}
|
||||
if jsnEec.Tenant != nil {
|
||||
if eeC.Tenant, err = NewRSRParsers(*jsnEec.Tenant, true, separator); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnEec.Timezone != nil {
|
||||
eeC.Timezone = *jsnEec.Timezone
|
||||
}
|
||||
if jsnEec.Filters != nil {
|
||||
eeC.Filters = make([]string, len(*jsnEec.Filters))
|
||||
for i, fltr := range *jsnEec.Filters {
|
||||
eeC.Filters[i] = fltr
|
||||
}
|
||||
}
|
||||
if jsnEec.Flags != nil {
|
||||
if eeC.Flags, err = utils.FlagsWithParamsFromSlice(*jsnEec.Flags); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnEec.Attribute_context != nil {
|
||||
eeC.AttributeSCtx = *jsnEec.Attribute_context
|
||||
}
|
||||
if jsnEec.Attribute_ids != nil {
|
||||
eeC.AttributeSIDs = make([]string, len(*jsnEec.Attribute_ids))
|
||||
for i, fltr := range *jsnEec.Attribute_ids {
|
||||
eeC.AttributeSIDs[i] = fltr
|
||||
}
|
||||
}
|
||||
if jsnEec.Synchronous != nil {
|
||||
eeC.Synchronous = *jsnEec.Synchronous
|
||||
}
|
||||
if jsnEec.Attempts != nil {
|
||||
eeC.Attempts = *jsnEec.Attempts
|
||||
}
|
||||
if jsnEec.Field_separator != nil {
|
||||
eeC.FieldSep = *jsnEec.Field_separator
|
||||
}
|
||||
if jsnEec.Fields != nil {
|
||||
if eeC.Fields, err = FCTemplatesFromFCTemplatesJsonCfg(*jsnEec.Fields, separator); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (eeC *EventExporterCfg) Clone() (cln *EventExporterCfg) {
|
||||
cln = new(EventExporterCfg)
|
||||
cln.ID = eeC.ID
|
||||
cln.Type = eeC.Type
|
||||
cln.ExportPath = eeC.ExportPath
|
||||
if len(eeC.Tenant) != 0 {
|
||||
cln.Tenant = make(RSRParsers, len(eeC.Tenant))
|
||||
for idx, val := range eeC.Tenant {
|
||||
clnVal := *val
|
||||
cln.Tenant[idx] = &clnVal
|
||||
}
|
||||
}
|
||||
cln.Timezone = eeC.Timezone
|
||||
if len(eeC.Filters) != 0 {
|
||||
cln.Filters = make([]string, len(eeC.Filters))
|
||||
for idx, val := range eeC.Filters {
|
||||
cln.Filters[idx] = val
|
||||
}
|
||||
}
|
||||
cln.Flags = eeC.Flags
|
||||
cln.AttributeSCtx = eeC.AttributeSCtx
|
||||
if len(eeC.AttributeSIDs) != 0 {
|
||||
cln.AttributeSIDs = make([]string, len(eeC.AttributeSIDs))
|
||||
for idx, val := range eeC.AttributeSIDs {
|
||||
cln.AttributeSIDs[idx] = val
|
||||
}
|
||||
}
|
||||
cln.Synchronous = eeC.Synchronous
|
||||
cln.Attempts = eeC.Attempts
|
||||
cln.FieldSep = eeC.FieldSep
|
||||
cln.Fields = make([]*FCTemplate, len(eeC.Fields))
|
||||
for idx, fld := range eeC.Fields {
|
||||
cln.Fields[idx] = fld.Clone()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -195,7 +195,31 @@ type EventReaderJsonCfg struct {
|
||||
Cache_dump_fields *[]*FcTemplateJsonCfg
|
||||
}
|
||||
|
||||
// SM-Generic config section
|
||||
// EEsJsonCfg contains the configuration of EventExporterService
|
||||
type EEsJsonCfg struct {
|
||||
Enabled *bool
|
||||
Attributes_conns *[]string
|
||||
Exporters *[]*EventExporterJsonCfg
|
||||
}
|
||||
|
||||
// EventExporterJsonCfg is the configuration of a single EventExporter
|
||||
type EventExporterJsonCfg struct {
|
||||
Id *string
|
||||
Type *string
|
||||
Export_path *string
|
||||
Tenant *string
|
||||
Timezone *string
|
||||
Filters *[]string
|
||||
Flags *[]string
|
||||
Attribute_context *string
|
||||
Attribute_ids *[]string
|
||||
Synchronous *bool
|
||||
Attempts *int
|
||||
Field_separator *string
|
||||
Fields *[]*FcTemplateJsonCfg
|
||||
}
|
||||
|
||||
// SessionSJsonCfg config section
|
||||
type SessionSJsonCfg struct {
|
||||
Enabled *bool
|
||||
Listen_bijson *string
|
||||
|
||||
@@ -670,6 +670,7 @@ const (
|
||||
MetaRatingPlanCost = "*rating_plan_cost"
|
||||
RatingPlanIDs = "RatingPlanIDs"
|
||||
ERs = "ERs"
|
||||
EEs = "EEs"
|
||||
Ratio = "Ratio"
|
||||
Load = "Load"
|
||||
Slash = "/"
|
||||
|
||||
Reference in New Issue
Block a user