Updated indexes computing in favor of cache

This commit is contained in:
porosnicuadrian
2021-05-10 16:38:57 +03:00
committed by Dan Christian Bogos
parent 36f9be70c3
commit b9e328bf32
7 changed files with 153 additions and 37 deletions

View File

@@ -82,8 +82,8 @@ func (adms *AdminSv1) RemoveFilterIndexes(ctx *context.Context, arg *AttrRemFilt
map[string]int64{arg.ItemType: time.Now().UnixNano()}); err != nil {
return utils.APIErrorHandler(err)
}
if err := adms.callCacheForIndexes(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant,
arg.ItemType, arg.APIOpts); err != nil {
if err := adms.callCacheForRemoveIndexes(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant,
arg.ItemType, []string{utils.MetaAny}, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
@@ -221,10 +221,13 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
if tnt == utils.EmptyString {
tnt = adms.cfg.GeneralCfg().DefaultTenant
}
cacheIDs := make(map[string][]string)
var indexes utils.StringSet
//ThresholdProfile Indexes
if args.ThresholdS {
if args.ThresholdS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes,
cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -238,10 +241,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ThresholdS = indexes.Size() != 0
}
//StatQueueProfile Indexes
if args.StatS {
if args.StatS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes,
cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -255,10 +260,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.StatS = indexes.Size() != 0
}
//ResourceProfile Indexes
if args.ResourceS {
if args.ResourceS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes,
cacheIDs[utils.ResourceFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := adms.dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -272,10 +279,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ResourceS = indexes.Size() != 0
}
//SupplierProfile Indexes
//RouteSProfile Indexes
if args.RouteS {
if args.RouteS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes,
cacheIDs[utils.RouteFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := adms.dm.GetRouteProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -289,10 +298,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.RouteS = indexes.Size() != 0
}
//AttributeProfile Indexes
if args.AttributeS {
if args.AttributeS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes,
cacheIDs[utils.AttributeFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -310,10 +321,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.AttributeS = indexes.Size() != 0
}
//ChargerProfile Indexes
if args.ChargerS {
if args.ChargerS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes,
cacheIDs[utils.ChargerFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := adms.dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -327,10 +340,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.ChargerS = indexes.Size() != 0
}
//DispatcherProfile Indexes
if args.DispatcherS {
if args.DispatcherS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
cacheIDs[utils.DispatcherFilterIndexIDs] = []string{utils.MetaAny}
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
dsp, e := adms.dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -347,6 +362,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
args.DispatcherS = indexes.Size() != 0
}
tntCtx := args.Tenant
@@ -396,6 +412,20 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
return
}
}
//generate a load
//ID for CacheFilterIndexes and store it in database
loadIDs := make(map[string]int64)
timeNow := time.Now().UnixNano()
for idx := range cacheIDs {
loadIDs[utils.ArgCacheToInstance[idx]] = timeNow
}
if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil {
return utils.APIErrorHandler(err)
}
if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]),
args.Tenant, cacheIDs, args.APIOpts); err != nil {
return err
}
*reply = utils.OK
return nil
}
@@ -407,8 +437,10 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
if tnt == utils.EmptyString {
tnt = adms.cfg.GeneralCfg().DefaultTenant
}
indexes := make(utils.StringSet)
cacheIDs := make(map[string][]string)
//ThresholdProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes,
&args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -422,13 +454,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ThresholdFilterIndexIDs] = indexes.AsSlice()
}
//StatQueueProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes,
&args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.StatFilterIndexIDs] = []string{sq.ID}
fltrIDs := make([]string, len(sq.FilterIDs))
for i, fltrID := range sq.FilterIDs {
fltrIDs[i] = fltrID
@@ -437,13 +473,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.StatFilterIndexIDs] = indexes.AsSlice()
}
//ResourceProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes,
&args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := adms.dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.ResourceFilterIndexIDs] = []string{rp.ID}
fltrIDs := make([]string, len(rp.FilterIDs))
for i, fltrID := range rp.FilterIDs {
fltrIDs[i] = fltrID
@@ -452,13 +492,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ResourceFilterIndexIDs] = indexes.AsSlice()
}
//RouteProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes,
&args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
rp, e := adms.dm.GetRouteProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
return nil, e
}
cacheIDs[utils.RouteFilterIndexIDs] = []string{rp.ID}
fltrIDs := make([]string, len(rp.FilterIDs))
for i, fltrID := range rp.FilterIDs {
fltrIDs[i] = fltrID
@@ -467,8 +511,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.RouteFilterIndexIDs] = indexes.AsSlice()
}
//AttributeProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes,
&args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -485,8 +532,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.AttributeFilterIndexIDs] = indexes.AsSlice()
}
//ChargerProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes,
&args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
ap, e := adms.dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -500,8 +550,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.ChargerFilterIndexIDs] = indexes.AsSlice()
}
//DispatcherProfile Indexes
if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes,
&args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
dsp, e := adms.dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional)
if e != nil {
@@ -518,6 +571,22 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
}, nil); err != nil && err != utils.ErrNotFound {
return utils.APIErrorHandler(err)
}
if indexes.Size() != 0 {
cacheIDs[utils.DispatcherFilterIndexIDs] = indexes.AsSlice()
}
loadIDs := make(map[string]int64)
timeNow := time.Now().UnixNano()
for idx := range cacheIDs {
loadIDs[utils.ArgCacheToInstance[idx]] = timeNow
}
if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil {
return utils.APIErrorHandler(err)
}
if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]),
args.Tenant, cacheIDs, args.APIOpts); err != nil {
return err
}
*reply = utils.OK
return nil
}

