Sync functionality between APIer and TPReader

This commit is contained in:
TeoV
2019-04-15 16:33:59 +03:00
committed by Dan Christian Bogos
parent dbd9bf243b
commit 8ed82eab05
6 changed files with 59 additions and 80 deletions

View File

@@ -132,20 +132,26 @@ func (apierV1 *ApierV1) SetResourceProfile(arg *ResourceWithCache, reply *string
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierV1.DataManager.SetResource(
&engine.Resource{Tenant: arg.Tenant,
ID: arg.ID,
Usages: make(map[string]*engine.ResourceUsage)}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for Resource
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheResources,
ItemID: arg.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
//add the resource only if it's not present
if has, err := apierV1.DataManager.HasData(utils.ResourcesPrefix, arg.ID, arg.Tenant); err != nil {
return err
} else if !has {
if err := apierV1.DataManager.SetResource(
&engine.Resource{Tenant: arg.Tenant,
ID: arg.ID,
Usages: make(map[string]*engine.ResourceUsage)}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for Resource
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheResources,
ItemID: arg.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
}
*reply = utils.OK
return nil
}

View File

@@ -84,25 +84,29 @@ func (apierV1 *ApierV1) SetStatQueueProfile(arg *StatQueueWithCache, reply *stri
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
//compose metrics for StatQueue
metrics := make(map[string]engine.StatMetric)
for _, metric := range arg.Metrics {
if stsMetric, err := engine.NewStatMetric(metric.MetricID, arg.MinItems, metric.FilterIDs); err != nil {
return utils.APIErrorHandler(err)
} else {
metrics[metric.MetricID] = stsMetric
if has, err := apierV1.DataManager.HasData(utils.StatQueuePrefix, arg.ID, arg.Tenant); err != nil {
return err
} else if !has {
//compose metrics for StatQueue
metrics := make(map[string]engine.StatMetric)
for _, metric := range arg.Metrics {
if stsMetric, err := engine.NewStatMetric(metric.MetricID, arg.MinItems, metric.FilterIDs); err != nil {
return utils.APIErrorHandler(err)
} else {
metrics[metric.MetricID] = stsMetric
}
}
if err := apierV1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for StatQueues
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheStatQueues,
ItemID: arg.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
}
if err := apierV1.DataManager.SetStatQueue(&engine.StatQueue{Tenant: arg.Tenant, ID: arg.ID, SQMetrics: metrics}); err != nil {
return utils.APIErrorHandler(err)
}
//handle caching for StatQueues
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheStatQueues,
ItemID: arg.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(arg.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK

View File

@@ -118,17 +118,23 @@ func (apierV1 *ApierV1) SetThresholdProfile(args *ThresholdWithCache, reply *str
if err := apierV1.CallCache(GetCacheOpt(args.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierV1.DataManager.SetThreshold(&engine.Threshold{Tenant: args.Tenant, ID: args.ID}); err != nil {
if has, err := apierV1.DataManager.HasData(utils.ThresholdPrefix, args.ID, args.Tenant); err != nil {
return err
} else if !has {
if err := apierV1.DataManager.SetThreshold(&engine.Threshold{Tenant: args.Tenant, ID: args.ID}); err != nil {
return err
}
//handle caching for Threshold
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheThresholds,
ItemID: args.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(args.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
}
//handle caching for Threshold
argCache = engine.ArgsGetCacheItem{
CacheID: utils.CacheThresholds,
ItemID: args.TenantID(),
}
if err := apierV1.CallCache(GetCacheOpt(args.Cache), argCache); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}

View File

@@ -670,6 +670,8 @@ func (ms *MongoStorage) HasDataDrv(category, subject, tenant string) (has bool,
case utils.StatQueueProfilePrefix:
count, err = ms.getCol(colSqp).Count(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.ThresholdPrefix:
count, err = ms.getCol(colThs).Count(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.ThresholdProfilePrefix:
count, err = ms.getCol(colTps).Count(sctx, bson.M{"tenant": tenant, "id": subject})
case utils.FilterPrefix:
count, err = ms.getCol(colFlt).Count(sctx, bson.M{"tenant": tenant, "id": subject})

View File

@@ -60,11 +60,6 @@ type TpReader struct {
resources []*utils.TenantID // IDs of resources which need creation based on resourceProfiles
statQueues []*utils.TenantID // IDs of statQueues which need creation based on statQueueProfiles
thresholds []*utils.TenantID // IDs of thresholds which need creation based on thresholdProfiles
suppliers []*utils.TenantID // IDs of suppliers which need creation based on sppProfiles
attrTntID []*utils.TenantID // IDs of suppliers which need creation based on attributeProfiles
chargers []*utils.TenantID // IDs of chargers which need creation based on chargerProfiles
dpps []*utils.TenantID // IDs of dispatchers which need creation based on dispatcherProfiles
dphs []*utils.TenantID // IDs of dispatcherHosts which need creation based on dispatcherHosts
revDests,
acntActionPlans map[string][]string
cacheS rpcclient.RpcClientConnection
@@ -1208,13 +1203,6 @@ func (tpr *TpReader) LoadSupplierProfilesFiltered(tag string) (err error) {
mapRsPfls[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
}
tpr.sppProfiles = mapRsPfls
for tntID := range mapRsPfls {
if has, err := tpr.dm.HasData(utils.SupplierProfilePrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.suppliers = append(tpr.suppliers, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}
@@ -1232,13 +1220,6 @@ func (tpr *TpReader) LoadAttributeProfilesFiltered(tag string) (err error) {
mapAttrPfls[utils.TenantID{Tenant: attr.Tenant, ID: attr.ID}] = attr
}
tpr.attributeProfiles = mapAttrPfls
for tntID := range mapAttrPfls {
if has, err := tpr.dm.HasData(utils.AttributeProfilePrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.attrTntID = append(tpr.attrTntID, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}
@@ -1256,13 +1237,6 @@ func (tpr *TpReader) LoadChargerProfilesFiltered(tag string) (err error) {
mapChargerProfile[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
}
tpr.chargerProfiles = mapChargerProfile
for tntID := range mapChargerProfile {
if has, err := tpr.dm.HasData(utils.ChargerProfilePrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.chargers = append(tpr.chargers, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}
@@ -1280,13 +1254,6 @@ func (tpr *TpReader) LoadDispatcherProfilesFiltered(tag string) (err error) {
mapDispatcherProfile[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
}
tpr.dispatcherProfiles = mapDispatcherProfile
for tntID := range mapDispatcherProfile {
if has, err := tpr.dm.HasData(utils.DispatcherProfilePrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.dpps = append(tpr.dpps, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}
@@ -1304,13 +1271,6 @@ func (tpr *TpReader) LoadDispatcherHostsFiltered(tag string) (err error) {
mapDispatcherHost[utils.TenantID{Tenant: rl.Tenant, ID: rl.ID}] = rl
}
tpr.dispatcherHosts = mapDispatcherHost
for tntID := range mapDispatcherHost {
if has, err := tpr.dm.HasData(utils.DispatcherHostPrefix, tntID.ID, tntID.Tenant); err != nil {
return err
} else if !has {
tpr.dphs = append(tpr.dphs, &utils.TenantID{Tenant: tntID.Tenant, ID: tntID.ID})
}
}
return nil
}

View File

@@ -254,6 +254,7 @@ func testV1AccSendToThreshold(t *testing.T) {
MaxHits: -1,
MinSleep: time.Duration(1 * time.Second),
Weight: 20.0,
Async: true,
ActionIDs: []string{"DISABLE_LOG"},
}