Updated ProcessEventWithGet for cdrs

This commit is contained in:
porosnicuadrian
2022-01-06 17:59:25 +02:00
committed by Dan Christian Bogos
parent 8fe9805592
commit 4b1f6ab11a
7 changed files with 35 additions and 19 deletions

View File

@@ -36,8 +36,14 @@ type CDRsV1 struct {
cdrS *engine.CDRServer
}
// ProcessEvent
// ProcessEvent will process the CGREvent
func (cdrSv1 *CDRsV1) ProcessEvent(ctx *context.Context, args *utils.CGREvent,
reply *string) error {
return cdrSv1.cdrS.V1ProcessEvent(ctx, args, reply)
}
// ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrSv1 *CDRsV1) ProcessEventWithGet(ctx *context.Context, args *utils.CGREvent,
reply *[]*utils.EventsWithOpts) error {
return cdrSv1.cdrS.V1ProcessEventWithGet(ctx, args, reply)
}

View File

@@ -325,12 +325,7 @@ func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}
type ArchiveEventsArgs struct {
Tenant string
APIOpts map[string]interface{}
Events []*EventsWithOpts
}
type EventsWithOpts struct {
Event map[string]interface{}
Opts map[string]interface{}
Events []*utils.EventsWithOpts
}
// V1ArchiveEventsInReply should archive the events sent with existing exporters. The zipped content should be returned back as a reply.

View File

@@ -324,7 +324,7 @@ func testCsvExportBufferedEvent(t *testing.T) {
utils.MetaExporterID: "CSVExporterBuffered",
utils.MetaUsage: 123 * time.Nanosecond,
},
Events: []*EventsWithOpts{
Events: []*utils.EventsWithOpts{
{
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),
@@ -481,7 +481,7 @@ func testCsvExportBufferedEventNoExports(t *testing.T) {
APIOpts: map[string]interface{}{
utils.MetaExporterID: "InexistentExport",
},
Events: []*EventsWithOpts{
Events: []*utils.EventsWithOpts{
{
Event: map[string]interface{}{
utils.AccountField: "not_exported_Acc",
@@ -502,7 +502,7 @@ func testCsvExportBufferedEventNoExports(t *testing.T) {
APIOpts: map[string]interface{}{
utils.MetaExporterID: "CSVExporterBuffered",
},
Events: []*EventsWithOpts{
Events: []*utils.EventsWithOpts{
{
Event: map[string]interface{}{
utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()),

View File

@@ -185,9 +185,8 @@ func (cdrS *CDRServer) eeSProcessEvent(ctx *context.Context, cgrEv *utils.CGREve
// processEvent processes a CGREvent based on arguments
// in case of partially executed, both error and evs will be returned
func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (evs []*utils.EventWithFlags, err error) {
func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (evs []*utils.EventsWithOpts, err error) {
// making the options
var attrS bool
if attrS, err = GetBoolOpts(ctx, ev.Tenant, ev, cdrS.fltrS, cdrS.cfg.CdrsCfg().Opts.Attributes,
config.CDRsAttributesDftOpt, utils.OptsAttributeS); err != nil {
@@ -306,6 +305,15 @@ func (cdrS *CDRServer) processEvent(ctx *context.Context, ev *utils.CGREvent) (e
}
}
// now that we did all the requested processed events, we have to build our EventsWithOpts
evs = make([]*utils.EventsWithOpts, len(cgrEvs))
for i, cgrEv := range cgrEvs {
evs[i] = &utils.EventsWithOpts{
Event: cgrEv.Event,
Opts: cgrEv.APIOpts,
}
}
if partiallyExecuted {
err = utils.ErrPartiallyExecuted
}
@@ -348,7 +356,7 @@ func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, arg *utils.CGREvent,
}
// V1ProcessEventWithGet has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CGREvent, evs *[]*utils.EventWithFlags) (err error) {
func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CGREvent, evs *[]*utils.EventsWithOpts) (err error) {
if arg.ID == utils.EmptyString {
arg.ID = utils.GenUUID()
}
@@ -365,7 +373,7 @@ func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CG
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*evs = *cachedResp.Result.(*[]*utils.EventWithFlags)
*evs = *cachedResp.Result.(*[]*utils.EventsWithOpts)
}
return cachedResp.Error
}
@@ -375,7 +383,7 @@ func (cdrS *CDRServer) V1ProcessEventWithGet(ctx *context.Context, arg *utils.CG
}
// end of RPC caching
var procEvs []*utils.EventWithFlags
var procEvs []*utils.EventsWithOpts
if procEvs, err = cdrS.processEvent(ctx, arg); err != nil {
return
}

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"net/http"
"net/url"
"reflect"
@@ -2489,11 +2490,12 @@ func TestCDRsV1ProcessEventWithGetMockCache(t *testing.T) {
defer func() {
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf
}()
var rply []*utils.EventWithFlags
var rply []*utils.EventsWithOpts
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply)
if err != nil {
t.Errorf("\nExpected <%+v> \n, received <%+v>", nil, err)
}
fmt.Println(utils.ToJSON(rply))
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "testID",
@@ -2574,7 +2576,7 @@ func TestCDRsV1ProcessEventWithGetMockCacheErr(t *testing.T) {
defer func() {
config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses] = defaultConf
}()
var rply []*utils.EventWithFlags
var rply []*utils.EventsWithOpts
err := newCDRSrv.V1ProcessEventWithGet(context.Background(), cgrEv, &rply)
if err == nil || err.Error() != "cannot convert field: 1s to bool" {
t.Errorf("\nExpected <%+v> \n, received <%+v>", "cannot convert field: 1s to bool", err)
@@ -2712,7 +2714,7 @@ func TestCDRsV1ProcessEventWithGetCacheGet(t *testing.T) {
},
}
rply := []*utils.EventWithFlags{}
rply := []*utils.EventsWithOpts{}
Cache.Set(context.Background(), utils.CacheRPCResponses, "CDRsV1.ProcessEvent:testID",
&utils.CachedRPCResponse{Result: &rply, Error: nil},
nil, true, utils.NonTransactional)

View File

@@ -49,7 +49,7 @@ type StorDBService struct {
srvDep map[string]*sync.WaitGroup
}
// Start should handle the sercive start
// Start should handle the service start
func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) {
if db.IsRunning() {
return utils.ErrServiceAlreadyRunning

View File

@@ -153,6 +153,11 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e
return
}
type EventsWithOpts struct {
Event map[string]interface{}
Opts map[string]interface{}
}
// CGREventWithEeIDs is the CGREventWithOpts with EventExporterIDs
type CGREventWithEeIDs struct {
EeIDs []string