Add rpcAPIOpts in ees cfg and change PrepareMap structure

This commit is contained in:
ionutboangiu
2022-02-24 19:06:17 +02:00
committed by Dan Christian Bogos
parent c0c3b87b83
commit 3471d868b6
16 changed files with 151 additions and 79 deletions

View File

@@ -562,6 +562,7 @@ const CGRATES_CFG_JSON = `
// "connIDs": [], // connections for connManager to this exporter
// "rpcConnTimeout" : "1s", // connection unsuccesfull on timeout
// "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value
// "rpcAPIOpts": {},
}, // extra options for exporter
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters

View File

@@ -201,6 +201,7 @@ type EventExporterOpts struct {
ConnIDs *[]string
RPCConnTimeout *time.Duration
RPCReplyTimeout *time.Duration
RPCAPIOpts map[string]interface{}
}
// EventExporterCfg the config for a Event Exporter
@@ -390,6 +391,10 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
}
eeOpts.RPCReplyTimeout = utils.DurationPointer(rpcReplyTimeout)
}
if jsnCfg.RPCAPIOpts != nil {
eeOpts.RPCAPIOpts = make(map[string]interface{})
eeOpts.RPCAPIOpts = jsnCfg.RPCAPIOpts
}
return
}
@@ -634,6 +639,10 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
if eeOpts.RPCReplyTimeout != nil {
cln.RPCReplyTimeout = utils.DurationPointer(*eeOpts.RPCReplyTimeout)
}
if eeOpts.RPCAPIOpts != nil {
cln.RPCAPIOpts = make(map[string]interface{})
cln.RPCAPIOpts = eeOpts.RPCAPIOpts
}
return cln
}
@@ -829,6 +838,9 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
if eeC.Opts.RPCReplyTimeout != nil {
opts[utils.RpcReplyTimeout] = eeC.Opts.RPCReplyTimeout.String()
}
if eeC.Opts.RPCAPIOpts != nil {
opts[utils.RPCAPIOpts] = eeC.Opts.RPCAPIOpts
}
flgs := eeC.Flags.SliceFlags()
if flgs == nil {

View File

@@ -270,53 +270,54 @@ type EEsJsonCfg struct {
}
type EventExporterOptsJson struct {
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`
ElsTimeout *string `json:"elsTimeout"`
ElsVersion *int `json:"elsVersion"`
ElsVersionType *string `json:"elsVersionType"`
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
SQLTableName *string `json:"sqlTableName"`
SQLDBName *string `json:"sqlDBName"`
SSLMode *string `json:"sslMode"`
KafkaTopic *string `json:"kafkaTopic"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
RPCCodec *string `json:"rpcCodec"`
ServiceMethod *string `json:"serviceMethod"`
KeyPath *string `json:"keyPath"`
CertPath *string `json:"certPath"`
CAPath *string `json:"caPath"`
ConnIDs *[]string `json:"connIDs"`
TLS *bool `json:"tls"`
RPCConnTimeout *string `json:"rpcConnTimeout"`
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
CSVFieldSeparator *string `json:"csvFieldSeparator"`
ElsIndex *string `json:"elsIndex"`
ElsIfPrimaryTerm *int `json:"elsIfPrimaryTerm"`
ElsIfSeqNo *int `json:"elsIfSeqNo"`
ElsOpType *string `json:"elsOpType"`
ElsPipeline *string `json:"elsPipeline"`
ElsRouting *string `json:"elsRouting"`
ElsTimeout *string `json:"elsTimeout"`
ElsVersion *int `json:"elsVersion"`
ElsVersionType *string `json:"elsVersionType"`
ElsWaitForActiveShards *string `json:"elsWaitForActiveShards"`
SQLMaxIdleConns *int `json:"sqlMaxIdleConns"`
SQLMaxOpenConns *int `json:"sqlMaxOpenConns"`
SQLConnMaxLifetime *string `json:"sqlConnMaxLifetime"`
MYSQLDSNParams map[string]string `json:"mysqlDSNParams"`
SQLTableName *string `json:"sqlTableName"`
SQLDBName *string `json:"sqlDBName"`
SSLMode *string `json:"sslMode"`
KafkaTopic *string `json:"kafkaTopic"`
AMQPQueueID *string `json:"amqpQueueID"`
AMQPRoutingKey *string `json:"amqpRoutingKey"`
AMQPExchange *string `json:"amqpExchange"`
AMQPExchangeType *string `json:"amqpExchangeType"`
AWSRegion *string `json:"awsRegion"`
AWSKey *string `json:"awsKey"`
AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"`
S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"`
NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"`
NATSSeedFile *string `json:"natsSeedFile"`
NATSCertificateAuthority *string `json:"natsCertificateAuthority"`
NATSClientCertificate *string `json:"natsClientCertificate"`
NATSClientKey *string `json:"natsClientKey"`
NATSJetStreamMaxWait *string `json:"natsJetStreamMaxWait"`
RPCCodec *string `json:"rpcCodec"`
ServiceMethod *string `json:"serviceMethod"`
KeyPath *string `json:"keyPath"`
CertPath *string `json:"certPath"`
CAPath *string `json:"caPath"`
ConnIDs *[]string `json:"connIDs"`
TLS *bool `json:"tls"`
RPCConnTimeout *string `json:"rpcConnTimeout"`
RPCReplyTimeout *string `json:"rpcReplyTimeout"`
RPCAPIOpts map[string]interface{} `json:"rpcAPIOpts"`
}
// EventExporterJsonCfg is the configuration of a single EventExporter

