Updated indexes apis in favor of cache

This commit is contained in:
porosnicuadrian
2021-05-11 09:08:46 +03:00
committed by Dan Christian Bogos
parent 7e7a1ded59
commit 91eda67c4a
5 changed files with 177 additions and 29 deletions

View File

@@ -20,6 +20,7 @@ package v1
import (
"strings"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -39,6 +40,7 @@ type AttrRemFilterIndexes struct {
Tenant string
Context string
ItemType string
APIOpts map[string]interface{}
}
func (apierSv1 *APIerSv1) RemoveFilterIndexes(arg *AttrRemFilterIndexes, reply *string) (err error) {
@@ -77,6 +79,14 @@ func (apierSv1 *APIerSv1) RemoveFilterIndexes(arg *AttrRemFilterIndexes, reply *
if err = apierSv1.DataManager.RemoveIndexes(arg.ItemType, tntCtx, utils.EmptyString); err != nil {
return
}
//generate a loadID for CacheFilterIndexes and store it in database
if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{arg.ItemType: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierSv1.callCacheForRemoveIndexes(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant,
arg.ItemType, []string{utils.MetaAny}, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return
}
@@ -212,10 +222,13 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
cacheIDs := make(map[string][]string)
var indexes utils.StringSet
//ThresholdProfile Indexes
if args.ThresholdS {
if args.ThresholdS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes,
cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
th, e := apierSv1.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -229,10 +242,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ThresholdS = indexes.Size() != 0
}
//StatQueueProfile Indexes
if args.StatS {
if args.StatS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes,
cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
sq, e := apierSv1.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -246,10 +261,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.StatS = indexes.Size() != 0
}
//ResourceProfile Indexes
if args.ResourceS {
if args.ResourceS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes,
cacheIDs[utils.ResourceFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := apierSv1.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -263,10 +280,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ResourceS = indexes.Size() != 0
}
//SupplierProfile Indexes
//RouteSProfile Indexes
if args.RouteS {
if args.RouteS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes,
cacheIDs[utils.RouteFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := apierSv1.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -280,10 +299,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.RouteS = indexes.Size() != 0
}
//AttributeProfile Indexes
if args.AttributeS {
if args.AttributeS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes,
cacheIDs[utils.AttributeFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := apierSv1.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -301,10 +322,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.AttributeS = indexes.Size() != 0
}
//ChargerProfile Indexes
if args.ChargerS {
if args.ChargerS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes,
cacheIDs[utils.ChargerFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := apierSv1.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -318,10 +341,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ChargerS = indexes.Size() != 0
}
//DispatcherProfile Indexes
if args.DispatcherS {
if args.DispatcherS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
cacheIDs[utils.DispatcherFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
dsp, e := apierSv1.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -338,6 +363,7 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.DispatcherS = indexes.Size() != 0
}
tntCtx := args.Tenant
@@ -387,6 +413,20 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde
return
}
}
//generate a load
//ID for CacheFilterIndexes and store it in database
loadIDs := make(map[string]int64)
timeNow := time.Now().UnixNano()
for idx := range cacheIDs {
loadIDs[utils.ArgCacheToInstance[idx]] = timeNow
}
if err := apierSv1.DataManager.SetLoadIDs(loadIDs); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierSv1.callCacheForComputeIndexes(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]),
args.Tenant, cacheIDs, args.APIOpts); err != nil {
return err
}
*reply = utils.OK
return nil
}
@@ -398,8 +438,10 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
if tnt == utils.EmptyString {
tnt = apierSv1.Config.GeneralCfg().DefaultTenant
}
indexes := make(utils.StringSet)
cacheIDs := make(map[string][]string)
//ThresholdProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes,
&args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
th, e := apierSv1.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -413,13 +455,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ThresholdFilterIndexIDs] = indexes.AsSlice()
}
//StatQueueProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes,
&args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
sq, e := apierSv1.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.StatFilterIndexIDs] = []string{sq.ID}
fltrIDs := make([]string, len(sq.FilterIDs))
for i, fltrID := range sq.FilterIDs {
fltrIDs[i] = fltrID
@@ -428,13 +474,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.StatFilterIndexIDs] = indexes.AsSlice()
}
//ResourceProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes,
&args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := apierSv1.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.ResourceFilterIndexIDs] = []string{rp.ID}
fltrIDs := make([]string, len(rp.FilterIDs))
for i, fltrID := range rp.FilterIDs {
fltrIDs[i] = fltrID
@@ -443,13 +493,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ResourceFilterIndexIDs] = indexes.AsSlice()
}
//RouteProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes,
&args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := apierSv1.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.RouteFilterIndexIDs] = []string{rp.ID}
fltrIDs := make([]string, len(rp.FilterIDs))
for i, fltrID := range rp.FilterIDs {
fltrIDs[i] = fltrID
@@ -458,8 +512,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.RouteFilterIndexIDs] = indexes.AsSlice()
}
//AttributeProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes,
&args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := apierSv1.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -476,8 +533,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.AttributeFilterIndexIDs] = indexes.AsSlice()
}
//ChargerProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes,
&args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := apierSv1.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -491,8 +551,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ChargerFilterIndexIDs] = indexes.AsSlice()
}
//DispatcherProfile Indexes
if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
&args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
dsp, e := apierSv1.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -509,6 +572,22 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.DispatcherFilterIndexIDs] = indexes.AsSlice()
}
loadIDs := make(map[string]int64)
timeNow := time.Now().UnixNano()
for idx := range cacheIDs {
loadIDs[utils.ArgCacheToInstance[idx]] = timeNow
}
if err := apierSv1.DataManager.SetLoadIDs(loadIDs); err != nil {
return utils.APIErrorHandler(err)
}
if err := apierSv1.callCacheForComputeIndexes(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]),
args.Tenant, cacheIDs, args.APIOpts); err != nil {
return err
}
*reply = utils.OK
return nil
}

