Updated Resource handling in datamanager

This commit is contained in:
Trial97
2021-05-21 10:49:05 +03:00
committed by Dan Christian Bogos
parent e1b21983c4
commit 30b6cc2c70
11 changed files with 77 additions and 214 deletions

View File

@@ -35,14 +35,14 @@ type DataDBMock struct {
RemoveAttributeProfileDrvF func(ctx *context.Context, str1 string, str2 string) error
SetLoadIDsDrvF func(ctx *context.Context, loadIDs map[string]int64) error
GetFilterDrvF func(ctx *context.Context, str1 string, str2 string) (*Filter, error)
GetResourceProfileDrvF func(*context.Context, string, string) (*ResourceProfile, error)
SetResourceProfileDrvF func(*context.Context, *ResourceProfile) error
RemoveResourceProfileDrvF func(*context.Context, string, string) error
GetChargerProfileDrvF func(*context.Context, string, string) (*ChargerProfile, error)
GetThresholdProfileDrvF func(ctx *context.Context, tenant, id string) (tp *ThresholdProfile, err error)
SetThresholdProfileDrvF func(ctx *context.Context, tp *ThresholdProfile) (err error)
RemThresholdProfileDrvF func(ctx *context.Context, tenant, id string) (err error)
GetThresholdDrvF func(ctx *context.Context, tenant, id string) (*Threshold, error)
GetResourceProfileDrvF func(ctx *context.Context, tnt, id string) (*ResourceProfile, error)
SetResourceProfileDrvF func(ctx *context.Context, rp *ResourceProfile) error
RemoveResourceProfileDrvF func(ctx *context.Context, tnt, id string) error
}
//Storage methods
@@ -88,7 +88,7 @@ func (dbM *DataDBMock) HasDataDrv(*context.Context, string, string, string) (boo
return false, utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetResourceProfileDrv(ctx *context.Context, tnt string, id string) (*ResourceProfile, error) {
func (dbM *DataDBMock) GetResourceProfileDrv(ctx *context.Context, tnt, id string) (*ResourceProfile, error) {
if dbM.GetResourceProfileDrvF != nil {
return dbM.GetResourceProfileDrvF(ctx, tnt, id)
}

View File

@@ -752,7 +752,7 @@ func (dm *DataManager) SetThresholdProfile(ctx *context.Context, th *ThresholdPr
}
}
if oldTh == nil || // create the threshold if it didn't exit before
if oldTh == nil || // create the threshold if it didn't exist before
oldTh.MaxHits != th.MaxHits ||
oldTh.MinHits != th.MinHits ||
oldTh.MinSleep != th.MinSleep { // reset the threshold if the profile changed this fields
@@ -989,36 +989,10 @@ func (dm *DataManager) GetResource(ctx *context.Context, tenant, id string, cach
return
}
func (dm *DataManager) SetResource(ctx *context.Context, rs *Resource, ttl *time.Duration, usageLimit float64, simpleSet bool) (err error) {
func (dm *DataManager) SetResource(ctx *context.Context, rs *Resource) (err error) {
if dm == nil {
return utils.ErrNoDatabaseConn
}
if !simpleSet {
// do stuff
tnt := rs.Tenant // save the tenant
id := rs.ID // save the ID from the initial StatQueue
// handle metrics for statsQueue
rs, err = dm.GetResource(ctx, tnt, id, true, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return
}
if err == utils.ErrNotFound {
rs = &Resource{Tenant: tnt, ID: id, Usages: make(map[string]*ResourceUsage)}
// if the resource didn't exists simply initiate the Usages
} else {
rs.ttl = ttl
rs.removeExpiredUnits()
for rsUsage := range rs.Usages {
if rs.totalUsage() > usageLimit {
if err = rs.clearUsage(rsUsage); err != nil {
return
}
} else {
break
}
}
}
}
if err = dm.DataDB().SetResourceDrv(ctx, rs); err != nil {
return
}
@@ -1134,14 +1108,45 @@ func (dm *DataManager) SetResourceProfile(ctx *context.Context, rp *ResourceProf
Cache.Clear([]string{utils.CacheEventResources})
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate {
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
if err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourceProfilesPrefix, rp.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetResourceProfile,
&ResourceProfileWithAPIOpts{
ResourceProfile: rp,
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil {
return
}
}
if oldRes == nil || // create the resource if it didn't exist before
oldRes.UsageTTL != rp.UsageTTL ||
oldRes.Limit != rp.Limit ||
oldRes.Stored != rp.Stored { // reset the resource if the profile changed this fields
var ttl *time.Duration
if rp.UsageTTL > 0 {
ttl = &rp.UsageTTL
}
err = dm.SetResource(ctx, &Resource{
Tenant: rp.Tenant,
ID: rp.ID,
Usages: make(map[string]*ResourceUsage),
ttl: ttl,
rPrf: rp,
})
} else if _, errRs := dm.GetResource(ctx, rp.Tenant, rp.ID, // do not try to get the resource if the configuration changed
true, false, utils.NonTransactional); errRs == utils.ErrNotFound { // the resource does not exist
var ttl *time.Duration
if rp.UsageTTL > 0 {
ttl = &rp.UsageTTL
}
err = dm.SetResource(ctx, &Resource{
Tenant: rp.Tenant,
ID: rp.ID,
Usages: make(map[string]*ResourceUsage),
ttl: ttl,
rPrf: rp,
})
}
return
}
@@ -1179,7 +1184,7 @@ func (dm *DataManager) RemoveResourceProfile(ctx *context.Context, tenant, id, t
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)})
}
return
return dm.RemoveResource(ctx, tenant, id, transactionID)
}
func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err error) {

View File

@@ -677,16 +677,6 @@ func TestLoadDispatcherHosts(t *testing.T) {
}
}
func TestLoadResource(t *testing.T) {
eResources := []*utils.TenantID{
{Tenant: "cgrates.org", ID: "ResGroup21"},
{Tenant: "cgrates.org", ID: "ResGroup22"},
}
if len(csvr.resources) != len(eResources) {
t.Errorf("Failed to load resources expecting 2 but received : %+v", len(csvr.resources))
}
}
func TestLoadstatQueues(t *testing.T) {
eStatQueues := []*utils.TenantID{
{Tenant: "cgrates.org", ID: "TestStats"},

View File

@@ -357,7 +357,7 @@ func (rS *ResourceService) StoreResource(ctx *context.Context, r *Resource) (err
if r.dirty == nil || !*r.dirty {
return
}
if err = rS.dm.SetResource(ctx, r, nil, 0, true); err != nil {
if err = rS.dm.SetResource(ctx, r); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ResourceS> failed saving Resource with ID: %s, error: %s",
r.ID, err.Error()))

View File

@@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"log"
"strconv"
"time"
"github.com/cgrates/birpc/context"
@@ -47,8 +46,8 @@ type TpReader struct {
rateProfiles map[utils.TenantID]*utils.TPRateProfile
actionProfiles map[utils.TenantID]*utils.TPActionProfile
accounts map[utils.TenantID]*utils.TPAccount
resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
// resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
// thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
// acntActionPlans map[string][]string
cacheConns []string
@@ -98,7 +97,6 @@ func (tpr *TpReader) LoadResourceProfilesFiltered(tag string) (err error) {
if err = verifyInlineFilterS(rl.FilterIDs); err != nil {
return
}
tpr.resources = append(tpr.resources, &utils.TenantID{Tenant: rl.Tenant, ID: rl.ID})
mapRsPfls[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
}
tpr.resProfiles = mapRsPfls
@@ -413,41 +411,6 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
}
if len(tpr.resProfiles) != 0 {
loadIDs[utils.CacheResourceProfiles] = loadID
}
if verbose {
log.Print("Resources:")
}
for _, rTid := range tpr.resources {
var ttl *time.Duration
if tpr.resProfiles[*rTid].UsageTTL != utils.EmptyString {
ttl = new(time.Duration)
if *ttl, err = utils.ParseDurationWithNanosecs(tpr.resProfiles[*rTid].UsageTTL); err != nil {
return
}
if *ttl <= 0 {
ttl = nil
}
}
var limit float64
if tpr.resProfiles[*rTid].Limit != utils.EmptyString {
if limit, err = strconv.ParseFloat(tpr.resProfiles[*rTid].Limit, 64); err != nil {
return
}
}
// for non stored we do not save the resource
if err = tpr.dm.SetResource(context.TODO(),
&Resource{
Tenant: rTid.Tenant,
ID: rTid.ID,
Usages: make(map[string]*ResourceUsage),
}, ttl, limit, !tpr.resProfiles[*rTid].Stored); err != nil {
return
}
if verbose {
log.Print("\t", rTid.TenantID())
}
}
if len(tpr.resources) != 0 {
loadIDs[utils.CacheResources] = loadID
}
if verbose {
@@ -708,12 +671,6 @@ func (tpr *TpReader) ShowStatistics() {
// GetLoadedIds returns the identities loaded for a specific category, useful for cache reloads
func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
switch categ {
case utils.ResourcesPrefix:
keys := make([]string, len(tpr.resources))
for i, k := range tpr.resources {
keys[i] = k.TenantID()
}
return keys, nil
case utils.StatQueuePrefix:
keys := make([]string, len(tpr.statQueues))
for i, k := range tpr.statQueues {
@@ -829,17 +786,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID))
}
}
if verbose {
log.Print("Resources:")
}
for _, rTid := range tpr.resources {
if err = tpr.dm.RemoveResource(context.TODO(), rTid.Tenant, rTid.ID, utils.NonTransactional); err != nil {
return
}
if verbose {
log.Print("\t", rTid.TenantID())
}
}
if verbose {
log.Print("StatQueueProfiles:")
}
@@ -993,8 +939,6 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
}
if len(tpr.resProfiles) != 0 {
loadIDs[utils.CacheResourceProfiles] = loadID
}
if len(tpr.resources) != 0 {
loadIDs[utils.CacheResources] = loadID
}
if len(tpr.sqProfiles) != 0 {
@@ -1044,7 +988,6 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
}
// take IDs for each type
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
resIDs, _ := tpr.GetLoadedIds(utils.ResourcesPrefix)
stqIDs, _ := tpr.GetLoadedIds(utils.StatQueuePrefix)
stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix)
trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix)
@@ -1061,7 +1004,7 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
//compose Reload Cache argument
cacheArgs := map[string][]string{
utils.ResourceProfileIDs: rspIDs,
utils.ResourceIDs: resIDs,
utils.ResourceIDs: rspIDs,
utils.StatsQueueIDs: stqIDs,
utils.StatsQueueProfileIDs: stqpIDs,
utils.ThresholdIDs: trspfIDs,

View File

@@ -411,29 +411,6 @@ func TestCallCacheClear(t *testing.T) {
}
}
func TestGetLoadedIdsResources(t *testing.T) {
tpr := &TpReader{
resources: []*utils.TenantID{
{
Tenant: "cgrates.org",
ID: "resourcesID",
},
{
Tenant: "tenant.com",
ID: "mytenantID",
},
},
}
rcv, err := tpr.GetLoadedIds(utils.ResourcesPrefix)
if err != nil {
t.Error(err)
}
expRcv := []string{"cgrates.org:resourcesID", "tenant.com:mytenantID"}
if !reflect.DeepEqual(expRcv, rcv) {
t.Errorf("\nExpected %v but received \n%v", expRcv, rcv)
}
}
func TestGetLoadedIdsStatQueues(t *testing.T) {
tpr := &TpReader{
statQueues: []*utils.TenantID{
@@ -703,7 +680,7 @@ func TestReloadCache(t *testing.T) {
"ChargerProfileIDs": {"cgrates.org:chargerProfilesID"},
"DispatcherProfileIDs": {"cgrates.org:dispatcherProfilesID"},
"DispatcherHostIDs": {"cgrates.org:dispatcherHostsID"},
"ResourceIDs": {"cgrates.org:resourcesID"},
"ResourceIDs": {"cgrates.org:resourceProfilesID"},
"StatsQueueIDs": {"cgrates.org:statQueuesID"},
"ThresholdIDs": {"cgrates.org:thresholdProfilesID"},
},
@@ -754,12 +731,6 @@ func TestReloadCache(t *testing.T) {
dispatcherHosts: map[utils.TenantID]*utils.TPDispatcherHost{
{Tenant: "cgrates.org", ID: "dispatcherHostsID"}: {},
},
resources: []*utils.TenantID{
{
Tenant: "cgrates.org",
ID: "resourcesID",
},
},
statQueues: []*utils.TenantID{
{
Tenant: "cgrates.org",

View File

@@ -199,7 +199,7 @@ func testOnStorITResource(t *testing.T) {
true, false, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
if err := onStor.SetResource(context.TODO(), res, nil, 0, true); err != nil {
if err := onStor.SetResource(context.TODO(), res); err != nil {
t.Error(err)
}
//get from database
@@ -217,7 +217,7 @@ func testOnStorITResource(t *testing.T) {
}
//update
res.TTLIdx = []string{"RU1", "RU2"}
if err := onStor.SetResource(context.TODO(), res, nil, 0, true); err != nil {
if err := onStor.SetResource(context.TODO(), res); err != nil {
t.Error(err)
}

View File

@@ -1032,7 +1032,7 @@ func TestResourceAddResourceProfile(t *testing.T) {
dmRES.SetResourceProfile(context.TODO(), resProfile, true)
}
for _, res := range resourceTest {
dmRES.SetResource(context.TODO(), res, nil, 0, true)
dmRES.SetResource(context.TODO(), res)
}
//Test each resourceProfile from cache
for _, resPrf := range resprf {
@@ -1216,7 +1216,7 @@ func TestResourceMatchingResourcesForEvent(t *testing.T) {
dmRES.SetResourceProfile(context.TODO(), resProfile, true)
}
for _, res := range resourceTest {
dmRES.SetResource(context.TODO(), res, nil, 0, true)
dmRES.SetResource(context.TODO(), res)
}
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
"TestResourceMatchingResourcesForEvent1", &timeDurationExample)
@@ -1434,7 +1434,7 @@ func TestResourceUsageTTLCase1(t *testing.T) {
if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil {
t.Error(err)
}
if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil {
if err := dmRES.SetResource(context.TODO(), resourceTest[0]); err != nil {
t.Error(err)
}
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
@@ -1627,7 +1627,7 @@ func TestResourceUsageTTLCase2(t *testing.T) {
if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil {
t.Error(err)
}
if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil {
if err := dmRES.SetResource(context.TODO(), resourceTest[0]); err != nil {
t.Error(err)
}
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
@@ -1820,7 +1820,7 @@ func TestResourceUsageTTLCase3(t *testing.T) {
if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil {
t.Error(err)
}
if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil {
if err := dmRES.SetResource(context.TODO(), resourceTest[0]); err != nil {
t.Error(err)
}
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
@@ -2014,7 +2014,7 @@ func TestResourceUsageTTLCase4(t *testing.T) {
if err := dmRES.SetResourceProfile(context.TODO(), resprf[0], true); err != nil {
t.Error(err)
}
if err := dmRES.SetResource(context.TODO(), resourceTest[0], nil, 0, true); err != nil {
if err := dmRES.SetResource(context.TODO(), resourceTest[0]); err != nil {
t.Error(err)
}
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
@@ -2401,7 +2401,7 @@ func TestResourceMatchWithIndexFalse(t *testing.T) {
dmRES.SetResourceProfile(context.TODO(), resProfile, true)
}
for _, res := range resourceTest {
dmRES.SetResource(context.TODO(), res, nil, 0, true)
dmRES.SetResource(context.TODO(), res)
}
resService.cgrcfg.ResourceSCfg().IndexedSelects = false
mres, err := resService.matchingResourcesForEvent(context.TODO(), resEvs[0].Tenant, resEvs[0],
@@ -2579,7 +2579,7 @@ func TestResourceCaching(t *testing.T) {
dmRES.SetResourceProfile(context.TODO(), resProfile, true)
}
for _, res := range resourceTest {
dmRES.SetResource(context.TODO(), res, nil, 0, true)
dmRES.SetResource(context.TODO(), res)
}
//clear the cache
Cache.Clear(nil)
@@ -2937,7 +2937,7 @@ func TestResourceAllocateResourceOtherDB(t *testing.T) {
},
},
TTLIdx: []string{"RU1"},
}, nil, 2, true); err != nil { // simulate how the resource is stored in redis or mongo(non-exported fields are not populated)
}); err != nil { // simulate how the resource is stored in redis or mongo(non-exported fields are not populated)
t.Fatal(err)
}
var reply string
@@ -2956,6 +2956,7 @@ func TestResourceAllocateResourceOtherDB(t *testing.T) {
} else if reply != exp {
t.Errorf("Expected: %q, received: %q", exp, reply)
}
}
func TestResourceClearUsageErr(t *testing.T) {