Added EeSV1 APIs back

This commit is contained in:
Trial97
2021-09-06 17:03:34 +03:00
committed by Dan Christian Bogos
parent f147617401
commit a3ebbe38ec
42 changed files with 195 additions and 288 deletions

View File

@@ -63,7 +63,7 @@ func (aL *actHTTPPost) cfg() *engine.APAction {
}
// execute implements actioner interface
func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ string) (err error) {
func (aL *actHTTPPost) execute(ctx *context.Context, data utils.MapStorage, _ string) (err error) {
var body []byte
if body, err = json.Marshal(data); err != nil {
return
@@ -71,8 +71,8 @@ func (aL *actHTTPPost) execute(_ *context.Context, data utils.MapStorage, _ stri
var partExec bool
for _, pstr := range aL.pstrs {
if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr {
go ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString)
} else if err = ees.ExportWithAttempts(pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil {
go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString)
} else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil {
if pstr.Cfg().FailedPostsDir != utils.MetaNone {
err = nil
} else {

View File

@@ -25,6 +25,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -65,7 +66,7 @@ func TestACHTTPPostExecute(t *testing.T) {
log.SetOutput(buff)
expected := `<EEs> Exporter <TEST_ACTION_HTTPPOST> could not export because err: <Post "~*balance.TestBalance.Value": unsupported protocol scheme "">`
if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil {
if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err != nil {
t.Error(err)
} else if rcv := buff.String(); !strings.Contains(rcv, expected) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
@@ -76,7 +77,7 @@ func TestACHTTPPostExecute(t *testing.T) {
// channels cannot be marshaled
dataStorage[utils.MetaReq] = make(chan struct{}, 1)
expected = "json: unsupported type: chan struct {}"
if err := http.execute(nil, dataStorage, utils.EmptyString); err == nil || err.Error() != expected {
if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err == nil || err.Error() != expected {
t.Errorf("Expected %+v, received %+v", expected, err)
}
@@ -88,7 +89,7 @@ func TestACHTTPPostExecute(t *testing.T) {
http.aCfg.Opts = make(map[string]interface{})
http.aCfg.Opts[utils.MetaAsync] = true
http.config.GeneralCfg().FailedPostsDir = utils.MetaNone
if err := http.execute(nil, dataStorage, utils.EmptyString); err != nil {
if err := http.execute(context.Background(), dataStorage, utils.EmptyString); err != nil {
t.Error(err)
}
}
@@ -114,7 +115,7 @@ func TestACHTTPPostValues(t *testing.T) {
},
}
if err := http.execute(nil, dataStorage,
if err := http.execute(context.Background(), dataStorage,
utils.EmptyString); err == nil || err != utils.ErrPartiallyExecuted {
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
}

View File

@@ -89,7 +89,7 @@ func (aL *actCDRLog) execute(ctx *context.Context, data utils.MapStorage, _ stri
}, aL.config.GeneralCfg().DefaultTenant,
aL.filterS, oNm)
if err = cdrLogReq.SetFields(template); err != nil {
if err = cdrLogReq.SetFields(ctx, template); err != nil {
return
}
var rply string

36
apis/ees.go Normal file
View File

@@ -0,0 +1,36 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/utils"
)
func NewEeSv1(eesV1 *ees.EventExporterS) *EeSv1 {
return &EeSv1{ees: eesV1}
}
// EeSv1 exports RPC from RLs
type EeSv1 struct {
ees *ees.EventExporterS
ping
}
func (cS *EeSv1) ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) error {
return cS.ees.V1ProcessEvent(ctx, cgrEv, rply)
}

View File

@@ -423,6 +423,15 @@
"natsSubject": "processed_cdrs",
}
},
{
"id": "NatsJsonMapExporter2",
"type": "*natsJSONMap",
"export_path": "nats://localhost:4222",
"attempts": 1,
"opts": {
"natsSubject": "processed_cdrs",
}
},
{
"id": "SQLExportCDR",
"type": "*sql",

View File

@@ -65,120 +65,6 @@ func TestDspCDRsV1PingNilError(t *testing.T) {
}
}
func TestDspCDRsV1GetCDRsError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &utils.RPCCDRsFilterWithAPIOpts{}
var reply *[]*engine.CDR
result := dspSrv.CDRsV1GetCDRs(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1GetCDRsNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &utils.RPCCDRsFilterWithAPIOpts{
Tenant: "tenant",
}
var reply *[]*engine.CDR
result := dspSrv.CDRsV1GetCDRs(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1GetCDRsCountError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &utils.RPCCDRsFilterWithAPIOpts{}
var reply *int64
result := dspSrv.CDRsV1GetCDRsCount(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1GetCDRsCountNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &utils.RPCCDRsFilterWithAPIOpts{
Tenant: "tenant",
}
var reply *int64
result := dspSrv.CDRsV1GetCDRsCount(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1RateCDRsError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &engine.ArgRateCDRs{}
var reply *string
result := dspSrv.CDRsV1RateCDRs(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1RateCDRsNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &engine.ArgRateCDRs{
Tenant: "tenant",
}
var reply *string
result := dspSrv.CDRsV1RateCDRs(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1ProcessExternalCDRError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &engine.ExternalCDRWithAPIOpts{
ExternalCDR: &engine.ExternalCDR{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.CDRsV1ProcessExternalCDR(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1ProcessExternalCDRNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &engine.ExternalCDRWithAPIOpts{
ExternalCDR: &engine.ExternalCDR{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.CDRsV1ProcessExternalCDR(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1ProcessEventError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
@@ -211,40 +97,6 @@ func TestDspCDRsV1ProcessEventNil(t *testing.T) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1ProcessCDRError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
cgrCfg.DispatcherSCfg().AttributeSConns = []string{"test"}
CGREvent := &engine.CDRWithAPIOpts{
CDR: &engine.CDR{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.CDRsV1ProcessCDR(CGREvent, reply)
expected := "MANDATORY_IE_MISSING: [ApiKey]"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV1ProcessCDRNil(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)
CGREvent := &engine.CDRWithAPIOpts{
CDR: &engine.CDR{
Tenant: "tenant",
},
}
var reply *string
result := dspSrv.CDRsV1ProcessCDR(CGREvent, reply)
expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION"
if result == nil || result.Error() != expected {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestDspCDRsV2ProcessEventError(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
dspSrv := NewDispatcherService(nil, cgrCfg, nil, nil)

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
@@ -136,7 +137,7 @@ func (pstr *AMQPee) Connect() (err error) {
return
}
func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) {
func (pstr *AMQPee) ExportEvent(_ *context.Context, content interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
if pstr.postChan == nil {

View File

@@ -19,10 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"context"
"sync"
amqpv1 "github.com/Azure/go-amqp"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -78,7 +78,7 @@ func (pstr *AMQPv1EE) Connect() (err error) {
return
}
func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) {
func (pstr *AMQPv1EE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
defer func() {
@@ -95,7 +95,6 @@ func (pstr *AMQPv1EE) ExportEvent(content interface{}, _ string) (err error) {
return
}
// Send message
ctx := context.Background()
err = sender.Send(ctx, amqpv1.NewMessage(content.([]byte)))
sender.Close(ctx)
return

View File

@@ -24,17 +24,18 @@ import (
"strings"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type EventExporter interface {
Cfg() *config.EventExporterCfg // return the config
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(interface{}, string) error // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
Cfg() *config.EventExporterCfg // return the config
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(*context.Context, interface{}, string) error // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
PrepareMap(map[string]interface{}) (interface{}, error)
PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error)
}
@@ -110,26 +111,13 @@ func (c *concReq) done() {
}
// composeHeaderTrailer will return the orderNM for *hdr or *trl
func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
func composeHeaderTrailer(ctx *context.Context, prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: dc,
utils.MetaCfg: cfg.GetDataProvider(),
}, cfg.GeneralCfg().DefaultTenant, fltS,
map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(fields)
return
}
func composeExp(fields []*config.FCTemplate, cgrEv *utils.CGREvent, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant),
fltS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields)
map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(ctx, fields)
return
}

View File

@@ -98,18 +98,21 @@ func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Unlock()
}
func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (err error) {
func (eeS *EventExporterS) attrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent, attrIDs []string, attributeSCtx string) (err error) {
var rplyEv engine.AttrSProcessEventReply
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = make(map[string]interface{})
}
cgrEv.APIOpts[utils.Subsys] = utils.MetaEEs
cgrEv.APIOpts[utils.OptsContext] = ctx
cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty(
attributeSCtx,
utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]),
utils.MetaEEs)
attrArgs := &engine.AttrArgsProcessEvent{
AttributeIDs: attrIDs,
CGREvent: cgrEv,
}
if err = eeS.connMgr.Call(context.TODO(),
if err = eeS.connMgr.Call(ctx,
eeS.cfg.EEsNoLksCfg().AttributeSConns,
utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
@@ -122,7 +125,7 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []st
// V1ProcessEvent will be called each time a new event is received from readers
// rply -> map[string]map[string]interface{}
func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) {
func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) {
eeS.cfg.RLocks(config.EEsJSON)
defer eeS.cfg.RUnlocks(config.EEsJSON)
@@ -146,7 +149,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
if len(eeCfg.Filters) != 0 {
tnt := utils.FirstNonEmpty(cgrEv.Tenant, eeS.cfg.GeneralCfg().DefaultTenant)
if pass, errPass := eeS.filterS.Pass(context.TODO(), tnt,
if pass, errPass := eeS.filterS.Pass(ctx, tnt,
eeCfg.Filters, cgrDp); errPass != nil {
return errPass
} else if !pass {
@@ -155,12 +158,8 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
}
if eeCfg.Flags.GetBool(utils.MetaAttributes) {
if err = eeS.attrSProcessEvent(
cgrEv.CGREvent,
eeCfg.AttributeSIDs, utils.FirstNonEmpty(
eeCfg.AttributeSCtx,
utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]),
utils.MetaEEs)); err != nil {
if err = eeS.attrSProcessEvent(ctx, cgrEv.CGREvent,
eeCfg.AttributeSIDs, eeCfg.AttributeSCtx); err != nil {
return
}
}
@@ -188,9 +187,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
metricMapLock.Lock()
metricsMap[ee.Cfg().ID] = utils.MapStorage{} // will return the ID for all processed exporters
metricMapLock.Unlock()
ctx := ctx
if eeCfg.Synchronous {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
} else {
ctx = context.Background() // is async so lose the API context
}
// log the message before starting the gorutine, but still execute the exporter
if hasVerbose && !eeCfg.Synchronous {
@@ -199,7 +200,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
utils.EEs, ee.Cfg().ID))
}
go func(evict, sync bool, ee EventExporter) {
if err := exportEventWithExporter(ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.filterS); err != nil {
if err := exportEventWithExporter(ctx, ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.filterS); err != nil {
withErr = true
}
if sync {
@@ -243,7 +244,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
return
}
func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
defer func() {
updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone,
cfg.GeneralCfg().DefaultTimezone))
@@ -269,7 +270,7 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool
utils.MetaCfg: cfg.GetDataProvider(),
}, utils.FirstNonEmpty(ev.Tenant, cfg.GeneralCfg().DefaultTenant),
filterS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields())
map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(ctx, exp.Cfg().ContentFields())
if eEv, err = exp.PrepareOrderMap(expNM); err != nil {
return
}
@@ -277,10 +278,10 @@ func exportEventWithExporter(exp EventExporter, ev *utils.CGREvent, oneTime bool
key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()),
utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault))
return ExportWithAttempts(exp, eEv, key)
return ExportWithAttempts(ctx, exp, eEv, key)
}
func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err error) {
func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key string) (err error) {
if exp.Cfg().FailedPostsDir != utils.MetaNone {
defer func() {
if err != nil {
@@ -307,7 +308,7 @@ func ExportWithAttempts(exp EventExporter, eEv interface{}, key string) (err err
return
}
for i := 0; i < exp.Cfg().Attempts; i++ {
if err = exp.ExportEvent(eEv, key); err == nil ||
if err = exp.ExportEvent(ctx, eEv, key); err == nil ||
err == utils.ErrDisconnected { // special error in case the exporter was closed
break
}

View File

@@ -115,7 +115,7 @@ func TestAttrSProcessEvent(t *testing.T) {
})
eeS := NewEventExporterS(cgrCfg, filterS, connMgr)
// cgrEv := &utils.CGREvent{}
if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil {
t.Error(err)
}
}
@@ -141,7 +141,7 @@ func TestAttrSProcessEvent2(t *testing.T) {
})
eeS := NewEventExporterS(cgrCfg, filterS, connMgr)
cgrEv := &utils.CGREvent{}
if err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
if err := eeS.attrSProcessEvent(context.TODO(), cgrEv, []string{}, utils.EmptyString); err != nil {
t.Error(err)
}
}
@@ -189,7 +189,7 @@ func TestV1ProcessEvent(t *testing.T) {
rplyExpect := map[string]map[string]interface{}{
"SQLExporterFull": {},
}
if err := eeS.V1ProcessEvent(cgrEv, &rply); err != nil {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rply, rplyExpect) {
t.Errorf("Expected %q but received %q", rplyExpect, rply)
@@ -225,13 +225,13 @@ func TestV1ProcessEvent2(t *testing.T) {
}
var rply map[string]map[string]interface{}
errExpect := "NOT_FOUND"
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expecting %q but received %q", errExpect, err)
}
errExpect = "NOT_FOUND:test"
eeS.cfg.EEsCfg().Exporters[0].Filters = []string{"test"}
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expecting %q but received %q", errExpect, err)
}
}
@@ -257,7 +257,7 @@ func TestV1ProcessEvent3(t *testing.T) {
}
var rply map[string]map[string]interface{}
errExpect := "MANDATORY_IE_MISSING: [connIDs]"
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expecting %q but received %q", errExpect, err)
}
}
@@ -293,7 +293,7 @@ func TestV1ProcessEvent4(t *testing.T) {
}
var rply map[string]map[string]interface{}
errExpect := "PARTIALLY_EXECUTED"
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expecting %q but received %q", errExpect, err)
} else if len(rply) != 0 {
t.Error("Unexpected reply result")
@@ -318,9 +318,9 @@ func (m mockEventExporter) GetMetrics() *utils.SafeMapStorage {
return m.dc
}
func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) }
func (mockEventExporter) Connect() error { return nil }
func (mockEventExporter) ExportEvent(interface{}, string) error { return nil }
func (mockEventExporter) Cfg() *config.EventExporterCfg { return new(config.EventExporterCfg) }
func (mockEventExporter) Connect() error { return nil }
func (mockEventExporter) ExportEvent(*context.Context, interface{}, string) error { return nil }
func (mockEventExporter) Close() error {
utils.Logger.Warning("NOT IMPLEMENTED")
return nil
@@ -354,7 +354,7 @@ func TestV1ProcessEventMockMetrics(t *testing.T) {
}
var rply map[string]map[string]interface{}
errExpect := "cannot cast to map[string]interface{} 5 for positive exports"
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expecting %q but received %q", errExpect, err)
}
}
@@ -386,7 +386,7 @@ func TestV1ProcessEvent5(t *testing.T) {
eeS := NewEventExporterS(cgrCfg, filterS, nil)
var rply map[string]map[string]interface{}
errExpect := "unsupported exporter type: <invalid_type>"
if err := eeS.V1ProcessEvent(cgrEv, &rply); err == nil || err.Error() != errExpect {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err == nil || err.Error() != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
@@ -411,7 +411,7 @@ func TestV1ProcessEvent6(t *testing.T) {
},
}
var rply map[string]map[string]interface{}
if err := eeS.V1ProcessEvent(cgrEv, &rply); err != nil {
if err := eeS.V1ProcessEvent(context.TODO(), cgrEv, &rply); err != nil {
t.Error(err)
}
}

View File

@@ -20,7 +20,6 @@ package ees
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
@@ -28,6 +27,7 @@ import (
"github.com/elastic/go-elasticsearch/esapi"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
elasticsearch "github.com/elastic/go-elasticsearch"
@@ -120,7 +120,7 @@ func (eEe *ElasticEE) Connect() (err error) {
}
// ExportEvent implements EventExporter
func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
func (eEe *ElasticEE) ExportEvent(ctx *context.Context, ev interface{}, key string) (err error) {
eEe.reqs.get()
eEe.RLock()
defer func() {
@@ -148,7 +148,7 @@ func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
}
var resp *esapi.Response
if resp, err = eReq.Do(context.Background(), eEe.eClnt); err != nil {
if resp, err = eReq.Do(ctx, eEe.eClnt); err != nil {
return
}
defer resp.Body.Close()

View File

@@ -24,6 +24,7 @@ import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -261,7 +262,7 @@ func TestElasticExportEvent(t *testing.T) {
t.Error(err)
}
eEe.eClnt.Transport = new(mockClientErr)
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
t.Error(err)
}
}
@@ -293,7 +294,7 @@ func TestElasticExportEvent2(t *testing.T) {
eEe.eClnt.Transport = new(mockClientErr2)
errExpect := io.EOF
if err := eEe.ExportEvent([]byte{}, ""); err == nil || err != errExpect {
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err != errExpect {
t.Errorf("Expected %v but received %v", errExpect, err)
}
}
@@ -324,7 +325,7 @@ func TestElasticExportEvent3(t *testing.T) {
eEe.eClnt.Transport = new(mockClient)
// errExpect := `unsupported protocol scheme ""`
cgrCfg.EEsCfg().Exporters[0].ComputeFields()
if err := eEe.ExportEvent([]byte{}, ""); err != nil {
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
t.Error(err)
}
}
@@ -343,7 +344,7 @@ func TestElasticExportEvent4(t *testing.T) {
t.Error(err)
}
errExpect := `unsupported protocol scheme ""`
if err := eEe.ExportEvent([]byte{}, ""); err == nil || err.Error() != errExpect {
if err := eEe.ExportEvent(context.Background(), []byte{}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but got %q", errExpect, err)
}
}

View File

@@ -26,6 +26,7 @@ import (
"path"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
@@ -83,7 +84,7 @@ func (fCsv *FileCSVee) init() (err error) {
func (fCsv *FileCSVee) composeHeader() (err error) {
if len(fCsv.Cfg().HeaderFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -95,7 +96,7 @@ func (fCsv *FileCSVee) composeHeader() (err error) {
func (fCsv *FileCSVee) composeTrailer() (err error) {
if len(fCsv.Cfg().TrailerFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
@@ -107,7 +108,7 @@ func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg }
func (fCsv *FileCSVee) Connect() (_ error) { return }
func (fCsv *FileCSVee) ExportEvent(ev interface{}, _ string) error {
func (fCsv *FileCSVee) ExportEvent(_ *context.Context, ev interface{}, _ string) error {
fCsv.Lock() // make sure that only one event is writen in file at once
defer fCsv.Unlock()
return fCsv.csvWriter.Write(ev.([]string))

View File

@@ -25,6 +25,7 @@ import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -200,7 +201,7 @@ func TestFileCsvExportEvent(t *testing.T) {
dc: dc,
}
if err := fCsv.ExportEvent([]string{"value", "3"}, ""); err != nil {
if err := fCsv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
t.Error(err)
}
csvNW.Flush()

View File

@@ -25,6 +25,7 @@ import (
"path"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -75,7 +76,7 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -93,7 +94,7 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range exp.OrderedFieldsAsStrings() {
@@ -109,7 +110,7 @@ func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg }
func (fFwv *FileFWVee) Connect() (_ error) { return }
func (fFwv *FileFWVee) ExportEvent(records interface{}, _ string) (err error) {
func (fFwv *FileFWVee) ExportEvent(_ *context.Context, records interface{}, _ string) (err error) {
fFwv.Lock() // make sure that only one event is writen in file at once
defer fFwv.Unlock()
for _, record := range records.([]string) {

View File

@@ -25,6 +25,7 @@ import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -184,7 +185,7 @@ func TestFileFwvExportEvent(t *testing.T) {
file: nopCloser{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent([]string{"value", "3"}, ""); err != nil {
if err := fFwv.ExportEvent(context.Background(), []string{"value", "3"}, ""); err != nil {
t.Error(err)
}
csvNW.Flush()
@@ -220,7 +221,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
file: nopCloserWrite{byteBuff},
dc: dc,
}
if err := fFwv.ExportEvent([]string{""}, ""); err == nil || err != utils.ErrNotImplemented {
if err := fFwv.ExportEvent(context.Background(), []string{""}, ""); err == nil || err != utils.ErrNotImplemented {
t.Errorf("Expected %q but received %q", utils.ErrNotImplemented, err)
}
}

View File

@@ -27,6 +27,7 @@ import (
"net/url"
"strings"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -61,7 +62,7 @@ func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *en
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -77,12 +78,12 @@ func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg
func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return }
func (httpEE *HTTPjsonMapEE) ExportEvent(content interface{}, _ string) (err error) {
func (httpEE *HTTPjsonMapEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
httpEE.reqs.get()
defer httpEE.reqs.done()
pReq := content.(*HTTPPosterRequest)
var req *http.Request
if req, err = prepareRequest(httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil {
if req, err = prepareRequest(ctx, httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil {
return
}
_, err = sendHTTPReq(httpEE.client, req)
@@ -116,7 +117,7 @@ func (httpEE *HTTPjsonMapEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (int
}, err
}
func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) {
func prepareRequest(ctx *context.Context, addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) {
var body io.Reader
if cType == utils.ContentForm {
body = strings.NewReader(content.(url.Values).Encode())
@@ -128,7 +129,7 @@ func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (r
contentType = "application/json"
}
hdr.Set("Content-Type", contentType)
if req, err = http.NewRequest(http.MethodPost, addr, body); err != nil {
if req, err = http.NewRequestWithContext(ctx, http.MethodPost, addr, body); err != nil {
return
}
req.Header = hdr

View File

@@ -27,6 +27,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -54,7 +55,7 @@ func TestHttpJsonMapExportEvent1(t *testing.T) {
t.Error(err)
}
errExpect := `Post "/var/spool/cgrates/ees": unsupported protocol scheme ""`
if err := httpEE.ExportEvent(&HTTPPosterRequest{Body: []byte{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect {
if err := httpEE.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but received %q", errExpect, err)
}
}
@@ -84,7 +85,7 @@ func TestHttpJsonMapExportEvent2(t *testing.T) {
t.Error(err)
}
if err := httpEE.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, ""); err != nil {
if err := httpEE.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, ""); err != nil {
t.Error(err)
}
}
@@ -119,7 +120,7 @@ func TestHttpJsonMapSync(t *testing.T) {
}
for i := 0; i < 3; i++ {
go exp.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "")
go exp.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "")
}
select {
@@ -160,7 +161,7 @@ func TestHttpJsonMapSyncLimit(t *testing.T) {
}
for i := 0; i < 3; i++ {
go exp.ExportEvent(&HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "")
go exp.ExportEvent(context.Background(), &HTTPPosterRequest{Body: []byte(`{"2": "*req.field2"}`), Header: make(http.Header)}, "")
}
select {

View File

@@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -61,7 +62,7 @@ func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *eng
return
}
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil {
if exp, err = composeHeaderTrailer(context.Background(), utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil {
return
}
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
@@ -77,12 +78,12 @@ func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg
func (httpPost *HTTPPostEE) Connect() (_ error) { return }
func (httpPost *HTTPPostEE) ExportEvent(content interface{}, _ string) (err error) {
func (httpPost *HTTPPostEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
httpPost.reqs.get()
defer httpPost.reqs.done()
pReq := content.(*HTTPPosterRequest)
var req *http.Request
if req, err = prepareRequest(httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil {
if req, err = prepareRequest(ctx, httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil {
return
}
_, err = sendHTTPReq(httpPost.client, req)

View File

@@ -28,6 +28,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -61,7 +62,7 @@ func TestHttpPostExportEvent(t *testing.T) {
"Test1": 3,
}
errExpect := `Post "/var/spool/cgrates/ees": unsupported protocol scheme ""`
if err := httpPost.ExportEvent(&HTTPPosterRequest{Body: url.Values{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect {
if err := httpPost.ExportEvent(context.Background(), &HTTPPosterRequest{Body: url.Values{}, Header: make(http.Header)}, ""); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but received %q", errExpect, err)
}
}
@@ -91,7 +92,7 @@ func TestHttpPostExportEvent2(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := httpPost.ExportEvent(vals, ""); err != nil {
if err := httpPost.ExportEvent(context.Background(), vals, ""); err != nil {
t.Error(err)
}
}
@@ -135,7 +136,7 @@ func TestHttpPostSync(t *testing.T) {
}
for i := 0; i < 3; i++ {
go exp.ExportEvent(vals, "")
go exp.ExportEvent(context.Background(), vals, "")
}
select {
@@ -188,7 +189,7 @@ func TestHttpPostSyncLimit(t *testing.T) {
}
for i := 0; i < 3; i++ {
go exp.ExportEvent(vals, "")
go exp.ExportEvent(context.Background(), vals, "")
}
select {
case <-test:

View File

@@ -18,9 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"context"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
kafka "github.com/segmentio/kafka-go"
@@ -72,7 +72,7 @@ func (pstr *KafkaEE) Connect() (_ error) {
return
}
func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) {
func (pstr *KafkaEE) ExportEvent(ctx *context.Context, content interface{}, key string) (err error) {
pstr.reqs.get()
pstr.RLock()
if pstr.writer == nil {
@@ -80,7 +80,7 @@ func (pstr *KafkaEE) ExportEvent(content interface{}, key string) (err error) {
pstr.reqs.done()
return utils.ErrDisconnected
}
err = pstr.writer.WriteMessages(context.Background(), kafka.Message{
err = pstr.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(key),
Value: content.([]byte),
})

View File

@@ -173,7 +173,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
keyFunc = utils.UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = ExportWithAttempts(ee, ev, keyFunc()); err != nil {
if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
failedEvents.AddEvent(ev)
}
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"strings"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -41,7 +42,7 @@ type LogEE struct {
func (vEe *LogEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *LogEE) Connect() error { return nil }
func (vEe *LogEE) ExportEvent(mp interface{}, _ string) error {
func (vEe *LogEE) ExportEvent(_ *context.Context, mp interface{}, _ string) error {
utils.Logger.Info(
fmt.Sprintf("<%s> <%s> exported: <%s>",
utils.EEs, vEe.Cfg().ID, utils.ToJSON(mp)))

View File

@@ -26,6 +26,7 @@ import (
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
@@ -99,7 +100,7 @@ func (pstr *NatsEE) Connect() (err error) {
return
}
func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) {
func (pstr *NatsEE) ExportEvent(ctx *context.Context, content interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
if pstr.poster == nil {
@@ -108,7 +109,7 @@ func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) {
return utils.ErrDisconnected
}
if pstr.jetStream {
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte), nats.Context(ctx))
} else {
err = pstr.poster.Publish(pstr.subject, content.([]byte))
}

View File

@@ -27,6 +27,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -105,7 +106,7 @@ func TestNatsEE(t *testing.T) {
"Destination": "1002",
},
}
if err := exportEventWithExporter(evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)
@@ -171,7 +172,7 @@ func TestNatsEE2(t *testing.T) {
"Destination": "1002",
},
}
if err := exportEventWithExporter(evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)

View File

@@ -21,7 +21,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"context"
"encoding/json"
"flag"
"net/http"
@@ -38,6 +37,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -76,7 +76,7 @@ func TestHttpJsonPoster(t *testing.T) {
if err != nil {
t.Error(err)
}
if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil {
if err = ExportWithAttempts(context.Background(), pstr, &HTTPPosterRequest{Body: jsn, Header: make(http.Header)}, ""); err == nil {
t.Error("Expected error")
}
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.MetaHTTPjsonMap, "test1", jsn, make(map[string]interface{}))
@@ -112,7 +112,7 @@ func TestHttpBytesPoster(t *testing.T) {
if err != nil {
t.Error(err)
}
if err = ExportWithAttempts(pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil {
if err = ExportWithAttempts(context.Background(), pstr, &HTTPPosterRequest{Body: content, Header: make(http.Header)}, ""); err == nil {
t.Error("Expected error")
}
AddFailedPost("/tmp", "http://localhost:8080/invalid", utils.ContentJSON, "test2", content, make(map[string]interface{}))
@@ -166,7 +166,7 @@ func TestSQSPoster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(pstr, []byte(body), ""); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
t.Fatal(err)
}
@@ -249,7 +249,7 @@ func TestS3Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(pstr, []byte(body), key); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key); err != nil {
t.Fatal(err)
}
key += ".json"
@@ -308,7 +308,7 @@ func TestAMQPv1Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
if err := ExportWithAttempts(pstr, []byte(body), ""); err != nil {
if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
t.Fatal(err)
}
// Create client

View File

@@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -111,10 +112,10 @@ func (pstr *S3EE) Connect() (err error) {
return
}
func (pstr *S3EE) ExportEvent(message interface{}, key string) (err error) {
func (pstr *S3EE) ExportEvent(ctx *context.Context, message interface{}, key string) (err error) {
pstr.reqs.get()
pstr.RLock()
_, err = pstr.up.Upload(&s3manager.UploadInput{
_, err = pstr.up.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(pstr.bucket),
// Can also use the `filepath` standard library package to modify the

View File

@@ -29,6 +29,7 @@ import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -142,7 +143,7 @@ func (sqlEe *SQLEe) Connect() (err error) {
return
}
func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error {
func (sqlEe *SQLEe) ExportEvent(_ *context.Context, req interface{}, _ string) error {
sqlEe.reqs.get()
sqlEe.RLock()
defer func() {

View File

@@ -28,6 +28,7 @@ import (
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
@@ -297,7 +298,7 @@ func TestSQLExportEvent1(t *testing.T) {
if err := sqlEe.Connect(); err != nil {
t.Fatal(err)
}
if err := sqlEe.ExportEvent(&sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil {
if err := sqlEe.ExportEvent(context.Background(), &sqlPosterRequest{Querry: "INSERT INTO cdrs VALUES (); ", Values: []interface{}{}}, ""); err != nil {
t.Error(err)
}
sqlEe.Close()

View File

@@ -26,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -129,10 +130,10 @@ func (pstr *SQSee) Connect() (err error) {
return
}
func (pstr *SQSee) ExportEvent(message interface{}, _ string) (err error) {
func (pstr *SQSee) ExportEvent(ctx *context.Context, message interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
_, err = pstr.svc.SendMessage(
_, err = pstr.svc.SendMessageWithContext(ctx,
&sqs.SendMessageInput{
MessageBody: aws.String(string(message.([]byte))),
QueueUrl: pstr.queueURL,

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -36,12 +37,12 @@ type VirtualEE struct {
dc *utils.SafeMapStorage
}
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(*context.Context, interface{}, string) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) {
return nil, nil
}

View File

@@ -22,6 +22,7 @@ import (
"reflect"
"testing"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
@@ -40,7 +41,7 @@ func TestVirtualEeGetMetrics(t *testing.T) {
}
func TestVirtualEeExportEvent(t *testing.T) {
vEe := &VirtualEE{}
if err := vEe.ExportEvent([]byte{}, ""); err != nil {
if err := vEe.ExportEvent(context.Background(), []byte{}, ""); err != nil {
t.Error(err)
}
vEe.Close()

View File

@@ -96,9 +96,9 @@ func (eeR *ExportRequest) FieldAsString(fldPath []string) (val string, err error
}
//SetFields will populate fields of AgentRequest out of templates
func (eeR *ExportRequest) SetFields(tplFlds []*config.FCTemplate) (err error) {
func (eeR *ExportRequest) SetFields(ctx *context.Context, tplFlds []*config.FCTemplate) (err error) {
for _, tplFld := range tplFlds {
if pass, err := eeR.filterS.Pass(context.TODO(), eeR.tnt,
if pass, err := eeR.filterS.Pass(ctx, eeR.tnt,
tplFld.Filters, eeR); err != nil {
return err
} else if !pass {

View File

@@ -167,7 +167,7 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil {
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Body, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))

View File

@@ -145,7 +145,7 @@ func (rdr *AMQPv1ER) readLoop(recv *amqpv1.Receiver) (err error) {
utils.ERs, err.Error()))
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, body, utils.EmptyString); err != nil {
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, body, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message error: %s",
utils.ERs, err.Error()))

View File

@@ -370,7 +370,7 @@ func (erS *ERService) onEvicted(id string, value interface{}) {
utils.MetaExp: utils.NewOrderedNavigableMap(),
})
if err = eeReq.SetFields(eEvs.rdrCfg.CacheDumpFields); err != nil {
if err = eeReq.SetFields(context.Background(), eEvs.rdrCfg.CacheDumpFields); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> Converting CDR with CGRID: <%s> to record , ignoring due to error: <%s>",
utils.ERs, id, err.Error()))

View File

@@ -141,7 +141,7 @@ func (rdr *KafkaER) readLoop(r *kafka.Reader) {
utils.ERs, string(msg.Key), err.Error()))
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Value, string(msg.Key)); err != nil {
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Value, string(msg.Key)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Key), err.Error()))

View File

@@ -136,7 +136,7 @@ func (rdr *NatsER) Serve() (err error) {
utils.ERs, string(msg.Data), err.Error()))
}
if rdr.poster != nil { // post it
if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil {
if err := ees.ExportWithAttempts(context.Background(), rdr.poster, msg.Data, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))

View File

@@ -246,7 +246,7 @@ func (rdr *S3ER) readMsg(scv *s3.S3, key string) (err error) {
}
if rdr.poster != nil { // post it
if err = ees.ExportWithAttempts(rdr.poster, msg, key); err != nil {
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, msg, key); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, key, err.Error()))

View File

@@ -261,7 +261,7 @@ func (rdr *SQSER) readMsg(scv sqsClient, msg *sqs.Message) (err error) {
}
if rdr.poster != nil { // post it
if err = ees.ExportWithAttempts(rdr.poster, body, key); err != nil {
if err = ees.ExportWithAttempts(context.Background(), rdr.poster, body, key); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, key, err.Error()))

View File

@@ -23,6 +23,7 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/ees"
@@ -60,8 +61,8 @@ type EventExporterService struct {
rldChan chan struct{}
stopChan chan struct{}
eeS *ees.EventExporterS
// rpc *v1.EeSv1
eeS *ees.EventExporterS
rpc *apis.EeSv1
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
@@ -96,7 +97,7 @@ func (es *EventExporterService) Shutdown() (err error) {
close(es.stopChan)
es.eeS.Shutdown()
es.eeS = nil
//<-es.intConnChan
<-es.intConnChan
return
}
@@ -118,10 +119,11 @@ func (es *EventExporterService) Start() (err error) {
es.stopChan = make(chan struct{})
go es.eeS.ListenAndServe(es.stopChan, es.rldChan)
// es.rpc = v1.NewEeSv1(es.eeS)
// if !es.cfg.DispatcherSCfg().Enabled {
// es.server.RpcRegister(es.rpc)
// }
// es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EventExporterS)
es.rpc = apis.NewEeSv1(es.eeS)
srv, _ := birpc.NewService(es.rpc, "", false)
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(srv)
}
es.intConnChan <- es.anz.GetInternalCodec(srv, utils.EEs)
return
}