View File

@@ -136,6 +136,71 @@ func (apierSv1 *APIerSv1) composeArgsReload(tnt, cacheID, itemID string, filterI
return
}
// callCacheForIndexes will only call CacheClear because don't have access at ItemID
func (apierSv1 *APIerSv1) callCacheForRemoveIndexes(cacheopt string, tnt, cacheID string,
itemIDs []string, opts map[string]interface{}) (err error) {
var reply, method string
var args interface{} = utils.AttrReloadCacheWithAPIOpts{
Tenant: tnt,
ArgsCache: map[string][]string{
utils.CacheInstanceToArg[cacheID]: itemIDs,
},
APIOpts: opts,
}
switch utils.FirstNonEmpty(cacheopt, apierSv1.Config.GeneralCfg().DefaultCaching) {
case utils.MetaNone:
return
case utils.MetaReload:
method = utils.CacheSv1ReloadCache
case utils.MetaLoad:
method = utils.CacheSv1LoadCache
case utils.MetaRemove:
method = utils.CacheSv1RemoveItems
case utils.MetaClear:
method = utils.CacheSv1Clear
args = utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: []string{cacheID},
APIOpts: opts,
}
}
return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
method, args, &reply)
}
func (apierSv1 *APIerSv1) callCacheForComputeIndexes(cacheopt, tnt string,
cacheItems map[string][]string, opts map[string]interface{}) (err error) {
var reply, method string
var args interface{} = utils.AttrReloadCacheWithAPIOpts{
Tenant: tnt,
ArgsCache: cacheItems,
APIOpts: opts,
}
switch utils.FirstNonEmpty(cacheopt, apierSv1.Config.GeneralCfg().DefaultCaching) {
case utils.MetaNone:
return
case utils.MetaReload:
method = utils.CacheSv1ReloadCache
case utils.MetaLoad:
method = utils.CacheSv1LoadCache
case utils.MetaRemove:
method = utils.CacheSv1RemoveItems
case utils.MetaClear:
method = utils.CacheSv1Clear
cacheIDs := make([]string, 0, len(cacheItems))
for idx := range cacheItems {
cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[idx])
}
args = utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: cacheIDs,
APIOpts: opts,
}
}
return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil,
method, args, &reply)
}
// callCacheRevDestinations used for reverse destination, loadIDs and indexes replication
func (apierSv1 *APIerSv1) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) {
if len(itemIDs) == 0 {

View File

@@ -383,7 +383,8 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) {
// ComputeIndexes gets the indexes from the DB and ensure that the items are indexed
// getFilters returns a list of filters IDs for the given profile id
func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (processed bool, err error) {
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) {
indexes = make(utils.StringSet)
var profilesIDs []string
if IDs == nil { // get all items
var ids []string
@@ -419,13 +420,13 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
return
}
// ensure that the item is in the index set
for _, idx := range index {
for key, idx := range index {
idx.Add(id)
indexes.Add(utils.ConcatenatedKey(tntCtx, key))
}
if err = dm.SetIndexes(idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil {
return
}
processed = true
}
return
}

View File

@@ -935,20 +935,25 @@ func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage {
}
type ArgsComputeFilterIndexIDs struct {
Tenant string
Context string
AttributeIDs []string
ResourceIDs []string
StatIDs []string
RouteIDs []string
ThresholdIDs []string
ChargerIDs []string
DispatcherIDs []string
Tenant string
Context string
APIOpts map[string]interface{}
AttributeIDs []string
ResourceIDs []string
StatIDs []string
RouteIDs []string
ThresholdIDs []string
ChargerIDs []string
DispatcherIDs []string
RateProfileIDs []string
AccountIDs []string
ActionProfileIDs []string
}
type ArgsComputeFilterIndexes struct {
Tenant string
Context string
APIOpts map[string]interface{}
AttributeS bool
ResourceS bool
StatS bool

View File

@@ -901,8 +901,6 @@ func TestOrderedNavigableMapString(t *testing.T) {
},
}
onmExpect := `{"Map":{"test1":{"Value":{"Data":"data!"}}}}`
// fmt.Println(onm.nm)
// fmt.Println(onm.String())
if onm.String() != onmExpect {
t.Errorf("Expected %s but received %s", onmExpect, onm.String())
}