diff --git a/apis/filter_indexes.go b/apis/filter_indexes.go index fab5f135d..1f15cc875 100644 --- a/apis/filter_indexes.go +++ b/apis/filter_indexes.go @@ -82,8 +82,8 @@ func (adms *AdminSv1) RemoveFilterIndexes(ctx *context.Context, arg *AttrRemFilt map[string]int64{arg.ItemType: time.Now().UnixNano()}); err != nil { return utils.APIErrorHandler(err) } - if err := adms.callCacheForIndexes(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, - arg.ItemType, arg.APIOpts); err != nil { + if err := adms.callCacheForRemoveIndexes(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, + arg.ItemType, []string{utils.MetaAny}, arg.APIOpts); err != nil { return utils.APIErrorHandler(err) } *reply = utils.OK @@ -221,10 +221,13 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A if tnt == utils.EmptyString { tnt = adms.cfg.GeneralCfg().DefaultTenant } + cacheIDs := make(map[string][]string) + var indexes utils.StringSet //ThresholdProfile Indexes if args.ThresholdS { - if args.ThresholdS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes, + cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -238,10 +241,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ThresholdS = indexes.Size() != 0 } //StatQueueProfile Indexes if args.StatS { - if args.StatS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes, + cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -255,10 +260,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.StatS = indexes.Size() != 0 } //ResourceProfile Indexes if args.ResourceS { - if args.ResourceS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes, + cacheIDs[utils.ResourceFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := adms.dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -272,10 +279,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ResourceS = indexes.Size() != 0 } - //SupplierProfile Indexes + //RouteSProfile Indexes if args.RouteS { - if args.RouteS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes, + cacheIDs[utils.RouteFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := adms.dm.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -289,10 +298,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.RouteS = indexes.Size() != 0 } //AttributeProfile Indexes if args.AttributeS { - if args.AttributeS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes, + cacheIDs[utils.AttributeFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -310,10 +321,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.AttributeS = indexes.Size() != 0 } //ChargerProfile Indexes if args.ChargerS { - if args.ChargerS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes, + cacheIDs[utils.ChargerFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := adms.dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -327,10 +340,12 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ChargerS = indexes.Size() != 0 } //DispatcherProfile Indexes if args.DispatcherS { - if args.DispatcherS, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes, + cacheIDs[utils.DispatcherFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := adms.dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -347,6 +362,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.DispatcherS = indexes.Size() != 0 } tntCtx := args.Tenant @@ -396,6 +412,20 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A return } } + //generate a load + //ID for CacheFilterIndexes and store it in database + loadIDs := make(map[string]int64) + timeNow := time.Now().UnixNano() + for idx := range cacheIDs { + loadIDs[utils.ArgCacheToInstance[idx]] = timeNow + } + if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil { + return utils.APIErrorHandler(err) + } + if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + args.Tenant, cacheIDs, args.APIOpts); err != nil { + return err + } *reply = utils.OK return nil } @@ -407,8 +437,10 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. if tnt == utils.EmptyString { tnt = adms.cfg.GeneralCfg().DefaultTenant } + indexes := make(utils.StringSet) + cacheIDs := make(map[string][]string) //ThresholdProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheThresholdFilterIndexes, &args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -422,13 +454,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ThresholdFilterIndexIDs] = indexes.AsSlice() + } //StatQueueProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheStatFilterIndexes, &args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.StatFilterIndexIDs] = []string{sq.ID} fltrIDs := make([]string, len(sq.FilterIDs)) for i, fltrID := range sq.FilterIDs { fltrIDs[i] = fltrID @@ -437,13 +473,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.StatFilterIndexIDs] = indexes.AsSlice() + } //ResourceProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheResourceFilterIndexes, &args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := adms.dm.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.ResourceFilterIndexIDs] = []string{rp.ID} fltrIDs := make([]string, len(rp.FilterIDs)) for i, fltrID := range rp.FilterIDs { fltrIDs[i] = fltrID @@ -452,13 +492,17 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ResourceFilterIndexIDs] = indexes.AsSlice() + } //RouteProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheRouteFilterIndexes, &args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := adms.dm.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.RouteFilterIndexIDs] = []string{rp.ID} fltrIDs := make([]string, len(rp.FilterIDs)) for i, fltrID := range rp.FilterIDs { fltrIDs[i] = fltrID @@ -467,8 +511,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.RouteFilterIndexIDs] = indexes.AsSlice() + } //AttributeProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheAttributeFilterIndexes, &args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := adms.dm.GetAttributeProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -485,8 +532,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.AttributeFilterIndexIDs] = indexes.AsSlice() + } //ChargerProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheChargerFilterIndexes, &args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := adms.dm.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -500,8 +550,11 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ChargerFilterIndexIDs] = indexes.AsSlice() + } //DispatcherProfile Indexes - if _, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes, + if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Context, utils.CacheDispatcherFilterIndexes, &args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := adms.dm.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -518,6 +571,22 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.DispatcherFilterIndexIDs] = indexes.AsSlice() + } + + loadIDs := make(map[string]int64) + timeNow := time.Now().UnixNano() + for idx := range cacheIDs { + loadIDs[utils.ArgCacheToInstance[idx]] = timeNow + } + if err := adms.dm.SetLoadIDs(cntxt, loadIDs); err != nil { + return utils.APIErrorHandler(err) + } + if err := adms.callCacheForComputeIndexes(cntxt, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + args.Tenant, cacheIDs, args.APIOpts); err != nil { + return err + } *reply = utils.OK return nil } diff --git a/apis/libadmin.go b/apis/libadmin.go index ee7d74101..727afae55 100644 --- a/apis/libadmin.go +++ b/apis/libadmin.go @@ -138,18 +138,68 @@ func (admS *AdminSv1) composeArgsReload(ctx *context.Context, tnt, cacheID, item } // callCacheForIndexes will only call CacheClear because don't have access at ItemID -func (admS *AdminSv1) callCacheForIndexes(ctx *context.Context, cacheopt string, tnt, cacheID string, - opts map[string]interface{}) (err error) { - if utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) == utils.MetaClear { - var reply string - return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns, - utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ - Tenant: tnt, - CacheIDs: []string{cacheID}, - APIOpts: opts, - }, &reply) +func (admS *AdminSv1) callCacheForRemoveIndexes(ctx *context.Context, cacheopt string, tnt, cacheID string, + itemIDs []string, opts map[string]interface{}) (err error) { + var reply, method string + var args interface{} = &utils.AttrReloadCacheWithAPIOpts{ + Tenant: tnt, + ArgsCache: map[string][]string{ + utils.CacheInstanceToArg[cacheID]: itemIDs, + }, + APIOpts: opts, } - return + switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) { + case utils.MetaNone: + return + case utils.MetaReload: + method = utils.CacheSv1ReloadCache + case utils.MetaLoad: + method = utils.CacheSv1LoadCache + case utils.MetaRemove: + method = utils.CacheSv1RemoveItems + case utils.MetaClear: + method = utils.CacheSv1Clear + args = &utils.AttrCacheIDsWithAPIOpts{ + Tenant: tnt, + CacheIDs: []string{cacheID}, + APIOpts: opts, + } + } + return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns, + method, args, &reply) +} + +func (admS *AdminSv1) callCacheForComputeIndexes(ctx *context.Context, cacheopt, tnt string, + cacheItems map[string][]string, opts map[string]interface{}) (err error) { + var reply, method string + var args interface{} = &utils.AttrReloadCacheWithAPIOpts{ + Tenant: tnt, + ArgsCache: cacheItems, + APIOpts: opts, + } + switch utils.FirstNonEmpty(cacheopt, admS.cfg.GeneralCfg().DefaultCaching) { + case utils.MetaNone: + return + case utils.MetaReload: + method = utils.CacheSv1ReloadCache + case utils.MetaLoad: + method = utils.CacheSv1LoadCache + case utils.MetaRemove: + method = utils.CacheSv1RemoveItems + case utils.MetaClear: + method = utils.CacheSv1Clear + cacheIDs := make([]string, 0, len(cacheItems)) + for idx := range cacheItems { + cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[idx]) + } + args = &utils.AttrCacheIDsWithAPIOpts{ + Tenant: tnt, + CacheIDs: cacheIDs, + APIOpts: opts, + } + } + return admS.connMgr.Call(ctx, admS.cfg.AdminSCfg().CachesConns, + method, args, &reply) } /* diff --git a/config/diametercfg_test.go b/config/diametercfg_test.go index 5445d256d..53606a716 100644 --- a/config/diametercfg_test.go +++ b/config/diametercfg_test.go @@ -18,7 +18,6 @@ along with this program. If not, see package config import ( - "fmt" "reflect" "testing" @@ -194,8 +193,6 @@ func TestDiameterAgentCfgAsMapInterface(t *testing.T) { } rcv := cgrCfg.diameterAgentCfg.AsMapInterface(utils.InfieldSep) if !reflect.DeepEqual(rcv, eMap) { - fmt.Printf("%T \n", rcv[utils.RequestProcessorsCfg].([]map[string]interface{})[0][utils.FlagsCfg]) - fmt.Printf("%T \n", eMap[utils.RequestProcessorsCfg].([]map[string]interface{})[0][utils.FlagsCfg]) t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap), utils.ToJSON(rcv)) } } diff --git a/engine/datamanager.go b/engine/datamanager.go index 4f95fe5ab..6a3978275 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -766,7 +766,6 @@ func (dm *DataManager) SetFilter(ctx *context.Context, fltr *Filter, withIndex b if err = dm.DataDB().SetFilterDrv(ctx, fltr); err != nil { return } - fmt.Println("set fltr da", withIndex, utils.ToJSON(oldFlt), utils.ToJSON(fltr)) if withIndex { if err = UpdateFilterIndex(ctx, dm, oldFlt, fltr); err != nil { return @@ -1744,7 +1743,6 @@ func (dm *DataManager) SetAttributeProfile(ctx *context.Context, ap *AttributePr oldContexes = &oldAP.Contexts oldFiltersIDs = &oldAP.FilterIDs } - fmt.Println(oldContexes, oldFiltersIDs) if err = updatedIndexesWithContexts(ctx, dm, utils.CacheAttributeFilterIndexes, ap.Tenant, ap.ID, oldContexes, oldFiltersIDs, ap.Contexts, ap.FilterIDs); err != nil { return diff --git a/engine/libindex.go b/engine/libindex.go index b7969efec..35c411087 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -387,7 +387,8 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) { // ComputeIndexes gets the indexes from the DB and ensure that the items are indexed // getFilters returns a list of filters IDs for the given profile id func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, - transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (processed bool, err error) { + transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) { + indexes = make(utils.StringSet) var profilesIDs []string if IDs == nil { // get all items var ids []string @@ -423,13 +424,13 @@ func ComputeIndexes(cntxt *context.Context, dm *DataManager, tnt, ctx, idxItmTyp return } // ensure that the item is in the index set - for _, idx := range index { + for key, idx := range index { idx.Add(id) + indexes.Add(utils.ConcatenatedKey(tntCtx, key)) } if err = dm.SetIndexes(cntxt, idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil { return } - processed = true } return } diff --git a/migrator/stats.go b/migrator/stats.go index 9d49c1fd9..6b507de2a 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -72,7 +72,6 @@ func (m *Migrator) moveStatQueueProfile() (err error) { return fmt.Errorf("Invalid key <%s> when migrating stat queue profiles", id) } sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tntID[0], tntID[1], false, false, utils.NonTransactional) - fmt.Println("sgs", utils.ToJSON(sgs)) if err != nil { return err } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 1799819fc..7f950d938 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -544,6 +544,7 @@ func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage { type ArgsComputeFilterIndexIDs struct { Tenant string Context string + APIOpts map[string]interface{} AttributeIDs []string ResourceIDs []string StatIDs []string @@ -559,6 +560,7 @@ type ArgsComputeFilterIndexIDs struct { type ArgsComputeFilterIndexes struct { Tenant string Context string + APIOpts map[string]interface{} AttributeS bool ResourceS bool StatS bool