mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ExtraData for exporters interface
This commit is contained in:
@@ -137,7 +137,7 @@ func (pstr *AMQPee) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AMQPee) ExportEvent(_ *context.Context, content interface{}, _ string) (err error) {
|
||||
func (pstr *AMQPee) ExportEvent(_ *context.Context, content, _ interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.postChan == nil {
|
||||
@@ -175,3 +175,5 @@ func (pstr *AMQPee) Close() (err error) {
|
||||
}
|
||||
|
||||
func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *AMQPee) ExtraData(*utils.CGREvent) interface{} { return nil }
|
||||
|
||||
@@ -78,7 +78,7 @@ func (pstr *AMQPv1EE) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
|
||||
func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
defer func() {
|
||||
@@ -115,3 +115,5 @@ func (pstr *AMQPv1EE) Close() (err error) {
|
||||
}
|
||||
|
||||
func (pstr *AMQPv1EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *AMQPv1EE) ExtraData(*utils.CGREvent) interface{} { return nil }
|
||||
|
||||
11
ees/ee.go
11
ees/ee.go
@@ -31,13 +31,14 @@ import (
|
||||
)
|
||||
|
||||
type EventExporter interface {
|
||||
Cfg() *config.EventExporterCfg // return the config
|
||||
Connect() error // called before exporting an event to make sure it is connected
|
||||
ExportEvent(*context.Context, interface{}, string) error // called on each event to be exported
|
||||
Close() error // called when the exporter needs to terminate
|
||||
GetMetrics() *utils.SafeMapStorage // called to get metrics
|
||||
Cfg() *config.EventExporterCfg // return the config
|
||||
Connect() error // called before exporting an event to make sure it is connected
|
||||
ExportEvent(ctx *context.Context, content interface{}, extraData interface{}) error // called on each event to be exported
|
||||
Close() error // called when the exporter needs to terminate
|
||||
GetMetrics() *utils.SafeMapStorage // called to get metrics
|
||||
PrepareMap(*utils.CGREvent) (interface{}, error)
|
||||
PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error)
|
||||
ExtraData(*utils.CGREvent) interface{}
|
||||
}
|
||||
|
||||
// NewEventExporter produces exporters
|
||||
|
||||
@@ -274,13 +274,12 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.
|
||||
return
|
||||
}
|
||||
}
|
||||
key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault))
|
||||
extraData := exp.ExtraData(ev)
|
||||
|
||||
return ExportWithAttempts(ctx, exp, eEv, key)
|
||||
return ExportWithAttempts(ctx, exp, eEv, extraData)
|
||||
}
|
||||
|
||||
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key string) (err error) {
|
||||
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}) (err error) {
|
||||
if exp.Cfg().FailedPostsDir != utils.MetaNone {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
elasticsearch "github.com/elastic/go-elasticsearch"
|
||||
)
|
||||
@@ -100,7 +101,7 @@ func (eEe *ElasticEE) Connect() (err error) {
|
||||
}
|
||||
|
||||
// ExportEvent implements EventExporter
|
||||
func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key string) (err error) {
|
||||
func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev, extraData interface{}) (err error) {
|
||||
eEe.reqs.get()
|
||||
eEe.RLock()
|
||||
defer func() {
|
||||
@@ -110,6 +111,7 @@ func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key stri
|
||||
if eEe.eClnt == nil {
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
key := extraData.(string)
|
||||
eReq := esapi.IndexRequest{
|
||||
Index: eEe.opts.Index,
|
||||
DocumentID: key,
|
||||
@@ -151,3 +153,10 @@ func (eEe *ElasticEE) Close() (_ error) {
|
||||
}
|
||||
|
||||
func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc }
|
||||
|
||||
func (eEE *ElasticEE) ExtraData(ev *utils.CGREvent) interface{} {
|
||||
return utils.ConcatenatedKey(
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg }
|
||||
|
||||
func (fCsv *FileCSVee) Connect() (_ error) { return }
|
||||
|
||||
func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev interface{}, _ string) error {
|
||||
func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev, _ interface{}) error {
|
||||
fCsv.Lock() // make sure that only one event is writen in file at once
|
||||
defer fCsv.Unlock()
|
||||
return fCsv.csvWriter.Write(ev.([]string))
|
||||
@@ -134,6 +134,8 @@ func (fCsv *FileCSVee) Close() (err error) {
|
||||
|
||||
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }
|
||||
|
||||
func (fCsv *FileCSVee) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
// Buffers cannot be closed, they just Reset. We implement our struct and used it for writer field in FileCSVee to be available for WriterCloser interface
|
||||
type buffer struct {
|
||||
io.Writer
|
||||
|
||||
@@ -112,7 +112,7 @@ func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg }
|
||||
|
||||
func (fFwv *FileFWVee) Connect() (_ error) { return }
|
||||
|
||||
func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ string) (err error) {
|
||||
func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records, _ interface{}) (err error) {
|
||||
fFwv.Lock() // make sure that only one event is writen in file at once
|
||||
defer fFwv.Unlock()
|
||||
for _, record := range records.([]string) {
|
||||
@@ -140,3 +140,5 @@ func (fFwv *FileFWVee) Close() (err error) {
|
||||
}
|
||||
|
||||
func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc }
|
||||
|
||||
func (fFwv *FileFWVee) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
@@ -78,7 +78,7 @@ func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
|
||||
func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) {
|
||||
httpEE.reqs.get()
|
||||
defer httpEE.reqs.done()
|
||||
pReq := content.(*HTTPPosterRequest)
|
||||
@@ -94,6 +94,8 @@ func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
func (httpEE *HTTPjsonMapEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
|
||||
body, err := json.Marshal(mp.Event)
|
||||
return &HTTPPosterRequest{
|
||||
|
||||
@@ -78,7 +78,7 @@ func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg
|
||||
|
||||
func (httpPost *HTTPPostEE) Connect() (_ error) { return }
|
||||
|
||||
func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
|
||||
func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) {
|
||||
httpPost.reqs.get()
|
||||
defer httpPost.reqs.done()
|
||||
pReq := content.(*HTTPPosterRequest)
|
||||
@@ -94,6 +94,8 @@ func (httpPost *HTTPPostEE) Close() (_ error) { return }
|
||||
|
||||
func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc }
|
||||
|
||||
func (httpPost *HTTPPostEE) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
func (httpPost *HTTPPostEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
|
||||
urlVals := url.Values{}
|
||||
for k, v := range mp.Event {
|
||||
|
||||
12
ees/kafka.go
12
ees/kafka.go
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
kafka "github.com/segmentio/kafka-go"
|
||||
)
|
||||
@@ -72,7 +73,7 @@ func (pstr *KafkaEE) Connect() (_ error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key string) (err error) {
|
||||
func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, extraData interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.writer == nil {
|
||||
@@ -80,8 +81,9 @@ func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key
|
||||
pstr.reqs.done()
|
||||
return utils.ErrDisconnected
|
||||
}
|
||||
kafkaKey := extraData.(string)
|
||||
err = pstr.writer.WriteMessages(ctx, kafka.Message{
|
||||
Key: []byte(key),
|
||||
Key: []byte(kafkaKey),
|
||||
Value: content.([]byte),
|
||||
})
|
||||
pstr.RUnlock()
|
||||
@@ -100,3 +102,9 @@ func (pstr *KafkaEE) Close() (err error) {
|
||||
}
|
||||
|
||||
func (pstr *KafkaEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
func (pstr *KafkaEE) ExtraData(ev *utils.CGREvent) interface{} {
|
||||
return utils.ConcatenatedKey(
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -42,14 +42,15 @@ type LogEE struct {
|
||||
|
||||
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
|
||||
func (vEe *LogEE) Connect() error { return nil }
|
||||
func (vEe *LogEE) ExportEvent(_ *context.Context, mp interface{}, _ string) error {
|
||||
func (vEe *LogEE) ExportEvent(_ *context.Context, mp, _ interface{}) error {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> <%s> exported: <%s>",
|
||||
utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp)))
|
||||
return nil
|
||||
}
|
||||
func (vEe *LogEE) Close() error { return nil }
|
||||
func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *LogEE) Close() error { return nil }
|
||||
func (vEe *LogEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *LogEE) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
func (vEe *LogEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
|
||||
return mp.Event, nil
|
||||
}
|
||||
|
||||
@@ -94,7 +94,7 @@ func (pstr *NatsEE) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *NatsEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
|
||||
func (pstr *NatsEE) ExportEvent(ctx *context.Context, content, _ interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
if pstr.poster == nil {
|
||||
@@ -124,6 +124,8 @@ func (pstr *NatsEE) Close() (err error) {
|
||||
|
||||
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *NatsEE) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
func GetNatsOpts(opts *config.EventExporterOpts, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
|
||||
nop = make([]nats.Option, 0, 7)
|
||||
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),
|
||||
|
||||
@@ -67,7 +67,7 @@ func (e *RPCee) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (e *RPCee) ExportEvent(ctx *context.Context, args interface{}, _ string) (err error) {
|
||||
func (e *RPCee) ExportEvent(ctx *context.Context, args, _ interface{}) (err error) {
|
||||
e.Lock()
|
||||
defer e.Unlock()
|
||||
var rply string
|
||||
@@ -84,6 +84,7 @@ func (e *RPCee) Close() (err error) {
|
||||
func (e *RPCee) GetMetrics() (mp *utils.SafeMapStorage) {
|
||||
return e.dc
|
||||
}
|
||||
func (e *RPCee) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
func (e *RPCee) PrepareMap(mp *utils.CGREvent) (interface{}, error) {
|
||||
for i, v := range e.Cfg().Opts.RPCAPIOpts {
|
||||
|
||||
13
ees/s3.go
13
ees/s3.go
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
@@ -112,16 +113,17 @@ func (pstr *S3EE) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key string) (err error) {
|
||||
func (pstr *S3EE) ExportEvent(ctx *context.Context, message, extraData interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
sKey := extraData.(string)
|
||||
_, err = pstr.up.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||
Bucket: aws.String(pstr.bucket),
|
||||
|
||||
// Can also use the `filepath` standard library package to modify the
|
||||
// filename as need for an S3 object key. Such as turning absolute path
|
||||
// to a relative path.
|
||||
Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, key)),
|
||||
Key: aws.String(fmt.Sprintf("%s/%s.json", pstr.folderPath, sKey)),
|
||||
|
||||
// The file to be uploaded. io.ReadSeeker is preferred as the Uploader
|
||||
// will be able to optimize memory when uploading large content. io.Reader
|
||||
@@ -137,3 +139,10 @@ func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key str
|
||||
func (pstr *S3EE) Close() (_ error) { return }
|
||||
|
||||
func (pstr *S3EE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *S3EE) ExtraData(ev *utils.CGREvent) interface{} {
|
||||
return utils.ConcatenatedKey(
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaOriginID), utils.GenUUID()),
|
||||
utils.FirstNonEmpty(engine.MapEvent(ev.APIOpts).GetStringIgnoreErrors(utils.MetaRunID), utils.MetaDefault),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -132,7 +132,7 @@ func (sqlEe *SQLEe) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req interface{}, _ string) error {
|
||||
func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req, _ interface{}) error {
|
||||
sqlEe.reqs.get()
|
||||
sqlEe.RLock()
|
||||
defer func() {
|
||||
@@ -159,6 +159,8 @@ func (sqlEe *SQLEe) Close() (err error) {
|
||||
|
||||
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
|
||||
|
||||
func (sqlEe *SQLEe) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
func (sqlEe *SQLEe) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil }
|
||||
|
||||
func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
|
||||
|
||||
@@ -130,7 +130,7 @@ func (pstr *SQSee) Connect() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ string) (err error) {
|
||||
func (pstr *SQSee) ExportEvent(ctx *context.Context, message, _ interface{}) (err error) {
|
||||
pstr.reqs.get()
|
||||
pstr.RLock()
|
||||
_, err = pstr.svc.SendMessageWithContext(ctx,
|
||||
@@ -147,3 +147,5 @@ func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ stri
|
||||
func (pstr *SQSee) Close() (_ error) { return }
|
||||
|
||||
func (pstr *SQSee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
|
||||
|
||||
func (pstr *SQSee) ExtraData(ev *utils.CGREvent) interface{} { return nil }
|
||||
|
||||
@@ -37,12 +37,13 @@ type VirtualEE struct {
|
||||
dc *utils.SafeMapStorage
|
||||
}
|
||||
|
||||
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
|
||||
func (vEe *VirtualEE) Connect() error { return nil }
|
||||
func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, string) error { return nil }
|
||||
func (vEe *VirtualEE) Close() error { return nil }
|
||||
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil }
|
||||
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
|
||||
func (vEe *VirtualEE) Connect() error { return nil }
|
||||
func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, interface{}) error { return nil }
|
||||
func (vEe *VirtualEE) Close() error { return nil }
|
||||
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
|
||||
func (vEe *VirtualEE) ExtraData(*utils.CGREvent) interface{} { return nil }
|
||||
func (vEe *VirtualEE) PrepareMap(mp *utils.CGREvent) (interface{}, error) { return nil, nil }
|
||||
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user