Implement CacheLoadIDs in tpReader

This commit is contained in:
TeoV
2019-03-26 16:00:14 +02:00
committed by Dan Christian Bogos
parent ba419ec67e
commit 8e06267ca3
12 changed files with 232 additions and 20 deletions

View File

@@ -323,6 +323,17 @@ func (chS *CacheS) V1ReloadCache(attrs utils.AttrReloadCache, reply *string) (er
return
}
//get loadIDs from database for all types
loadIDs, err := chS.dm.GetItemLoadIDs(utils.EmptyString, false)
if err != nil {
return err
}
cacheLoadIDs := populateCacheLoadIDs(loadIDs, attrs)
for key, val := range cacheLoadIDs {
Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
*reply = utils.OK
return nil
}
@@ -363,6 +374,16 @@ func (chS *CacheS) V1LoadCache(args utils.AttrReloadCache, reply *string) (err e
); err != nil {
return utils.NewErrServerError(err)
}
//get loadIDs for all types
loadIDs, err := chS.dm.GetItemLoadIDs(utils.EmptyString, false)
if err != nil {
return err
}
cacheLoadIDs := populateCacheLoadIDs(loadIDs, args)
for key, val := range cacheLoadIDs {
Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
*reply = utils.OK
return nil
}
@@ -405,7 +426,83 @@ func (chS *CacheS) V1FlushCache(args utils.AttrReloadCache, reply *string) (err
flushCache(utils.CacheDispatcherProfiles, args.DispatcherProfileIDs)
flushCache(utils.CacheDispatcherHosts, args.DispatcherHostIDs)
flushCache(utils.CacheDispatcherRoutes, args.DispatcherRoutesIDs)
//get loadIDs for all types
loadIDs, err := chS.dm.GetItemLoadIDs(utils.EmptyString, false)
if err != nil {
return err
}
cacheLoadIDs := populateCacheLoadIDs(loadIDs, args)
for key, val := range cacheLoadIDs {
Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
*reply = utils.OK
return
}
//populateCacheLoadIDs populate cacheLoadIDs based on attrs
func populateCacheLoadIDs(loadIDs map[string]string, attrs utils.AttrReloadCache) (cacheLoadIDs map[string]string) {
cacheLoadIDs = make(map[string]string)
//based on IDs of each type populate cacheLoadIDs and add into cache
if attrs.DestinationIDs == nil || len(*attrs.DestinationIDs) != 0 {
cacheLoadIDs[utils.CacheDestinations] = loadIDs[utils.CacheDestinations]
}
if attrs.ReverseDestinationIDs == nil || len(*attrs.ReverseDestinationIDs) != 0 {
cacheLoadIDs[utils.CacheReverseDestinations] = loadIDs[utils.CacheReverseDestinations]
}
if attrs.RatingPlanIDs == nil || len(*attrs.RatingPlanIDs) != 0 {
cacheLoadIDs[utils.CacheRatingPlans] = loadIDs[utils.CacheRatingPlans]
}
if attrs.RatingProfileIDs == nil || len(*attrs.RatingProfileIDs) != 0 {
cacheLoadIDs[utils.CacheRatingProfiles] = loadIDs[utils.CacheRatingProfiles]
}
if attrs.ActionIDs == nil || len(*attrs.ActionIDs) != 0 {
cacheLoadIDs[utils.CacheActions] = loadIDs[utils.CacheActions]
}
if attrs.ActionPlanIDs == nil || len(*attrs.ActionPlanIDs) != 0 {
cacheLoadIDs[utils.CacheActionPlans] = loadIDs[utils.CacheActionPlans]
}
if attrs.AccountActionPlanIDs == nil || len(*attrs.AccountActionPlanIDs) != 0 {
cacheLoadIDs[utils.CacheAccountActionPlans] = loadIDs[utils.CacheAccountActionPlans]
}
if attrs.ActionTriggerIDs == nil || len(*attrs.ActionTriggerIDs) != 0 {
cacheLoadIDs[utils.CacheActionTriggers] = loadIDs[utils.CacheActionTriggers]
}
if attrs.SharedGroupIDs == nil || len(*attrs.SharedGroupIDs) != 0 {
cacheLoadIDs[utils.CacheSharedGroups] = loadIDs[utils.CacheSharedGroups]
}
if attrs.ResourceProfileIDs == nil || len(*attrs.ResourceProfileIDs) != 0 {
cacheLoadIDs[utils.CacheResourceProfiles] = loadIDs[utils.CacheResourceProfiles]
}
if attrs.ResourceIDs == nil || len(*attrs.ResourceIDs) != 0 {
cacheLoadIDs[utils.CacheResources] = loadIDs[utils.CacheResources]
}
if attrs.StatsQueueProfileIDs == nil || len(*attrs.StatsQueueProfileIDs) != 0 {
cacheLoadIDs[utils.CacheStatQueueProfiles] = loadIDs[utils.CacheStatQueueProfiles]
}
if attrs.StatsQueueIDs == nil || len(*attrs.StatsQueueIDs) != 0 {
cacheLoadIDs[utils.CacheStatQueues] = loadIDs[utils.CacheStatQueues]
}
if attrs.ThresholdProfileIDs == nil || len(*attrs.ThresholdProfileIDs) != 0 {
cacheLoadIDs[utils.CacheThresholdProfiles] = loadIDs[utils.CacheThresholdProfiles]
}
if attrs.ThresholdIDs == nil || len(*attrs.ThresholdIDs) != 0 {
cacheLoadIDs[utils.CacheThresholds] = loadIDs[utils.CacheThresholds]
}
if attrs.FilterIDs == nil || len(*attrs.FilterIDs) != 0 {
cacheLoadIDs[utils.CacheFilters] = loadIDs[utils.CacheFilters]
}
if attrs.SupplierProfileIDs == nil || len(*attrs.SupplierProfileIDs) != 0 {
cacheLoadIDs[utils.CacheSupplierProfiles] = loadIDs[utils.CacheSupplierProfiles]
}
if attrs.AttributeProfileIDs == nil || len(*attrs.AttributeProfileIDs) != 0 {
cacheLoadIDs[utils.CacheAttributeProfiles] = loadIDs[utils.CacheAttributeProfiles]
}
if attrs.ChargerProfileIDs == nil || len(*attrs.ChargerProfileIDs) != 0 {
cacheLoadIDs[utils.CacheChargerProfiles] = loadIDs[utils.CacheChargerProfiles]
}
if attrs.DispatcherProfileIDs == nil || len(*attrs.DispatcherProfileIDs) != 0 {
cacheLoadIDs[utils.CacheDispatcherProfiles] = loadIDs[utils.CacheDispatcherProfiles]
}
return
}

View File

@@ -291,6 +291,8 @@ func (dm *DataManager) CacheDataFromDB(prfx string, ids []string, mustBeCached b
err = dm.MatchFilterIndexFromKey(utils.CacheChargerFilterIndexes, dataID)
case utils.DispatcherFilterIndexes:
err = dm.MatchFilterIndexFromKey(utils.CacheDispatcherFilterIndexes, dataID)
case utils.LoadIDPrefix:
_, err = dm.GetItemLoadIDs(utils.EmptyString, true)
}
if err != nil {
if err == utils.ErrNotFound {
@@ -1348,3 +1350,27 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string,
}
return
}
func (dm *DataManager) GetItemLoadIDs(itemIDPrefix string, cacheWrite bool) (loadIDs map[string]string, err error) {
loadIDs, err = dm.DataDB().GetItemLoadIDsDrv(itemIDPrefix)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
for key, _ := range loadIDs {
Cache.Set(utils.CacheLoadIDs, key, nil, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
}
return nil, err
}
if cacheWrite {
for key, val := range loadIDs {
Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
}
return
}
func (dm *DataManager) SetLoadIDs(loadIDs map[string]string) error {
return dm.DataDB().SetLoadIDsDrv(loadIDs)
}

View File

@@ -373,5 +373,9 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheLoadIDs: {
Items: 0,
Groups: 0,
},
}
}

