From 13c102e8b8b53b151db2e67ce5779d93b67908f0 Mon Sep 17 00:00:00 2001 From: porosnicuadrian Date: Mon, 17 May 2021 10:18:01 +0300 Subject: [PATCH] Added context for Action apis --- actions/actions.go | 38 +++++++------- actions/actions_test.go | 84 +++++++++++++++++-------------- engine/datadbmock.go | 6 +-- engine/datamanager.go | 36 ++++++------- engine/libindex.go | 2 +- engine/storage_interface.go | 6 +-- engine/storage_internal_datadb.go | 6 +-- engine/storage_mongo_datadb.go | 12 ++--- engine/storage_redis.go | 6 +-- engine/tpreader.go | 4 +- loaders/loader.go | 4 +- loaders/loader_test.go | 8 +-- migrator/action_profiles.go | 6 +-- 13 files changed, 113 insertions(+), 105 deletions(-) diff --git a/actions/actions.go b/actions/actions.go index 1b20477a0..05d38e15f 100644 --- a/actions/actions.go +++ b/actions/actions.go @@ -94,11 +94,11 @@ func (aS *ActionS) schedInit() { }, } } - aS.scheduleActions(cgrEvs, nil, true) + aS.scheduleActions(context.Background(), cgrEvs, nil, true) } // scheduleActions will set up cron and load the matching data -func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string, crnReset bool) (err error) { +func (aS *ActionS) scheduleActions(ctx *context.Context, cgrEvs []*utils.CGREvent, aPrflIDs []string, crnReset bool) (err error) { aS.crnLk.Lock() // make sure we don't have parallel processes running setu defer aS.crnLk.Unlock() crn := aS.crn @@ -108,7 +108,7 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string, var partExec bool for _, cgrEv := range cgrEvs { var schedActSet []*scheduledActs - if schedActSet, err = aS.scheduledActions(cgrEv.Tenant, cgrEv, aPrflIDs, false); err != nil { + if schedActSet, err = aS.scheduledActions(ctx, cgrEv.Tenant, cgrEv, aPrflIDs, false); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> scheduler init, ignoring tenant: <%s>, error: <%s>", @@ -118,7 +118,7 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string, } for _, sActs := range schedActSet { if sActs.schedule == utils.MetaASAP { - go aS.asapExecuteActions(sActs) + go aS.asapExecuteActions(ctx, sActs) continue } if _, err = crn.AddFunc(sActs.schedule, sActs.ScheduledExecute); err != nil { @@ -145,12 +145,12 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string, } // matchingActionProfilesForEvent returns the matched ActionProfiles for the given event -func (aS *ActionS) matchingActionProfilesForEvent(tnt string, +func (aS *ActionS) matchingActionProfilesForEvent(ctx *context.Context, tnt string, evNm utils.MapStorage, actTime *time.Time, aPrflIDs []string) (aPfs engine.ActionProfiles, err error) { if len(aPrflIDs) == 0 { var aPfIDMp utils.StringSet if aPfIDMp, err = engine.MatchingItemIDsForEvent( - context.TODO(), + ctx, evNm, aS.cfg.ActionSCfg().StringIndexedFields, aS.cfg.ActionSCfg().PrefixIndexedFields, @@ -167,7 +167,7 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, } for _, aPfID := range aPrflIDs { var aPf *engine.ActionProfile - if aPf, err = aS.dm.GetActionProfile(tnt, aPfID, + if aPf, err = aS.dm.GetActionProfile(ctx, tnt, aPfID, true, true, utils.NonTransactional); err != nil { if err == utils.ErrNotFound { err = nil @@ -176,7 +176,7 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, return } var pass bool - if pass, err = aS.fltrS.Pass(context.TODO(), tnt, aPf.FilterIDs, evNm); err != nil { + if pass, err = aS.fltrS.Pass(ctx, tnt, aPf.FilterIDs, evNm); err != nil { return } else if !pass { continue @@ -191,14 +191,14 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string, } // scheduledActions is responsible for scheduling the action profiles matching cgrEv -func (aS *ActionS) scheduledActions(tnt string, cgrEv *utils.CGREvent, aPrflIDs []string, +func (aS *ActionS) scheduledActions(ctx *context.Context, tnt string, cgrEv *utils.CGREvent, aPrflIDs []string, forceASAP bool) (schedActs []*scheduledActs, err error) { var aPfs engine.ActionProfiles evNm := utils.MapStorage{ utils.MetaReq: cgrEv.Event, utils.MetaOpts: cgrEv.APIOpts, } - if aPfs, err = aS.matchingActionProfilesForEvent(tnt, evNm, cgrEv.Time, aPrflIDs); err != nil { + if aPfs, err = aS.matchingActionProfilesForEvent(ctx, tnt, evNm, cgrEv.Time, aPrflIDs); err != nil { return } @@ -239,10 +239,10 @@ func (aS *ActionS) scheduledActions(tnt string, cgrEv *utils.CGREvent, aPrflIDs // asapExecuteActions executes the scheduledActs and removes the executed from database // uses locks to avoid concurrent access -func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) { - _, err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) { +func (aS *ActionS) asapExecuteActions(ctx *context.Context, sActs *scheduledActs) (err error) { + _, err = guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) { var ap *engine.ActionProfile - if ap, gErr = aS.dm.GetActionProfile(sActs.tenant, sActs.apID, true, true, utils.NonTransactional); gErr != nil { + if ap, gErr = aS.dm.GetActionProfile(ctx, sActs.tenant, sActs.apID, true, true, utils.NonTransactional); gErr != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> querying ActionProfile with id: <%s:%s>, error: <%s>", @@ -253,7 +253,7 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) { return } delete(ap.Targets[sActs.trgTyp], sActs.trgID) - if gErr = aS.dm.SetActionProfile(ap, true); gErr != nil { + if gErr = aS.dm.SetActionProfile(ctx, ap, true); gErr != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> saving ActionProfile with id: <%s:%s>, error: <%s>", @@ -265,8 +265,8 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) { } // V1ScheduleActions will be called to schedule actions matching the arguments -func (aS *ActionS) V1ScheduleActions(args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) { - if err = aS.scheduleActions([]*utils.CGREvent{args.CGREvent}, +func (aS *ActionS) V1ScheduleActions(ctx *context.Context, args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) { + if err = aS.scheduleActions(ctx, []*utils.CGREvent{args.CGREvent}, args.ActionProfileIDs, false); err != nil { return } @@ -275,16 +275,16 @@ func (aS *ActionS) V1ScheduleActions(args *utils.ArgActionSv1ScheduleActions, rp } // V1ExecuteActions will be called to execute ASAP action profiles, ignoring their Schedule field -func (aS *ActionS) V1ExecuteActions(args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) { +func (aS *ActionS) V1ExecuteActions(ctx *context.Context, args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) { var schedActSet []*scheduledActs - if schedActSet, err = aS.scheduledActions(args.CGREvent.Tenant, + if schedActSet, err = aS.scheduledActions(ctx, args.CGREvent.Tenant, args.CGREvent, args.ActionProfileIDs, true); err != nil { return } var partExec bool // execute the actions for _, sActs := range schedActSet { - if err = aS.asapExecuteActions(sActs); err != nil { + if err = aS.asapExecuteActions(ctx, sActs); err != nil { partExec = true } } diff --git a/actions/actions_test.go b/actions/actions_test.go index 5e85446d1..91f614fc4 100644 --- a/actions/actions_test.go +++ b/actions/actions_test.go @@ -71,13 +71,14 @@ func TestMatchingActionProfilesForEvent(t *testing.T) { }, } - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } expActionPrf := engine.ActionProfiles{actPrf} - if rcv, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), []string{}); err != nil { + if rcv, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Now()), []string{}); err != nil { t.Error(err) } else if !reflect.DeepEqual(rcv, expActionPrf) { t.Errorf("Expected %+v, received %+v", utils.ToJSON(expActionPrf), utils.ToJSON(rcv)) @@ -90,7 +91,8 @@ func TestMatchingActionProfilesForEvent(t *testing.T) { utils.MetaOpts: map[string]interface{}{}, } //This Event is not matching with our filter - if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), []string{}); err == nil || err != utils.ErrNotFound { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Now()), []string{}); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } @@ -102,38 +104,44 @@ func TestMatchingActionProfilesForEvent(t *testing.T) { } actPrfIDs := []string{"inexisting_id"} //Unable to get from database an ActionProfile if the ID won't match - if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } actPrfIDs = []string{"test_id1"} - if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } actPrf.FilterIDs = append(actPrf.FilterIDs, "*ai:~*req.AnswerTime:2012-07-21T00:00:00Z|2012-08-21T00:00:00Z") //this event is not active in this interval time - if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Date(2012, 6, 21, 0, 0, 0, 0, time.UTC)), actPrfIDs); err == nil || err != utils.ErrNotFound { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Date(2012, 6, 21, 0, 0, 0, 0, time.UTC)), actPrfIDs); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } //when dataManager is nil, it won't be able to get ActionsProfile from database acts.dm = nil - if _, err := acts.matchingActionProfilesForEvent("INVALID_TENANT", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNoDatabaseConn { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "INVALID_TENANT", + evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNoDatabaseConn { t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err) } acts.dm = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) actPrf.FilterIDs = []string{"invalid_filters"} //Set in database and invalid filter, so it won t pass - if err := acts.dm.SetActionProfile(actPrf, false); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, false); err != nil { t.Error(err) } expected := "NOT_FOUND:invalid_filters" - if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err.Error() != expected { + if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org", + evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err.Error() != expected { t.Errorf("Expected %+v, received %+v", expected, err) } - if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, false); err != nil { + if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, + actPrf.ID, utils.NonTransactional, false); err != nil { t.Error(err) } } @@ -175,11 +183,11 @@ func TestScheduledActions(t *testing.T) { }, } - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if rcv, err := acts.scheduledActions(cgrEv.Tenant, cgrEv, []string{}, false); err != nil { + if rcv, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false); err != nil { t.Error(err) } else { expSchedActs := newScheduledActs(context.Background(), cgrEv.Tenant, cgrEv.ID, utils.MetaNone, utils.EmptyString, @@ -196,7 +204,7 @@ func TestScheduledActions(t *testing.T) { utils.Accounts: "10", }, } - if _, err := acts.scheduledActions(cgrEv.Tenant, cgrEv, []string{}, false); err == nil || err != utils.ErrNotFound { + if _, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } } @@ -236,34 +244,34 @@ func TestScheduleAction(t *testing.T) { }, }, } - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if err := acts.scheduleActions(cgrEv, []string{}, true); err != nil { + if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err != nil { t.Error(err) } //Cannot schedule an action if the ID is invalid - if err := acts.scheduleActions(cgrEv, []string{"INVALID_ID1"}, true); err == nil || err != utils.ErrPartiallyExecuted { + if err := acts.scheduleActions(context.Background(), cgrEv, []string{"INVALID_ID1"}, true); err == nil || err != utils.ErrPartiallyExecuted { t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err) } //When schedule is "*asap", the action will execute immediately actPrf.Schedule = utils.MetaASAP - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if err := acts.scheduleActions(cgrEv, []string{}, true); err != nil { + if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err != nil { t.Error(err) } //Cannot execute the action if the cron is invalid actPrf.Schedule = "* * * *" - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if err := acts.scheduleActions(cgrEv, []string{}, true); err == nil || err != utils.ErrPartiallyExecuted { + if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err == nil || err != utils.ErrPartiallyExecuted { t.Error(err) } } @@ -294,7 +302,7 @@ func TestAsapExecuteActions(t *testing.T) { expSchedActs := newScheduledActs(context.Background(), cgrEv[0].Tenant, cgrEv[0].ID, utils.MetaNone, utils.EmptyString, utils.EmptyString, evNM, nil) - if err := acts.asapExecuteActions(expSchedActs); err == nil || err != utils.ErrNoDatabaseConn { + if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNoDatabaseConn { t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err) } @@ -302,7 +310,7 @@ func TestAsapExecuteActions(t *testing.T) { acts.dm = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) expSchedActs = newScheduledActs(context.Background(), cgrEv[0].Tenant, "another_id", utils.MetaNone, utils.EmptyString, utils.EmptyString, evNM, nil) - if err := acts.asapExecuteActions(expSchedActs); err == nil || err != utils.ErrNotFound { + if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } } @@ -376,22 +384,22 @@ func TestV1ScheduleActions(t *testing.T) { }, } - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if err := acts.V1ScheduleActions(newArgs, &reply); err != nil { + if err := acts.V1ScheduleActions(context.Background(), newArgs, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Unexpected reply %+v", reply) } newArgs.ActionProfileIDs = []string{"invalid_id"} - if err := acts.V1ScheduleActions(newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted { + if err := acts.V1ScheduleActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted { t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err) } - if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil { + if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil { t.Error(err) } } @@ -433,18 +441,18 @@ func TestV1ExecuteActions(t *testing.T) { }, }, } - if err := acts.dm.SetActionProfile(actPrf, true); err != nil { + if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } - if err := acts.V1ExecuteActions(newArgs, &reply); err != nil { + if err := acts.V1ExecuteActions(context.Background(), newArgs, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Unexpected reply %+v", reply) } newArgs.ActionProfileIDs = []string{"invalid_id"} - if err := acts.V1ExecuteActions(newArgs, &reply); err == nil || err != utils.ErrNotFound { + if err := acts.V1ExecuteActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrNotFound { t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err) } @@ -452,11 +460,11 @@ func TestV1ExecuteActions(t *testing.T) { newDm := engine.NewDataManager(newData, config.CgrConfig().CacheCfg(), nil) newActs := NewActionS(defaultCfg, filters, newDm, nil) newArgs.ActionProfileIDs = []string{} - if err := newActs.V1ExecuteActions(newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted { + if err := newActs.V1ExecuteActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted { t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err) } - if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil { + if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil { t.Error(err) } } @@ -491,7 +499,7 @@ type dataDBMockError struct { *engine.DataDBMock } -func (dbM *dataDBMockError) GetActionProfileDrv(string, string) (*engine.ActionProfile, error) { +func (dbM *dataDBMockError) GetActionProfileDrv(*context.Context, string, string) (*engine.ActionProfile, error) { return &engine.ActionProfile{ Tenant: "cgrates.org", ID: "test_id1", @@ -510,7 +518,7 @@ func (dbM *dataDBMockError) GetActionProfileDrv(string, string) (*engine.ActionP }, nil } -func (dbM *dataDBMockError) SetActionProfileDrv(*engine.ActionProfile) error { +func (dbM *dataDBMockError) SetActionProfileDrv(*context.Context, *engine.ActionProfile) error { return utils.ErrNoDatabaseConn } @@ -1061,7 +1069,7 @@ func TestACScheduledActions(t *testing.T) { }, } - if err := dm.SetActionProfile(actPrf, true); err != nil { + if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } @@ -1084,7 +1092,7 @@ func TestACScheduledActions(t *testing.T) { acts := NewActionS(cfg, fltrs, dm, nil) expected := "WARNING] ignoring ActionProfile with id: creating action: , error: >" - if _, err := acts.scheduledActions("cgrates.org", cgrEv, []string{}, true); err != nil { + if _, err := acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, true); err != nil { t.Error(err) } else if rcv := buff.String(); !strings.Contains(rcv, expected) { t.Errorf("Expected %+v, received %+v", expected, rcv) @@ -1097,7 +1105,7 @@ func TestACScheduledActions(t *testing.T) { "ID_TEST": {}, }, } - if err := dm.SetActionProfile(actPrf, true); err != nil { + if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil { t.Error(err) } @@ -1117,14 +1125,14 @@ func TestACScheduledActions(t *testing.T) { }, } var schedActs []*scheduledActs - if schedActs, err = acts.scheduledActions("cgrates.org", cgrEv, []string{}, true); err != nil { + if schedActs, err = acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, true); err != nil { t.Error(err) } else { } //execute asap the actions schedActs[0].trgID = "invalid_type" - if err := acts.asapExecuteActions(schedActs[0]); err == nil || err != utils.ErrPartiallyExecuted { + if err := acts.asapExecuteActions(context.Background(), schedActs[0]); err == nil || err != utils.ErrPartiallyExecuted { t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err) } diff --git a/engine/datadbmock.go b/engine/datadbmock.go index e96d50ede..6cdcfdf92 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -290,15 +290,15 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(*context.Context, string, string) er return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetActionProfileDrv(string, string) (*ActionProfile, error) { +func (dbM *DataDBMock) GetActionProfileDrv(*context.Context, string, string) (*ActionProfile, error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetActionProfileDrv(*ActionProfile) error { +func (dbM *DataDBMock) SetActionProfileDrv(*context.Context, *ActionProfile) error { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemoveActionProfileDrv(string, string) error { +func (dbM *DataDBMock) RemoveActionProfileDrv(*context.Context, string, string) error { return utils.ErrNotImplemented } diff --git a/engine/datamanager.go b/engine/datamanager.go index 6cd70d5ce..7c19dabc0 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -186,7 +186,7 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] _, err = dm.GetRateProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.ActionProfilePrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetActionProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetActionProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.AttributeFilterIndexes: var tntCtx, idxKey string if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil { @@ -2247,7 +2247,7 @@ func (dm *DataManager) SetRateProfileRates(ctx *context.Context, rpp *utils.Rate return } -func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite bool, +func (dm *DataManager) GetActionProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (ap *ActionProfile, err error) { tntID := utils.ConcatenatedKey(tenant, id) if cacheRead { @@ -2262,10 +2262,10 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite err = utils.ErrNoDatabaseConn return } - ap, err = dm.dataDB.GetActionProfileDrv(tenant, id) + ap, err = dm.dataDB.GetActionProfileDrv(ctx, tenant, id) if err != nil { if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; err == utils.ErrNotFound && itm.Remote { - if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns, + if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetActionProfile, &utils.TenantIDWithAPIOpts{ TenantID: &utils.TenantID{Tenant: tenant, ID: id}, @@ -2273,13 +2273,13 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, config.CgrConfig().GeneralCfg().NodeID)), }, &ap); err == nil { - err = dm.dataDB.SetActionProfileDrv(ap) + err = dm.dataDB.SetActionProfileDrv(ctx, ap) } } if err != nil { err = utils.CastRPCErr(err) if err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal { - if errCh := Cache.Set(context.TODO(), utils.CacheActionProfiles, tntID, nil, nil, + if errCh := Cache.Set(ctx, utils.CacheActionProfiles, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh } @@ -2289,7 +2289,7 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite } } if cacheWrite { - if errCh := Cache.Set(context.TODO(), utils.CacheActionProfiles, tntID, ap, nil, + if errCh := Cache.Set(ctx, utils.CacheActionProfiles, tntID, ap, nil, cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh } @@ -2297,22 +2297,22 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite return } -func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err error) { +func (dm *DataManager) SetActionProfile(ctx *context.Context, ap *ActionProfile, withIndex bool) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } if withIndex { - if brokenReference := dm.checkFilters(context.TODO(), ap.Tenant, ap.FilterIDs); len(brokenReference) != 0 { + if brokenReference := dm.checkFilters(ctx, ap.Tenant, ap.FilterIDs); len(brokenReference) != 0 { // if we get a broken filter do not set the profile return fmt.Errorf("broken reference to filter: %+v for item with ID: %+v", brokenReference, ap.TenantID()) } } - oldRpp, err := dm.GetActionProfile(ap.Tenant, ap.ID, true, false, utils.NonTransactional) + oldRpp, err := dm.GetActionProfile(ctx, ap.Tenant, ap.ID, true, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return err } - if err = dm.DataDB().SetActionProfileDrv(ap); err != nil { + if err = dm.DataDB().SetActionProfileDrv(ctx, ap); err != nil { return err } if withIndex { @@ -2320,13 +2320,13 @@ func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err if oldRpp != nil { oldFiltersIDs = &oldRpp.FilterIDs } - if err := updatedIndexes(context.TODO(), dm, utils.CacheActionProfilesFilterIndexes, ap.Tenant, + if err := updatedIndexes(ctx, dm, utils.CacheActionProfilesFilterIndexes, ap.Tenant, utils.EmptyString, ap.ID, oldFiltersIDs, ap.FilterIDs, false); err != nil { return err } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; itm.Replicate { - err = replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.ActionProfilePrefix, ap.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetActionProfile, @@ -2338,29 +2338,29 @@ func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err return } -func (dm *DataManager) RemoveActionProfile(tenant, id string, +func (dm *DataManager) RemoveActionProfile(ctx *context.Context, tenant, id string, transactionID string, withIndex bool) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } - oldRpp, err := dm.GetActionProfile(tenant, id, true, false, utils.NonTransactional) + oldRpp, err := dm.GetActionProfile(ctx, tenant, id, true, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return err } - if err = dm.DataDB().RemoveActionProfileDrv(tenant, id); err != nil { + if err = dm.DataDB().RemoveActionProfileDrv(ctx, tenant, id); err != nil { return } if oldRpp == nil { return utils.ErrNotFound } if withIndex { - if err = removeItemFromFilterIndex(context.TODO(), dm, utils.CacheActionProfilesFilterIndexes, + if err = removeItemFromFilterIndex(ctx, dm, utils.CacheActionProfilesFilterIndexes, tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil { return } } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; itm.Replicate { - replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.ActionProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache utils.ReplicatorSv1RemoveActionProfile, diff --git a/engine/libindex.go b/engine/libindex.go index 35c411087..fef16ef18 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -725,7 +725,7 @@ func UpdateFilterIndex(ctx *context.Context, dm *DataManager, oldFlt, newFlt *Fi idxSlice := indx.AsSlice() if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - acp, e := dm.GetActionProfile(tnt, id, true, false, utils.NonTransactional) + acp, e := dm.GetActionProfile(context.TODO(), tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index c63339d43..6806d976e 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -99,9 +99,9 @@ type DataDB interface { GetRateProfileDrv(*context.Context, string, string) (*utils.RateProfile, error) SetRateProfileDrv(*context.Context, *utils.RateProfile) error RemoveRateProfileDrv(*context.Context, string, string) error - GetActionProfileDrv(string, string) (*ActionProfile, error) - SetActionProfileDrv(*ActionProfile) error - RemoveActionProfileDrv(string, string) error + GetActionProfileDrv(*context.Context, string, string) (*ActionProfile, error) + SetActionProfileDrv(*context.Context, *ActionProfile) error + RemoveActionProfileDrv(*context.Context, string, string) error GetAccountDrv(string, string) (*utils.Account, error) SetAccountDrv(profile *utils.Account) error RemoveAccountDrv(string, string) error diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 5d49ee228..86a592131 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -500,7 +500,7 @@ func (iDB *InternalDB) RemoveRateProfileDrv(ctx *context.Context, tenant, id str return } -func (iDB *InternalDB) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) { +func (iDB *InternalDB) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) { x, ok := Cache.Get(utils.CacheActionProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -508,13 +508,13 @@ func (iDB *InternalDB) GetActionProfileDrv(tenant, id string) (ap *ActionProfile return x.(*ActionProfile), nil } -func (iDB *InternalDB) SetActionProfileDrv(ap *ActionProfile) (err error) { +func (iDB *InternalDB) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) { Cache.SetWithoutReplicate(utils.CacheActionProfiles, ap.TenantID(), ap, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemoveActionProfileDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheActionProfiles, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 69d26190c..1abcd1a08 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1382,9 +1382,9 @@ func (ms *MongoStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st }) } -func (ms *MongoStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) { +func (ms *MongoStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) { ap = new(ActionProfile) - err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { + err = ms.query(ctx, func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColApp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) if err := cur.Decode(ap); err != nil { ap = nil @@ -1398,8 +1398,8 @@ func (ms *MongoStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfil return } -func (ms *MongoStorage) SetActionProfileDrv(ap *ActionProfile) (err error) { - return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) { + return ms.query(ctx, func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColApp).UpdateOne(sctx, bson.M{"tenant": ap.Tenant, "id": ap.ID}, bson.M{"$set": ap}, options.Update().SetUpsert(true), @@ -1408,8 +1408,8 @@ func (ms *MongoStorage) SetActionProfileDrv(ap *ActionProfile) (err error) { }) } -func (ms *MongoStorage) RemoveActionProfileDrv(tenant, id string) (err error) { - return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { +func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) { + return ms.query(ctx, func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColApp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { return utils.ErrNotFound diff --git a/engine/storage_redis.go b/engine/storage_redis.go index b634a7512..5a97f4b39 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -773,7 +773,7 @@ func (rs *RedisStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st return rs.Cmd(nil, redisDEL, utils.RateProfilePrefix+utils.ConcatenatedKey(tenant, id)) } -func (rs *RedisStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) { +func (rs *RedisStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -785,7 +785,7 @@ func (rs *RedisStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfil return } -func (rs *RedisStorage) SetActionProfileDrv(ap *ActionProfile) (err error) { +func (rs *RedisStorage) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) { var result []byte if result, err = rs.ms.Marshal(ap); err != nil { return @@ -793,7 +793,7 @@ func (rs *RedisStorage) SetActionProfileDrv(ap *ActionProfile) (err error) { return rs.Cmd(nil, redisSET, utils.ActionProfilePrefix+utils.ConcatenatedKey(ap.Tenant, ap.ID), string(result)) } -func (rs *RedisStorage) RemoveActionProfileDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id)) } diff --git a/engine/tpreader.go b/engine/tpreader.go index 119a45a67..462d32709 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -696,7 +696,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { if ap, err = APItoActionProfile(tpAP, tpr.timezone); err != nil { return } - if err = tpr.dm.SetActionProfile(ap, true); err != nil { + if err = tpr.dm.SetActionProfile(context.TODO(), ap, true); err != nil { return } if verbose { @@ -1043,7 +1043,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("ActionProfiles:") } for _, tpAp := range tpr.actionProfiles { - if err = tpr.dm.RemoveActionProfile(tpAp.Tenant, tpAp.ID, + if err = tpr.dm.RemoveActionProfile(context.TODO(), tpAp.Tenant, tpAp.ID, utils.NonTransactional, true); err != nil { return } diff --git a/loaders/loader.go b/loaders/loader.go index ee2cb06d8..594185c7d 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -634,7 +634,7 @@ func (ldr *Loader) storeLoadedData(loaderType string, } // get IDs so we can reload in cache ids = append(ids, acp.TenantID()) - if err := ldr.dm.SetActionProfile(acp, true); err != nil { + if err := ldr.dm.SetActionProfile(context.TODO(), acp, true); err != nil { return err } cacheArgs[utils.ActionProfileIDs] = ids @@ -969,7 +969,7 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa tntIDStruct := utils.NewTenantID(tntID) // get IDs so we can reload in cache ids = append(ids, tntID) - if err := ldr.dm.RemoveActionProfile(tntIDStruct.Tenant, + if err := ldr.dm.RemoveActionProfile(context.TODO(), tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } diff --git a/loaders/loader_test.go b/loaders/loader_test.go index 678633dc9..54d27c1f4 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -2513,7 +2513,7 @@ func TestLoaderActionProfile(t *testing.T) { }, } - aps, err := ldr.dm.GetActionProfile("cgrates.org", "ONE_TIME_ACT", + aps, err := ldr.dm.GetActionProfile(context.Background(), "cgrates.org", "ONE_TIME_ACT", true, false, utils.NonTransactional) if err != nil { t.Fatal(err) @@ -4185,7 +4185,7 @@ cgrates.org,REM_ACTPROFILE_1 Tenant: "cgrates.org", ID: "REM_ACTPROFILE_1", } - if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil { + if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil { t.Error(err) } else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err != nil { t.Error(err) @@ -4339,7 +4339,7 @@ cgrates.org,REM_ACTPROFILE_s ID: "REM_ACTPROFILE_s", } expectedErr := "NOT_FOUND" - if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil { + if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil { t.Error(err) } else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+v, received %+v", expectedErr, err) @@ -4391,7 +4391,7 @@ cgrates.org,REM_ACTPROFILE_s ID: "REM_ACTPROFILE_s", } expectedErr := "NOT_FOUND" - if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil { + if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil { t.Error(err) } else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err == nil || err.Error() != expectedErr { t.Errorf("Expected %+v, received %+v", expectedErr, err) diff --git a/migrator/action_profiles.go b/migrator/action_profiles.go index 5e38e815e..b19dbcb06 100644 --- a/migrator/action_profiles.go +++ b/migrator/action_profiles.go @@ -39,17 +39,17 @@ func (m *Migrator) migrateCurrentActionProfiles() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating from action profiles", id) } - ap, err := m.dmIN.DataManager().GetActionProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + ap, err := m.dmIN.DataManager().GetActionProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } if ap == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetActionProfile(ap, true); err != nil { + if err := m.dmOut.DataManager().SetActionProfile(context.TODO(), ap, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveActionProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { + if err := m.dmIN.DataManager().RemoveActionProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } m.stats[utils.ActionProfiles]++