mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 12:49:54 +05:00
Added context for Action apis
This commit is contained in:
committed by
Dan Christian Bogos
parent
6c2a2c3a98
commit
13c102e8b8
@@ -94,11 +94,11 @@ func (aS *ActionS) schedInit() {
|
||||
},
|
||||
}
|
||||
}
|
||||
aS.scheduleActions(cgrEvs, nil, true)
|
||||
aS.scheduleActions(context.Background(), cgrEvs, nil, true)
|
||||
}
|
||||
|
||||
// scheduleActions will set up cron and load the matching data
|
||||
func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string, crnReset bool) (err error) {
|
||||
func (aS *ActionS) scheduleActions(ctx *context.Context, cgrEvs []*utils.CGREvent, aPrflIDs []string, crnReset bool) (err error) {
|
||||
aS.crnLk.Lock() // make sure we don't have parallel processes running setu
|
||||
defer aS.crnLk.Unlock()
|
||||
crn := aS.crn
|
||||
@@ -108,7 +108,7 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string,
|
||||
var partExec bool
|
||||
for _, cgrEv := range cgrEvs {
|
||||
var schedActSet []*scheduledActs
|
||||
if schedActSet, err = aS.scheduledActions(cgrEv.Tenant, cgrEv, aPrflIDs, false); err != nil {
|
||||
if schedActSet, err = aS.scheduledActions(ctx, cgrEv.Tenant, cgrEv, aPrflIDs, false); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> scheduler init, ignoring tenant: <%s>, error: <%s>",
|
||||
@@ -118,7 +118,7 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string,
|
||||
}
|
||||
for _, sActs := range schedActSet {
|
||||
if sActs.schedule == utils.MetaASAP {
|
||||
go aS.asapExecuteActions(sActs)
|
||||
go aS.asapExecuteActions(ctx, sActs)
|
||||
continue
|
||||
}
|
||||
if _, err = crn.AddFunc(sActs.schedule, sActs.ScheduledExecute); err != nil {
|
||||
@@ -145,12 +145,12 @@ func (aS *ActionS) scheduleActions(cgrEvs []*utils.CGREvent, aPrflIDs []string,
|
||||
}
|
||||
|
||||
// matchingActionProfilesForEvent returns the matched ActionProfiles for the given event
|
||||
func (aS *ActionS) matchingActionProfilesForEvent(tnt string,
|
||||
func (aS *ActionS) matchingActionProfilesForEvent(ctx *context.Context, tnt string,
|
||||
evNm utils.MapStorage, actTime *time.Time, aPrflIDs []string) (aPfs engine.ActionProfiles, err error) {
|
||||
if len(aPrflIDs) == 0 {
|
||||
var aPfIDMp utils.StringSet
|
||||
if aPfIDMp, err = engine.MatchingItemIDsForEvent(
|
||||
context.TODO(),
|
||||
ctx,
|
||||
evNm,
|
||||
aS.cfg.ActionSCfg().StringIndexedFields,
|
||||
aS.cfg.ActionSCfg().PrefixIndexedFields,
|
||||
@@ -167,7 +167,7 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string,
|
||||
}
|
||||
for _, aPfID := range aPrflIDs {
|
||||
var aPf *engine.ActionProfile
|
||||
if aPf, err = aS.dm.GetActionProfile(tnt, aPfID,
|
||||
if aPf, err = aS.dm.GetActionProfile(ctx, tnt, aPfID,
|
||||
true, true, utils.NonTransactional); err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
err = nil
|
||||
@@ -176,7 +176,7 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string,
|
||||
return
|
||||
}
|
||||
var pass bool
|
||||
if pass, err = aS.fltrS.Pass(context.TODO(), tnt, aPf.FilterIDs, evNm); err != nil {
|
||||
if pass, err = aS.fltrS.Pass(ctx, tnt, aPf.FilterIDs, evNm); err != nil {
|
||||
return
|
||||
} else if !pass {
|
||||
continue
|
||||
@@ -191,14 +191,14 @@ func (aS *ActionS) matchingActionProfilesForEvent(tnt string,
|
||||
}
|
||||
|
||||
// scheduledActions is responsible for scheduling the action profiles matching cgrEv
|
||||
func (aS *ActionS) scheduledActions(tnt string, cgrEv *utils.CGREvent, aPrflIDs []string,
|
||||
func (aS *ActionS) scheduledActions(ctx *context.Context, tnt string, cgrEv *utils.CGREvent, aPrflIDs []string,
|
||||
forceASAP bool) (schedActs []*scheduledActs, err error) {
|
||||
var aPfs engine.ActionProfiles
|
||||
evNm := utils.MapStorage{
|
||||
utils.MetaReq: cgrEv.Event,
|
||||
utils.MetaOpts: cgrEv.APIOpts,
|
||||
}
|
||||
if aPfs, err = aS.matchingActionProfilesForEvent(tnt, evNm, cgrEv.Time, aPrflIDs); err != nil {
|
||||
if aPfs, err = aS.matchingActionProfilesForEvent(ctx, tnt, evNm, cgrEv.Time, aPrflIDs); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -239,10 +239,10 @@ func (aS *ActionS) scheduledActions(tnt string, cgrEv *utils.CGREvent, aPrflIDs
|
||||
|
||||
// asapExecuteActions executes the scheduledActs and removes the executed from database
|
||||
// uses locks to avoid concurrent access
|
||||
func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
|
||||
_, err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
func (aS *ActionS) asapExecuteActions(ctx *context.Context, sActs *scheduledActs) (err error) {
|
||||
_, err = guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
var ap *engine.ActionProfile
|
||||
if ap, gErr = aS.dm.GetActionProfile(sActs.tenant, sActs.apID, true, true, utils.NonTransactional); gErr != nil {
|
||||
if ap, gErr = aS.dm.GetActionProfile(ctx, sActs.tenant, sActs.apID, true, true, utils.NonTransactional); gErr != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying ActionProfile with id: <%s:%s>, error: <%s>",
|
||||
@@ -253,7 +253,7 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
|
||||
return
|
||||
}
|
||||
delete(ap.Targets[sActs.trgTyp], sActs.trgID)
|
||||
if gErr = aS.dm.SetActionProfile(ap, true); gErr != nil {
|
||||
if gErr = aS.dm.SetActionProfile(ctx, ap, true); gErr != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> saving ActionProfile with id: <%s:%s>, error: <%s>",
|
||||
@@ -265,8 +265,8 @@ func (aS *ActionS) asapExecuteActions(sActs *scheduledActs) (err error) {
|
||||
}
|
||||
|
||||
// V1ScheduleActions will be called to schedule actions matching the arguments
|
||||
func (aS *ActionS) V1ScheduleActions(args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) {
|
||||
if err = aS.scheduleActions([]*utils.CGREvent{args.CGREvent},
|
||||
func (aS *ActionS) V1ScheduleActions(ctx *context.Context, args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) {
|
||||
if err = aS.scheduleActions(ctx, []*utils.CGREvent{args.CGREvent},
|
||||
args.ActionProfileIDs, false); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -275,16 +275,16 @@ func (aS *ActionS) V1ScheduleActions(args *utils.ArgActionSv1ScheduleActions, rp
|
||||
}
|
||||
|
||||
// V1ExecuteActions will be called to execute ASAP action profiles, ignoring their Schedule field
|
||||
func (aS *ActionS) V1ExecuteActions(args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) {
|
||||
func (aS *ActionS) V1ExecuteActions(ctx *context.Context, args *utils.ArgActionSv1ScheduleActions, rpl *string) (err error) {
|
||||
var schedActSet []*scheduledActs
|
||||
if schedActSet, err = aS.scheduledActions(args.CGREvent.Tenant,
|
||||
if schedActSet, err = aS.scheduledActions(ctx, args.CGREvent.Tenant,
|
||||
args.CGREvent, args.ActionProfileIDs, true); err != nil {
|
||||
return
|
||||
}
|
||||
var partExec bool
|
||||
// execute the actions
|
||||
for _, sActs := range schedActSet {
|
||||
if err = aS.asapExecuteActions(sActs); err != nil {
|
||||
if err = aS.asapExecuteActions(ctx, sActs); err != nil {
|
||||
partExec = true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,13 +71,14 @@ func TestMatchingActionProfilesForEvent(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
expActionPrf := engine.ActionProfiles{actPrf}
|
||||
|
||||
if rcv, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), []string{}); err != nil {
|
||||
if rcv, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Now()), []string{}); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(rcv, expActionPrf) {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expActionPrf), utils.ToJSON(rcv))
|
||||
@@ -90,7 +91,8 @@ func TestMatchingActionProfilesForEvent(t *testing.T) {
|
||||
utils.MetaOpts: map[string]interface{}{},
|
||||
}
|
||||
//This Event is not matching with our filter
|
||||
if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), []string{}); err == nil || err != utils.ErrNotFound {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Now()), []string{}); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
|
||||
@@ -102,38 +104,44 @@ func TestMatchingActionProfilesForEvent(t *testing.T) {
|
||||
}
|
||||
actPrfIDs := []string{"inexisting_id"}
|
||||
//Unable to get from database an ActionProfile if the ID won't match
|
||||
if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
|
||||
actPrfIDs = []string{"test_id1"}
|
||||
if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
actPrf.FilterIDs = append(actPrf.FilterIDs, "*ai:~*req.AnswerTime:2012-07-21T00:00:00Z|2012-08-21T00:00:00Z")
|
||||
//this event is not active in this interval time
|
||||
if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Date(2012, 6, 21, 0, 0, 0, 0, time.UTC)), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Date(2012, 6, 21, 0, 0, 0, 0, time.UTC)), actPrfIDs); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
|
||||
//when dataManager is nil, it won't be able to get ActionsProfile from database
|
||||
acts.dm = nil
|
||||
if _, err := acts.matchingActionProfilesForEvent("INVALID_TENANT", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNoDatabaseConn {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "INVALID_TENANT",
|
||||
evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err != utils.ErrNoDatabaseConn {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
|
||||
}
|
||||
|
||||
acts.dm = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
|
||||
actPrf.FilterIDs = []string{"invalid_filters"}
|
||||
//Set in database and invalid filter, so it won t pass
|
||||
if err := acts.dm.SetActionProfile(actPrf, false); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expected := "NOT_FOUND:invalid_filters"
|
||||
if _, err := acts.matchingActionProfilesForEvent("cgrates.org", evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err.Error() != expected {
|
||||
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
|
||||
evNM, utils.TimePointer(time.Now()), actPrfIDs); err == nil || err.Error() != expected {
|
||||
t.Errorf("Expected %+v, received %+v", expected, err)
|
||||
}
|
||||
|
||||
if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, false); err != nil {
|
||||
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant,
|
||||
actPrf.ID, utils.NonTransactional, false); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -175,11 +183,11 @@ func TestScheduledActions(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if rcv, err := acts.scheduledActions(cgrEv.Tenant, cgrEv, []string{}, false); err != nil {
|
||||
if rcv, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
expSchedActs := newScheduledActs(context.Background(), cgrEv.Tenant, cgrEv.ID, utils.MetaNone, utils.EmptyString,
|
||||
@@ -196,7 +204,7 @@ func TestScheduledActions(t *testing.T) {
|
||||
utils.Accounts: "10",
|
||||
},
|
||||
}
|
||||
if _, err := acts.scheduledActions(cgrEv.Tenant, cgrEv, []string{}, false); err == nil || err != utils.ErrNotFound {
|
||||
if _, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
@@ -236,34 +244,34 @@ func TestScheduleAction(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := acts.scheduleActions(cgrEv, []string{}, true); err != nil {
|
||||
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Cannot schedule an action if the ID is invalid
|
||||
if err := acts.scheduleActions(cgrEv, []string{"INVALID_ID1"}, true); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
if err := acts.scheduleActions(context.Background(), cgrEv, []string{"INVALID_ID1"}, true); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
//When schedule is "*asap", the action will execute immediately
|
||||
actPrf.Schedule = utils.MetaASAP
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := acts.scheduleActions(cgrEv, []string{}, true); err != nil {
|
||||
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
//Cannot execute the action if the cron is invalid
|
||||
actPrf.Schedule = "* * * *"
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := acts.scheduleActions(cgrEv, []string{}, true); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, true); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -294,7 +302,7 @@ func TestAsapExecuteActions(t *testing.T) {
|
||||
expSchedActs := newScheduledActs(context.Background(), cgrEv[0].Tenant, cgrEv[0].ID, utils.MetaNone, utils.EmptyString,
|
||||
utils.EmptyString, evNM, nil)
|
||||
|
||||
if err := acts.asapExecuteActions(expSchedActs); err == nil || err != utils.ErrNoDatabaseConn {
|
||||
if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNoDatabaseConn {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
|
||||
}
|
||||
|
||||
@@ -302,7 +310,7 @@ func TestAsapExecuteActions(t *testing.T) {
|
||||
acts.dm = engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
|
||||
expSchedActs = newScheduledActs(context.Background(), cgrEv[0].Tenant, "another_id", utils.MetaNone, utils.EmptyString,
|
||||
utils.EmptyString, evNM, nil)
|
||||
if err := acts.asapExecuteActions(expSchedActs); err == nil || err != utils.ErrNotFound {
|
||||
if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
@@ -376,22 +384,22 @@ func TestV1ScheduleActions(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := acts.V1ScheduleActions(newArgs, &reply); err != nil {
|
||||
if err := acts.V1ScheduleActions(context.Background(), newArgs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply %+v", reply)
|
||||
}
|
||||
|
||||
newArgs.ActionProfileIDs = []string{"invalid_id"}
|
||||
if err := acts.V1ScheduleActions(newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
if err := acts.V1ScheduleActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil {
|
||||
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -433,18 +441,18 @@ func TestV1ExecuteActions(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := acts.dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := acts.V1ExecuteActions(newArgs, &reply); err != nil {
|
||||
if err := acts.V1ExecuteActions(context.Background(), newArgs, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Unexpected reply %+v", reply)
|
||||
}
|
||||
|
||||
newArgs.ActionProfileIDs = []string{"invalid_id"}
|
||||
if err := acts.V1ExecuteActions(newArgs, &reply); err == nil || err != utils.ErrNotFound {
|
||||
if err := acts.V1ExecuteActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrNotFound {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
|
||||
}
|
||||
|
||||
@@ -452,11 +460,11 @@ func TestV1ExecuteActions(t *testing.T) {
|
||||
newDm := engine.NewDataManager(newData, config.CgrConfig().CacheCfg(), nil)
|
||||
newActs := NewActionS(defaultCfg, filters, newDm, nil)
|
||||
newArgs.ActionProfileIDs = []string{}
|
||||
if err := newActs.V1ExecuteActions(newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
if err := newActs.V1ExecuteActions(context.Background(), newArgs, &reply); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
if err := acts.dm.RemoveActionProfile(actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil {
|
||||
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, utils.NonTransactional, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
@@ -491,7 +499,7 @@ type dataDBMockError struct {
|
||||
*engine.DataDBMock
|
||||
}
|
||||
|
||||
func (dbM *dataDBMockError) GetActionProfileDrv(string, string) (*engine.ActionProfile, error) {
|
||||
func (dbM *dataDBMockError) GetActionProfileDrv(*context.Context, string, string) (*engine.ActionProfile, error) {
|
||||
return &engine.ActionProfile{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "test_id1",
|
||||
@@ -510,7 +518,7 @@ func (dbM *dataDBMockError) GetActionProfileDrv(string, string) (*engine.ActionP
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (dbM *dataDBMockError) SetActionProfileDrv(*engine.ActionProfile) error {
|
||||
func (dbM *dataDBMockError) SetActionProfileDrv(*context.Context, *engine.ActionProfile) error {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
|
||||
@@ -1061,7 +1069,7 @@ func TestACScheduledActions(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -1084,7 +1092,7 @@ func TestACScheduledActions(t *testing.T) {
|
||||
|
||||
acts := NewActionS(cfg, fltrs, dm, nil)
|
||||
expected := "WARNING] <ActionS> ignoring ActionProfile with id: <cgrates.org:TestACScheduledActions> creating action: <TOPUP>, error: <unsupported action type: <inexistent_type>>"
|
||||
if _, err := acts.scheduledActions("cgrates.org", cgrEv, []string{}, true); err != nil {
|
||||
if _, err := acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if rcv := buff.String(); !strings.Contains(rcv, expected) {
|
||||
t.Errorf("Expected %+v, received %+v", expected, rcv)
|
||||
@@ -1097,7 +1105,7 @@ func TestACScheduledActions(t *testing.T) {
|
||||
"ID_TEST": {},
|
||||
},
|
||||
}
|
||||
if err := dm.SetActionProfile(actPrf, true); err != nil {
|
||||
if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -1117,14 +1125,14 @@ func TestACScheduledActions(t *testing.T) {
|
||||
},
|
||||
}
|
||||
var schedActs []*scheduledActs
|
||||
if schedActs, err = acts.scheduledActions("cgrates.org", cgrEv, []string{}, true); err != nil {
|
||||
if schedActs, err = acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, true); err != nil {
|
||||
t.Error(err)
|
||||
} else {
|
||||
|
||||
}
|
||||
//execute asap the actions
|
||||
schedActs[0].trgID = "invalid_type"
|
||||
if err := acts.asapExecuteActions(schedActs[0]); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
if err := acts.asapExecuteActions(context.Background(), schedActs[0]); err == nil || err != utils.ErrPartiallyExecuted {
|
||||
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -290,15 +290,15 @@ func (dbM *DataDBMock) RemoveRateProfileDrv(*context.Context, string, string) er
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) GetActionProfileDrv(string, string) (*ActionProfile, error) {
|
||||
func (dbM *DataDBMock) GetActionProfileDrv(*context.Context, string, string) (*ActionProfile, error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) SetActionProfileDrv(*ActionProfile) error {
|
||||
func (dbM *DataDBMock) SetActionProfileDrv(*context.Context, *ActionProfile) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
func (dbM *DataDBMock) RemoveActionProfileDrv(string, string) error {
|
||||
func (dbM *DataDBMock) RemoveActionProfileDrv(*context.Context, string, string) error {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
|
||||
@@ -186,7 +186,7 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
|
||||
_, err = dm.GetRateProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.ActionProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetActionProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetActionProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.AttributeFilterIndexes:
|
||||
var tntCtx, idxKey string
|
||||
if tntCtx, idxKey, err = splitFilterIndex(dataID); err != nil {
|
||||
@@ -2247,7 +2247,7 @@ func (dm *DataManager) SetRateProfileRates(ctx *context.Context, rpp *utils.Rate
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite bool,
|
||||
func (dm *DataManager) GetActionProfile(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool,
|
||||
transactionID string) (ap *ActionProfile, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
if cacheRead {
|
||||
@@ -2262,10 +2262,10 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite
|
||||
err = utils.ErrNoDatabaseConn
|
||||
return
|
||||
}
|
||||
ap, err = dm.dataDB.GetActionProfileDrv(tenant, id)
|
||||
ap, err = dm.dataDB.GetActionProfileDrv(ctx, tenant, id)
|
||||
if err != nil {
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; err == utils.ErrNotFound && itm.Remote {
|
||||
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
|
||||
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns,
|
||||
utils.ReplicatorSv1GetActionProfile,
|
||||
&utils.TenantIDWithAPIOpts{
|
||||
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
|
||||
@@ -2273,13 +2273,13 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite
|
||||
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
|
||||
config.CgrConfig().GeneralCfg().NodeID)),
|
||||
}, &ap); err == nil {
|
||||
err = dm.dataDB.SetActionProfileDrv(ap)
|
||||
err = dm.dataDB.SetActionProfileDrv(ctx, ap)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
err = utils.CastRPCErr(err)
|
||||
if err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal {
|
||||
if errCh := Cache.Set(context.TODO(), utils.CacheActionProfiles, tntID, nil, nil,
|
||||
if errCh := Cache.Set(ctx, utils.CacheActionProfiles, tntID, nil, nil,
|
||||
cacheCommit(transactionID), transactionID); errCh != nil {
|
||||
return nil, errCh
|
||||
}
|
||||
@@ -2289,7 +2289,7 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite
|
||||
}
|
||||
}
|
||||
if cacheWrite {
|
||||
if errCh := Cache.Set(context.TODO(), utils.CacheActionProfiles, tntID, ap, nil,
|
||||
if errCh := Cache.Set(ctx, utils.CacheActionProfiles, tntID, ap, nil,
|
||||
cacheCommit(transactionID), transactionID); errCh != nil {
|
||||
return nil, errCh
|
||||
}
|
||||
@@ -2297,22 +2297,22 @@ func (dm *DataManager) GetActionProfile(tenant, id string, cacheRead, cacheWrite
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err error) {
|
||||
func (dm *DataManager) SetActionProfile(ctx *context.Context, ap *ActionProfile, withIndex bool) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
if withIndex {
|
||||
if brokenReference := dm.checkFilters(context.TODO(), ap.Tenant, ap.FilterIDs); len(brokenReference) != 0 {
|
||||
if brokenReference := dm.checkFilters(ctx, ap.Tenant, ap.FilterIDs); len(brokenReference) != 0 {
|
||||
// if we get a broken filter do not set the profile
|
||||
return fmt.Errorf("broken reference to filter: %+v for item with ID: %+v",
|
||||
brokenReference, ap.TenantID())
|
||||
}
|
||||
}
|
||||
oldRpp, err := dm.GetActionProfile(ap.Tenant, ap.ID, true, false, utils.NonTransactional)
|
||||
oldRpp, err := dm.GetActionProfile(ctx, ap.Tenant, ap.ID, true, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err = dm.DataDB().SetActionProfileDrv(ap); err != nil {
|
||||
if err = dm.DataDB().SetActionProfileDrv(ctx, ap); err != nil {
|
||||
return err
|
||||
}
|
||||
if withIndex {
|
||||
@@ -2320,13 +2320,13 @@ func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err
|
||||
if oldRpp != nil {
|
||||
oldFiltersIDs = &oldRpp.FilterIDs
|
||||
}
|
||||
if err := updatedIndexes(context.TODO(), dm, utils.CacheActionProfilesFilterIndexes, ap.Tenant,
|
||||
if err := updatedIndexes(ctx, dm, utils.CacheActionProfilesFilterIndexes, ap.Tenant,
|
||||
utils.EmptyString, ap.ID, oldFiltersIDs, ap.FilterIDs, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; itm.Replicate {
|
||||
err = replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
config.CgrConfig().DataDbCfg().RplFiltered,
|
||||
utils.ActionProfilePrefix, ap.TenantID(), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1SetActionProfile,
|
||||
@@ -2338,29 +2338,29 @@ func (dm *DataManager) SetActionProfile(ap *ActionProfile, withIndex bool) (err
|
||||
return
|
||||
}
|
||||
|
||||
func (dm *DataManager) RemoveActionProfile(tenant, id string,
|
||||
func (dm *DataManager) RemoveActionProfile(ctx *context.Context, tenant, id string,
|
||||
transactionID string, withIndex bool) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
oldRpp, err := dm.GetActionProfile(tenant, id, true, false, utils.NonTransactional)
|
||||
oldRpp, err := dm.GetActionProfile(ctx, tenant, id, true, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err = dm.DataDB().RemoveActionProfileDrv(tenant, id); err != nil {
|
||||
if err = dm.DataDB().RemoveActionProfileDrv(ctx, tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
if oldRpp == nil {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
if withIndex {
|
||||
if err = removeItemFromFilterIndex(context.TODO(), dm, utils.CacheActionProfilesFilterIndexes,
|
||||
if err = removeItemFromFilterIndex(ctx, dm, utils.CacheActionProfilesFilterIndexes,
|
||||
tenant, utils.EmptyString, id, oldRpp.FilterIDs); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionProfiles]; itm.Replicate {
|
||||
replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
config.CgrConfig().DataDbCfg().RplFiltered,
|
||||
utils.ActionProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1RemoveActionProfile,
|
||||
|
||||
@@ -725,7 +725,7 @@ func UpdateFilterIndex(ctx *context.Context, dm *DataManager, oldFlt, newFlt *Fi
|
||||
idxSlice := indx.AsSlice()
|
||||
if _, err = ComputeIndexes(ctx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items
|
||||
&idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) {
|
||||
acp, e := dm.GetActionProfile(tnt, id, true, false, utils.NonTransactional)
|
||||
acp, e := dm.GetActionProfile(context.TODO(), tnt, id, true, false, utils.NonTransactional)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
|
||||
@@ -99,9 +99,9 @@ type DataDB interface {
|
||||
GetRateProfileDrv(*context.Context, string, string) (*utils.RateProfile, error)
|
||||
SetRateProfileDrv(*context.Context, *utils.RateProfile) error
|
||||
RemoveRateProfileDrv(*context.Context, string, string) error
|
||||
GetActionProfileDrv(string, string) (*ActionProfile, error)
|
||||
SetActionProfileDrv(*ActionProfile) error
|
||||
RemoveActionProfileDrv(string, string) error
|
||||
GetActionProfileDrv(*context.Context, string, string) (*ActionProfile, error)
|
||||
SetActionProfileDrv(*context.Context, *ActionProfile) error
|
||||
RemoveActionProfileDrv(*context.Context, string, string) error
|
||||
GetAccountDrv(string, string) (*utils.Account, error)
|
||||
SetAccountDrv(profile *utils.Account) error
|
||||
RemoveAccountDrv(string, string) error
|
||||
|
||||
@@ -500,7 +500,7 @@ func (iDB *InternalDB) RemoveRateProfileDrv(ctx *context.Context, tenant, id str
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) {
|
||||
func (iDB *InternalDB) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) {
|
||||
x, ok := Cache.Get(utils.CacheActionProfiles, utils.ConcatenatedKey(tenant, id))
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
@@ -508,13 +508,13 @@ func (iDB *InternalDB) GetActionProfileDrv(tenant, id string) (ap *ActionProfile
|
||||
return x.(*ActionProfile), nil
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetActionProfileDrv(ap *ActionProfile) (err error) {
|
||||
func (iDB *InternalDB) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) {
|
||||
Cache.SetWithoutReplicate(utils.CacheActionProfiles, ap.TenantID(), ap, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) RemoveActionProfileDrv(tenant, id string) (err error) {
|
||||
func (iDB *InternalDB) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
|
||||
Cache.RemoveWithoutReplicate(utils.CacheActionProfiles, utils.ConcatenatedKey(tenant, id),
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
return
|
||||
|
||||
@@ -1382,9 +1382,9 @@ func (ms *MongoStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) {
|
||||
func (ms *MongoStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) {
|
||||
ap = new(ActionProfile)
|
||||
err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
err = ms.query(ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur := ms.getCol(ColApp).FindOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
if err := cur.Decode(ap); err != nil {
|
||||
ap = nil
|
||||
@@ -1398,8 +1398,8 @@ func (ms *MongoStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfil
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetActionProfileDrv(ap *ActionProfile) (err error) {
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
func (ms *MongoStorage) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(ColApp).UpdateOne(sctx, bson.M{"tenant": ap.Tenant, "id": ap.ID},
|
||||
bson.M{"$set": ap},
|
||||
options.Update().SetUpsert(true),
|
||||
@@ -1408,8 +1408,8 @@ func (ms *MongoStorage) SetActionProfileDrv(ap *ActionProfile) (err error) {
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveActionProfileDrv(tenant, id string) (err error) {
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
|
||||
return ms.query(ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
dr, err := ms.getCol(ColApp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id})
|
||||
if dr.DeletedCount == 0 {
|
||||
return utils.ErrNotFound
|
||||
|
||||
@@ -773,7 +773,7 @@ func (rs *RedisStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st
|
||||
return rs.Cmd(nil, redisDEL, utils.RateProfilePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfile, err error) {
|
||||
func (rs *RedisStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *ActionProfile, err error) {
|
||||
var values []byte
|
||||
if err = rs.Cmd(&values, redisGET, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil {
|
||||
return
|
||||
@@ -785,7 +785,7 @@ func (rs *RedisStorage) GetActionProfileDrv(tenant, id string) (ap *ActionProfil
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetActionProfileDrv(ap *ActionProfile) (err error) {
|
||||
func (rs *RedisStorage) SetActionProfileDrv(ctx *context.Context, ap *ActionProfile) (err error) {
|
||||
var result []byte
|
||||
if result, err = rs.ms.Marshal(ap); err != nil {
|
||||
return
|
||||
@@ -793,7 +793,7 @@ func (rs *RedisStorage) SetActionProfileDrv(ap *ActionProfile) (err error) {
|
||||
return rs.Cmd(nil, redisSET, utils.ActionProfilePrefix+utils.ConcatenatedKey(ap.Tenant, ap.ID), string(result))
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveActionProfileDrv(tenant, id string) (err error) {
|
||||
func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
|
||||
return rs.Cmd(nil, redisDEL, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
|
||||
@@ -696,7 +696,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
|
||||
if ap, err = APItoActionProfile(tpAP, tpr.timezone); err != nil {
|
||||
return
|
||||
}
|
||||
if err = tpr.dm.SetActionProfile(ap, true); err != nil {
|
||||
if err = tpr.dm.SetActionProfile(context.TODO(), ap, true); err != nil {
|
||||
return
|
||||
}
|
||||
if verbose {
|
||||
@@ -1043,7 +1043,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
|
||||
log.Print("ActionProfiles:")
|
||||
}
|
||||
for _, tpAp := range tpr.actionProfiles {
|
||||
if err = tpr.dm.RemoveActionProfile(tpAp.Tenant, tpAp.ID,
|
||||
if err = tpr.dm.RemoveActionProfile(context.TODO(), tpAp.Tenant, tpAp.ID,
|
||||
utils.NonTransactional, true); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -634,7 +634,7 @@ func (ldr *Loader) storeLoadedData(loaderType string,
|
||||
}
|
||||
// get IDs so we can reload in cache
|
||||
ids = append(ids, acp.TenantID())
|
||||
if err := ldr.dm.SetActionProfile(acp, true); err != nil {
|
||||
if err := ldr.dm.SetActionProfile(context.TODO(), acp, true); err != nil {
|
||||
return err
|
||||
}
|
||||
cacheArgs[utils.ActionProfileIDs] = ids
|
||||
@@ -969,7 +969,7 @@ func (ldr *Loader) removeLoadedData(loaderType string, lds map[string][]LoaderDa
|
||||
tntIDStruct := utils.NewTenantID(tntID)
|
||||
// get IDs so we can reload in cache
|
||||
ids = append(ids, tntID)
|
||||
if err := ldr.dm.RemoveActionProfile(tntIDStruct.Tenant,
|
||||
if err := ldr.dm.RemoveActionProfile(context.TODO(), tntIDStruct.Tenant,
|
||||
tntIDStruct.ID, utils.NonTransactional, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2513,7 +2513,7 @@ func TestLoaderActionProfile(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
aps, err := ldr.dm.GetActionProfile("cgrates.org", "ONE_TIME_ACT",
|
||||
aps, err := ldr.dm.GetActionProfile(context.Background(), "cgrates.org", "ONE_TIME_ACT",
|
||||
true, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -4185,7 +4185,7 @@ cgrates.org,REM_ACTPROFILE_1
|
||||
Tenant: "cgrates.org",
|
||||
ID: "REM_ACTPROFILE_1",
|
||||
}
|
||||
if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil {
|
||||
if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err != nil {
|
||||
t.Error(err)
|
||||
@@ -4339,7 +4339,7 @@ cgrates.org,REM_ACTPROFILE_s
|
||||
ID: "REM_ACTPROFILE_s",
|
||||
}
|
||||
expectedErr := "NOT_FOUND"
|
||||
if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil {
|
||||
if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err == nil || err.Error() != expectedErr {
|
||||
t.Errorf("Expected %+v, received %+v", expectedErr, err)
|
||||
@@ -4391,7 +4391,7 @@ cgrates.org,REM_ACTPROFILE_s
|
||||
ID: "REM_ACTPROFILE_s",
|
||||
}
|
||||
expectedErr := "NOT_FOUND"
|
||||
if err := ldr.dm.SetActionProfile(actRtPrf, true); err != nil {
|
||||
if err := ldr.dm.SetActionProfile(context.Background(), actRtPrf, true); err != nil {
|
||||
t.Error(err)
|
||||
} else if err := ldr.removeContent(utils.MetaActionProfiles, utils.EmptyString); err == nil || err.Error() != expectedErr {
|
||||
t.Errorf("Expected %+v, received %+v", expectedErr, err)
|
||||
|
||||
@@ -39,17 +39,17 @@ func (m *Migrator) migrateCurrentActionProfiles() (err error) {
|
||||
if len(tntID) < 2 {
|
||||
return fmt.Errorf("Invalid key <%s> when migrating from action profiles", id)
|
||||
}
|
||||
ap, err := m.dmIN.DataManager().GetActionProfile(tntID[0], tntID[1], false, false, utils.NonTransactional)
|
||||
ap, err := m.dmIN.DataManager().GetActionProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ap == nil || m.dryRun {
|
||||
continue
|
||||
}
|
||||
if err := m.dmOut.DataManager().SetActionProfile(ap, true); err != nil {
|
||||
if err := m.dmOut.DataManager().SetActionProfile(context.TODO(), ap, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := m.dmIN.DataManager().RemoveActionProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil {
|
||||
if err := m.dmIN.DataManager().RemoveActionProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil {
|
||||
return err
|
||||
}
|
||||
m.stats[utils.ActionProfiles]++
|
||||
|
||||
Reference in New Issue
Block a user