diff --git a/apis/libadmin_test.go b/apis/libadmin_test.go index 266473d13..20a31276d 100644 --- a/apis/libadmin_test.go +++ b/apis/libadmin_test.go @@ -52,7 +52,7 @@ func TestCallCacheForFilter(t *testing.T) { ID: "TH1", FilterIDs: []string{flt.ID}, } - if err := dm.SetThresholdProfile(th, true); err != nil { + if err := dm.SetThresholdProfile(context.TODO(), th, true); err != nil { t.Fatal(err) } dsp := &engine.DispatcherProfile{ diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ab3b475b2..190cf033d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -155,18 +155,18 @@ func startRPC(server *cores.Server, internalAdminSChan, // internalCdrSChan <- cdrs // case smg := <-internalSMGChan: // internalSMGChan <- smg - // case rls := <-internalRsChan: - // internalRsChan <- rls - // case statS := <-internalStatSChan: - // internalStatSChan <- statS + case rls := <-internalRsChan: + internalRsChan <- rls + case statS := <-internalStatSChan: + internalStatSChan <- statS case admS := <-internalAdminSChan: internalAdminSChan <- admS case attrS := <-internalAttrSChan: internalAttrSChan <- attrS // case chrgS := <-internalChargerSChan: // internalChargerSChan <- chrgS - // case thS := <-internalThdSChan: - // internalThdSChan <- thS + case thS := <-internalThdSChan: + internalThdSChan <- thS // case splS := <-internalSuplSChan: // internalSuplSChan <- splS // case analyzerS := <-internalAnalyzerSChan: diff --git a/engine/datadbmock.go b/engine/datadbmock.go index e60653c25..5c26aa6d3 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -80,43 +80,27 @@ func (dbM *DataDBMock) HasDataDrv(*context.Context, string, string, string) (boo return false, utils.ErrNotImplemented } -func (dbM *DataDBMock) RemoveDestinationDrv(string, string) error { - return utils.ErrNotImplemented -} - -func (dbM *DataDBMock) RemoveReverseDestinationDrv(string, string, string) error { - return utils.ErrNotImplemented -} - -func (dbM *DataDBMock) SetReverseDestinationDrv(string, []string, string) error { - return utils.ErrNotImplemented -} - -func (dbM *DataDBMock) GetReverseDestinationDrv(string, string) ([]string, error) { +func (dbM *DataDBMock) GetResourceProfileDrv(*context.Context, string, string) (*ResourceProfile, error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) GetResourceProfileDrv(string, string) (*ResourceProfile, error) { +func (dbM *DataDBMock) SetResourceProfileDrv(*context.Context, *ResourceProfile) error { + return utils.ErrNotImplemented +} + +func (dbM *DataDBMock) RemoveResourceProfileDrv(*context.Context, string, string) error { + return utils.ErrNotImplemented +} + +func (dbM *DataDBMock) GetResourceDrv(*context.Context, string, string) (*Resource, error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetResourceProfileDrv(*ResourceProfile) error { +func (dbM *DataDBMock) SetResourceDrv(*context.Context, *Resource) error { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemoveResourceProfileDrv(string, string) error { - return utils.ErrNotImplemented -} - -func (dbM *DataDBMock) GetResourceDrv(string, string) (*Resource, error) { - return nil, utils.ErrNotImplemented -} - -func (dbM *DataDBMock) SetResourceDrv(*Resource) error { - return utils.ErrNotImplemented -} - -func (dbM *DataDBMock) RemoveResourceDrv(string, string) error { +func (dbM *DataDBMock) RemoveResourceDrv(*context.Context, string, string) error { return utils.ErrNotImplemented } @@ -147,51 +131,51 @@ func (dbM *DataDBMock) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) { +func (dbM *DataDBMock) GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *StatQueueProfile, err error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { +func (dbM *DataDBMock) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemStatQueueProfileDrv(tenant, id string) (err error) { +func (dbM *DataDBMock) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (dbM *DataDBMock) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { +func (dbM *DataDBMock) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemStatQueueDrv(tenant, id string) (err error) { +func (dbM *DataDBMock) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetThresholdProfileDrv(tenant string, ID string) (tp *ThresholdProfile, err error) { +func (dbM *DataDBMock) GetThresholdProfileDrv(ctx *context.Context, tenant string, ID string) (tp *ThresholdProfile, err error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (dbM *DataDBMock) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemThresholdProfileDrv(tenant, id string) (err error) { +func (dbM *DataDBMock) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { return utils.ErrNotImplemented } -func (dbM *DataDBMock) GetThresholdDrv(string, string) (*Threshold, error) { +func (dbM *DataDBMock) GetThresholdDrv(*context.Context, string, string) (*Threshold, error) { return nil, utils.ErrNotImplemented } -func (dbM *DataDBMock) SetThresholdDrv(*Threshold) error { +func (dbM *DataDBMock) SetThresholdDrv(*context.Context, *Threshold) error { return utils.ErrNotImplemented } -func (dbM *DataDBMock) RemoveThresholdDrv(string, string) error { +func (dbM *DataDBMock) RemoveThresholdDrv(*context.Context, string, string) error { return utils.ErrNotImplemented } diff --git a/engine/libindex.go b/engine/libindex.go index a2d23e2ef..cc7009ee1 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -605,7 +605,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt idxSlice := indx.AsSlice() if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - th, e := dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) + th, e := dm.GetThresholdProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -625,7 +625,7 @@ func UpdateFilterIndex(apiCtx *context.Context, dm *DataManager, oldFlt, newFlt idxSlice := indx.AsSlice() if _, err = ComputeIndexes(apiCtx, dm, newFlt.Tenant, utils.EmptyString, idxItmType, // compute all the indexes for afected items &idxSlice, utils.NonTransactional, func(tnt, id, ctx string) (*[]string, error) { - sq, e := dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) + sq, e := dm.GetStatQueueProfile(apiCtx, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } diff --git a/engine/stats_test.go b/engine/stats_test.go index 670fd7a9b..08ab950fe 100644 --- a/engine/stats_test.go +++ b/engine/stats_test.go @@ -232,21 +232,21 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { } dmSTS.SetFilter(context.Background(), fltrSts3, true) for _, statQueueProfile := range sqps { - dmSTS.SetStatQueueProfile(statQueueProfile, true) + dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) } //Test each statQueueProfile from cache for _, sqp := range sqps { - if tempStat, err := dmSTS.GetStatQueueProfile(sqp.Tenant, + if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqp.Tenant, sqp.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(sqp, tempStat) { t.Errorf("Expecting: %+v, received: %+v", sqp, tempStat) } } - msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, + msq, err := statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[0].Tenant, statsEvs[0].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -258,7 +258,7 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { } else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, + msq, err = statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[1].Tenant, statsEvs[1].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -270,7 +270,7 @@ func TestMatchingStatQueuesForEvent(t *testing.T) { } else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, + msq, err = statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[2].Tenant, statsEvs[2].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -463,14 +463,14 @@ func TestStatQueuesProcessEvent(t *testing.T) { } dmSTS.SetFilter(context.Background(), fltrSts3, true) for _, statQueueProfile := range sqps { - dmSTS.SetStatQueueProfile(statQueueProfile, true) + dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) } //Test each statQueueProfile from cache for _, sqp := range sqps { - if tempStat, err := dmSTS.GetStatQueueProfile(sqp.Tenant, + if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqp.Tenant, sqp.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(sqp, tempStat) { @@ -480,37 +480,37 @@ func TestStatQueuesProcessEvent(t *testing.T) { stq := map[string]string{} reply := []string{} expected := []string{"StatQueueProfile1"} - err := statService.V1ProcessEvent(statsEvs[0], &reply) + err := statService.V1ProcessEvent(context.TODO(), statsEvs[0], &reply) if err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(reply, expected) { t.Errorf("Expecting: %+v, received: %+v", expected, reply) } - err = statService.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[0].Tenant, ID: stqs[0].ID}, &stq) + err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: stqs[0].Tenant, ID: stqs[0].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } expected = []string{"StatQueueProfile2"} - err = statService.V1ProcessEvent(statsEvs[1], &reply) + err = statService.V1ProcessEvent(context.TODO(), statsEvs[1], &reply) if err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(reply, expected) { t.Errorf("Expecting: %+v, received: %+v", expected, reply) } - err = statService.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[1].Tenant, ID: stqs[1].ID}, &stq) + err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: stqs[1].Tenant, ID: stqs[1].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } expected = []string{"StatQueueProfilePrefix"} - err = statService.V1ProcessEvent(statsEvs[2], &reply) + err = statService.V1ProcessEvent(context.TODO(), statsEvs[2], &reply) if err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(reply, expected) { t.Errorf("Expecting: %+v, received: %+v", expected, reply) } - err = statService.V1GetQueueStringMetrics(&utils.TenantID{Tenant: stqs[2].Tenant, ID: stqs[2].ID}, &stq) + err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: stqs[2].Tenant, ID: stqs[2].ID}, &stq) if err != nil { t.Errorf("Error: %+v", err) } @@ -695,14 +695,14 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } dmSTS.SetFilter(context.Background(), fltrSts3, true) for _, statQueueProfile := range sqps { - dmSTS.SetStatQueueProfile(statQueueProfile, true) + dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) } //Test each statQueueProfile from cache for _, sqp := range sqps { - if tempStat, err := dmSTS.GetStatQueueProfile(sqp.Tenant, + if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqp.Tenant, sqp.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(sqp, tempStat) { @@ -710,7 +710,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } } statService.cgrcfg.StatSCfg().IndexedSelects = false - msq, err := statService.matchingStatQueuesForEvent(statsEvs[0].Tenant, statsEvs[0].StatIDs, + msq, err := statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[0].Tenant, statsEvs[0].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[0].Event, utils.MetaOpts: statsEvs[0].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -722,7 +722,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } else if !reflect.DeepEqual(stqs[0].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[0].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[1].Tenant, statsEvs[1].StatIDs, + msq, err = statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[1].Tenant, statsEvs[1].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[1].Event, utils.MetaOpts: statsEvs[1].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -734,7 +734,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) { } else if !reflect.DeepEqual(stqs[1].sqPrfl, msq[0].sqPrfl) { t.Errorf("Expecting: %+v, received: %+v", stqs[1].sqPrfl, msq[0].sqPrfl) } - msq, err = statService.matchingStatQueuesForEvent(statsEvs[2].Tenant, statsEvs[2].StatIDs, + msq, err = statService.matchingStatQueuesForEvent(context.TODO(), statsEvs[2].Tenant, statsEvs[2].StatIDs, utils.MapStorage{utils.MetaReq: statsEvs[2].Event, utils.MetaOpts: statsEvs[2].APIOpts}) if err != nil { t.Errorf("Error: %+v", err) @@ -927,14 +927,14 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { } dmSTS.SetFilter(context.Background(), fltrSts3, true) for _, statQueueProfile := range sqps { - dmSTS.SetStatQueueProfile(statQueueProfile, true) + dmSTS.SetStatQueueProfile(context.TODO(), statQueueProfile, true) } for _, statQueue := range stqs { - dmSTS.SetStatQueue(statQueue, nil, 0, nil, 0, true) + dmSTS.SetStatQueue(context.TODO(), statQueue, nil, 0, nil, 0, true) } //Test each statQueueProfile from cache for _, sqp := range sqps { - if tempStat, err := dmSTS.GetStatQueueProfile(sqp.Tenant, + if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqp.Tenant, sqp.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(sqp, tempStat) { @@ -958,13 +958,13 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { MinItems: 1, } sq := &StatQueue{Tenant: "cgrates.org", ID: "StatQueueProfile3", sqPrfl: sqPrf} - if err := dmSTS.SetStatQueueProfile(sqPrf, true); err != nil { + if err := dmSTS.SetStatQueueProfile(context.TODO(), sqPrf, true); err != nil { t.Error(err) } - if err := dmSTS.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err := dmSTS.SetStatQueue(context.TODO(), sq, nil, 0, nil, 0, true); err != nil { t.Error(err) } - if tempStat, err := dmSTS.GetStatQueueProfile(sqPrf.Tenant, + if tempStat, err := dmSTS.GetStatQueueProfile(context.TODO(), sqPrf.Tenant, sqPrf.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(sqPrf, tempStat) { @@ -977,7 +977,7 @@ func TestStatQueuesV1ProcessEvent(t *testing.T) { reply := []string{} expected := []string{"StatQueueProfile1", "StatQueueProfile3"} expectedRev := []string{"StatQueueProfile3", "StatQueueProfile1"} - if err := statService.V1ProcessEvent(ev, &reply); err != nil { + if err := statService.V1ProcessEvent(context.TODO(), ev, &reply); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(reply, expected) && !reflect.DeepEqual(reply, expectedRev) { t.Errorf("Expecting: %+v, received: %+v", expected, reply) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 6290a5128..236a41dd3 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -179,39 +179,39 @@ func (iDB *InternalDB) HasDataDrv(ctx *context.Context, category, subject, tenan return false, errors.New("Unsupported HasData category") } -func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) { +func (iDB *InternalDB) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rp *ResourceProfile, err error) { if x, ok := Cache.Get(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id)); ok && x != nil { return x.(*ResourceProfile), nil } return nil, utils.ErrNotFound } -func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) { +func (iDB *InternalDB) SetResourceProfileDrv(ctx *context.Context, rp *ResourceProfile) (err error) { Cache.SetWithoutReplicate(utils.CacheResourceProfiles, rp.TenantID(), rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemoveResourceProfileDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetResourceDrv(tenant, id string) (r *Resource, err error) { +func (iDB *InternalDB) GetResourceDrv(ctx *context.Context, tenant, id string) (r *Resource, err error) { if x, ok := Cache.Get(utils.CacheResources, utils.ConcatenatedKey(tenant, id)); ok && x != nil { return x.(*Resource), nil } return nil, utils.ErrNotFound } -func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) { +func (iDB *InternalDB) SetResourceDrv(ctx *context.Context, r *Resource) (err error) { Cache.SetWithoutReplicate(utils.CacheResources, r.TenantID(), r, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemoveResourceDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheResources, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return @@ -225,7 +225,7 @@ func (iDB *InternalDB) AddLoadHistory(*utils.LoadInstance, int, string) error { return nil } -func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { +func (iDB *InternalDB) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) { x, ok := Cache.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -233,26 +233,26 @@ func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *Sta return x.(*StatQueueProfile), nil } -func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { +func (iDB *InternalDB) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { Cache.SetWithoutReplicate(utils.CacheStatQueueProfiles, sq.TenantID(), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemStatQueueProfileDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (iDB *InternalDB) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { x, ok := Cache.Get(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound } return x.(*StatQueue), nil } -func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { +func (iDB *InternalDB) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { if sq == nil { sq, err = ssq.AsStatQueue(iDB.ms) if err != nil { @@ -263,13 +263,13 @@ func (iDB *InternalDB) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemStatQueueDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheStatQueues, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) { +func (iDB *InternalDB) GetThresholdProfileDrv(ctx *context.Context, tenant, id string) (tp *ThresholdProfile, err error) { x, ok := Cache.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -277,19 +277,19 @@ func (iDB *InternalDB) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdP return x.(*ThresholdProfile), nil } -func (iDB *InternalDB) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (iDB *InternalDB) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { Cache.SetWithoutReplicate(utils.CacheThresholdProfiles, tp.TenantID(), tp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemThresholdProfileDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (th *Threshold, err error) { +func (iDB *InternalDB) GetThresholdDrv(ctx *context.Context, tenant, id string) (th *Threshold, err error) { x, ok := Cache.Get(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { return nil, utils.ErrNotFound @@ -297,13 +297,13 @@ func (iDB *InternalDB) GetThresholdDrv(tenant, id string) (th *Threshold, err er return x.(*Threshold), nil } -func (iDB *InternalDB) SetThresholdDrv(th *Threshold) (err error) { +func (iDB *InternalDB) SetThresholdDrv(ctx *context.Context, th *Threshold) (err error) { Cache.SetWithoutReplicate(utils.CacheThresholds, th.TenantID(), th, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } -func (iDB *InternalDB) RemoveThresholdDrv(tenant, id string) (err error) { +func (iDB *InternalDB) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) { Cache.RemoveWithoutReplicate(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id), cacheCommit(utils.NonTransactional), utils.NonTransactional) return diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index bd8927c39..29a9b64d4 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -638,61 +638,6 @@ func (ms *MongoStorage) HasDataDrv(ctx *context.Context, category, subject, tena return has, err } -func (ms *MongoStorage) RemoveDestinationDrv(destID string, - transactionID string) (err error) { - return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { - dr, err := ms.getCol(ColDst).DeleteOne(sctx, bson.M{"key": destID}) - if dr.DeletedCount == 0 { - return utils.ErrNotFound - } - return err - }) -} - -func (ms *MongoStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) (err error) { - return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": prfx}, - bson.M{"$pull": bson.M{"value": dstID}}) - return err - }) -} - -func (ms *MongoStorage) GetReverseDestinationDrv(prefix, transactionID string) (ids []string, err error) { - var result struct { - Key string - Value []string - } - if err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRds).FindOne(sctx, bson.M{"key": prefix}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err - } - return nil - }); err != nil { - return nil, err - } - ids = result.Value - return -} - -func (ms *MongoStorage) SetReverseDestinationDrv(destID string, prefixes []string, transactionID string) (err error) { - for _, p := range prefixes { - if err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRds).UpdateOne(sctx, bson.M{"key": p}, - bson.M{"$addToSet": bson.M{"value": destID}}, - options.Update().SetUpsert(true), - ) - return err - }); err != nil { - return err - } - } - return nil -} - // Limit will only retrieve the last n items out of history, newest first func (ms *MongoStorage) GetLoadHistory(limit int, skipCache bool, transactionID string) (loadInsts []*utils.LoadInstance, err error) { @@ -798,7 +743,7 @@ func (ms *MongoStorage) AddLoadHistory(ldInst *utils.LoadInstance, return err } -func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) { +func (ms *MongoStorage) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rp *ResourceProfile, err error) { rp = new(ResourceProfile) err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColRsP).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) @@ -814,7 +759,7 @@ func (ms *MongoStorage) GetResourceProfileDrv(tenant, id string) (rp *ResourcePr return } -func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { +func (ms *MongoStorage) SetResourceProfileDrv(ctx *context.Context, rp *ResourceProfile) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColRsP).UpdateOne(sctx, bson.M{"tenant": rp.Tenant, "id": rp.ID}, bson.M{"$set": rp}, @@ -824,7 +769,7 @@ func (ms *MongoStorage) SetResourceProfileDrv(rp *ResourceProfile) (err error) { }) } -func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) { +func (ms *MongoStorage) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColRsP).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { @@ -834,7 +779,7 @@ func (ms *MongoStorage) RemoveResourceProfileDrv(tenant, id string) (err error) }) } -func (ms *MongoStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { +func (ms *MongoStorage) GetResourceDrv(ctx *context.Context, tenant, id string) (r *Resource, err error) { r = new(Resource) err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColRes).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) @@ -850,7 +795,7 @@ func (ms *MongoStorage) GetResourceDrv(tenant, id string) (r *Resource, err erro return } -func (ms *MongoStorage) SetResourceDrv(r *Resource) (err error) { +func (ms *MongoStorage) SetResourceDrv(ctx *context.Context, r *Resource) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColRes).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, @@ -871,7 +816,7 @@ func (ms *MongoStorage) RemoveResourceDrv(ctx *context.Context, tenant, id strin } // GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB -func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { +func (ms *MongoStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) { sq = new(StatQueueProfile) err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColSqp).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) @@ -888,7 +833,7 @@ func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St } // SetStatQueueProfileDrv stores a StatsQueue into DataDB -func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { +func (ms *MongoStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColSqp).UpdateOne(sctx, bson.M{"tenant": sq.Tenant, "id": sq.ID}, bson.M{"$set": sq}, @@ -899,7 +844,7 @@ func (ms *MongoStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) } // RemStatQueueProfileDrv removes a StatsQueue from dataDB -func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { +func (ms *MongoStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColSqp).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { @@ -910,7 +855,7 @@ func (ms *MongoStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { } // GetStatQueueDrv retrieves a StoredStatQueue -func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (ms *MongoStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { ssq := new(StoredStatQueue) if err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColSqs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) @@ -930,7 +875,7 @@ func (ms *MongoStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e } // SetStatQueueDrv stores the metrics for a StoredStatQueue -func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { +func (ms *MongoStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { if ssq == nil { if ssq, err = NewStoredStatQueue(sq, ms.ms); err != nil { return @@ -946,7 +891,7 @@ func (ms *MongoStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (er } // RemStatQueueDrv removes stored metrics for a StoredStatQueue -func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { +func (ms *MongoStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColSqs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { @@ -957,7 +902,7 @@ func (ms *MongoStorage) RemStatQueueDrv(tenant, id string) (err error) { } // GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB -func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) { +func (ms *MongoStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) { tp = new(ThresholdProfile) err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColTps).FindOne(sctx, bson.M{"tenant": tenant, "id": ID}) @@ -974,7 +919,7 @@ func (ms *MongoStorage) GetThresholdProfileDrv(tenant, ID string) (tp *Threshold } // SetThresholdProfileDrv stores a ThresholdProfile into DataDB -func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (ms *MongoStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColTps).UpdateOne(sctx, bson.M{"tenant": tp.Tenant, "id": tp.ID}, bson.M{"$set": tp}, options.Update().SetUpsert(true), @@ -984,7 +929,7 @@ func (ms *MongoStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) } // RemoveThresholdProfile removes a ThresholdProfile from dataDB/cache -func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { +func (ms *MongoStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColTps).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { @@ -994,7 +939,7 @@ func (ms *MongoStorage) RemThresholdProfileDrv(tenant, id string) (err error) { }) } -func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { +func (ms *MongoStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (r *Threshold, err error) { r = new(Threshold) err = ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { cur := ms.getCol(ColThs).FindOne(sctx, bson.M{"tenant": tenant, "id": id}) @@ -1010,7 +955,7 @@ func (ms *MongoStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err er return } -func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { +func (ms *MongoStorage) SetThresholdDrv(ctx *context.Context, r *Threshold) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { _, err = ms.getCol(ColThs).UpdateOne(sctx, bson.M{"tenant": r.Tenant, "id": r.ID}, bson.M{"$set": r}, @@ -1020,7 +965,7 @@ func (ms *MongoStorage) SetThresholdDrv(r *Threshold) (err error) { }) } -func (ms *MongoStorage) RemoveThresholdDrv(tenant, id string) (err error) { +func (ms *MongoStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) { return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) { dr, err := ms.getCol(ColThs).DeleteOne(sctx, bson.M{"tenant": tenant, "id": id}) if dr.DeletedCount == 0 { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 7449b1832..b3067d0ec 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -333,7 +333,7 @@ func (rs *RedisStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize return } -func (rs *RedisStorage) GetResourceProfileDrv(tenant, id string) (rsp *ResourceProfile, err error) { +func (rs *RedisStorage) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rsp *ResourceProfile, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -345,7 +345,7 @@ func (rs *RedisStorage) GetResourceProfileDrv(tenant, id string) (rsp *ResourceP return } -func (rs *RedisStorage) SetResourceProfileDrv(rsp *ResourceProfile) (err error) { +func (rs *RedisStorage) SetResourceProfileDrv(ctx *context.Context, rsp *ResourceProfile) (err error) { var result []byte if result, err = rs.ms.Marshal(rsp); err != nil { return @@ -353,11 +353,11 @@ func (rs *RedisStorage) SetResourceProfileDrv(rsp *ResourceProfile) (err error) return rs.Cmd(nil, redisSET, utils.ResourceProfilesPrefix+rsp.TenantID(), string(result)) } -func (rs *RedisStorage) RemoveResourceProfileDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id)) } -func (rs *RedisStorage) GetResourceDrv(tenant, id string) (r *Resource, err error) { +func (rs *RedisStorage) GetResourceDrv(ctx *context.Context, tenant, id string) (r *Resource, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -369,7 +369,7 @@ func (rs *RedisStorage) GetResourceDrv(tenant, id string) (r *Resource, err erro return } -func (rs *RedisStorage) SetResourceDrv(r *Resource) (err error) { +func (rs *RedisStorage) SetResourceDrv(ctx *context.Context, r *Resource) (err error) { var result []byte if result, err = rs.ms.Marshal(r); err != nil { return @@ -377,7 +377,7 @@ func (rs *RedisStorage) SetResourceDrv(r *Resource) (err error) { return rs.Cmd(nil, redisSET, utils.ResourcesPrefix+r.TenantID(), string(result)) } -func (rs *RedisStorage) RemoveResourceDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id)) } @@ -428,7 +428,7 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) { } // GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB -func (rs *RedisStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { +func (rs *RedisStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -441,7 +441,7 @@ func (rs *RedisStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *St } // SetStatQueueProfileDrv stores a StatsQueue into DataDB -func (rs *RedisStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { +func (rs *RedisStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { var result []byte if result, err = rs.ms.Marshal(sq); err != nil { return @@ -450,12 +450,12 @@ func (rs *RedisStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) } // RemStatQueueProfileDrv removes a StatsQueue from dataDB -func (rs *RedisStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.StatQueueProfilePrefix+utils.ConcatenatedKey(tenant, id)) } // GetStatQueueDrv retrieves the stored metrics for a StatsQueue -func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) { +func (rs *RedisStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -472,7 +472,7 @@ func (rs *RedisStorage) GetStatQueueDrv(tenant, id string) (sq *StatQueue, err e } // SetStatQueueDrv stores the metrics for a StatsQueue -func (rs *RedisStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) { +func (rs *RedisStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { if ssq == nil { if ssq, err = NewStoredStatQueue(sq, rs.ms); err != nil { return @@ -486,12 +486,12 @@ func (rs *RedisStorage) SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (er } // RemStatQueueDrv removes a StatsQueue -func (rs *RedisStorage) RemStatQueueDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.StatQueuePrefix+utils.ConcatenatedKey(tenant, id)) } // GetThresholdProfileDrv retrieves a ThresholdProfile from dataDB -func (rs *RedisStorage) GetThresholdProfileDrv(tenant, ID string) (tp *ThresholdProfile, err error) { +func (rs *RedisStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, ID)); err != nil { return @@ -504,7 +504,7 @@ func (rs *RedisStorage) GetThresholdProfileDrv(tenant, ID string) (tp *Threshold } // SetThresholdProfileDrv stores a ThresholdProfile into DataDB -func (rs *RedisStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { +func (rs *RedisStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { var result []byte if result, err = rs.ms.Marshal(tp); err != nil { return @@ -513,11 +513,11 @@ func (rs *RedisStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) } // RemThresholdProfileDrv removes a ThresholdProfile from dataDB/cache -func (rs *RedisStorage) RemThresholdProfileDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.ThresholdProfilePrefix+utils.ConcatenatedKey(tenant, id)) } -func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err error) { +func (rs *RedisStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (r *Threshold, err error) { var values []byte if err = rs.Cmd(&values, redisGET, utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id)); err != nil { return @@ -529,7 +529,7 @@ func (rs *RedisStorage) GetThresholdDrv(tenant, id string) (r *Threshold, err er return } -func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) { +func (rs *RedisStorage) SetThresholdDrv(ctx *context.Context, r *Threshold) (err error) { var result []byte if result, err = rs.ms.Marshal(r); err != nil { return @@ -537,7 +537,7 @@ func (rs *RedisStorage) SetThresholdDrv(r *Threshold) (err error) { return rs.Cmd(nil, redisSET, utils.ThresholdPrefix+utils.ConcatenatedKey(r.Tenant, r.ID), string(result)) } -func (rs *RedisStorage) RemoveThresholdDrv(tenant, id string) (err error) { +func (rs *RedisStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) { return rs.Cmd(nil, redisDEL, utils.ThresholdPrefix+utils.ConcatenatedKey(tenant, id)) } diff --git a/engine/thresholds_test.go b/engine/thresholds_test.go index 289f32cd9..9a602e721 100644 --- a/engine/thresholds_test.go +++ b/engine/thresholds_test.go @@ -154,13 +154,13 @@ func TestThresholdsCache(t *testing.T) { } dmTH.SetFilter(context.Background(), fltrTh3, true) for _, th := range tPrfls { - if err = dmTH.SetThresholdProfile(th, true); err != nil { + if err = dmTH.SetThresholdProfile(context.TODO(), th, true); err != nil { t.Errorf("Error: %+v", err) } } //Test each threshold profile from cache for _, th := range tPrfls { - if temptTh, err := dmTH.GetThresholdProfile(th.Tenant, + if temptTh, err := dmTH.GetThresholdProfile(context.TODO(), th.Tenant, th.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(th, temptTh) { @@ -168,13 +168,13 @@ func TestThresholdsCache(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(context.TODO(), th, 0, true); err != nil { t.Errorf("Error: %+v", err) } } //Test each threshold profile from cache for _, th := range ths { - if temptTh, err := dmTH.GetThreshold(th.Tenant, + if temptTh, err := dmTH.GetThreshold(context.TODO(), th.Tenant, th.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(th, temptTh) { @@ -327,13 +327,13 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { dmTH.SetFilter(context.TODO(), fltrTh2, true) dmTH.SetFilter(context.TODO(), fltrTh3, true) for _, th := range tPrfls { - if err = dmTH.SetThresholdProfile(th, true); err != nil { + if err = dmTH.SetThresholdProfile(context.TODO(), th, true); err != nil { t.Errorf("Error: %+v", err) } } //Test each threshold profile from cache for _, th := range tPrfls { - if temptTh, err := dmTH.GetThresholdProfile(th.Tenant, + if temptTh, err := dmTH.GetThresholdProfile(context.TODO(), th.Tenant, th.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(th, temptTh) { @@ -341,20 +341,20 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { } } for _, th := range ths { - if err = dmTH.SetThreshold(th, 0, true); err != nil { + if err = dmTH.SetThreshold(context.TODO(), th, 0, true); err != nil { t.Errorf("Error: %+v", err) } } //Test each threshold profile from cache for _, th := range ths { - if temptTh, err := dmTH.GetThreshold(th.Tenant, + if temptTh, err := dmTH.GetThreshold(context.TODO(), th.Tenant, th.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(th, temptTh) { t.Errorf("Expecting: %+v, received: %+v", th, temptTh) } } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { + if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[0].Tenant, argsGetThresholds[0]); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(ths[0].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Tenant, thMatched[0].Tenant) @@ -364,7 +364,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", ths[0].Hits, thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { + if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[1].Tenant, argsGetThresholds[1]); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(ths[1].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Tenant, thMatched[0].Tenant) @@ -374,7 +374,7 @@ func TestThresholdsmatchingThresholdsForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", ths[1].Hits, thMatched[0].Hits) } - if thMatched, err := thServ.matchingThresholdsForEvent(argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { + if thMatched, err := thServ.matchingThresholdsForEvent(context.TODO(), argsGetThresholds[2].Tenant, argsGetThresholds[2]); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(ths[2].Tenant, thMatched[0].Tenant) { t.Errorf("Expecting: %+v, received: %+v", ths[2].Tenant, thMatched[0].Tenant) diff --git a/engine/tpreader.go b/engine/tpreader.go index 573272175..60c13358d 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -409,7 +409,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { if rsp, err = APItoResource(tpRsp, tpr.timezone); err != nil { return } - if err = tpr.dm.SetResourceProfile(rsp, true); err != nil { + if err = tpr.dm.SetResourceProfile(context.TODO(), rsp, true); err != nil { return } if verbose { @@ -440,7 +440,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { } } // for non stored we do not save the resource - if err = tpr.dm.SetResource( + if err = tpr.dm.SetResource(context.TODO(), &Resource{ Tenant: rTid.Tenant, ID: rTid.ID, @@ -463,7 +463,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { if st, err = APItoStats(tpST, tpr.timezone); err != nil { return } - if err = tpr.dm.SetStatQueueProfile(st, true); err != nil { + if err = tpr.dm.SetStatQueueProfile(context.TODO(), st, true); err != nil { return } if verbose { @@ -505,7 +505,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { } } // for non stored we do not save the metrics - if err = tpr.dm.SetStatQueue(sq, metrics, + if err = tpr.dm.SetStatQueue(context.TODO(), sq, metrics, tpr.sqProfiles[*sqTntID].MinItems, ttl, tpr.sqProfiles[*sqTntID].QueueLength, !tpr.sqProfiles[*sqTntID].Stored); err != nil { @@ -526,7 +526,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { if th, err = APItoThresholdProfile(tpTH, tpr.timezone); err != nil { return } - if err = tpr.dm.SetThresholdProfile(th, true); err != nil { + if err = tpr.dm.SetThresholdProfile(context.TODO(), th, true); err != nil { return } if verbose { @@ -546,7 +546,7 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { return } } - if err = tpr.dm.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}, minSleep, false); err != nil { + if err = tpr.dm.SetThreshold(context.TODO(), &Threshold{Tenant: thd.Tenant, ID: thd.ID}, minSleep, false); err != nil { return } if verbose { @@ -852,7 +852,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("ResourceProfiles:") } for _, tpRsp := range tpr.resProfiles { - if err = tpr.dm.RemoveResourceProfile(tpRsp.Tenant, tpRsp.ID, utils.NonTransactional, true); err != nil { + if err = tpr.dm.RemoveResourceProfile(context.TODO(), tpRsp.Tenant, tpRsp.ID, utils.NonTransactional, true); err != nil { return } if verbose { @@ -863,7 +863,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("Resources:") } for _, rTid := range tpr.resources { - if err = tpr.dm.RemoveResource(rTid.Tenant, rTid.ID, utils.NonTransactional); err != nil { + if err = tpr.dm.RemoveResource(context.TODO(), rTid.Tenant, rTid.ID, utils.NonTransactional); err != nil { return } if verbose { @@ -874,7 +874,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("StatQueueProfiles:") } for _, tpST := range tpr.sqProfiles { - if err = tpr.dm.RemoveStatQueueProfile(tpST.Tenant, tpST.ID, utils.NonTransactional, true); err != nil { + if err = tpr.dm.RemoveStatQueueProfile(context.TODO(), tpST.Tenant, tpST.ID, utils.NonTransactional, true); err != nil { return } if verbose { @@ -885,7 +885,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("StatQueues:") } for _, sqTntID := range tpr.statQueues { - if err = tpr.dm.RemoveStatQueue(sqTntID.Tenant, sqTntID.ID, utils.NonTransactional); err != nil { + if err = tpr.dm.RemoveStatQueue(context.TODO(), sqTntID.Tenant, sqTntID.ID, utils.NonTransactional); err != nil { return } if verbose { @@ -896,7 +896,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("ThresholdProfiles:") } for _, tpTH := range tpr.thProfiles { - if err = tpr.dm.RemoveThresholdProfile(tpTH.Tenant, tpTH.ID, utils.NonTransactional, true); err != nil { + if err = tpr.dm.RemoveThresholdProfile(context.TODO(), tpTH.Tenant, tpTH.ID, utils.NonTransactional, true); err != nil { return } if verbose { @@ -907,7 +907,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("Thresholds:") } for _, thd := range tpr.thresholds { - if err = tpr.dm.RemoveThreshold(thd.Tenant, thd.ID, utils.NonTransactional); err != nil { + if err = tpr.dm.RemoveThreshold(context.TODO(), thd.Tenant, thd.ID, utils.NonTransactional); err != nil { return } if verbose { diff --git a/engine/z_resources_test.go b/engine/z_resources_test.go index a051c2397..73bfb29ab 100644 --- a/engine/z_resources_test.go +++ b/engine/z_resources_test.go @@ -793,19 +793,19 @@ func TestResourceAllocateResource(t *testing.T) { rs.clearUsage(ru2.ID) ru1.ExpiryTime = time.Now().Add(time.Second) ru2.ExpiryTime = time.Now().Add(time.Second) - if alcMessage, err := rs.allocateResource(ru1, false); err != nil { + if alcMessage, err := rs.allocateResource(context.TODO(), ru1, false); err != nil { t.Error(err.Error()) } else { if alcMessage != "ALLOC" { t.Errorf("Wrong allocation message: %v", alcMessage) } } - if _, err := rs.allocateResource(ru2, false); err != utils.ErrResourceUnavailable { + if _, err := rs.allocateResource(context.TODO(), ru2, false); err != utils.ErrResourceUnavailable { t.Error("Did not receive " + utils.ErrResourceUnavailable.Error() + " error") } rs[0].rPrf.Limit = 1 rs[1].rPrf.Limit = 4 - if alcMessage, err := rs.allocateResource(ru1, false); err != nil { + if alcMessage, err := rs.allocateResource(context.TODO(), ru1, false); err != nil { t.Error(err.Error()) } else { if alcMessage != "ALLOC" { @@ -813,7 +813,7 @@ func TestResourceAllocateResource(t *testing.T) { } } - if alcMessage, err := rs.allocateResource(ru2, false); err != nil { + if alcMessage, err := rs.allocateResource(context.TODO(), ru2, false); err != nil { t.Error(err.Error()) } else { if alcMessage != "RL2" { @@ -822,7 +822,7 @@ func TestResourceAllocateResource(t *testing.T) { } ru2.Units = 0 - if _, err := rs.allocateResource(ru2, false); err != nil { + if _, err := rs.allocateResource(context.TODO(), ru2, false); err != nil { t.Error(err) } } @@ -891,10 +891,10 @@ func TestResourceV1AuthorizeResourceMissingStruct(t *testing.T) { }, Units: 20, } - if err := resService.V1AuthorizeResources(argsMissingTenant, reply); err != nil && err.Error() != "MANDATORY_IE_MISSING: [Event]" { + if err := resService.V1AuthorizeResources(context.TODO(), argsMissingTenant, reply); err != nil && err.Error() != "MANDATORY_IE_MISSING: [Event]" { t.Error(err.Error()) } - if err := resService.V1AuthorizeResources(argsMissingUsageID, reply); err != nil && err.Error() != "MANDATORY_IE_MISSING: [Event]" { + if err := resService.V1AuthorizeResources(context.TODO(), argsMissingUsageID, reply); err != nil && err.Error() != "MANDATORY_IE_MISSING: [Event]" { t.Error(err.Error()) } } @@ -1029,14 +1029,14 @@ func TestResourceAddResourceProfile(t *testing.T) { }, } for _, resProfile := range resprf { - dmRES.SetResourceProfile(resProfile, true) + dmRES.SetResourceProfile(context.TODO(), resProfile, true) } for _, res := range resourceTest { - dmRES.SetResource(res, nil, 0, true) + dmRES.SetResource(context.TODO(), res, nil, 0, true) } //Test each resourceProfile from cache for _, resPrf := range resprf { - if tempRes, err := dmRES.GetResourceProfile(resPrf.Tenant, + if tempRes, err := dmRES.GetResourceProfile(context.TODO(), resPrf.Tenant, resPrf.ID, true, false, utils.NonTransactional); err != nil { t.Errorf("Error: %+v", err) } else if !reflect.DeepEqual(resPrf, tempRes) { @@ -1213,12 +1213,12 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { } timeDurationExample := 10 * time.Second for _, resProfile := range resprf { - dmRES.SetResourceProfile(resProfile, true) + dmRES.SetResourceProfile(context.TODO(), resProfile, true) } for _, res := range resourceTest { - dmRES.SetResource(res, nil, 0, true) + dmRES.SetResource(context.TODO(), res, nil, 0, true) } - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceMatchingResourcesForEvent1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -1231,7 +1231,7 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[0].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[1].Tenant, resEvs[1], + mres, err = resService.matchingResourcesForEvent(context.TODO(), resEvs[1].Tenant, resEvs[1], "TestResourceMatchingResourcesForEvent2", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -1244,7 +1244,7 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[1].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[2].Tenant, resEvs[2], + mres, err = resService.matchingResourcesForEvent(context.TODO(), resEvs[2].Tenant, resEvs[2], "TestResourceMatchingResourcesForEvent3", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -1431,13 +1431,13 @@ func TestResourceUsageTTLCase1(t *testing.T) { resprf[0].UsageTTL = 0 resourceTest[0].rPrf = resprf[0] resourceTest[0].ttl = &timeDurationExample - if err := dmRES.SetResourceProfile(resprf[0], true); err != nil { + if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil { t.Error(err) } - if err := dmRES.SetResource(resourceTest[0], nil, 0, true); err != nil { + if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceUsageTTLCase1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -1624,13 +1624,13 @@ func TestResourceUsageTTLCase2(t *testing.T) { resprf[0].UsageTTL = 0 resourceTest[0].rPrf = resprf[0] resourceTest[0].ttl = &resprf[0].UsageTTL - if err := dmRES.SetResourceProfile(resprf[0], true); err != nil { + if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil { t.Error(err) } - if err := dmRES.SetResource(resourceTest[0], nil, 0, true); err != nil { + if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceUsageTTLCase2", nil) if err != nil { t.Errorf("Error: %+v", err) @@ -1817,13 +1817,13 @@ func TestResourceUsageTTLCase3(t *testing.T) { resprf[0].UsageTTL = 0 resourceTest[0].rPrf = resprf[0] resourceTest[0].ttl = nil - if err := dmRES.SetResourceProfile(resprf[0], true); err != nil { + if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil { t.Error(err) } - if err := dmRES.SetResource(resourceTest[0], nil, 0, true); err != nil { + if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceUsageTTLCase3", utils.DurationPointer(0)) if err != nil { t.Errorf("Error: %+v", err) @@ -2011,13 +2011,13 @@ func TestResourceUsageTTLCase4(t *testing.T) { resprf[0].UsageTTL = 5 resourceTest[0].rPrf = resprf[0] resourceTest[0].ttl = &timeDurationExample - if err := dmRES.SetResourceProfile(resprf[0], true); err != nil { + if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil { t.Error(err) } - if err := dmRES.SetResource(resourceTest[0], nil, 0, true); err != nil { + if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil { t.Error(err) } - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceUsageTTLCase4", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -2398,13 +2398,13 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { } timeDurationExample := 10 * time.Second for _, resProfile := range resprf { - dmRES.SetResourceProfile(resProfile, true) + dmRES.SetResourceProfile(context.TODO(), resProfile, true) } for _, res := range resourceTest { - dmRES.SetResource(res, nil, 0, true) + dmRES.SetResource(context.TODO(), res, nil, 0, true) } resService.cgrcfg.ResourceSCfg().IndexedSelects = false - mres, err := resService.matchingResourcesForEvent(resEvs[0].Tenant, resEvs[0], + mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0], "TestResourceMatchWithIndexFalse1", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -2418,7 +2418,7 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[0].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[1].Tenant, resEvs[1], + mres, err = resService.matchingResourcesForEvent(context.TODO(), resEvs[1].Tenant, resEvs[1], "TestResourceMatchWithIndexFalse2", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -2431,7 +2431,7 @@ func TestResourceMatchWithIndexFalse(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", resourceTest[1].rPrf, mres[0].rPrf) } - mres, err = resService.matchingResourcesForEvent(resEvs[2].Tenant, resEvs[2], + mres, err = resService.matchingResourcesForEvent(context.TODO(), resEvs[2].Tenant, resEvs[2], "TestResourceMatchWithIndexFalse3", &timeDurationExample) if err != nil { t.Errorf("Error: %+v", err) @@ -2576,10 +2576,10 @@ func TestResourceCaching(t *testing.T) { } for _, resProfile := range resprf { - dmRES.SetResourceProfile(resProfile, true) + dmRES.SetResourceProfile(context.TODO(), resProfile, true) } for _, res := range resourceTest { - dmRES.SetResource(res, nil, 0, true) + dmRES.SetResource(context.TODO(), res, nil, 0, true) } //clear the cache Cache.Clear(nil) @@ -2631,7 +2631,7 @@ func TestResourceCaching(t *testing.T) { "Destination": "3002"}, } - mres, err := resService.matchingResourcesForEvent(ev.Tenant, ev, + mres, err := resService.matchingResourcesForEvent(context.TODO(), ev.Tenant, ev, "TestResourceCaching", nil) if err != nil { t.Errorf("Error: %+v", err) @@ -2922,10 +2922,10 @@ func TestResourceAllocateResourceOtherDB(t *testing.T) { dm := NewDataManager(NewInternalDB(nil, nil, true), cfg.CacheCfg(), nil) fltS := NewFilterS(cfg, nil, dm) rs := NewResourceService(dm, cfg, fltS, nil) - if err := dm.SetResourceProfile(rProf, true); err != nil { + if err := dm.SetResourceProfile(context.TODO(), rProf, true); err != nil { t.Fatal(err) } - if err := dm.SetResource(&Resource{ + if err := dm.SetResource(context.TODO(), &Resource{ Tenant: "cgrates.org", ID: "RL_DB", Usages: map[string]*ResourceUsage{ @@ -2942,7 +2942,7 @@ func TestResourceAllocateResourceOtherDB(t *testing.T) { } var reply string exp := rProf.ID - if err := rs.V1AllocateResources(utils.ArgRSv1ResourceUsage{ + if err := rs.V1AllocateResources(context.TODO(), utils.ArgRSv1ResourceUsage{ CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", ID: "ef0f554", @@ -3008,7 +3008,7 @@ func TestResourcesAllocateResourceErrRsUnavailable(t *testing.T) { ru := &ResourceUsage{} experr := utils.ErrResourceUnavailable - rcv, err := rs.allocateResource(ru, false) + rcv, err := rs.allocateResource(context.TODO(), ru, false) if err == nil || !errors.Is(err, experr) { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) @@ -3045,7 +3045,7 @@ func TestResourcesAllocateResourceEmptyConfiguration(t *testing.T) { } experr := fmt.Sprintf("empty configuration for resourceID: %s", rs[0].TenantID()) - rcv, err := rs.allocateResource(ru, false) + rcv, err := rs.allocateResource(context.TODO(), ru, false) if err == nil || err.Error() != experr { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) @@ -3086,7 +3086,7 @@ func TestResourcesAllocateResourceDryRun(t *testing.T) { } exp := "ResGroup1" - rcv, err := rs.allocateResource(ru, true) + rcv, err := rs.allocateResource(context.TODO(), ru, true) if err != nil { t.Errorf("\nexpected nil, got %+v", err) @@ -3123,7 +3123,7 @@ func TestResourcesShutdown(t *testing.T) { utils.ResourceS), } exp := utils.StringSet{} - rS.Shutdown() + rS.Shutdown(context.TODO()) if !reflect.DeepEqual(rS.storedResources, exp) { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", @@ -3184,7 +3184,7 @@ func TestResourcesStoreResources(t *testing.T) { "Res1": struct{}{}, }, } - rS.storeResources() + rS.storeResources(context.TODO()) if !reflect.DeepEqual(rS, exp) { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", exp, rS) @@ -3203,7 +3203,7 @@ func TestResourcesStoreResourceNotDirty(t *testing.T) { dirty: utils.BoolPointer(false), } - err := rS.StoreResource(r) + err := rS.StoreResource(context.TODO(), r) if err != nil { t.Errorf("\nexpected nil, received %+v", err) @@ -3219,7 +3219,7 @@ func TestResourcesStoreResourceOK(t *testing.T) { dirty: utils.BoolPointer(true), } - err := rS.StoreResource(r) + err := rS.StoreResource(context.TODO(), r) if err != nil { t.Errorf("\nexpected nil, received %+v", err) @@ -3272,7 +3272,7 @@ func TestResourcesAllocateResourceEmptyKey(t *testing.T) { ru := &ResourceUsage{} exp := "allocation msg" - rcv, err := rs.allocateResource(ru, false) + rcv, err := rs.allocateResource(context.TODO(), ru, false) if err != nil { t.Errorf("\nexpected nil, received %+v", err) @@ -3294,7 +3294,7 @@ func TestResourcesProcessThresholdsNoConns(t *testing.T) { } opts := map[string]interface{}{} - err := rS.processThresholds(r, opts) + err := rS.processThresholds(context.TODO(), r, opts) if err != nil { t.Errorf("\nexpected nil, received %+v", err) @@ -3355,7 +3355,7 @@ func TestResourcesProcessThresholdsOK(t *testing.T) { }, } - err := rS.processThresholds(r, nil) + err := rS.processThresholds(context.TODO(), r, nil) if err != nil { t.Errorf("\nexpected nil, received %+v", err) @@ -3430,7 +3430,7 @@ func TestResourcesProcessThresholdsCallErr(t *testing.T) { } experr := utils.ErrExists - err := rS.processThresholds(r, nil) + err := rS.processThresholds(context.TODO(), r, nil) if err == nil || err != experr { t.Errorf("\nexpected: <%+v>, \nreceived: <%+v>", experr, err) @@ -3456,7 +3456,7 @@ func TestResourcesProcessThresholdsThdConnMetaNone(t *testing.T) { } opts := map[string]interface{}{} - err := rS.processThresholds(r, opts) + err := rS.processThresholds(context.TODO(), r, opts) if err != nil { t.Errorf("\nexpected nil, received: %+v", err) diff --git a/loaders/lib_test.go b/loaders/lib_test.go index d7c51c393..a7dacde03 100644 --- a/loaders/lib_test.go +++ b/loaders/lib_test.go @@ -939,7 +939,7 @@ type dataDBMockError struct { } //For Threshold -func (dbM *dataDBMockError) RemThresholdProfileDrv(tenant, id string) (err error) { +func (dbM *dataDBMockError) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { return } @@ -948,11 +948,11 @@ func (dbM *dataDBMockError) SetIndexesDrv(ctx *context.Context, idxItmType, tntC return } -func (dbM *dataDBMockError) RemoveThresholdDrv(string, string) error { +func (dbM *dataDBMockError) RemoveThresholdDrv(*context.Context, string, string) error { return utils.ErrNoDatabaseConn } -func (dbM *dataDBMockError) GetThresholdProfileDrv(tenant string, ID string) (tp *engine.ThresholdProfile, err error) { +func (dbM *dataDBMockError) GetThresholdProfileDrv(ctx *context.Context, tenant string, ID string) (tp *engine.ThresholdProfile, err error) { expThresholdPrf := &engine.ThresholdProfile{ Tenant: "cgrates.org", ID: "REM_THRESHOLDS_1", @@ -960,11 +960,11 @@ func (dbM *dataDBMockError) GetThresholdProfileDrv(tenant string, ID string) (tp return expThresholdPrf, nil } -func (dbM *dataDBMockError) SetThresholdProfileDrv(tp *engine.ThresholdProfile) (err error) { +func (dbM *dataDBMockError) SetThresholdProfileDrv(ctx *context.Context, tp *engine.ThresholdProfile) (err error) { return } -func (dbM *dataDBMockError) GetThresholdDrv(string, string) (*engine.Threshold, error) { +func (dbM *dataDBMockError) GetThresholdDrv(*context.Context, string, string) (*engine.Threshold, error) { return nil, utils.ErrNoDatabaseConn } @@ -973,40 +973,40 @@ func (dbM *dataDBMockError) HasDataDrv(*context.Context, string, string, string) } //For StatQueue -func (dbM *dataDBMockError) GetStatQueueProfileDrv(tenant string, ID string) (sq *engine.StatQueueProfile, err error) { +func (dbM *dataDBMockError) GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *engine.StatQueueProfile, err error) { return nil, nil } -func (dbM *dataDBMockError) RemStatQueueProfileDrv(tenant, id string) (err error) { +func (dbM *dataDBMockError) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { return nil } -func (dbM *dataDBMockError) RemStatQueueDrv(tenant, id string) (err error) { +func (dbM *dataDBMockError) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { return utils.ErrNoDatabaseConn } -func (dbM *dataDBMockError) GetStatQueueDrv(tenant, id string) (sq *engine.StatQueue, err error) { +func (dbM *dataDBMockError) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *engine.StatQueue, err error) { return nil, utils.ErrNoDatabaseConn } -func (dbM *dataDBMockError) SetStatQueueDrv(ssq *engine.StoredStatQueue, sq *engine.StatQueue) (err error) { +func (dbM *dataDBMockError) SetStatQueueDrv(ctx *context.Context, ssq *engine.StoredStatQueue, sq *engine.StatQueue) (err error) { return utils.ErrNoDatabaseConn } -func (dbM *dataDBMockError) SetStatQueueProfileDrv(sq *engine.StatQueueProfile) (err error) { +func (dbM *dataDBMockError) SetStatQueueProfileDrv(ctx *context.Context, sq *engine.StatQueueProfile) (err error) { return nil } //For Resources -func (dbM *dataDBMockError) GetResourceProfileDrv(string, string) (*engine.ResourceProfile, error) { +func (dbM *dataDBMockError) GetResourceProfileDrv(*context.Context, string, string) (*engine.ResourceProfile, error) { return nil, nil } -func (dbM *dataDBMockError) RemoveResourceProfileDrv(string, string) error { +func (dbM *dataDBMockError) RemoveResourceProfileDrv(*context.Context, string, string) error { return nil } -func (dbM *dataDBMockError) RemoveResourceDrv(tenant, id string) (err error) { +func (dbM *dataDBMockError) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) { return utils.ErrNoDatabaseConn } @@ -1014,10 +1014,10 @@ func (dbM *dataDBMockError) GetIndexesDrv(ctx *context.Context, idxItmType, tntC return nil, nil } -func (dbM *dataDBMockError) SetResourceProfileDrv(*engine.ResourceProfile) error { +func (dbM *dataDBMockError) SetResourceProfileDrv(*context.Context, *engine.ResourceProfile) error { return nil } -func (dbM *dataDBMockError) SetResourceDrv(*engine.Resource) error { +func (dbM *dataDBMockError) SetResourceDrv(*context.Context, *engine.Resource) error { return utils.ErrNoDatabaseConn } diff --git a/loaders/loader.go b/loaders/loader.go index d25028532..59bdd52e0 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -330,7 +330,7 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, } // get IDs so we can reload in cache ids = append(ids, res.TenantID()) - if err := ldr.dm.SetResourceProfile(res, true); err != nil { + if err := ldr.dm.SetResourceProfile(ctx, res, true); err != nil { return err } var ttl *time.Duration @@ -338,7 +338,7 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, ttl = &res.UsageTTL } // for non stored we do not save the resource - if err := ldr.dm.SetResource( + if err := ldr.dm.SetResource(ctx, &engine.Resource{ Tenant: res.Tenant, ID: res.ID, @@ -402,7 +402,7 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, } // get IDs so we can reload in cache ids = append(ids, stsPrf.TenantID()) - if err := ldr.dm.SetStatQueueProfile(stsPrf, true); err != nil { + if err := ldr.dm.SetStatQueueProfile(ctx, stsPrf, true); err != nil { return err } var sq *engine.StatQueue @@ -416,7 +416,7 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, } // for non stored we do not save the metrics - if err := ldr.dm.SetStatQueue(sq, stsPrf.Metrics, + if err := ldr.dm.SetStatQueue(ctx, sq, stsPrf.Metrics, stsPrf.MinItems, ttl, stsPrf.QueueLength, !stsPrf.Stored); err != nil { return err @@ -448,10 +448,10 @@ func (ldr *Loader) storeLoadedData(ctx *context.Context, loaderType string, } // get IDs so we can reload in cache ids = append(ids, thPrf.TenantID()) - if err := ldr.dm.SetThresholdProfile(thPrf, true); err != nil { + if err := ldr.dm.SetThresholdProfile(ctx, thPrf, true); err != nil { return err } - if err := ldr.dm.SetThreshold(&engine.Threshold{Tenant: thPrf.Tenant, ID: thPrf.ID}, thPrf.MinSleep, false); err != nil { + if err := ldr.dm.SetThreshold(ctx, &engine.Threshold{Tenant: thPrf.Tenant, ID: thPrf.ID}, thPrf.MinSleep, false); err != nil { return err } cacheArgs[utils.ThresholdProfileIDs] = ids @@ -781,11 +781,11 @@ func (ldr *Loader) removeLoadedData(ctx *context.Context, loaderType string, lds tntIDStruct := utils.NewTenantID(tntID) // get IDs so we can reload in cache ids = append(ids, tntID) - if err := ldr.dm.RemoveResourceProfile(tntIDStruct.Tenant, + if err := ldr.dm.RemoveResourceProfile(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveResource(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + if err := ldr.dm.RemoveResource(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { return err } cacheArgs[utils.ResourceProfileIDs] = ids @@ -820,11 +820,11 @@ func (ldr *Loader) removeLoadedData(ctx *context.Context, loaderType string, lds tntIDStruct := utils.NewTenantID(tntID) // get IDs so we can reload in cache ids = append(ids, tntID) - if err := ldr.dm.RemoveStatQueueProfile(tntIDStruct.Tenant, + if err := ldr.dm.RemoveStatQueueProfile(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveStatQueue(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + if err := ldr.dm.RemoveStatQueue(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { return err } cacheArgs[utils.StatsQueueProfileIDs] = ids @@ -842,11 +842,11 @@ func (ldr *Loader) removeLoadedData(ctx *context.Context, loaderType string, lds tntIDStruct := utils.NewTenantID(tntID) // get IDs so we can reload in cache ids = append(ids, tntID) - if err := ldr.dm.RemoveThresholdProfile(tntIDStruct.Tenant, + if err := ldr.dm.RemoveThresholdProfile(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional, true); err != nil { return err } - if err := ldr.dm.RemoveThreshold(tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { + if err := ldr.dm.RemoveThreshold(ctx, tntIDStruct.Tenant, tntIDStruct.ID, utils.NonTransactional); err != nil { return err } cacheArgs[utils.ThresholdProfileIDs] = ids diff --git a/loaders/loader_test.go b/loaders/loader_test.go index a30913e38..8db0001e0 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -331,14 +331,14 @@ func TestLoaderProcessResource(t *testing.T) { if len(ldr.bufLoaderData) != 0 { t.Errorf("wrong buffer content: %+v", ldr.bufLoaderData) } - if resPrf, err := ldr.dm.GetResourceProfile("cgrates.org", "ResGroup21", + if resPrf, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "ResGroup21", true, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eResPrf1, resPrf) { t.Errorf("expecting: %s, received: %s", utils.ToJSON(eResPrf1), utils.ToJSON(resPrf)) } - if resPrf, err := ldr.dm.GetResourceProfile("cgrates.org", "ResGroup22", + if resPrf, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "ResGroup22", true, false, utils.NonTransactional); err != nil { t.Error(err) } else if !reflect.DeepEqual(eResPrf2, resPrf) { @@ -560,7 +560,7 @@ func TestLoaderProcessThresholds(t *testing.T) { ActionIDs: []string{"THRESH1"}, Async: true, } - aps, err := ldr.dm.GetThresholdProfile("cgrates.org", "Threshold1", + aps, err := ldr.dm.GetThresholdProfile(context.TODO(), "cgrates.org", "Threshold1", true, false, utils.NonTransactional) sort.Strings(eTh1.FilterIDs) sort.Strings(aps.FilterIDs) @@ -686,7 +686,7 @@ func TestLoaderProcessStats(t *testing.T) { MinItems: 2, } - aps, err := ldr.dm.GetStatQueueProfile("cgrates.org", "TestStats", + aps, err := ldr.dm.GetStatQueueProfile(context.TODO(), "cgrates.org", "TestStats", true, false, utils.NonTransactional) //sort the slices of Metrics sort.Slice(eSt1.Metrics, func(i, j int) bool { return eSt1.Metrics[i].MetricID < eSt1.Metrics[j].MetricID }) @@ -3242,7 +3242,7 @@ cgrates.org,NewRes1 rdr: rdr, csvRdr: rdrCsv}}, } //empty database - if _, err := ldr.dm.GetResourceProfile("cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { + if _, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { t.Error(err) } @@ -3269,7 +3269,7 @@ cgrates.org,NewRes1 ThresholdIDs: []string{}, } //NOT_FOUND because is resourceProfile is not set - if _, err := ldr.dm.GetResourceProfile("cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { + if _, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { t.Error(err) } @@ -3277,7 +3277,7 @@ cgrates.org,NewRes1 t.Error(err) } - rcv, err := ldr.dm.GetResourceProfile("cgrates.org", "NewRes1", false, false, utils.NonTransactional) + rcv, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "NewRes1", false, false, utils.NonTransactional) if err != nil { t.Error(err) } else if !reflect.DeepEqual(resPrf, rcv) { @@ -3315,7 +3315,7 @@ cgrates.org,NewRes1 } //nothing in database - if _, err := ldr.dm.GetResourceProfile("cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { + if _, err := ldr.dm.GetResourceProfile(context.TODO(), "cgrates.org", "NewRes1", false, false, utils.NonTransactional); err != utils.ErrNotFound { t.Error(err) } @@ -3475,7 +3475,7 @@ cgrates.org,REM_STATS_1 Tenant: "cgrates.org", ID: "REM_STATS_1", } - if err := ldr.dm.SetStatQueueProfile(expStats, true); err != nil { + if err := ldr.dm.SetStatQueueProfile(context.TODO(), expStats, true); err != nil { t.Error(err) } if err := ldr.removeContent(context.Background(), utils.MetaStats, utils.EmptyString); err != nil { @@ -3568,7 +3568,7 @@ cgrates.org,REM_THRESHOLDS_1, Tenant: "cgrates.org", ID: "REM_THRESHOLDS_1", } - if err := ldr.dm.SetThresholdProfile(expThresholdPrf, true); err != nil { + if err := ldr.dm.SetThresholdProfile(context.TODO(), expThresholdPrf, true); err != nil { t.Error(err) } if err := ldr.removeContent(context.Background(), utils.MetaThresholds, utils.EmptyString); err != nil { @@ -4537,7 +4537,7 @@ cgrates.org,REM_THRESHOLDS_1, ID: "REM_THRESHOLDS_1", } - if err := ldr.dm.SetThresholdProfile(expThresholdPrf, true); err != nil { + if err := ldr.dm.SetThresholdProfile(context.TODO(), expThresholdPrf, true); err != nil { t.Error(err) } @@ -4595,7 +4595,7 @@ cgrates.org,REM_STATS_1 ID: "REM_STATS_1", } - if err := ldr.dm.SetStatQueueProfile(expStats, true); err != nil { + if err := ldr.dm.SetStatQueueProfile(context.TODO(), expStats, true); err != nil { t.Error(err) } @@ -4650,7 +4650,7 @@ cgrates.org,NewRes1 ID: "NewRes1", } - if err := ldr.dm.SetResourceProfile(resPrf, true); err != nil { + if err := ldr.dm.SetResourceProfile(context.TODO(), resPrf, true); err != nil { t.Error(err) } diff --git a/migrator/filters.go b/migrator/filters.go index 5bb2f258b..d441fb188 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -372,7 +372,7 @@ func (m *Migrator) migrateResourceProfileFiltersV1() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for resourceProfile", id) } - res, err := m.dmIN.DataManager().GetResourceProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + res, err := m.dmIN.DataManager().GetResourceProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } @@ -382,7 +382,7 @@ func (m *Migrator) migrateResourceProfileFiltersV1() (err error) { for i, fl := range res.FilterIDs { res.FilterIDs[i] = migrateInlineFilter(fl) } - if err := m.dmOut.DataManager().SetResourceProfile(res, true); err != nil { + if err := m.dmOut.DataManager().SetResourceProfile(context.TODO(), res, true); err != nil { return err } m.stats[utils.RQF]++ @@ -401,7 +401,7 @@ func (m *Migrator) migrateStatQueueProfileFiltersV1() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for statQueueProfile", id) } - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueueProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } @@ -411,7 +411,7 @@ func (m *Migrator) migrateStatQueueProfileFiltersV1() (err error) { for i, fl := range sgs.FilterIDs { sgs.FilterIDs[i] = migrateInlineFilter(fl) } - if err = m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { + if err = m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), sgs, true); err != nil { return err } m.stats[utils.RQF]++ @@ -430,7 +430,7 @@ func (m *Migrator) migrateThresholdsProfileFiltersV1() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for thresholdProfile", id) } - ths, err := m.dmIN.DataManager().GetThresholdProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThresholdProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } @@ -440,7 +440,7 @@ func (m *Migrator) migrateThresholdsProfileFiltersV1() (err error) { for i, fl := range ths.FilterIDs { ths.FilterIDs[i] = migrateInlineFilter(fl) } - if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(context.TODO(), ths, true); err != nil { return err } m.stats[utils.RQF]++ @@ -582,7 +582,7 @@ func (m *Migrator) migrateResourceProfileFiltersV2() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for resourcerProfile", id) } - res, err := m.dmIN.DataManager().GetResourceProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + res, err := m.dmIN.DataManager().GetResourceProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return fmt.Errorf("error: <%s> when getting resource profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) @@ -593,7 +593,7 @@ func (m *Migrator) migrateResourceProfileFiltersV2() (err error) { for i, fl := range res.FilterIDs { res.FilterIDs[i] = migrateInlineFilterV2(fl) } - if err := m.dmOut.DataManager().SetResourceProfile(res, true); err != nil { + if err := m.dmOut.DataManager().SetResourceProfile(context.TODO(), res, true); err != nil { return fmt.Errorf("error: <%s> when setting resource profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) } @@ -613,7 +613,7 @@ func (m *Migrator) migrateStatQueueProfileFiltersV2() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for statQueueProfile", id) } - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueueProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return fmt.Errorf("error: <%s> when getting statQueue profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) @@ -624,7 +624,7 @@ func (m *Migrator) migrateStatQueueProfileFiltersV2() (err error) { for i, fl := range sgs.FilterIDs { sgs.FilterIDs[i] = migrateInlineFilterV2(fl) } - if err = m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { + if err = m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), sgs, true); err != nil { return fmt.Errorf("error: <%s> when setting statQueue profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) } @@ -644,7 +644,7 @@ func (m *Migrator) migrateThresholdsProfileFiltersV2() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating filter for thresholdProfile", id) } - ths, err := m.dmIN.DataManager().GetThresholdProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThresholdProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return fmt.Errorf("error: <%s> when getting threshold profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) @@ -655,7 +655,7 @@ func (m *Migrator) migrateThresholdsProfileFiltersV2() (err error) { for i, fl := range ths.FilterIDs { ths.FilterIDs[i] = migrateInlineFilterV2(fl) } - if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(context.TODO(), ths, true); err != nil { return fmt.Errorf("error: <%s> when setting threshold profile with tenant: <%s> and id: <%s>", err.Error(), tntID[0], tntID[1]) } diff --git a/migrator/resource.go b/migrator/resource.go index 8613cb1bf..d6ecf4e6a 100644 --- a/migrator/resource.go +++ b/migrator/resource.go @@ -38,17 +38,17 @@ func (m *Migrator) migrateCurrentResource() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating resource profiles", id) } - res, err := m.dmIN.DataManager().GetResourceProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + res, err := m.dmIN.DataManager().GetResourceProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } if res == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetResourceProfile(res, true); err != nil { + if err := m.dmOut.DataManager().SetResourceProfile(context.TODO(), res, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveResourceProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { + if err := m.dmIN.DataManager().RemoveResourceProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } m.stats[utils.Resource]++ diff --git a/migrator/stats.go b/migrator/stats.go index 6b507de2a..7f7c98a32 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -71,17 +71,17 @@ func (m *Migrator) moveStatQueueProfile() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating stat queue profiles", id) } - sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueueProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } if sgs == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), sgs, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveStatQueueProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { + if err := m.dmIN.DataManager().RemoveStatQueueProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } } @@ -99,7 +99,7 @@ func (m *Migrator) migrateCurrentStats() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating stat queues", id) } - sgs, err := m.dmIN.DataManager().GetStatQueue(tntID[0], tntID[1], false, false, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueue(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err @@ -107,10 +107,10 @@ func (m *Migrator) migrateCurrentStats() (err error) { if sgs == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetStatQueue(sgs, nil, 0, nil, 0, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueue(context.TODO(), sgs, nil, 0, nil, 0, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveStatQueue(tntID[0], tntID[1], utils.NonTransactional); err != nil { + if err := m.dmIN.DataManager().RemoveStatQueue(context.TODO(), tntID[0], tntID[1], utils.NonTransactional); err != nil { return err } m.stats[utils.StatS]++ @@ -223,10 +223,10 @@ func (m *Migrator) migrateStats() (err error) { } } // Set the fresh-migrated Stats into DB - if err = m.dmOut.DataManager().SetStatQueueProfile(v4sts, true); err != nil { + if err = m.dmOut.DataManager().SetStatQueueProfile(context.TODO(), v4sts, true); err != nil { return } - if err = m.dmOut.DataManager().SetStatQueue(v3Stats, nil, 0, nil, 0, true); err != nil { + if err = m.dmOut.DataManager().SetStatQueue(context.TODO(), v3Stats, nil, 0, nil, 0, true); err != nil { return } } diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 4c6f67964..130344e49 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -40,17 +40,17 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating thresholds", id) } - ths, err := m.dmIN.DataManager().GetThreshold(tntID[0], tntID[1], false, false, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThreshold(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } if ths == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetThreshold(ths, 0, true); err != nil { + if err := m.dmOut.DataManager().SetThreshold(context.TODO(), ths, 0, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveThreshold(tntID[0], tntID[1], utils.NonTransactional); err != nil { + if err := m.dmIN.DataManager().RemoveThreshold(context.TODO(), tntID[0], tntID[1], utils.NonTransactional); err != nil { return err } m.stats[utils.Thresholds]++ @@ -65,17 +65,17 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { if len(tntID) < 2 { return fmt.Errorf("Invalid key <%s> when migrating threshold profiles", id) } - ths, err := m.dmIN.DataManager().GetThresholdProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThresholdProfile(context.TODO(), tntID[0], tntID[1], false, false, utils.NonTransactional) if err != nil { return err } if ths == nil || m.dryRun { continue } - if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(context.TODO(), ths, true); err != nil { return err } - if err := m.dmIN.DataManager().RemoveThresholdProfile(tntID[0], tntID[1], utils.NonTransactional, false); err != nil { + if err := m.dmIN.DataManager().RemoveThresholdProfile(context.TODO(), tntID[0], tntID[1], utils.NonTransactional, false); err != nil { return err } } @@ -167,11 +167,11 @@ func (m *Migrator) migrateThresholds() (err error) { if err = m.dmOut.DataManager().SetFilter(context.TODO(), filter, true); err != nil { return } - if err = m.dmOut.DataManager().SetThreshold(th, 0, true); err != nil { + if err = m.dmOut.DataManager().SetThreshold(context.TODO(), th, 0, true); err != nil { return } } - if err = m.dmOut.DataManager().SetThresholdProfile(v4, true); err != nil { + if err = m.dmOut.DataManager().SetThresholdProfile(context.TODO(), v4, true); err != nil { return } diff --git a/services/resources.go b/services/resources.go index a5de0e8b6..61322d5ee 100644 --- a/services/resources.go +++ b/services/resources.go @@ -23,6 +23,8 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -58,8 +60,8 @@ type ResourceService struct { filterSChan chan *engine.FilterS server *cores.Server - reS *engine.ResourceService - // rpc *v1.ResourceSv1 + reS *engine.ResourceService + rpc *apis.ResourceSv1 connChan chan birpc.ClientConnector connMgr *engine.ConnManager anz *AnalyzerService @@ -86,19 +88,20 @@ func (reS *ResourceService) Start() (err error) { defer reS.Unlock() reS.reS = engine.NewResourceService(datadb, reS.cfg, filterS, reS.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ResourceS)) - reS.reS.StartLoop() - // reS.rpc = v1.NewResourceSv1(reS.reS) - // if !reS.cfg.DispatcherSCfg().Enabled { - // reS.server.RpcRegister(reS.rpc) - // } - // reS.connChan <- reS.anz.GetInternalCodec(reS.rpc, utils.ResourceS) + reS.reS.StartLoop(context.TODO()) + reS.rpc = apis.NewResourceSv1(reS.reS) + srv, _ := birpc.NewService(reS.rpc, "", false) + if !reS.cfg.DispatcherSCfg().Enabled { + reS.server.RpcRegister(srv) + } + reS.connChan <- reS.anz.GetInternalCodec(srv, utils.ResourceS) return } // Reload handles the change of config func (reS *ResourceService) Reload() (err error) { reS.Lock() - reS.reS.Reload() + reS.reS.Reload(context.TODO()) reS.Unlock() return } @@ -108,10 +111,10 @@ func (reS *ResourceService) Shutdown() (err error) { defer reS.srvDep[utils.DataDB].Done() reS.Lock() defer reS.Unlock() - reS.reS.Shutdown() //we don't verify the error because shutdown never returns an error + reS.reS.Shutdown(context.TODO()) //we don't verify the error because shutdown never returns an error reS.reS = nil - // reS.rpc = nil - //<-reS.connChan + reS.rpc = nil + <-reS.connChan return } diff --git a/services/stats.go b/services/stats.go index ea90b4f68..9514b64ac 100644 --- a/services/stats.go +++ b/services/stats.go @@ -23,6 +23,8 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -59,8 +61,8 @@ type StatService struct { server *cores.Server connMgr *engine.ConnManager - sts *engine.StatService - // rpc *v1.StatSv1 + sts *engine.StatService + rpc *apis.StatSv1 connChan chan birpc.ClientConnector anz *AnalyzerService srvDep map[string]*sync.WaitGroup @@ -89,19 +91,20 @@ func (sts *StatService) Start() (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.StatS)) - sts.sts.StartLoop() - // sts.rpc = v1.NewStatSv1(sts.sts) - // if !sts.cfg.DispatcherSCfg().Enabled { - // sts.server.RpcRegister(sts.rpc) - // } - // sts.connChan <- sts.anz.GetInternalCodec(sts.rpc, utils.StatS) + sts.sts.StartLoop(context.TODO()) + sts.rpc = apis.NewStatSv1(sts.sts) + srv, _ := birpc.NewService(sts.rpc, "", false) + if !sts.cfg.DispatcherSCfg().Enabled { + sts.server.RpcRegister(srv) + } + sts.connChan <- sts.anz.GetInternalCodec(srv, utils.StatS) return } // Reload handles the change of config func (sts *StatService) Reload() (err error) { sts.Lock() - sts.sts.Reload() + sts.sts.Reload(context.TODO()) sts.Unlock() return } @@ -111,10 +114,10 @@ func (sts *StatService) Shutdown() (err error) { defer sts.srvDep[utils.DataDB].Done() sts.Lock() defer sts.Unlock() - sts.sts.Shutdown() + sts.sts.Shutdown(context.TODO()) sts.sts = nil - // sts.rpc = nil - //<-sts.connChan + sts.rpc = nil + <-sts.connChan return } diff --git a/services/thresholds.go b/services/thresholds.go index 574fe9cb7..5089e47df 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -23,6 +23,8 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -56,8 +58,8 @@ type ThresholdService struct { filterSChan chan *engine.FilterS server *cores.Server - thrs *engine.ThresholdService - // rpc *v1.ThresholdSv1 + thrs *engine.ThresholdService + rpc *apis.ThresholdSv1 connChan chan birpc.ClientConnector anz *AnalyzerService srvDep map[string]*sync.WaitGroup @@ -85,19 +87,20 @@ func (thrs *ThresholdService) Start() (err error) { thrs.thrs = engine.NewThresholdService(datadb, thrs.cfg, filterS) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.ThresholdS)) - thrs.thrs.StartLoop() - // thrs.rpc = v1.NewThresholdSv1(thrs.thrs) - // if !thrs.cfg.DispatcherSCfg().Enabled { - // thrs.server.RpcRegister(thrs.rpc) - // } - // thrs.connChan <- thrs.anz.GetInternalCodec(thrs.rpc, utils.ThresholdS) + thrs.thrs.StartLoop(context.TODO()) + thrs.rpc = apis.NewThresholdSv1(thrs.thrs) + srv, _ := birpc.NewService(thrs.rpc, "", false) + if !thrs.cfg.DispatcherSCfg().Enabled { + thrs.server.RpcRegister(srv) + } + thrs.connChan <- thrs.anz.GetInternalCodec(srv, utils.ThresholdS) return } // Reload handles the change of config func (thrs *ThresholdService) Reload() (err error) { thrs.Lock() - thrs.thrs.Reload() + thrs.thrs.Reload(context.TODO()) thrs.Unlock() return } @@ -107,10 +110,10 @@ func (thrs *ThresholdService) Shutdown() (err error) { defer thrs.srvDep[utils.DataDB].Done() thrs.Lock() defer thrs.Unlock() - thrs.thrs.Shutdown() + thrs.thrs.Shutdown(context.TODO()) thrs.thrs = nil - // thrs.rpc = nil - //<-thrs.connChan + thrs.rpc = nil + <-thrs.connChan return }