View File

@@ -1409,6 +1409,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
// return
// }
}
//generate a loadID
loadID := utils.UUIDSha1Prefix()
loadIDs := make(map[string]string)
if verbose {
log.Print("Destinations:")
}
@@ -1421,6 +1424,12 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", d.Id, " : ", d.Prefixes)
}
}
if len(tpr.destinations) != 0 {
loadIDs[utils.CacheDestinations] = loadID
}
if len(tpr.revDests) != 0 {
loadIDs[utils.CacheReverseDestinations] = loadID
}
if verbose {
log.Print("Reverse Destinations:")
for id, vals := range tpr.revDests {
@@ -1437,6 +1446,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", rp.Id)
}
}
if len(tpr.ratingPlans) != 0 {
loadIDs[utils.CacheRatingPlans] = loadID
}
if verbose {
log.Print("Rating Profiles:")
}
@@ -1449,6 +1461,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", rp.Id)
}
}
if len(tpr.ratingProfiles) != 0 {
loadIDs[utils.CacheRatingProfiles] = loadID
}
if verbose {
log.Print("Action Plans:")
}
@@ -1490,6 +1505,12 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Println("\t", k)
}
}
if len(tpr.actionPlans) != 0 {
loadIDs[utils.CacheActionPlans] = loadID
}
if len(tpr.acntActionPlans) != 0 {
loadIDs[utils.CacheAccountActionPlans] = loadID
}
if verbose {
log.Print("Account Action Plans:")
for id, vals := range tpr.acntActionPlans {
@@ -1508,6 +1529,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Println("\t", k)
}
}
if len(tpr.actionsTriggers) != 0 {
loadIDs[utils.CacheActionTriggers] = loadID
}
if verbose {
log.Print("Shared Groups:")
}
@@ -1520,6 +1544,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Println("\t", k)
}
}
if len(tpr.sharedGroups) != 0 {
loadIDs[utils.CacheSharedGroups] = loadID
}
if verbose {
log.Print("Actions:")
}
@@ -1532,6 +1559,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Println("\t", k)
}
}
if len(tpr.actions) != 0 {
loadIDs[utils.CacheActions] = loadID
}
if verbose {
log.Print("Account Actions:")
}
@@ -1559,6 +1589,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.filters) != 0 {
loadIDs[utils.CacheFilters] = loadID
}
if verbose {
log.Print("ResourceProfiles:")
}
@@ -1574,6 +1607,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", rsp.TenantID())
}
}
if len(tpr.resProfiles) != 0 {
loadIDs[utils.CacheResourceProfiles] = loadID
}
if verbose {
log.Print("Resources:")
}
@@ -1585,6 +1621,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", rTid.TenantID())
}
}
if len(tpr.resources) != 0 {
loadIDs[utils.CacheResources] = loadID
}
if verbose {
log.Print("StatQueueProfiles:")
}
@@ -1600,6 +1639,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", st.TenantID())
}
}
if len(tpr.sqProfiles) != 0 {
loadIDs[utils.CacheStatQueueProfiles] = loadID
}
if verbose {
log.Print("StatQueues:")
}
@@ -1622,6 +1664,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", sqTntID.TenantID())
}
}
if len(tpr.statQueues) != 0 {
loadIDs[utils.CacheStatQueues] = loadID
}
if verbose {
log.Print("ThresholdProfiles:")
}
@@ -1637,6 +1682,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.thProfiles) != 0 {
loadIDs[utils.CacheThresholdProfiles] = loadID
}
if verbose {
log.Print("Thresholds:")
}
@@ -1648,7 +1696,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", thd.TenantID())
}
}
if len(tpr.thresholds) != 0 {
loadIDs[utils.CacheThresholds] = loadID
}
if verbose {
log.Print("SupplierProfiles:")
}
@@ -1664,7 +1714,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.sppProfiles) != 0 {
loadIDs[utils.CacheSupplierProfiles] = loadID
}
if verbose {
log.Print("AttributeProfiles:")
}
@@ -1680,7 +1732,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.attributeProfiles) != 0 {
loadIDs[utils.CacheAttributeProfiles] = loadID
}
if verbose {
log.Print("ChargerProfiles:")
}
@@ -1697,7 +1751,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.chargerProfiles) != 0 {
loadIDs[utils.CacheChargerProfiles] = loadID
}
if verbose {
log.Print("DispatcherProfiles:")
}
@@ -1713,7 +1769,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", th.TenantID())
}
}
if len(tpr.dispatcherProfiles) != 0 {
loadIDs[utils.CacheDispatcherProfiles] = loadID
}
if verbose {
log.Print("DispatcherHosts:")
}
@@ -1738,6 +1796,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
log.Print("\t", t.ID)
}
}
if len(tpr.timings) != 0 {
loadIDs[utils.CacheTimings] = loadID
}
if !disable_reverse {
if len(tpr.destinations) > 0 {
if verbose {
@@ -1756,6 +1817,9 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
}
}
}
if err = tpr.dm.SetLoadIDs(loadIDs); err != nil {
return err
}
return
}
@@ -2377,6 +2441,16 @@ func (tpr *TpReader) ReloadCache(flush, verbose bool) (err error) {
}
}
//get loadIDs for all types
loadIDs, err := tpr.dm.GetItemLoadIDs(utils.EmptyString, false)
if err != nil {
return err
}
cacheLoadIDs := populateCacheLoadIDs(loadIDs, cacheArgs)
for key, val := range cacheLoadIDs {
Cache.Set(utils.CacheLoadIDs, key, val, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
// release the reader with it's structures
tpr.Init()
return