View File

@@ -35,7 +35,7 @@ type EventExporter interface {
ExportEvent(interface{}, string) error // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
PrepareMap(map[string]interface{}) (interface{}, error)
PrepareMap(*utils.CGREvent) (interface{}, error)
PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error)
}
@@ -230,8 +230,8 @@ func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent,
type bytePreparing struct{}
func (bytePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) {
return json.Marshal(mp)
func (bytePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
return json.Marshal(mp.Event)
}
func (bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
valMp := make(map[string]interface{})
@@ -246,9 +246,9 @@ func (bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}
type slicePreparing struct{}
func (slicePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) {
csvRecord := make([]string, 0, len(mp))
for _, val := range mp {
func (slicePreparing) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
csvRecord := make([]string, 0, len(mp.Event))
for _, val := range mp.Event {
csvRecord = append(csvRecord, utils.IfaceAsString(val))
}
return csvRecord, nil

View File

@@ -260,7 +260,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool
exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1
exp.GetMetrics().Unlock()
if len(exp.Cfg().ContentFields()) == 0 {
if eEv, err = exp.PrepareMap(ev.Event); err != nil {
if eEv, err = exp.PrepareMap(ev); err != nil {
return
}
} else {

View File

@@ -93,8 +93,8 @@ func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc }
func (httpEE *HTTPjsonMapEE) PrepareMap(mp map[string]interface{}) (interface{}, error) {
body, err := json.Marshal(mp)
func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
body, err := json.Marshal(mp.Event)
return &HTTPPosterRequest{
Header: httpEE.hdr,
Body: body,

View File

@@ -204,7 +204,9 @@ func TestHTTPJsonMapPrepareMap(t *testing.T) {
valMp := map[string]interface{}{
"*req.*tenant": "value1",
}
rcv, err := httpEE.PrepareMap(valMp)
rcv, err := httpEE.PrepareMap(&utils.CGREvent{
Event: valMp,
})
if err != nil {
t.Error(err)
}

View File

@@ -93,9 +93,9 @@ func (httpPost *HTTPPostEE) Close() (_ error) { return }
func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc }
func (httpPost *HTTPPostEE) PrepareMap(mp map[string]interface{}) (interface{}, error) {
func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
urlVals := url.Values{}
for k, v := range mp {
for k, v := range mp.Event {
urlVals.Set(k, utils.IfaceAsString(v))
}
return &HTTPPosterRequest{

View File

@@ -87,7 +87,11 @@ func TestHttpPostExportEvent2(t *testing.T) {
if err != nil {
t.Error(err)
}
vals, err := httpPost.PrepareMap(map[string]interface{}{"2": "*req.field2"})
vals, err := httpPost.PrepareMap(&utils.CGREvent{
Event: map[string]interface{}{
"2": "*req.field2",
},
})
if err != nil {
t.Fatal(err)
}
@@ -126,9 +130,11 @@ func TestHttpPostSync(t *testing.T) {
t.Error(err)
}
vals, err := exp.PrepareMap(map[string]interface{}{
"Account": "1001",
"Destination": "1002",
vals, err := exp.PrepareMap(&utils.CGREvent{
Event: map[string]interface{}{
"Account": "1001",
"Destination": "1002",
},
})
if err != nil {
t.Fatal(err)
@@ -179,9 +185,11 @@ func TestHttpPostSyncLimit(t *testing.T) {
t.Error(err)
}
vals, err := exp.PrepareMap(map[string]interface{}{
"Account": "1001",
"Destination": "1002",
vals, err := exp.PrepareMap(&utils.CGREvent{
Event: map[string]interface{}{
"Account": "1001",
"Destination": "1002",
},
})
if err != nil {
t.Fatal(err)

View File

@@ -49,8 +49,8 @@ func (vEe *LogEE) ExportEvent(mp interface{}, _ string) error {
}
func (vEe *LogEE) Close() error { return nil }
func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *LogEE) PrepareMap(mp map[string]interface{}) (interface{}, error) {
return mp, nil
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
return mp.Event, nil
}
func (vEe *LogEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
valMp := make(map[string]interface{})

View File

@@ -84,7 +84,10 @@ func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) {
return e.dc
}
func (e *RPCee) PrepareMap(mp map[string]interface{}) (interface{}, error) {
func (e *RPCee) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
for i, v := range e.Cfg().Opts.RPCAPIOpts {
mp.APIOpts[i] = v
}
return mp, nil
}

View File

@@ -179,9 +179,9 @@ func TestRPCPrepareMap(t *testing.T) {
},
}
exp := cgrEv.Event
exp := cgrEv
rcv, err := rpcEe.PrepareMap(cgrEv.Event)
rcv, err := rpcEe.PrepareMap(cgrEv)
if err != nil {
t.Error(err)
}

View File

@@ -158,7 +158,7 @@ func (sqlEe *SQLEe) Close() (err error) {
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
func (sqlEe *SQLEe) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (sqlEe *SQLEe) PrepareMap(*utils.CGREvent) (interface{}, error) { return nil, nil }
func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
var vals []interface{}

View File

@@ -36,12 +36,12 @@ type VirtualEE struct {
dc *utils.SafeMapStorage
}
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *VirtualEE) PrepareMap(*utils.CGREvent) (interface{}, error) { return nil, nil }
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) {
return nil, nil
}

View File

@@ -771,6 +771,11 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
// processing options
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if v, has := arg.APIOpts[utils.OptsAttributeS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
@@ -783,28 +788,58 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if v, has := arg.APIOpts[utils.OptsThresholdS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if v, has := arg.APIOpts[utils.OptsStatS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaStats) {
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event
if v, has := arg.APIOpts[utils.OptsChargerS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // activate single rating for the CDR
if v, has := arg.APIOpts[utils.OptsRALs]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if v, has := arg.APIOpts[utils.OptsRerate]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRerate) {
if reRate = flgs.GetBool(utils.MetaRerate); reRate {
ralS = true
}
}
var refund bool
if v, has := arg.APIOpts[utils.OptsRefund]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRefund) {
refund = flgs.GetBool(utils.MetaRefund)
}

View File

@@ -2428,7 +2428,8 @@ var CGROptionsSet = NewStringSet([]string{OptsSessionsTTL,
OptsRoutesProfileCount, OptsDispatchersProfilesCount, OptsAttributesProfileRuns,
OptsAttributesProfileIgnoreFilters, OptsStatsProfileIDs, OptsStatsProfileIgnoreFilters,
OptsThresholdsProfileIDs, OptsThresholdsProfileIgnoreFilters, OptsResourcesUsageID, OptsResourcesUsageTTL,
OptsResourcesUnits})
OptsResourcesUnits, OptsAttributeS, OptsThresholdS, OptsChargerS, OptsStatS, OptsRALs, OptsRerate,
OptsRefund})
// EventExporter metrics
const (
@@ -2489,6 +2490,14 @@ const (
// Thresholds
OptsThresholdsProfileIDs = "*thdProfileIDs"
OptsThresholdsProfileIgnoreFilters = "*thdProfileIgnoreFilters"
//CDRs and Sessions
OptsAttributeS = "*attributeS"
OptsChargerS = "*chargerS"
OptsStatS = "*statS"
OptsThresholdS = "*thresholdS"
OptsRALs = "*ralS"
OptsRerate = "*rerate"
OptsRefund = "*refund"
// Others
OptsContext = "*context"
Subsys = "*subsys"
@@ -2628,6 +2637,7 @@ const (
ConnIDs = "connIDs"
RpcConnTimeout = "rpcConnTimeout"
RpcReplyTimeout = "rpcReplyTimeout"
RPCAPIOpts = "rpcAPIOpts"
// processed opts
AMQPQueueIDProcessedCfg = "amqpQueueIDProcessed"