From f562de49df5577ba6daefb110dcbdcf6a3cfb7c8 Mon Sep 17 00:00:00 2001 From: adi Date: Tue, 28 Jun 2022 16:55:31 +0300 Subject: [PATCH] ExtraData for exporters interface --- ees/amqp.go | 4 +++- ees/amqpv1.go | 4 +++- ees/ee.go | 11 ++++++----- ees/ees.go | 7 +++---- ees/elastic.go | 11 ++++++++++- ees/filecsv.go | 4 +++- ees/filefwv.go | 4 +++- ees/httpjsonmap.go | 4 +++- ees/httppost.go | 4 +++- ees/kafka.go | 12 ++++++++++-- ees/log.go | 7 ++++--- ees/nats.go | 4 +++- ees/rpc.go | 3 ++- ees/s3.go | 13 +++++++++++-- ees/sql.go | 4 +++- ees/sqs.go | 4 +++- ees/virtualee.go | 13 +++++++------ 17 files changed, 80 insertions(+), 33 deletions(-) diff --git a/ees/amqp.go b/ees/amqp.go index 6a79888b8..99ca90024 100644 --- a/ees/amqp.go +++ b/ees/amqp.go @@ -137,7 +137,7 @@ func (pstr *AMQPee) Connect() (err error) { return } -func (pstr *AMQPee) ExportEvent(_ *context.Context, content interface{}, _ string) (err error) { +func (pstr *AMQPee) ExportEvent(_ *context.Context, content, _ interface{}) (err error) { pstr.reqs.get() pstr.RLock() if pstr.postChan == nil { @@ -175,3 +175,5 @@ func (pstr *AMQPee) Close() (err error) { } func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } + +func (pstr *AMQPee) ExtraData(*utils.CGREvent) interface{} { return nil } diff --git a/ees/amqpv1.go b/ees/amqpv1.go index e1325334b..21226589b 100644 --- a/ees/amqpv1.go +++ b/ees/amqpv1.go @@ -78,7 +78,7 @@ func (pstr *AMQPv1EE) Connect() (err error) { return } -func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { +func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) { pstr.reqs.get() pstr.RLock() defer func() { @@ -115,3 +115,5 @@ func (pstr *AMQPv1EE) Close() (err error) { } func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } + +func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) interface{} { return nil } diff --git a/ees/ee.go b/ees/ee.go index f39cbc214..404888d75 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -31,13 +31,14 @@ import ( ) type EventExporter interface { - Cfg() *config.EventExporterCfg // return the config - Connect() error // called before exporting an event to make sure it is connected - ExportEvent(*context.Context, interface{}, string) error // called on each event to be exported - Close() error // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics + Cfg() *config.EventExporterCfg // return the config + Connect() error // called before exporting an event to make sure it is connected + ExportEvent(ctx *context.Context, content interface{}, extraData interface{}) error // called on each event to be exported + Close() error // called when the exporter needs to terminate + GetMetrics() *utils.SafeMapStorage // called to get metrics PrepareMap(*utils.CGREvent) (interface{}, error) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) + ExtraData(*utils.CGREvent) interface{} } // NewEventExporter produces exporters diff --git a/ees/ees.go b/ees/ees.go index 201d49888..8fbf7e18e 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -274,13 +274,12 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils. return } } - key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), - utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault)) + extraData := exp.ExtraData(ev) - return ExportWithAttempts(ctx, exp, eEv, key) + return ExportWithAttempts(ctx, exp, eEv, extraData) } -func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key string) (err error) { +func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}) (err error) { if exp.Cfg().FailedPostsDir != utils.MetaNone { defer func() { if err != nil { diff --git a/ees/elastic.go b/ees/elastic.go index 9a31df589..3b6a3b09c 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -29,6 +29,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" elasticsearch "github.com/elastic/go-elasticsearch" ) @@ -100,7 +101,7 @@ func (eEe *ElasticEE) Connect() (err error) { } // ExportEvent implements EventExporter -func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key string) (err error) { +func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData interface{}) (err error) { eEe.reqs.get() eEe.RLock() defer func() { @@ -110,6 +111,7 @@ func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key stri if eEe.eClnt == nil { return utils.ErrDisconnected } + key := extraData.(string) eReq := esapi.IndexRequest{ Index: eEe.opts.Index, DocumentID: key, @@ -151,3 +153,10 @@ func (eEe *ElasticEE) Close() (_ error) { } func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc } + +func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) interface{} { + return utils.ConcatenatedKey( + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault), + ) +} diff --git a/ees/filecsv.go b/ees/filecsv.go index 751d1daf8..05bc47e38 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -110,7 +110,7 @@ func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg } func (fCsv *FileCSVee) Connect() (_ error) { return } -func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev interface{}, _ string) error { +func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev, _ interface{}) error { fCsv.Lock() // make sure that only one event is writen in file at once defer fCsv.Unlock() return fCsv.csvWriter.Write(ev.([]string)) @@ -134,6 +134,8 @@ func (fCsv *FileCSVee) Close() (err error) { func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc } +func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) interface{} { return nil } + // Buffers cannot be closed, they just Reset. We implement our struct and used it for writer field in FileCSVee to be available for WriterCloser interface type buffer struct { io.Writer diff --git a/ees/filefwv.go b/ees/filefwv.go index ce5c60b32..ba6af2c29 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -112,7 +112,7 @@ func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg } func (fFwv *FileFWVee) Connect() (_ error) { return } -func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ string) (err error) { +func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records, _ interface{}) (err error) { fFwv.Lock() // make sure that only one event is writen in file at once defer fFwv.Unlock() for _, record := range records.([]string) { @@ -140,3 +140,5 @@ func (fFwv *FileFWVee) Close() (err error) { } func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc } + +func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) interface{} { return nil } diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index c94f001ff..4b17bfd33 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -78,7 +78,7 @@ func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return } -func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { +func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) { httpEE.reqs.get() defer httpEE.reqs.done() pReq := content.(*HTTPPosterRequest) @@ -94,6 +94,8 @@ func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc } +func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) interface{} { return nil } + func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { body, err := json.Marshal(mp.Event) return &HTTPPosterRequest{ diff --git a/ees/httppost.go b/ees/httppost.go index b1e6b48be..054bcd8c0 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -78,7 +78,7 @@ func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg func (httpPost *HTTPPostEE) Connect() (_ error) { return } -func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { +func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) { httpPost.reqs.get() defer httpPost.reqs.done() pReq := content.(*HTTPPosterRequest) @@ -94,6 +94,8 @@ func (httpPost *HTTPPostEE) Close() (_ error) { return } func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc } +func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) interface{} { return nil } + func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { urlVals := url.Values{} for k, v := range mp.Event { diff --git a/ees/kafka.go b/ees/kafka.go index 132744ca3..73b7874e8 100644 --- a/ees/kafka.go +++ b/ees/kafka.go @@ -22,6 +22,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" kafka "github.com/segmentio/kafka-go" ) @@ -72,7 +73,7 @@ func (pstr *KafkaEE) Connect() (_ error) { return } -func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key string) (err error) { +func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, extraData interface{}) (err error) { pstr.reqs.get() pstr.RLock() if pstr.writer == nil { @@ -80,8 +81,9 @@ func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key pstr.reqs.done() return utils.ErrDisconnected } + kafkaKey := extraData.(string) err = pstr.writer.WriteMessages(ctx, kafka.Message{ - Key: []byte(key), + Key: []byte(kafkaKey), Value: content.([]byte), }) pstr.RUnlock() @@ -100,3 +102,9 @@ func (pstr *KafkaEE) Close() (err error) { } func (pstr *KafkaEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *KafkaEE) ExtraData(ev *utils.CGREvent) interface{} { + return utils.ConcatenatedKey( + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault), + ) +} diff --git a/ees/log.go b/ees/log.go index c2526aaf7..7ba258f18 100644 --- a/ees/log.go +++ b/ees/log.go @@ -42,14 +42,15 @@ type LogEE struct { func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg } func (vEe *LogEE) Connect() error { return nil } -func (vEe *LogEE) ExportEvent(_ *context.Context, mp interface{}, _ string) error { +func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ interface{}) error { utils.Logger.Info( fmt.Sprintf("<%s> <%s> exported: <%s>", utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp))) return nil } -func (vEe *LogEE) Close() error { return nil } -func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *LogEE) Close() error { return nil } +func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *LogEE) ExtraData(ev *utils.CGREvent) interface{} { return nil } func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return mp.Event, nil } diff --git a/ees/nats.go b/ees/nats.go index cf757c773..97ede37f1 100644 --- a/ees/nats.go +++ b/ees/nats.go @@ -94,7 +94,7 @@ func (pstr *NatsEE) Connect() (err error) { return } -func (pstr *NatsEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) { +func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) { pstr.reqs.get() pstr.RLock() if pstr.poster == nil { @@ -124,6 +124,8 @@ func (pstr *NatsEE) Close() (err error) { func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } +func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) interface{} { return nil } + func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { nop = make([]nats.Option, 0, 7) nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), diff --git a/ees/rpc.go b/ees/rpc.go index 59f6ab840..b351b9ecc 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -67,7 +67,7 @@ func (e *RPCee) Connect() (err error) { return } -func (e *RPCee) ExportEvent(ctx *context.Context, args interface{}, _ string) (err error) { +func (e *RPCee) ExportEvent(ctx *context.Context, args, _ interface{}) (err error) { e.Lock() defer e.Unlock() var rply string @@ -84,6 +84,7 @@ func (e *RPCee) Close() (err error) { func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) { return e.dc } +func (e *RPCee) ExtraData(ev *utils.CGREvent) interface{} { return nil } func (e *RPCee) PrepareMap(mp *utils.CGREvent) (interface{}, error) { for i, v := range e.Cfg().Opts.RPCAPIOpts { diff --git a/ees/s3.go b/ees/s3.go index 45d05d59c..eef1b3f5a 100644 --- a/ees/s3.go +++ b/ees/s3.go @@ -29,6 +29,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -112,16 +113,17 @@ func (pstr *S3EE) Connect() (err error) { return } -func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key string) (err error) { +func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData interface{}) (err error) { pstr.reqs.get() pstr.RLock() + sKey := extraData.(string) _, err = pstr.up.UploadWithContext(ctx, &s3manager.UploadInput{ Bucket: aws.String(pstr.bucket), // Can also use the `filepath` standard library package to modify the // filename as need for an S3 object key. Such as turning absolute path // to a relative path. - Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, key)), + Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, sKey)), // The file to be uploaded. io.ReadSeeker is preferred as the Uploader // will be able to optimize memory when uploading large content. io.Reader @@ -137,3 +139,10 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key str func (pstr *S3EE) Close() (_ error) { return } func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } + +func (pstr *S3EE) ExtraData(ev *utils.CGREvent) interface{} { + return utils.ConcatenatedKey( + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()), + utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault), + ) +} diff --git a/ees/sql.go b/ees/sql.go index c608fab15..63ad9d270 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -132,7 +132,7 @@ func (sqlEe *SQLEe) Connect() (err error) { return } -func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req interface{}, _ string) error { +func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req, _ interface{}) error { sqlEe.reqs.get() sqlEe.RLock() defer func() { @@ -159,6 +159,8 @@ func (sqlEe *SQLEe) Close() (err error) { func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } +func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) interface{} { return nil } + func (sqlEe *SQLEe) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil } func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { diff --git a/ees/sqs.go b/ees/sqs.go index eb0b5a858..85116e122 100644 --- a/ees/sqs.go +++ b/ees/sqs.go @@ -130,7 +130,7 @@ func (pstr *SQSee) Connect() (err error) { return } -func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ string) (err error) { +func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ interface{}) (err error) { pstr.reqs.get() pstr.RLock() _, err = pstr.svc.SendMessageWithContext(ctx, @@ -147,3 +147,5 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ stri func (pstr *SQSee) Close() (_ error) { return } func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } + +func (pstr *SQSee) ExtraData(ev *utils.CGREvent) interface{} { return nil } diff --git a/ees/virtualee.go b/ees/virtualee.go index 29d72b0ff..4ed8e3829 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -37,12 +37,13 @@ type VirtualEE struct { dc *utils.SafeMapStorage } -func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } -func (vEe *VirtualEE) Connect() error { return nil } -func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, string) error { return nil } -func (vEe *VirtualEE) Close() error { return nil } -func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } -func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil } +func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } +func (vEe *VirtualEE) Connect() error { return nil } +func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, interface{}) error { return nil } +func (vEe *VirtualEE) Close() error { return nil } +func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) ExtraData(*utils.CGREvent) interface{} { return nil } +func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil } func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) { return nil, nil }