View File

@@ -138,18 +138,68 @@ func (admS *AdminSv1) composeArgsReload(ctx *context.Context, tnt, cacheID, item
}
// callCacheForIndexes will only call CacheClear because don't have access at ItemID
func (admS *AdminSv1) callCacheForIndexes(ctx *context.Context, cacheopt string, tnt, cacheID string,
opts map[string]interface{}) (err error) {
if utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) == utils.MetaClear {
var reply string
return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns,
utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: []string{cacheID},
APIOpts: opts,
}, &reply)
func (admS *AdminSv1) callCacheForRemoveIndexes(ctx *context.Context, cacheopt string, tnt, cacheID string,
itemIDs []string, opts map[string]interface{}) (err error) {
var reply, method string
var args interface{} = &utils.AttrReloadCacheWithAPIOpts{
Tenant: tnt,
ArgsCache: map[string][]string{
utils.CacheInstanceToArg[cacheID]: itemIDs,
},
APIOpts: opts,
}
return
switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) {
case utils.MetaNone:
return
case utils.MetaReload:
method = utils.CacheSv1ReloadCache
case utils.MetaLoad:
method = utils.CacheSv1LoadCache
case utils.MetaRemove:
method = utils.CacheSv1RemoveItems
case utils.MetaClear:
method = utils.CacheSv1Clear
args = &utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: []string{cacheID},
APIOpts: opts,
}
}
return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns,
method, args, &reply)
}
func (admS *AdminSv1) callCacheForComputeIndexes(ctx *context.Context, cacheopt, tnt string,
cacheItems map[string][]string, opts map[string]interface{}) (err error) {
var reply, method string
var args interface{} = &utils.AttrReloadCacheWithAPIOpts{
Tenant: tnt,
ArgsCache: cacheItems,
APIOpts: opts,
}
switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) {
case utils.MetaNone:
return
case utils.MetaReload:
method = utils.CacheSv1ReloadCache
case utils.MetaLoad:
method = utils.CacheSv1LoadCache
case utils.MetaRemove:
method = utils.CacheSv1RemoveItems
case utils.MetaClear:
method = utils.CacheSv1Clear
cacheIDs := make([]string, 0, len(cacheItems))
for idx := range cacheItems {
cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[idx])
}
args = &utils.AttrCacheIDsWithAPIOpts{
Tenant: tnt,
CacheIDs: cacheIDs,
APIOpts: opts,
}
}
return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns,
method, args, &reply)
}
/*

View File

@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import (
"fmt"
"reflect"
"testing"
@@ -194,8 +193,6 @@ func TestDiameterAgentCfgAsMapInterface(t *testing.T) {
}
rcv := cgrCfg.diameterAgentCfg.AsMapInterface(utils.InfieldSep)
if !reflect.DeepEqual(rcv, eMap) {
fmt.Printf("%T \n", rcv[utils.RequestProcessorsCfg].([]map[string]interface{})[0][utils.FlagsCfg])
fmt.Printf("%T \n", eMap[utils.RequestProcessorsCfg].([]map[string]interface{})[0][utils.FlagsCfg])
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv))
}
}

View File

@@ -766,7 +766,6 @@ func (dm *DataManager) SetFilter(ctx *context.Context, fltr *Filter, withIndex b
if err = dm.DataDB().SetFilterDrv(ctx, fltr); err != nil {
return
}
fmt.Println("set fltr da", withIndex, utils.ToJSON(oldFlt), utils.ToJSON(fltr))
if withIndex {
if err = UpdateFilterIndex(ctx, dm, oldFlt, fltr); err != nil {
return
@@ -1744,7 +1743,6 @@ func (dm *DataManager) SetAttributeProfile(ctx *context.Context, ap *AttributePr
oldContexes = &oldAP.Contexts
oldFiltersIDs = &oldAP.FilterIDs
}
fmt.Println(oldContexes, oldFiltersIDs)
if err = updatedIndexesWithContexts(ctx, dm, utils.CacheAttributeFilterIndexes, ap.Tenant, ap.ID,
oldContexes, oldFiltersIDs, ap.Contexts, ap.FilterIDs); err != nil {
return

View File

@@ -387,7 +387,8 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) {
// ComputeIndexes gets the indexes from the DB and ensure that the items are indexed
// getFilters returns a list of filters IDs for the given profile id
func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string,
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (processed bool, err error) {
transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) {
indexes = make(utils.StringSet)
var profilesIDs []string
if IDs == nil { // get all items
var ids []string
@@ -423,13 +424,13 @@ func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmTyp
return
}
// ensure that the item is in the index set
for _, idx := range index {
for key, idx := range index {
idx.Add(id)
indexes.Add(utils.ConcatenatedKey(tntCtx, key))
}
if err = dm.SetIndexes(cntxt, idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil {
return
}
processed = true
}
return
}

View File

@@ -72,7 +72,6 @@ func (m *Migrator) moveStatQueueProfile() (err error) {
return fmt.Errorf("Invalid key <%s> when migrating stat queue profiles", id)
}
sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional)
fmt.Println("sgs", utils.ToJSON(sgs))
if err != nil {
return err
}

View File

@@ -544,6 +544,7 @@ func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage {
type ArgsComputeFilterIndexIDs struct {
Tenant string
Context string
APIOpts map[string]interface{}
AttributeIDs []string
ResourceIDs []string
StatIDs []string
@@ -559,6 +560,7 @@ type ArgsComputeFilterIndexIDs struct {
type ArgsComputeFilterIndexes struct {
Tenant string
Context string
APIOpts map[string]interface{}
AttributeS bool
ResourceS bool
StatS bool