diff --git a/apis/cdrs.go b/apis/cdrs.go index 2612e5dae..d654d68de 100644 --- a/apis/cdrs.go +++ b/apis/cdrs.go @@ -36,8 +36,14 @@ type CDRsV1 struct { cdrS *engine.CDRServer } -// ProcessEvent +// ProcessEvent will process the CGREvent func (cdrSv1 *CDRsV1) ProcessEvent(ctx *context.Context, args *utils.CGREvent, reply *string) error { return cdrSv1.cdrS.V1ProcessEvent(ctx, args, reply) } + +// ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply +func (cdrSv1 *CDRsV1) ProcessEventWithGet(ctx *context.Context, args *utils.CGREvent, + reply *[]*utils.EventsWithOpts) error { + return cdrSv1.cdrS.V1ProcessEventWithGet(ctx, args, reply) +} diff --git a/ees/ees.go b/ees/ees.go index bfbfade55..0e1d47655 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -325,12 +325,7 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{} type ArchiveEventsArgs struct { Tenant string APIOpts map[string]interface{} - Events []*EventsWithOpts -} - -type EventsWithOpts struct { - Event map[string]interface{} - Opts map[string]interface{} + Events []*utils.EventsWithOpts } // V1ArchiveEventsInReply should archive the events sent with existing exporters. The zipped content should be returned back as a reply. diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 5d947497d..24c2ffd6a 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -324,7 +324,7 @@ func testCsvExportBufferedEvent(t *testing.T) { utils.MetaExporterID: "CSVExporterBuffered", utils.MetaUsage: 123 * time.Nanosecond, }, - Events: []*EventsWithOpts{ + Events: []*utils.EventsWithOpts{ { Event: map[string]interface{}{ utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), @@ -481,7 +481,7 @@ func testCsvExportBufferedEventNoExports(t *testing.T) { APIOpts: map[string]interface{}{ utils.MetaExporterID: "InexistentExport", }, - Events: []*EventsWithOpts{ + Events: []*utils.EventsWithOpts{ { Event: map[string]interface{}{ utils.AccountField: "not_exported_Acc", @@ -502,7 +502,7 @@ func testCsvExportBufferedEventNoExports(t *testing.T) { APIOpts: map[string]interface{}{ utils.MetaExporterID: "CSVExporterBuffered", }, - Events: []*EventsWithOpts{ + Events: []*utils.EventsWithOpts{ { Event: map[string]interface{}{ utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), diff --git a/engine/cdrs.go b/engine/cdrs.go index 7ed04e52b..60bab0ca1 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -185,9 +185,8 @@ func (cdrS *CDRServer) eeSProcessEvent(ctx *context.Context, cgrEv *utils.CGREve // processEvent processes a CGREvent based on arguments // in case of partially executed, both error and evs will be returned -func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (evs []*utils.EventWithFlags, err error) { +func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (evs []*utils.EventsWithOpts, err error) { // making the options - var attrS bool if attrS, err = GetBoolOpts(ctx, ev.Tenant, ev, cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Attributes, config.CDRsAttributesDftOpt, utils.OptsAttributeS); err != nil { @@ -306,6 +305,15 @@ func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (e } } + // now that we did all the requested processed events, we have to build our EventsWithOpts + evs = make([]*utils.EventsWithOpts, len(cgrEvs)) + for i, cgrEv := range cgrEvs { + evs[i] = &utils.EventsWithOpts{ + Event: cgrEv.Event, + Opts: cgrEv.APIOpts, + } + } + if partiallyExecuted { err = utils.ErrPartiallyExecuted } @@ -348,7 +356,7 @@ func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, arg *utils.CGREvent, } // V1ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply -func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CGREvent, evs *[]*utils.EventWithFlags) (err error) { +func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CGREvent, evs *[]*utils.EventsWithOpts) (err error) { if arg.ID == utils.EmptyString { arg.ID = utils.GenUUID() } @@ -365,7 +373,7 @@ func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CG if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) if cachedResp.Error == nil { - *evs = *cachedResp.Result.(*[]*utils.EventWithFlags) + *evs = *cachedResp.Result.(*[]*utils.EventsWithOpts) } return cachedResp.Error } @@ -375,7 +383,7 @@ func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CG } // end of RPC caching - var procEvs []*utils.EventWithFlags + var procEvs []*utils.EventsWithOpts if procEvs, err = cdrS.processEvent(ctx, arg); err != nil { return } diff --git a/engine/cdrs_test.go b/engine/cdrs_test.go index 4c8ca4978..578e4405d 100644 --- a/engine/cdrs_test.go +++ b/engine/cdrs_test.go @@ -18,6 +18,7 @@ along with this program. If not, see package engine import ( + "fmt" "net/http" "net/url" "reflect" @@ -2489,11 +2490,12 @@ func TestCDRsV1ProcessEventWithGetMockCache(t *testing.T) { defer func() { config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf }() - var rply []*utils.EventWithFlags + var rply []*utils.EventsWithOpts err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply) if err != nil { t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err) } + fmt.Println(utils.ToJSON(rply)) expected := &utils.CGREvent{ Tenant: "cgrates.org", ID: "testID", @@ -2574,7 +2576,7 @@ func TestCDRsV1ProcessEventWithGetMockCacheErr(t *testing.T) { defer func() { config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf }() - var rply []*utils.EventWithFlags + var rply []*utils.EventsWithOpts err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply) if err == nil || err.Error() != "cannot convert field: 1s to bool" { t.Errorf("\nExpected <%+v> \n, received <%+v>", "cannot convert field: 1s to bool", err) @@ -2712,7 +2714,7 @@ func TestCDRsV1ProcessEventWithGetCacheGet(t *testing.T) { }, } - rply := []*utils.EventWithFlags{} + rply := []*utils.EventsWithOpts{} Cache.Set(context.Background(), utils.CacheRPCResponses, "CDRsV1.ProcessEvent:testID", &utils.CachedRPCResponse{Result: &rply, Error: nil}, nil, true, utils.NonTransactional) diff --git a/services/stordb.go b/services/stordb.go index fcb6c6ff0..d14a4f38d 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -49,7 +49,7 @@ type StorDBService struct { srvDep map[string]*sync.WaitGroup } -// Start should handle the sercive start +// Start should handle the service start func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) { if db.IsRunning() { return utils.ErrServiceAlreadyRunning diff --git a/utils/cgrevent.go b/utils/cgrevent.go index df2057b6e..61dbcaaab 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -153,6 +153,11 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e return } +type EventsWithOpts struct { + Event map[string]interface{} + Opts map[string]interface{} +} + // CGREventWithEeIDs is the CGREventWithOpts with EventExporterIDs type CGREventWithEeIDs struct { EeIDs []string