From 3471d868b6b8a23d8a436d5305475402782f4c46 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 24 Feb 2022 19:06:17 +0200 Subject: [PATCH] Add rpcAPIOpts in ees cfg and change PrepareMap structure --- config/config_defaults.go | 1 + config/eescfg.go | 12 +++++ config/libconfig_json.go | 95 ++++++++++++++++++++------------------- ees/ee.go | 12 ++--- ees/ees.go | 2 +- ees/httpjsonmap.go | 4 +- ees/httpjsonmap_test.go | 4 +- ees/httppost.go | 4 +- ees/httppost_test.go | 22 ++++++--- ees/log.go | 4 +- ees/rpc.go | 5 ++- ees/rpc_test.go | 4 +- ees/sql.go | 2 +- ees/virtualee.go | 12 ++--- engine/cdrs.go | 35 +++++++++++++++ utils/consts.go | 12 ++++- 16 files changed, 151 insertions(+), 79 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 345832707..6b1290548 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -562,6 +562,7 @@ const CGRATES_CFG_JSON = ` // "connIDs": [], // connections for connManager to this exporter // "rpcConnTimeout" : "1s", // connection unsuccesfull on timeout // "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value + // "rpcAPIOpts": {}, }, // extra options for exporter "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> "filters": [], // limit parsing based on the filters diff --git a/config/eescfg.go b/config/eescfg.go index 0c1fee4a0..39fb9e865 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -201,6 +201,7 @@ type EventExporterOpts struct { ConnIDs *[]string RPCConnTimeout *time.Duration RPCReplyTimeout *time.Duration + RPCAPIOpts map[string]interface{} } // EventExporterCfg the config for a Event Exporter @@ -390,6 +391,10 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson) } eeOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout) } + if jsnCfg.RPCAPIOpts != nil { + eeOpts.RPCAPIOpts = make(map[string]interface{}) + eeOpts.RPCAPIOpts = jsnCfg.RPCAPIOpts + } return } @@ -634,6 +639,10 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts { if eeOpts.RPCReplyTimeout != nil { cln.RPCReplyTimeout = utils.DurationPointer(*eeOpts.RPCReplyTimeout) } + if eeOpts.RPCAPIOpts != nil { + cln.RPCAPIOpts = make(map[string]interface{}) + cln.RPCAPIOpts = eeOpts.RPCAPIOpts + } return cln } @@ -829,6 +838,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str if eeC.Opts.RPCReplyTimeout != nil { opts[utils.RpcReplyTimeout] = eeC.Opts.RPCReplyTimeout.String() } + if eeC.Opts.RPCAPIOpts != nil { + opts[utils.RPCAPIOpts] = eeC.Opts.RPCAPIOpts + } flgs := eeC.Flags.SliceFlags() if flgs == nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index ef37d7209..97ae23c22 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -270,53 +270,54 @@ type EEsJsonCfg struct { } type EventExporterOptsJson struct { - CSVFieldSeparator *string `json:"csvFieldSeparator"` - ElsIndex *string `json:"elsIndex"` - ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` - ElsIfSeqNo *int `json:"elsIfSeqNo"` - ElsOpType *string `json:"elsOpType"` - ElsPipeline *string `json:"elsPipeline"` - ElsRouting *string `json:"elsRouting"` - ElsTimeout *string `json:"elsTimeout"` - ElsVersion *int `json:"elsVersion"` - ElsVersionType *string `json:"elsVersionType"` - ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` - SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` - SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` - SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` - MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` - SQLTableName *string `json:"sqlTableName"` - SQLDBName *string `json:"sqlDBName"` - SSLMode *string `json:"sslMode"` - KafkaTopic *string `json:"kafkaTopic"` - AMQPQueueID *string `json:"amqpQueueID"` - AMQPRoutingKey *string `json:"amqpRoutingKey"` - AMQPExchange *string `json:"amqpExchange"` - AMQPExchangeType *string `json:"amqpExchangeType"` - AWSRegion *string `json:"awsRegion"` - AWSKey *string `json:"awsKey"` - AWSSecret *string `json:"awsSecret"` - AWSToken *string `json:"awsToken"` - SQSQueueID *string `json:"sqsQueueID"` - S3BucketID *string `json:"s3BucketID"` - S3FolderPath *string `json:"s3FolderPath"` - NATSJetStream *bool `json:"natsJetStream"` - NATSSubject *string `json:"natsSubject"` - NATSJWTFile *string `json:"natsJWTFile"` - NATSSeedFile *string `json:"natsSeedFile"` - NATSCertificateAuthority *string `json:"natsCertificateAuthority"` - NATSClientCertificate *string `json:"natsClientCertificate"` - NATSClientKey *string `json:"natsClientKey"` - NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` - RPCCodec *string `json:"rpcCodec"` - ServiceMethod *string `json:"serviceMethod"` - KeyPath *string `json:"keyPath"` - CertPath *string `json:"certPath"` - CAPath *string `json:"caPath"` - ConnIDs *[]string `json:"connIDs"` - TLS *bool `json:"tls"` - RPCConnTimeout *string `json:"rpcConnTimeout"` - RPCReplyTimeout *string `json:"rpcReplyTimeout"` + CSVFieldSeparator *string `json:"csvFieldSeparator"` + ElsIndex *string `json:"elsIndex"` + ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"` + ElsIfSeqNo *int `json:"elsIfSeqNo"` + ElsOpType *string `json:"elsOpType"` + ElsPipeline *string `json:"elsPipeline"` + ElsRouting *string `json:"elsRouting"` + ElsTimeout *string `json:"elsTimeout"` + ElsVersion *int `json:"elsVersion"` + ElsVersionType *string `json:"elsVersionType"` + ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"` + SQLMaxIdleConns *int `json:"sqlMaxIdleConns"` + SQLMaxOpenConns *int `json:"sqlMaxOpenConns"` + SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"` + MYSQLDSNParams map[string]string `json:"mysqlDSNParams"` + SQLTableName *string `json:"sqlTableName"` + SQLDBName *string `json:"sqlDBName"` + SSLMode *string `json:"sslMode"` + KafkaTopic *string `json:"kafkaTopic"` + AMQPQueueID *string `json:"amqpQueueID"` + AMQPRoutingKey *string `json:"amqpRoutingKey"` + AMQPExchange *string `json:"amqpExchange"` + AMQPExchangeType *string `json:"amqpExchangeType"` + AWSRegion *string `json:"awsRegion"` + AWSKey *string `json:"awsKey"` + AWSSecret *string `json:"awsSecret"` + AWSToken *string `json:"awsToken"` + SQSQueueID *string `json:"sqsQueueID"` + S3BucketID *string `json:"s3BucketID"` + S3FolderPath *string `json:"s3FolderPath"` + NATSJetStream *bool `json:"natsJetStream"` + NATSSubject *string `json:"natsSubject"` + NATSJWTFile *string `json:"natsJWTFile"` + NATSSeedFile *string `json:"natsSeedFile"` + NATSCertificateAuthority *string `json:"natsCertificateAuthority"` + NATSClientCertificate *string `json:"natsClientCertificate"` + NATSClientKey *string `json:"natsClientKey"` + NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"` + RPCCodec *string `json:"rpcCodec"` + ServiceMethod *string `json:"serviceMethod"` + KeyPath *string `json:"keyPath"` + CertPath *string `json:"certPath"` + CAPath *string `json:"caPath"` + ConnIDs *[]string `json:"connIDs"` + TLS *bool `json:"tls"` + RPCConnTimeout *string `json:"rpcConnTimeout"` + RPCReplyTimeout *string `json:"rpcReplyTimeout"` + RPCAPIOpts map[string]interface{} `json:"rpcAPIOpts"` } // EventExporterJsonCfg is the configuration of a single EventExporter diff --git a/ees/ee.go b/ees/ee.go index 10838f589..804077e29 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -35,7 +35,7 @@ type EventExporter interface { ExportEvent(interface{}, string) error // called on each event to be exported Close() error // called when the exporter needs to terminate GetMetrics() *utils.SafeMapStorage // called to get metrics - PrepareMap(map[string]interface{}) (interface{}, error) + PrepareMap(*utils.CGREvent) (interface{}, error) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) } @@ -230,8 +230,8 @@ func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, type bytePreparing struct{} -func (bytePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) { - return json.Marshal(mp) +func (bytePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) { + return json.Marshal(mp.Event) } func (bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { valMp := make(map[string]interface{}) @@ -246,9 +246,9 @@ func (bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{} type slicePreparing struct{} -func (slicePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) { - csvRecord := make([]string, 0, len(mp)) - for _, val := range mp { +func (slicePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) { + csvRecord := make([]string, 0, len(mp.Event)) + for _, val := range mp.Event { csvRecord = append(csvRecord, utils.IfaceAsString(val)) } return csvRecord, nil diff --git a/ees/ees.go b/ees/ees.go index 956093738..5cf6b8d7d 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -260,7 +260,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1 exp.GetMetrics().Unlock() if len(exp.Cfg().ContentFields()) == 0 { - if eEv, err = exp.PrepareMap(ev.Event); err != nil { + if eEv, err = exp.PrepareMap(ev); err != nil { return } } else { diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 064cf5d41..5a82739da 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -93,8 +93,8 @@ func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc } -func (httpEE *HTTPjsonMapEE) PrepareMap(mp map[string]interface{}) (interface{}, error) { - body, err := json.Marshal(mp) +func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { + body, err := json.Marshal(mp.Event) return &HTTPPosterRequest{ Header: httpEE.hdr, Body: body, diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 23ccac5ef..98bf8ba79 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -204,7 +204,9 @@ func TestHTTPJsonMapPrepareMap(t *testing.T) { valMp := map[string]interface{}{ "*req.*tenant": "value1", } - rcv, err := httpEE.PrepareMap(valMp) + rcv, err := httpEE.PrepareMap(&utils.CGREvent{ + Event: valMp, + }) if err != nil { t.Error(err) } diff --git a/ees/httppost.go b/ees/httppost.go index 3a0953bef..04a76a207 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -93,9 +93,9 @@ func (httpPost *HTTPPostEE) Close() (_ error) { return } func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc } -func (httpPost *HTTPPostEE) PrepareMap(mp map[string]interface{}) (interface{}, error) { +func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { urlVals := url.Values{} - for k, v := range mp { + for k, v := range mp.Event { urlVals.Set(k, utils.IfaceAsString(v)) } return &HTTPPosterRequest{ diff --git a/ees/httppost_test.go b/ees/httppost_test.go index fbb9743f5..be7179a6a 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -87,7 +87,11 @@ func TestHttpPostExportEvent2(t *testing.T) { if err != nil { t.Error(err) } - vals, err := httpPost.PrepareMap(map[string]interface{}{"2": "*req.field2"}) + vals, err := httpPost.PrepareMap(&utils.CGREvent{ + Event: map[string]interface{}{ + "2": "*req.field2", + }, + }) if err != nil { t.Fatal(err) } @@ -126,9 +130,11 @@ func TestHttpPostSync(t *testing.T) { t.Error(err) } - vals, err := exp.PrepareMap(map[string]interface{}{ - "Account": "1001", - "Destination": "1002", + vals, err := exp.PrepareMap(&utils.CGREvent{ + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, }) if err != nil { t.Fatal(err) @@ -179,9 +185,11 @@ func TestHttpPostSyncLimit(t *testing.T) { t.Error(err) } - vals, err := exp.PrepareMap(map[string]interface{}{ - "Account": "1001", - "Destination": "1002", + vals, err := exp.PrepareMap(&utils.CGREvent{ + Event: map[string]interface{}{ + "Account": "1001", + "Destination": "1002", + }, }) if err != nil { t.Fatal(err) diff --git a/ees/log.go b/ees/log.go index 112c5dd70..ebcfba477 100644 --- a/ees/log.go +++ b/ees/log.go @@ -49,8 +49,8 @@ func (vEe *LogEE) ExportEvent(mp interface{}, _ string) error { } func (vEe *LogEE) Close() error { return nil } func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } -func (vEe *LogEE) PrepareMap(mp map[string]interface{}) (interface{}, error) { - return mp, nil +func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { + return mp.Event, nil } func (vEe *LogEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { valMp := make(map[string]interface{}) diff --git a/ees/rpc.go b/ees/rpc.go index a42936dbb..b9e330911 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -84,7 +84,10 @@ func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) { return e.dc } -func (e *RPCee) PrepareMap(mp map[string]interface{}) (interface{}, error) { +func (e *RPCee) PrepareMap(mp *utils.CGREvent) (interface{}, error) { + for i, v := range e.Cfg().Opts.RPCAPIOpts { + mp.APIOpts[i] = v + } return mp, nil } diff --git a/ees/rpc_test.go b/ees/rpc_test.go index 006f53ca2..bc0eb1f59 100644 --- a/ees/rpc_test.go +++ b/ees/rpc_test.go @@ -179,9 +179,9 @@ func TestRPCPrepareMap(t *testing.T) { }, } - exp := cgrEv.Event + exp := cgrEv - rcv, err := rpcEe.PrepareMap(cgrEv.Event) + rcv, err := rpcEe.PrepareMap(cgrEv) if err != nil { t.Error(err) } diff --git a/ees/sql.go b/ees/sql.go index 9345e03a0..70ea41e29 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -158,7 +158,7 @@ func (sqlEe *SQLEe) Close() (err error) { func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } -func (sqlEe *SQLEe) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } +func (sqlEe *SQLEe) PrepareMap(*utils.CGREvent) (interface{}, error) { return nil, nil } func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { var vals []interface{} diff --git a/ees/virtualee.go b/ees/virtualee.go index 105d0bd69..782ff125f 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -36,12 +36,12 @@ type VirtualEE struct { dc *utils.SafeMapStorage } -func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } -func (vEe *VirtualEE) Connect() error { return nil } -func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil } -func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } -func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } +func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } +func (vEe *VirtualEE) Connect() error { return nil } +func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil } +func (vEe *VirtualEE) Close() error { return nil } +func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) PrepareMap(*utils.CGREvent) (interface{}, error) { return nil, nil } func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) { return nil, nil } diff --git a/engine/cdrs.go b/engine/cdrs.go index 4b1978540..a222de7fa 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -771,6 +771,11 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er // processing options flgs := utils.FlagsWithParamsFromSlice(arg.Flags) attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 + if v, has := arg.APIOpts[utils.OptsAttributeS]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaAttributes) { attrS = flgs.GetBool(utils.MetaAttributes) } @@ -783,28 +788,58 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er export = flgs.GetBool(utils.MetaExport) } thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 + if v, has := arg.APIOpts[utils.OptsThresholdS]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaThresholds) { thdS = flgs.GetBool(utils.MetaThresholds) } stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 + if v, has := arg.APIOpts[utils.OptsStatS]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaStats) { stS = flgs.GetBool(utils.MetaStats) } chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event + if v, has := arg.APIOpts[utils.OptsChargerS]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaChargers) { chrgS = flgs.GetBool(utils.MetaChargers) } var ralS bool // activate single rating for the CDR + if v, has := arg.APIOpts[utils.OptsRALs]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaRALs) { ralS = flgs.GetBool(utils.MetaRALs) } var reRate bool + if v, has := arg.APIOpts[utils.OptsRerate]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaRerate) { if reRate = flgs.GetBool(utils.MetaRerate); reRate { ralS = true } } var refund bool + if v, has := arg.APIOpts[utils.OptsRefund]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } if flgs.Has(utils.MetaRefund) { refund = flgs.GetBool(utils.MetaRefund) } diff --git a/utils/consts.go b/utils/consts.go index e8ee934c7..d737f6144 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2428,7 +2428,8 @@ var CGROptionsSet = NewStringSet([]string{OptsSessionsTTL, OptsRoutesProfileCount, OptsDispatchersProfilesCount, OptsAttributesProfileRuns, OptsAttributesProfileIgnoreFilters, OptsStatsProfileIDs, OptsStatsProfileIgnoreFilters, OptsThresholdsProfileIDs, OptsThresholdsProfileIgnoreFilters, OptsResourcesUsageID, OptsResourcesUsageTTL, - OptsResourcesUnits}) + OptsResourcesUnits, OptsAttributeS, OptsThresholdS, OptsChargerS, OptsStatS, OptsRALs, OptsRerate, + OptsRefund}) // EventExporter metrics const ( @@ -2489,6 +2490,14 @@ const ( // Thresholds OptsThresholdsProfileIDs = "*thdProfileIDs" OptsThresholdsProfileIgnoreFilters = "*thdProfileIgnoreFilters" + //CDRs and Sessions + OptsAttributeS = "*attributeS" + OptsChargerS = "*chargerS" + OptsStatS = "*statS" + OptsThresholdS = "*thresholdS" + OptsRALs = "*ralS" + OptsRerate = "*rerate" + OptsRefund = "*refund" // Others OptsContext = "*context" Subsys = "*subsys" @@ -2628,6 +2637,7 @@ const ( ConnIDs = "connIDs" RpcConnTimeout = "rpcConnTimeout" RpcReplyTimeout = "rpcReplyTimeout" + RPCAPIOpts = "rpcAPIOpts" // processed opts AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed"