diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index 07ed58715..1dad2d7a9 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -20,6 +20,7 @@ package v1 import ( "strings" + "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -39,6 +40,7 @@ type AttrRemFilterIndexes struct { Tenant string Context string ItemType string + APIOpts map[string]interface{} } func (apierSv1 *APIerSv1) RemoveFilterIndexes(arg *AttrRemFilterIndexes, reply *string) (err error) { @@ -77,6 +79,14 @@ func (apierSv1 *APIerSv1) RemoveFilterIndexes(arg *AttrRemFilterIndexes, reply * if err = apierSv1.DataManager.RemoveIndexes(arg.ItemType, tntCtx, utils.EmptyString); err != nil { return } + //generate a loadID for CacheFilterIndexes and store it in database + if err := apierSv1.DataManager.SetLoadIDs(map[string]int64{arg.ItemType: time.Now().UnixNano()}); err != nil { + return utils.APIErrorHandler(err) + } + if err := apierSv1.callCacheForRemoveIndexes(utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, + arg.ItemType, []string{utils.MetaAny}, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return } @@ -212,10 +222,13 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde if tnt == utils.EmptyString { tnt = apierSv1.Config.GeneralCfg().DefaultTenant } + cacheIDs := make(map[string][]string) + var indexes utils.StringSet //ThresholdProfile Indexes if args.ThresholdS { - if args.ThresholdS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes, + cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := apierSv1.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -229,10 +242,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ThresholdS = indexes.Size() != 0 } //StatQueueProfile Indexes if args.StatS { - if args.StatS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes, + cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := apierSv1.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -246,10 +261,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.StatS = indexes.Size() != 0 } //ResourceProfile Indexes if args.ResourceS { - if args.ResourceS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes, + cacheIDs[utils.ResourceFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := apierSv1.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -263,10 +280,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ResourceS = indexes.Size() != 0 } - //SupplierProfile Indexes + //RouteSProfile Indexes if args.RouteS { - if args.RouteS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes, + cacheIDs[utils.RouteFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := apierSv1.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -280,10 +299,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.RouteS = indexes.Size() != 0 } //AttributeProfile Indexes if args.AttributeS { - if args.AttributeS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes, + cacheIDs[utils.AttributeFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := apierSv1.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -301,10 +322,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.AttributeS = indexes.Size() != 0 } //ChargerProfile Indexes if args.ChargerS { - if args.ChargerS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes, + cacheIDs[utils.ChargerFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := apierSv1.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -318,10 +341,12 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.ChargerS = indexes.Size() != 0 } //DispatcherProfile Indexes if args.DispatcherS { - if args.DispatcherS, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes, + cacheIDs[utils.DispatcherFilterIndexIDs] = []string{utils.MetaAny} + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := apierSv1.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -338,6 +363,7 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + args.DispatcherS = indexes.Size() != 0 } tntCtx := args.Tenant @@ -387,6 +413,20 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterInde return } } + //generate a load + //ID for CacheFilterIndexes and store it in database + loadIDs := make(map[string]int64) + timeNow := time.Now().UnixNano() + for idx := range cacheIDs { + loadIDs[utils.ArgCacheToInstance[idx]] = timeNow + } + if err := apierSv1.DataManager.SetLoadIDs(loadIDs); err != nil { + return utils.APIErrorHandler(err) + } + if err := apierSv1.callCacheForComputeIndexes(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + args.Tenant, cacheIDs, args.APIOpts); err != nil { + return err + } *reply = utils.OK return nil } @@ -398,8 +438,10 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd if tnt == utils.EmptyString { tnt = apierSv1.Config.GeneralCfg().DefaultTenant } + indexes := make(utils.StringSet) + cacheIDs := make(map[string][]string) //ThresholdProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheThresholdFilterIndexes, &args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := apierSv1.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -413,13 +455,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ThresholdFilterIndexIDs] = indexes.AsSlice() + } //StatQueueProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheStatFilterIndexes, &args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := apierSv1.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.StatFilterIndexIDs] = []string{sq.ID} fltrIDs := make([]string, len(sq.FilterIDs)) for i, fltrID := range sq.FilterIDs { fltrIDs[i] = fltrID @@ -428,13 +474,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.StatFilterIndexIDs] = indexes.AsSlice() + } //ResourceProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheResourceFilterIndexes, &args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := apierSv1.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.ResourceFilterIndexIDs] = []string{rp.ID} fltrIDs := make([]string, len(rp.FilterIDs)) for i, fltrID := range rp.FilterIDs { fltrIDs[i] = fltrID @@ -443,13 +493,17 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ResourceFilterIndexIDs] = indexes.AsSlice() + } //RouteProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheRouteFilterIndexes, &args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := apierSv1.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } + cacheIDs[utils.RouteFilterIndexIDs] = []string{rp.ID} fltrIDs := make([]string, len(rp.FilterIDs)) for i, fltrID := range rp.FilterIDs { fltrIDs[i] = fltrID @@ -458,8 +512,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.RouteFilterIndexIDs] = indexes.AsSlice() + } //AttributeProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheAttributeFilterIndexes, &args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := apierSv1.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -476,8 +533,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.AttributeFilterIndexIDs] = indexes.AsSlice() + } //ChargerProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheChargerFilterIndexes, &args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := apierSv1.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -491,8 +551,11 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.ChargerFilterIndexIDs] = indexes.AsSlice() + } //DispatcherProfile Indexes - if _, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes, + if indexes, err = engine.ComputeIndexes(apierSv1.DataManager, tnt, args.Context, utils.CacheDispatcherFilterIndexes, &args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := apierSv1.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -509,6 +572,22 @@ func (apierSv1 *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterInd }, nil); err != nil && err != utils.ErrNotFound { return utils.APIErrorHandler(err) } + if indexes.Size() != 0 { + cacheIDs[utils.DispatcherFilterIndexIDs] = indexes.AsSlice() + } + + loadIDs := make(map[string]int64) + timeNow := time.Now().UnixNano() + for idx := range cacheIDs { + loadIDs[utils.ArgCacheToInstance[idx]] = timeNow + } + if err := apierSv1.DataManager.SetLoadIDs(loadIDs); err != nil { + return utils.APIErrorHandler(err) + } + if err := apierSv1.callCacheForComputeIndexes(utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), + args.Tenant, cacheIDs, args.APIOpts); err != nil { + return err + } *reply = utils.OK return nil } diff --git a/apier/v1/libapier.go b/apier/v1/libapier.go index a4811cfa9..c8e7691f5 100644 --- a/apier/v1/libapier.go +++ b/apier/v1/libapier.go @@ -136,6 +136,71 @@ func (apierSv1 *APIerSv1) composeArgsReload(tnt, cacheID, itemID string, filterI return } +// callCacheForIndexes will only call CacheClear because don't have access at ItemID +func (apierSv1 *APIerSv1) callCacheForRemoveIndexes(cacheopt string, tnt, cacheID string, + itemIDs []string, opts map[string]interface{}) (err error) { + var reply, method string + var args interface{} = utils.AttrReloadCacheWithAPIOpts{ + Tenant: tnt, + ArgsCache: map[string][]string{ + utils.CacheInstanceToArg[cacheID]: itemIDs, + }, + APIOpts: opts, + } + switch utils.FirstNonEmpty(cacheopt, apierSv1.Config.GeneralCfg().DefaultCaching) { + case utils.MetaNone: + return + case utils.MetaReload: + method = utils.CacheSv1ReloadCache + case utils.MetaLoad: + method = utils.CacheSv1LoadCache + case utils.MetaRemove: + method = utils.CacheSv1RemoveItems + case utils.MetaClear: + method = utils.CacheSv1Clear + args = utils.AttrCacheIDsWithAPIOpts{ + Tenant: tnt, + CacheIDs: []string{cacheID}, + APIOpts: opts, + } + } + return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil, + method, args, &reply) +} + +func (apierSv1 *APIerSv1) callCacheForComputeIndexes(cacheopt, tnt string, + cacheItems map[string][]string, opts map[string]interface{}) (err error) { + var reply, method string + var args interface{} = utils.AttrReloadCacheWithAPIOpts{ + Tenant: tnt, + ArgsCache: cacheItems, + APIOpts: opts, + } + switch utils.FirstNonEmpty(cacheopt, apierSv1.Config.GeneralCfg().DefaultCaching) { + case utils.MetaNone: + return + case utils.MetaReload: + method = utils.CacheSv1ReloadCache + case utils.MetaLoad: + method = utils.CacheSv1LoadCache + case utils.MetaRemove: + method = utils.CacheSv1RemoveItems + case utils.MetaClear: + method = utils.CacheSv1Clear + cacheIDs := make([]string, 0, len(cacheItems)) + for idx := range cacheItems { + cacheIDs = append(cacheIDs, utils.ArgCacheToInstance[idx]) + } + args = utils.AttrCacheIDsWithAPIOpts{ + Tenant: tnt, + CacheIDs: cacheIDs, + APIOpts: opts, + } + } + return apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().CachesConns, nil, + method, args, &reply) +} + // callCacheRevDestinations used for reverse destination, loadIDs and indexes replication func (apierSv1 *APIerSv1) callCacheMultiple(cacheopt, tnt, cacheID string, itemIDs []string, opts map[string]interface{}) (err error) { if len(itemIDs) == 0 { diff --git a/engine/libindex.go b/engine/libindex.go index 3be79595b..25fc978c7 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -383,7 +383,8 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) { // ComputeIndexes gets the indexes from the DB and ensure that the items are indexed // getFilters returns a list of filters IDs for the given profile id func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, - transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (processed bool, err error) { + transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error), newFltr *Filter) (indexes utils.StringSet, err error) { + indexes = make(utils.StringSet) var profilesIDs []string if IDs == nil { // get all items var ids []string @@ -419,13 +420,13 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, return } // ensure that the item is in the index set - for _, idx := range index { + for key, idx := range index { idx.Add(id) + indexes.Add(utils.ConcatenatedKey(tntCtx, key)) } if err = dm.SetIndexes(idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil { return } - processed = true } return } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 3ecb88efd..34e346c62 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -935,20 +935,25 @@ func (attr *ArgRSv1ResourceUsage) Clone() *ArgRSv1ResourceUsage { } type ArgsComputeFilterIndexIDs struct { - Tenant string - Context string - AttributeIDs []string - ResourceIDs []string - StatIDs []string - RouteIDs []string - ThresholdIDs []string - ChargerIDs []string - DispatcherIDs []string + Tenant string + Context string + APIOpts map[string]interface{} + AttributeIDs []string + ResourceIDs []string + StatIDs []string + RouteIDs []string + ThresholdIDs []string + ChargerIDs []string + DispatcherIDs []string + RateProfileIDs []string + AccountIDs []string + ActionProfileIDs []string } type ArgsComputeFilterIndexes struct { Tenant string Context string + APIOpts map[string]interface{} AttributeS bool ResourceS bool StatS bool diff --git a/utils/orderednavigablemap_test.go b/utils/orderednavigablemap_test.go index 523095b0b..9359b1cc7 100644 --- a/utils/orderednavigablemap_test.go +++ b/utils/orderednavigablemap_test.go @@ -901,8 +901,6 @@ func TestOrderedNavigableMapString(t *testing.T) { }, } onmExpect := `{"Map":{"test1":{"Value":{"Data":"data!"}}}}` - // fmt.Println(onm.nm) - // fmt.Println(onm.String()) if onm.String() != onmExpect { t.Errorf("Expected %s but received %s", onmExpect, onm.String()) }