From 8d94a2d8dca394950801c193f54af1130cd4ff50 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 11 Jun 2020 16:57:02 +0300 Subject: [PATCH] Replaced FilterIndexes functions --- apier/v1/filter_indexes.go | 28 +- apier/v1/filter_indexes_it_test.go | 14 +- apier/v1/precache_it_test.go | 10 +- apier/v2/cdrs_it_test.go | 7 + config/config_it_test.go | 1 + .../remote_replication/internal/cgrates.json | 1 + .../internal_gob/cgrates.json | 1 + dispatchers/replicator.go | 66 --- engine/datamanager.go | 126 ------ engine/filterhelpers.go | 2 +- engine/filterindexer_it_test.go | 409 +++++++++--------- engine/libcdre_test.go | 2 - engine/libindex.go | 3 +- engine/storage_interface.go | 7 - engine/storage_internal_datadb.go | 105 +---- engine/storage_mongo_datadb.go | 216 +-------- engine/storage_redis.go | 93 ---- ers/partial_csv_it_test.go | 2 - migrator/alias_it_test.go | 15 +- migrator/derived_chargers_it_test.go | 23 +- migrator/filters_it_test.go | 22 +- migrator/user_it_test.go | 12 +- utils/consts.go | 3 - utils/coreutils.go | 39 -- 24 files changed, 310 insertions(+), 897 deletions(-) diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go index ea61db5bc..9b15813d8 100644 --- a/apier/v1/filter_indexes.go +++ b/apier/v1/filter_indexes.go @@ -202,7 +202,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, transactionID := utils.GenUUID() //ThresholdProfile Indexes if args.ThresholdS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheThresholdFilterIndexes, + if args.ThresholdS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheThresholdFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := api.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -219,7 +219,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //StatQueueProfile Indexes if args.StatS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheStatFilterIndexes, + if args.StatS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheStatFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := api.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -236,7 +236,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //ResourceProfile Indexes if args.ResourceS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheResourceFilterIndexes, + if args.ResourceS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheResourceFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := api.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -253,7 +253,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //SupplierProfile Indexes if args.RouteS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheRouteFilterIndexes, + if args.RouteS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheRouteFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := api.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -270,7 +270,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //AttributeProfile Indexes if args.AttributeS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheAttributeFilterIndexes, + if args.AttributeS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheAttributeFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := api.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -291,7 +291,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //ChargerProfile Indexes if args.ChargerS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheChargerFilterIndexes, + if args.ChargerS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheChargerFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := api.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -308,7 +308,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, } //DispatcherProfile Indexes if args.DispatcherS { - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheDispatcherFilterIndexes, + if args.DispatcherS, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheDispatcherFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := api.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -382,7 +382,7 @@ func (api *APIerSv1) ComputeFilterIndexes(args *utils.ArgsComputeFilterIndexes, func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs, reply *string) (err error) { transactionID := utils.NonTransactional //ThresholdProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheThresholdFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheThresholdFilterIndexes, &args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { th, e := api.DataManager.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -397,7 +397,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //StatQueueProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheStatFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheStatFilterIndexes, &args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { sq, e := api.DataManager.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -412,7 +412,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //ResourceProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheResourceFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheResourceFilterIndexes, &args.ResourceIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := api.DataManager.GetResourceProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -427,7 +427,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //RouteProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheRouteFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheRouteFilterIndexes, &args.RouteIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { rp, e := api.DataManager.GetRouteProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -442,7 +442,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //AttributeProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheAttributeFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheAttributeFilterIndexes, &args.AttributeIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := api.DataManager.GetAttributeProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -460,7 +460,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //ChargerProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheChargerFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheChargerFilterIndexes, &args.ChargerIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { ap, e := api.DataManager.GetChargerProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { @@ -475,7 +475,7 @@ func (api *APIerSv1) ComputeFilterIndexIDs(args *utils.ArgsComputeFilterIndexIDs return utils.APIErrorHandler(err) } //DispatcherProfile Indexes - if err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheDispatcherFilterIndexes, + if _, err = engine.ComputeIndexes(api.DataManager, args.Tenant, args.Context, utils.CacheDispatcherFilterIndexes, &args.DispatcherIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { dsp, e := api.DataManager.GetDispatcherProfile(tnt, id, true, false, utils.NonTransactional) if e != nil { diff --git a/apier/v1/filter_indexes_it_test.go b/apier/v1/filter_indexes_it_test.go index 889f979df..614013cbc 100644 --- a/apier/v1/filter_indexes_it_test.go +++ b/apier/v1/filter_indexes_it_test.go @@ -334,7 +334,7 @@ func testV1FIdxSecondComputeThresholdsIndexes(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaThresholds, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err != nil { t.Error(err) } if !reflect.DeepEqual(expectedIDX, indexes) { @@ -358,7 +358,7 @@ func testV1FIdxThirdComputeThresholdsIndexes(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaThresholds, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err != nil { t.Error(err) } sort.Strings(indexes) @@ -405,7 +405,7 @@ func testV1FIdxRemoveThresholdProfile(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaThresholds, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } @@ -654,7 +654,7 @@ func testV1FIdxRemoveStatQueueProfile(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaStats, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } @@ -724,7 +724,7 @@ func testV1FIdxSetResourceProfileIndexes(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaResources, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } @@ -817,7 +817,7 @@ func testV1FIdxSetSecondResourceProfileIndexes(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaResources, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } @@ -882,7 +882,7 @@ func testV1FIdxRemoveResourceProfile(t *testing.T) { var indexes []string if err := tFIdxRpc.Call(utils.APIerSv1GetFilterIndexes, &AttrGetFilterIndexes{ ItemType: utils.MetaResources, Tenant: tenant, FilterType: utils.MetaString}, - &indexes); err != nil && err.Error() != utils.ErrNotFound.Error() { + &indexes); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } diff --git a/apier/v1/precache_it_test.go b/apier/v1/precache_it_test.go index a68172292..08026c55f 100644 --- a/apier/v1/precache_it_test.go +++ b/apier/v1/precache_it_test.go @@ -173,7 +173,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { }, utils.CacheAttributeFilterIndexes: { Items: 2, - Groups: 0, + Groups: 2, }, utils.CacheAttributeProfiles: { Items: 1, @@ -215,7 +215,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { }, utils.CacheResourceFilterIndexes: { Items: 6, - Groups: 0, + Groups: 1, }, utils.CacheResourceProfiles: { Items: 3, @@ -236,7 +236,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { }, utils.CacheStatFilterIndexes: { Items: 2, - Groups: 0, + Groups: 1, }, utils.CacheStatQueueProfiles: { Items: 1, @@ -250,7 +250,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { utils.CacheEventCharges: {}, utils.CacheRouteFilterIndexes: { Items: 6, - Groups: 0, + Groups: 1, }, utils.CacheRouteProfiles: { Items: 3, @@ -258,7 +258,7 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) { }, utils.CacheThresholdFilterIndexes: { Items: 10, - Groups: 0, + Groups: 1, }, utils.CacheThresholdProfiles: { Items: 7, diff --git a/apier/v2/cdrs_it_test.go b/apier/v2/cdrs_it_test.go index 34f6bad48..648fc5dab 100644 --- a/apier/v2/cdrs_it_test.go +++ b/apier/v2/cdrs_it_test.go @@ -846,6 +846,13 @@ func testv2CDRsGetCDRsDest(t *testing.T) { func testV2CDRsRerate(t *testing.T) { var reply string + if err := cdrsRpc.Call(utils.CacheSv1Clear, &utils.AttrCacheIDsWithArgDispatcher{ + CacheIDs: nil, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Reply: ", reply) + } //add a charger chargerProfile := &v1.ChargerWithCache{ ChargerProfile: &engine.ChargerProfile{ diff --git a/config/config_it_test.go b/config/config_it_test.go index 049504591..2dce17635 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -325,6 +325,7 @@ func testCGRConfigReloadCDRs(t *testing.T) { SMCostRetries: 5, StoreCdrs: true, SchedulerConns: []string{}, + EEsConns: []string{}, } if !reflect.DeepEqual(expAttr, cfg.CdrsCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.CdrsCfg())) diff --git a/data/conf/samples/remote_replication/internal/cgrates.json b/data/conf/samples/remote_replication/internal/cgrates.json index 368e4f740..ec3b7be04 100644 --- a/data/conf/samples/remote_replication/internal/cgrates.json +++ b/data/conf/samples/remote_replication/internal/cgrates.json @@ -65,6 +65,7 @@ "*dispatcher_hosts":{"remote":true,"replicate":true}, "*filter_indexes" :{"remote":true,"replicate":true}, "*load_ids":{"remote":true,"replicate":true}, + "*indexes":{"remote":true, "replicate":true}, }, }, diff --git a/data/conf/samples/remote_replication/internal_gob/cgrates.json b/data/conf/samples/remote_replication/internal_gob/cgrates.json index 1a5a4596a..443a4b21f 100644 --- a/data/conf/samples/remote_replication/internal_gob/cgrates.json +++ b/data/conf/samples/remote_replication/internal_gob/cgrates.json @@ -57,6 +57,7 @@ "*dispatcher_hosts":{"remote":true,"replicate":true}, "*filter_indexes" :{"remote":true,"replicate":true}, "*load_ids":{"remote":true,"replicate":true}, + "*indexes":{"remote":true, "replicate":true}, }, }, diff --git a/dispatchers/replicator.go b/dispatchers/replicator.go index 1780b5ab9..c4fe56c19 100644 --- a/dispatchers/replicator.go +++ b/dispatchers/replicator.go @@ -606,50 +606,6 @@ func (dS *DispatcherService) ReplicatorSv1GetItemLoadIDs(args *utils.StringWithA utils.ReplicatorSv1GetItemLoadIDs, args, rpl) } -func (dS *DispatcherService) ReplicatorSv1GetFilterIndexes(args *utils.GetFilterIndexesArgWithArgDispatcher, rpl *map[string]utils.StringMap) (err error) { - if args == nil { - args = &utils.GetFilterIndexesArgWithArgDispatcher{} - } - args.TenantArg.Tenant = utils.FirstNonEmpty(args.TenantArg.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if args.ArgDispatcher == nil { - return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) - } - if err = dS.authorize(utils.ReplicatorSv1GetFilterIndexes, args.TenantArg.Tenant, - args.APIKey, utils.TimePointer(time.Now())); err != nil { - return - } - } - var routeID *string - if args.ArgDispatcher != nil { - routeID = args.ArgDispatcher.RouteID - } - return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaReplicator, routeID, - utils.ReplicatorSv1GetFilterIndexes, args, rpl) -} - -func (dS *DispatcherService) ReplicatorSv1MatchFilterIndex(args *utils.MatchFilterIndexArgWithArgDispatcher, rpl *utils.StringMap) (err error) { - if args == nil { - args = &utils.MatchFilterIndexArgWithArgDispatcher{} - } - args.TenantArg.Tenant = utils.FirstNonEmpty(args.TenantArg.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if args.ArgDispatcher == nil { - return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) - } - if err = dS.authorize(utils.ReplicatorSv1MatchFilterIndex, args.TenantArg.Tenant, - args.APIKey, utils.TimePointer(time.Now())); err != nil { - return - } - } - var routeID *string - if args.ArgDispatcher != nil { - routeID = args.ArgDispatcher.RouteID - } - return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaReplicator, routeID, - utils.ReplicatorSv1MatchFilterIndex, args, rpl) -} - func (dS *DispatcherService) ReplicatorSv1SetThresholdProfile(args *engine.ThresholdProfileWithArgDispatcher, rpl *string) (err error) { if args == nil { args = &engine.ThresholdProfileWithArgDispatcher{} @@ -694,28 +650,6 @@ func (dS *DispatcherService) ReplicatorSv1SetThreshold(args *engine.ThresholdWit utils.ReplicatorSv1SetThreshold, args, rpl) } -func (dS *DispatcherService) ReplicatorSv1SetFilterIndexes(args *utils.SetFilterIndexesArgWithArgDispatcher, rpl *string) (err error) { - if args == nil { - args = &utils.SetFilterIndexesArgWithArgDispatcher{} - } - args.Tenant = utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant) - if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { - if args.ArgDispatcher == nil { - return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField) - } - if err = dS.authorize(utils.ReplicatorSv1SetFilterIndexes, args.Tenant, - args.APIKey, utils.TimePointer(time.Now())); err != nil { - return - } - } - var routeID *string - if args.ArgDispatcher != nil { - routeID = args.ArgDispatcher.RouteID - } - return dS.Dispatch(&utils.CGREvent{Tenant: args.Tenant}, utils.MetaReplicator, routeID, - utils.ReplicatorSv1SetFilterIndexes, args, rpl) -} - func (dS *DispatcherService) ReplicatorSv1SetDestination(args *engine.DestinationWithArgDispatcher, rpl *string) (err error) { if args == nil { args = &engine.DestinationWithArgDispatcher{} diff --git a/engine/datamanager.go b/engine/datamanager.go index fc5c73ae5..bc07847bd 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -2218,132 +2218,6 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err return dm.DataDB().HasDataDrv(category, subject, tenant) } -func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string, - fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil { - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote { - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, - utils.ReplicatorSv1GetFilterIndexes, - &utils.GetFilterIndexesArgWithArgDispatcher{ - GetFilterIndexesArg: &utils.GetFilterIndexesArg{ - CacheID: cacheID, - ItemIDPrefix: itemIDPrefix, - FilterType: filterType, - FldNameVal: fldNameVal}, - TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - ArgDispatcher: &utils.ArgDispatcher{ - APIKey: utils.StringPointer(itm.APIKey), - RouteID: utils.StringPointer(itm.RouteID)}, - }, &indexes); err == nil { - err = dm.dataDB.SetFilterIndexesDrv(cacheID, itemIDPrefix, indexes, true, utils.NonTransactional) - } - } - if err != nil { - err = utils.CastRPCErr(err) - return nil, err - } - } - return -} - -func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, - indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - if err = dm.DataDB().SetFilterIndexesDrv(cacheID, itemIDPrefix, - indexes, commit, transactionID); err != nil { - return - } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; itm.Replicate { - var reply string - if err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RplConns, nil, - utils.ReplicatorSv1SetFilterIndexes, - &utils.SetFilterIndexesArgWithArgDispatcher{ - SetFilterIndexesArg: &utils.SetFilterIndexesArg{ - CacheID: cacheID, - ItemIDPrefix: itemIDPrefix, - Indexes: indexes}, - TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - ArgDispatcher: &utils.ArgDispatcher{ - APIKey: utils.StringPointer(itm.APIKey), - RouteID: utils.StringPointer(itm.RouteID)}, - }, &reply); err != nil { - err = utils.CastRPCErr(err) - return - } - } - return -} - -func (dm *DataManager) RemoveFilterIndexes(cacheID, itemIDPrefix string) (err error) { - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - if err = dm.DataDB().RemoveFilterIndexesDrv(cacheID, itemIDPrefix); err != nil { - return - } - return -} - -func (dm *DataManager) MatchFilterIndex(cacheID, itemIDPrefix, - filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { - if dm == nil { - err = utils.ErrNoDatabaseConn - return - } - fieldValKey := utils.ConcatenatedKey(itemIDPrefix, filterType, fieldName, fieldVal) - if x, ok := Cache.Get(cacheID, fieldValKey); ok { // Attempt to find in cache first - if x == nil { - return nil, utils.ErrNotFound - } - return x.(utils.StringMap), nil - } - // Not found in cache, check in DB - itemIDs, err = dm.DataDB().MatchFilterIndexDrv(cacheID, itemIDPrefix, filterType, fieldName, fieldVal) - if err != nil { - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilterIndexes]; err == utils.ErrNotFound && itm.Remote { - err = dm.connMgr.Call(config.CgrConfig().DataDbCfg().RmtConns, nil, - utils.ReplicatorSv1MatchFilterIndex, - &utils.MatchFilterIndexArgWithArgDispatcher{MatchFilterIndexArg: &utils.MatchFilterIndexArg{ - CacheID: cacheID, - ItemIDPrefix: itemIDPrefix, - FilterType: filterType, - FieldName: fieldName, - FieldVal: fieldVal, - }, - TenantArg: utils.TenantArg{Tenant: config.CgrConfig().GeneralCfg().DefaultTenant}, - ArgDispatcher: &utils.ArgDispatcher{ - APIKey: utils.StringPointer(itm.APIKey), - RouteID: utils.StringPointer(itm.RouteID), - }, - }, &itemIDs) - } - if err != nil { - err = utils.CastRPCErr(err) - if err == utils.ErrNotFound { - if errCh := Cache.Set(cacheID, fieldValKey, nil, nil, - true, utils.NonTransactional); errCh != nil { - return nil, errCh - } - - } - return nil, err - } - } - if errCh := Cache.Set(cacheID, fieldValKey, itemIDs, nil, - true, utils.NonTransactional); errCh != nil { - return nil, errCh - } - return -} - func (dm *DataManager) GetRouteProfile(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rpp *RouteProfile, err error) { tntID := utils.ConcatenatedKey(tenant, id) diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go index c3093b58d..b9f36d0be 100644 --- a/engine/filterhelpers.go +++ b/engine/filterhelpers.go @@ -28,7 +28,7 @@ import ( // MatchingItemIDsForEvent returns the list of item IDs matching fieldName/fieldValue for an event // fieldIDs limits the fields which are checked against indexes -// helper on top of dataDB.MatchFilterIndex, adding utils.ANY to list of fields queried +// helper on top of dataDB.GetIndexes, adding utils.ANY to list of fields queried func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldIDs *[]string, dm *DataManager, cacheID, itemIDPrefix string, indexedSelects, nestedFields bool) (itemIDs utils.StringSet, err error) { itemIDs = make(utils.StringSet) diff --git a/engine/filterindexer_it_test.go b/engine/filterindexer_it_test.go index 08ce0a000..5a35da852 100644 --- a/engine/filterindexer_it_test.go +++ b/engine/filterindexer_it_test.go @@ -123,99 +123,102 @@ func testITIsDBEmpty(t *testing.T) { } func testITSetFilterIndexes(t *testing.T) { - idxes := map[string]utils.StringMap{ + idxes := map[string]utils.StringSet{ "*string:Account:1001": { - "RL1": true, + "RL1": struct{}{}, }, "*string:Account:1002": { - "RL1": true, - "RL2": true, + "RL1": struct{}{}, + "RL2": struct{}{}, }, "*string:Account:dan": { - "RL2": true, + "RL2": struct{}{}, }, "*string:Subject:dan": { - "RL2": true, - "RL3": true, + "RL2": struct{}{}, + "RL3": struct{}{}, }, utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): { - "RL4": true, - "RL5": true, + "RL4": struct{}{}, + "RL5": struct{}{}, }, } - if err := dataManager.SetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - idxes, false, utils.NonTransactional); err != nil { + if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes, + "cgrates.org", idxes, false, utils.NonTransactional); err != nil { t.Error(err) } } func testITGetFilterIndexes(t *testing.T) { - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:Account:1001": { - "RL1": true, + "RL1": struct{}{}, }, "*string:Account:1002": { - "RL1": true, - "RL2": true, + "RL1": struct{}{}, + "RL2": struct{}{}, }, "*string:Account:dan": { - "RL2": true, + "RL2": struct{}{}, }, "*string:Subject:dan": { - "RL2": true, - "RL3": true, + "RL2": struct{}{}, + "RL3": struct{}{}, }, utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): { - "RL4": true, - "RL5": true, + "RL4": struct{}{}, + "RL5": struct{}{}, }, } - sbjDan := map[string]string{ - "Subject": "dan", - } - expectedsbjDan := map[string]utils.StringMap{ + expectedsbjDan := map[string]utils.StringSet{ "*string:Subject:dan": { - "RL2": true, - "RL3": true, + "RL2": struct{}{}, + "RL3": struct{}{}, }, } - if exsbjDan, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], - "cgrates.org", utils.MetaString, sbjDan); err != nil { + if exsbjDan, err := dataManager.GetIndexes( + utils.CacheResourceFilterIndexes, "cgrates.org", + utils.ConcatenatedKey(utils.MetaString, "Subject", "dan"), + false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(expectedsbjDan, exsbjDan) { t.Errorf("Expecting: %+v, received: %+v", expectedsbjDan, exsbjDan) } - if rcv, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], - "cgrates.org", utils.EmptyString, nil); err != nil { + if rcv, err := dataManager.GetIndexes( + utils.CacheResourceFilterIndexes, + "cgrates.org", utils.EmptyString, + false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcv) { t.Errorf("Expecting: %+v, received: %+v", eIdxes, rcv) } - if _, err := dataManager.GetFilterIndexes("unknown_key", "unkonwn_tenant", - utils.EmptyString, nil); err == nil || err != utils.ErrNotFound { + if _, err := dataManager.GetIndexes( + "unknown_key", "unkonwn_tenant", + utils.EmptyString, false, false); err == nil || err != utils.ErrNotFound { t.Error(err) } } func testITMatchFilterIndex(t *testing.T) { - eMp := utils.StringMap{ - "RL1": true, - "RL2": true, + eMp := map[string]utils.StringSet{ + "*string:Account:1002": { + "RL1": struct{}{}, + "RL2": struct{}{}, + }, } - if rcvMp, err := dataManager.MatchFilterIndex( + if rcvMp, err := dataManager.GetIndexes( utils.CacheResourceFilterIndexes, "cgrates.org", - utils.MetaString, "Account", "1002"); err != nil { + utils.ConcatenatedKey(utils.MetaString, "Account", "1002"), + false, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) } - if _, err := dataManager.MatchFilterIndex( + if _, err := dataManager.GetIndexes( utils.CacheResourceFilterIndexes, "cgrates.org", - utils.MetaString, "NonexistentField", "1002"); err == nil || + utils.ConcatenatedKey(utils.MetaString, "NonexistentField", "1002"), + true, true); err == nil || err != utils.ErrNotFound { t.Error(err) } @@ -269,20 +272,19 @@ func testITTestThresholdFilterIndexes(t *testing.T) { if err := dataManager.SetThresholdProfile(th2, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:EventType:Event1": { - "THD_Test": true, - "THD_Test2": true, + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, }, "*string:EventType:Event2": { - "THD_Test": true, - "THD_Test2": true, + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, }, } - rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -314,23 +316,23 @@ func testITTestThresholdFilterIndexes(t *testing.T) { if err := dataManager.SetThresholdProfile(cloneTh1, true); err != nil { t.Error(err) } - eIdxes = map[string]utils.StringMap{ + eIdxes = map[string]utils.StringSet{ "*string:Account:1001": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:Account:1002": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:EventType:Event1": { - "THD_Test2": true, + "THD_Test2": struct{}{}, }, "*string:EventType:Event2": { - "THD_Test2": true, + "THD_Test2": struct{}{}, }, } - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -362,25 +364,25 @@ func testITTestThresholdFilterIndexes(t *testing.T) { if err := dataManager.SetThresholdProfile(clone2Th1, true); err != nil { t.Error(err) } - eIdxes = map[string]utils.StringMap{ + eIdxes = map[string]utils.StringSet{ "*string:Destination:10": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:Destination:20": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:EventType:Event1": { - "THD_Test": true, - "THD_Test2": true, + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, }, "*string:EventType:Event2": { - "THD_Test": true, - "THD_Test2": true, + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, }, } - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -394,9 +396,9 @@ func testITTestThresholdFilterIndexes(t *testing.T) { th2.ID, utils.NonTransactional, true); err != nil { t.Error(err) } - if _, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != utils.ErrNotFound { + if _, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != utils.ErrNotFound { t.Error(err) } } @@ -440,20 +442,19 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) { if err := dataManager.SetAttributeProfile(attrProfile, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:EventType:Event1": { - "AttrPrf": true, + "AttrPrf": struct{}{}, }, "*string:EventType:Event2": { - "AttrPrf": true, + "AttrPrf": struct{}{}, }, } for _, ctx := range attrProfile.Contexts { - rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix, - utils.ConcatenatedKey(attrProfile.Tenant, ctx)) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey(attrProfile.Tenant, ctx), + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -466,22 +467,20 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) { t.Error(err) } //check indexes with the new context (con3) - rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix, - utils.ConcatenatedKey(attrProfile.Tenant, "con3")) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey(attrProfile.Tenant, "con3"), + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) } //check if old contexts was delete for _, ctx := range []string{"con1", "con2"} { - rfi := NewFilterIndexer(onStor, utils.AttributeProfilePrefix, - utils.ConcatenatedKey(attrProfile.Tenant, ctx)) - if _, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil && err != utils.ErrNotFound { + if _, err := dataManager.GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey(attrProfile.Tenant, ctx), + utils.EmptyString, false, false); err != nil && err != utils.ErrNotFound { t.Error(err) } } @@ -491,11 +490,10 @@ func testITTestAttributeProfileFilterIndexes(t *testing.T) { t.Error(err) } //check if index is removed - rfi = NewFilterIndexer(onStor, utils.AttributeProfilePrefix, - utils.ConcatenatedKey("cgrates.org", "con3")) - if _, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.MetaString, nil); err != nil && err != utils.ErrNotFound { + if _, err := dataManager.GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey("cgrates.org", "con3"), + utils.MetaString, false, false); err != nil && err != utils.ErrNotFound { t.Error(err) } @@ -535,18 +533,17 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) { if err := dataManager.SetThresholdProfile(th, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:EventType:Event1": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:EventType:Event2": { - "THD_Test": true, + "THD_Test": struct{}{}, }, } - rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -557,20 +554,20 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) { if err := dataManager.SetThresholdProfile(th, true); err != nil { t.Error(err) } - eIdxes = map[string]utils.StringMap{ + eIdxes = map[string]utils.StringSet{ "*string:Account:1001": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:EventType:Event1": { - "THD_Test": true, + "THD_Test": struct{}{}, }, "*string:EventType:Event2": { - "THD_Test": true, + "THD_Test": struct{}{}, }, } - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) @@ -580,73 +577,71 @@ func testITTestThresholdInlineFilterIndexing(t *testing.T) { th.ID, utils.NonTransactional, true); err != nil { t.Error(err) } - if _, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != utils.ErrNotFound { + if _, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != utils.ErrNotFound { t.Error(err) } } func testITTestStoreFilterIndexesWithTransID(t *testing.T) { - idxes := map[string]utils.StringMap{ + idxes := map[string]utils.StringSet{ "*string:Account:1001": { - "RL1": true, + "RL1": struct{}{}, }, "*string:Account:1002": { - "RL1": true, - "RL2": true, + "RL1": struct{}{}, + "RL2": struct{}{}, }, "*string:Account:dan": { - "RL2": true, + "RL2": struct{}{}, }, "*string:Subject:dan": { - "RL2": true, - "RL3": true, + "RL2": struct{}{}, + "RL3": struct{}{}, }, utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): { - "RL4": true, - "RL5": true, + "RL4": struct{}{}, + "RL5": struct{}{}, }, } - if err := dataManager.SetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - idxes, false, "transaction1"); err != nil { + if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes, + "cgrates.org", idxes, false, "transaction1"); err != nil { t.Error(err) } //commit transaction - if err := dataManager.SetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], + if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes, "cgrates.org", idxes, true, "transaction1"); err != nil { t.Error(err) } - eIdx := map[string]utils.StringMap{ + eIdx := map[string]utils.StringSet{ "*string:Account:1001": { - "RL1": true, + "RL1": struct{}{}, }, "*string:Account:1002": { - "RL1": true, - "RL2": true, + "RL1": struct{}{}, + "RL2": struct{}{}, }, "*string:Account:dan": { - "RL2": true, + "RL2": struct{}{}, }, "*string:Subject:dan": { - "RL2": true, - "RL3": true, + "RL2": struct{}{}, + "RL3": struct{}{}, }, utils.ConcatenatedKey(utils.META_NONE, utils.ANY, utils.ANY): { - "RL4": true, - "RL5": true, + "RL4": struct{}{}, + "RL5": struct{}{}, }, } //verify new key and check if data was moved - if rcv, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - utils.EmptyString, nil); err != nil { + if rcv, err := dataManager.GetIndexes( + utils.CacheResourceFilterIndexes, "cgrates.org", + utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(eIdx, rcv) { t.Errorf("Expecting: %+v, received: %+v", eIdx, rcv) @@ -654,38 +649,36 @@ func testITTestStoreFilterIndexesWithTransID(t *testing.T) { } func testITTestStoreFilterIndexesWithTransID2(t *testing.T) { - idxes := map[string]utils.StringMap{ + idxes := map[string]utils.StringSet{ "*string:Event:Event1": { - "RL1": true, + "RL1": struct{}{}, }, "*string:Event:Event2": { - "RL1": true, - "RL2": true, + "RL1": struct{}{}, + "RL2": struct{}{}, }, } transID := "transaction1" - if err := dataManager.SetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - idxes, false, transID); err != nil { + if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes, + "cgrates.org", idxes, false, transID); err != nil { t.Error(err) } //commit transaction - if err := dataManager.SetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], "cgrates.org", - idxes, true, transID); err != nil { + if err := dataManager.SetIndexes(utils.CacheResourceFilterIndexes, + "cgrates.org", idxes, true, transID); err != nil { t.Error(err) } //verify if old key was deleted - if _, err := dataManager.GetFilterIndexes( - "tmp_"+utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], + if _, err := dataManager.GetIndexes( + "tmp_"+utils.CacheResourceFilterIndexes, utils.ConcatenatedKey("cgrates.org", transID), - utils.EmptyString, nil); err != utils.ErrNotFound { + utils.EmptyString, false, false); err != utils.ErrNotFound { t.Error(err) } //verify new key and check if data was moved - if rcv, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[utils.ResourceProfilesPrefix], - "cgrates.org", utils.EmptyString, nil); err != nil { + if rcv, err := dataManager.GetIndexes( + utils.CacheResourceFilterIndexes, + "cgrates.org", utils.EmptyString, false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(idxes, rcv) { t.Errorf("Expecting: %+v, received: %+v", idxes, rcv) @@ -722,28 +715,31 @@ func testITTestIndexingWithEmptyFltrID(t *testing.T) { if err := dataManager.SetThresholdProfile(th2, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*none:*any:*any": { - "THD_Test": true, - "THD_Test2": true, + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, }, } - rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.META_NONE, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else { if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) } } - eMp := utils.StringMap{ - "THD_Test": true, - "THD_Test2": true, + eMp := map[string]utils.StringSet{ + "*none:*any:*any": { + "THD_Test": struct{}{}, + "THD_Test2": struct{}{}, + }, } - if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant, - utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil { + if rcvMp, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY), + true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) @@ -804,28 +800,31 @@ func testITTestIndexingWithEmptyFltrID2(t *testing.T) { if err := dataManager.SetRouteProfile(splProfile2, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*none:*any:*any": { - "SPL_Weight": true, - "SPL_Weight2": true, + "SPL_Weight": struct{}{}, + "SPL_Weight2": struct{}{}, }, } - rfi := NewFilterIndexer(onStor, utils.RouteProfilePrefix, splProfile.Tenant) - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheRouteFilterIndexes, splProfile.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else { if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) } } - eMp := utils.StringMap{ - "SPL_Weight": true, - "SPL_Weight2": true, + eMp := map[string]utils.StringSet{ + "*none:*any:*any": { + "SPL_Weight": struct{}{}, + "SPL_Weight2": struct{}{}, + }, } - if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheRouteFilterIndexes, - splProfile.Tenant, utils.META_NONE, utils.META_ANY, utils.META_ANY); err != nil { + if rcvMp, err := dataManager.GetIndexes( + utils.CacheRouteFilterIndexes, splProfile.Tenant, + utils.ConcatenatedKey(utils.META_NONE, utils.META_ANY, utils.META_ANY), + true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) @@ -851,7 +850,6 @@ func testITTestIndexingThresholds(t *testing.T) { FilterIDs: []string{"*string:Account:1002", "*lt:Balance:1000"}, ActionIDs: []string{}, } - rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) if err := dataManager.SetThresholdProfile(th, true); err != nil { t.Error(err) } @@ -861,30 +859,34 @@ func testITTestIndexingThresholds(t *testing.T) { if err := dataManager.SetThresholdProfile(th3, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:Account:1001": { - "TH1": true, - "TH2": true, + "TH1": struct{}{}, + "TH2": struct{}{}, }, "*string:Account:1002": { - "TH3": true, + "TH3": struct{}{}, }, } - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else { if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) } } - eMp := utils.StringMap{ - "TH1": true, - "TH2": true, + eMp := map[string]utils.StringSet{ + "*string:Account:1001": { + "TH1": struct{}{}, + "TH2": struct{}{}, + }, } - if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant, - utils.MetaString, utils.Account, "1001"); err != nil { + if rcvMp, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.ConcatenatedKey(utils.MetaString, utils.Account, "1001"), + true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) @@ -910,7 +912,6 @@ func testITTestIndexingMetaNot(t *testing.T) { FilterIDs: []string{"*notstring:Account:1002", "*notstring:Balance:1000"}, ActionIDs: []string{}, } - rfi := NewFilterIndexer(onStor, utils.ThresholdProfilePrefix, th.Tenant) if err := dataManager.SetThresholdProfile(th, true); err != nil { t.Error(err) } @@ -920,28 +921,32 @@ func testITTestIndexingMetaNot(t *testing.T) { if err := dataManager.SetThresholdProfile(th3, true); err != nil { t.Error(err) } - eIdxes := map[string]utils.StringMap{ + eIdxes := map[string]utils.StringSet{ "*string:Account:1001": { - "TH1": true, + "TH1": struct{}{}, }, "*prefix:EventName:Name": { - "TH2": true, + "TH2": struct{}{}, }, } - if rcvIdx, err := dataManager.GetFilterIndexes( - utils.PrefixToIndexCache[rfi.itemType], rfi.dbKeySuffix, - utils.EmptyString, nil); err != nil { + if rcvIdx, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.EmptyString, false, false); err != nil { t.Error(err) } else { if !reflect.DeepEqual(eIdxes, rcvIdx) { t.Errorf("Expecting %+v, received: %+v", eIdxes, rcvIdx) } } - eMp := utils.StringMap{ - "TH1": true, + eMp := map[string]utils.StringSet{ + "*string:Account:1001": { + "TH1": struct{}{}, + }, } - if rcvMp, err := dataManager.MatchFilterIndex(utils.CacheThresholdFilterIndexes, th.Tenant, - utils.MetaString, utils.Account, "1001"); err != nil { + if rcvMp, err := dataManager.GetIndexes( + utils.CacheThresholdFilterIndexes, th.Tenant, + utils.ConcatenatedKey(utils.MetaString, utils.Account, "1001"), + true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(eMp, rcvMp) { t.Errorf("Expecting: %+v, received: %+v", eMp, rcvMp) diff --git a/engine/libcdre_test.go b/engine/libcdre_test.go index 86d30d978..c454a87a3 100644 --- a/engine/libcdre_test.go +++ b/engine/libcdre_test.go @@ -19,7 +19,6 @@ along with this program. If not, see package engine import ( - "fmt" "reflect" "testing" "time" @@ -119,7 +118,6 @@ func TestFileName(t *testing.T) { if rcv[:7] != "module|" { t.Errorf("Expecting: 'module|', received: %+v", rcv[:7]) } else if rcv[14:] != ".gob" { - fmt.Println(rcv) t.Errorf("Expecting: '.gob', received: %+v", rcv[14:]) } diff --git a/engine/libindex.go b/engine/libindex.go index c83aaab24..5ab230b5e 100644 --- a/engine/libindex.go +++ b/engine/libindex.go @@ -312,7 +312,7 @@ func splitFilterIndex(tntCtxIdxKey string) (tntCtx, idxKey string, err error) { // ComputeIndexes gets the indexes from tha DB and ensure that the items are indexed // getFilters returns a list of filters IDs for the given profile id func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, - transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error)) (err error) { + transactionID string, getFilters func(tnt, id, ctx string) (*[]string, error)) (processed bool, err error) { var profilesIDs []string if IDs == nil { // get all items var ids []string @@ -348,6 +348,7 @@ func ComputeIndexes(dm *DataManager, tnt, ctx, idxItmType string, IDs *[]string, if err = dm.SetIndexes(idxItmType, tntCtx, index, cacheCommit(transactionID), transactionID); err != nil { return } + processed = true } return } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 305c54b85..67d0f9045 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -93,17 +93,10 @@ type DataDB interface { RemoveTimingDrv(string) error GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) AddLoadHistory(*utils.LoadInstance, int, string) error - GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, - fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) - SetFilterIndexesDrv(cacheID, itemIDPrefix string, - indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) - RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexes map[string]utils.StringSet, err error) SetIndexesDrv(idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) - MatchFilterIndexDrv(cacheID, itemIDPrefix, - filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) RemStatQueueProfileDrv(tenant, id string) (err error) diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 7865b886d..59c4d95ba 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -1097,109 +1097,6 @@ func (iDB *InternalDB) AddLoadHistory(*utils.LoadInstance, int, string) error { return nil } -func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, - fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - x, ok := iDB.db.Get(cacheID, dbKey) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - if len(fldNameVal) != 0 { - rcvidx := x.(map[string]utils.StringMap) - indexes = make(map[string]utils.StringMap) - for fldName, fldVal := range fldNameVal { - if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has { - indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)] = make(utils.StringMap) - } - if len(rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)]) != 0 { - for key := range rcvidx[utils.ConcatenatedKey(filterType, fldName, fldVal)] { - indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)][key] = true - } - } - } - return - } else { - indexes = x.(map[string]utils.StringMap) - if len(indexes) == 0 { - return nil, utils.ErrNotFound - } - } - return -} - -func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, - indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { - originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - dbKey := originKey - if transactionID != "" { - dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) - } - if commit && transactionID != "" { - x, _ := iDB.db.Get(cacheID, dbKey) - iDB.db.Remove(cacheID, dbKey, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - iDB.db.Set(cacheID, originKey, x, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - return - } - var toBeDeleted []string - toBeAdded := make(map[string]utils.StringMap) - for key, strMp := range indexes { - if len(strMp) == 0 { // remove with no more elements inside - toBeDeleted = append(toBeDeleted, key) - delete(indexes, key) - continue - } - toBeAdded[key] = make(utils.StringMap) - toBeAdded[key] = strMp - } - - x, ok := iDB.db.Get(cacheID, dbKey) - if !ok || x == nil { - iDB.db.Set(cacheID, dbKey, toBeAdded, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - return err - } - - mp := x.(map[string]utils.StringMap) - for _, key := range toBeDeleted { - delete(mp, key) - } - for key, strMp := range toBeAdded { - if _, has := mp[key]; !has { - mp[key] = make(utils.StringMap) - } - mp[key] = strMp - } - iDB.db.Set(cacheID, dbKey, mp, nil, - cacheCommit(transactionID), transactionID) - return nil -} -func (iDB *InternalDB) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { - iDB.db.Remove(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - return -} - -func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix, - filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { - - x, ok := iDB.db.Get(cacheID, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - - indexes := x.(map[string]utils.StringMap) - - if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt { - itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)] - } - if len(itemIDs) == 0 { - return nil, utils.ErrNotFound - } - return -} - func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { x, ok := iDB.db.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id)) if !ok || x == nil { @@ -1486,7 +1383,7 @@ func (iDB *InternalDB) SetIndexesDrv(idxItmType, tntCtx string, } iDB.db.Remove(idxItmType, dbKey, cacheCommit(utils.NonTransactional), utils.NonTransactional) - key := strings.TrimSuffix(strings.TrimPrefix(dbKey, "tmp_"), transactionID) + key := strings.TrimSuffix(strings.TrimPrefix(dbKey, "tmp_"), utils.CONCATENATED_KEY_SEP+transactionID) iDB.db.Set(idxItmType, key, x, []string{tntCtx}, cacheCommit(utils.NonTransactional), utils.NonTransactional) } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 6fa983962..742387e23 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -60,7 +60,6 @@ const ( ColLht = "load_history" ColVer = "versions" ColRsP = "resource_profiles" - ColRFI = "request_filter_indexes" ColIndx = "indexes" ColTmg = "timings" ColRes = "resources" @@ -284,7 +283,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte } err = nil switch col { - case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColRFI: + case ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx: if err = ms.enusureIndex(col, true, "key"); err != nil { return } @@ -351,7 +350,7 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) { } if ms.storageType == utils.DataDB { for _, col := range []string{ColAct, ColApl, ColAAp, ColAtr, - ColRpl, ColDst, ColRds, ColLht, ColRFI, ColRsP, ColRes, ColSqs, ColSqp, + ColRpl, ColDst, ColRds, ColLht, ColIndx, ColRsP, ColRes, ColSqs, ColSqp, ColTps, ColThs, ColRts, ColAttr, ColFlt, ColCpp, ColDpp, ColRpp, ColRpf, ColShg, ColAcc} { if err = ms.ensureIndexesForCol(col); err != nil { @@ -688,19 +687,19 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er case utils.DispatcherHostPrefix: result, err = ms.getField2(sctx, ColDph, utils.DispatcherHostPrefix, subject, tntID) case utils.AttributeFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.AttributeFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.AttributeFilterIndexes, "key") case utils.ResourceFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.ResourceFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.ResourceFilterIndexes, "key") case utils.StatFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.StatFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.StatFilterIndexes, "key") case utils.ThresholdFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.ThresholdFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.ThresholdFilterIndexes, "key") case utils.RouteFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.RouteFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.RouteFilterIndexes, "key") case utils.ChargerFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.ChargerFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.ChargerFilterIndexes, "key") case utils.DispatcherFilterIndexes: - result, err = ms.getField3(sctx, ColRFI, utils.DispatcherFilterIndexes, "key") + result, err = ms.getField3(sctx, ColIndx, utils.DispatcherFilterIndexes, "key") default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix) } @@ -1742,182 +1741,6 @@ func (ms *MongoStorage) RemoveTimingDrv(id string) (err error) { }) } -// GetFilterIndexesDrv retrieves Indexes from dataDB -//filterType is used togheter with fieldName:Val -func (ms *MongoStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, - fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - type result struct { - Key string - Value []string - } - var results []result - dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - if len(fldNameVal) != 0 { - for fldName, fldValue := range fldNameVal { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColRFI).Find(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, filterType, fldName, fldValue)}) - if err != nil { - return err - } - for cur.Next(sctx) { - var elem result - if err := cur.Decode(&elem); err != nil { - return err - } - results = append(results, elem) - } - return cur.Close(sctx) - }); err != nil { - return nil, err - } - if len(results) == 0 { - return nil, utils.ErrNotFound - } - } - } else { - for _, character := range []string{".", "*"} { - dbKey = strings.Replace(dbKey, character, `\`+character, strings.Count(dbKey, character)) - } - //inside bson.RegEx add carrot to match the prefix (optimization) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(ColRFI).Find(sctx, bson.M{"key": bsonx.Regex("^"+dbKey, "")}) - if err != nil { - return err - } - for cur.Next(sctx) { - var elem result - if err := cur.Decode(&elem); err != nil { - return err - } - results = append(results, elem) - } - return cur.Close(sctx) - }); err != nil { - return nil, err - } - if len(results) == 0 { - return nil, utils.ErrNotFound - } - } - indexes = make(map[string]utils.StringMap) - for _, res := range results { - if len(res.Value) == 0 { - continue - } - keys := strings.Split(res.Key, ":") // "cgrates.org:*sesions:*string:Subject:dan" - indexKey := utils.ConcatenatedKey(keys[1], keys[2], keys[3]) - //check here if itemIDPrefix has context - if len(strings.Split(itemIDPrefix, ":")) == 2 { - indexKey = utils.ConcatenatedKey(keys[2], keys[3], keys[4]) - } - indexes[indexKey] = utils.StringMapFromSlice(res.Value) - } - if len(indexes) == 0 { - return nil, utils.ErrNotFound - } - return indexes, nil -} - -// SetFilterIndexesDrv stores Indexes into DataDB -func (ms *MongoStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string, - indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { - originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - dbKey := originKey - if transactionID != "" { - dbKey = "tmp_" + utils.ConcatenatedKey(originKey, transactionID) - } - if commit && transactionID != "" { - regexKey := originKey - for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) - } - //inside bson.RegEx add carrot to match the prefix (optimization) - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")}) - return err - }); err != nil { - return err - } - var lastErr error - for key, itmMp := range indexes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(originKey, key)}, - bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(originKey, key), "value": itmMp.Slice()}}, - options.Update().SetUpsert(true), - ) - return err - }); err != nil { - lastErr = err - } - } - if lastErr != nil { - return lastErr - } - oldKey := "tmp_" + utils.ConcatenatedKey(originKey, transactionID) - for _, character := range []string{".", "*"} { - oldKey = strings.Replace(oldKey, character, `\`+character, strings.Count(oldKey, character)) - } - //inside bson.RegEx add carrot to match the prefix (optimization) - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+oldKey, "")}) - return err - }) - } else { - var lastErr error - for key, itmMp := range indexes { - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - var action bson.M - if len(itmMp) == 0 { - action = bson.M{"$unset": bson.M{"value": 1}} - } else { - action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, key), "value": itmMp.Slice()}} - } - _, err = ms.getCol(ColRFI).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, key)}, - action, options.Update().SetUpsert(true), - ) - return err - }); err != nil { - lastErr = err - } - } - return lastErr - } -} - -func (ms *MongoStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { - regexKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - for _, character := range []string{".", "*"} { - regexKey = strings.Replace(regexKey, character, `\`+character, strings.Count(regexKey, character)) - } - //inside bson.RegEx add carrot to match the prefix (optimization) - return ms.query(func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(ColRFI).DeleteMany(sctx, bson.M{"key": bsonx.Regex("^"+regexKey, "")}) - return err - }) -} - -func (ms *MongoStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix, - filterType, fldName, fldVal string) (itemIDs utils.StringMap, err error) { - var result struct { - Key string - Value []string - } - dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - if err = ms.query(func(sctx mongo.SessionContext) (err error) { - cur := ms.getCol(ColRFI).FindOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, filterType, fldName, fldVal)}) - if err := cur.Decode(&result); err != nil { - if err == mongo.ErrNoDocuments { - return utils.ErrNotFound - } - return err - } - return nil - }); err != nil { - return nil, err - } - return utils.StringMapFromSlice(result.Value), nil -} - // GetStatQueueProfileDrv retrieves a StatQueueProfile from dataDB func (ms *MongoStorage) GetStatQueueProfileDrv(tenant string, id string) (sq *StatQueueProfile, err error) { sq = new(StatQueueProfile) @@ -2387,7 +2210,6 @@ func (ms *MongoStorage) GetIndexesDrv(idxItmType, tntCtx, idxKey string) (indexe //inside bson.RegEx add carrot to match the prefix (optimization) q = bson.M{"key": bsonx.Regex("^"+dbKey, utils.EmptyString)} } - indexes = make(map[string]utils.StringSet) if err = ms.query(func(sctx mongo.SessionContext) (err error) { cur, err := ms.getCol(ColIndx).Find(sctx, q) @@ -2439,9 +2261,12 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, result, err = ms.getField3(sctx, ColIndx, regexKey, "key") for _, key := range result { idxKey := strings.TrimPrefix(key, dbKey) + if _, err = ms.getCol(ColIndx).DeleteOne(sctx, + bson.M{"key": originKey + idxKey}); err != nil { //ensure we do not have dup + return err + } if _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": key}, bson.M{"$set": bson.M{"key": originKey + idxKey}}, // only update the key - options.Update().SetUpsert(true), ); err != nil { return err } @@ -2454,15 +2279,16 @@ func (ms *MongoStorage) SetIndexesDrv(idxItmType, tntCtx string, var lastErr error for idxKey, itmMp := range indexes { if err = ms.query(func(sctx mongo.SessionContext) (err error) { - var action bson.M - if len(itmMp) == 0 { - action = bson.M{"$unset": bson.M{"value": 1}} + idxDbkey := utils.ConcatenatedKey(dbKey, idxKey) + if len(itmMp) == 0 { // remove from DB if we set it with empty indexes + _, err = ms.getCol(ColIndx).DeleteOne(sctx, + bson.M{"key": idxDbkey}) } else { - action = bson.M{"$set": bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey), "value": itmMp.AsSlice()}} + _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": idxDbkey}, + bson.M{"$set": bson.M{"key": idxDbkey, "value": itmMp.AsSlice()}}, + options.Update().SetUpsert(true), + ) } - _, err = ms.getCol(ColIndx).UpdateOne(sctx, bson.M{"key": utils.ConcatenatedKey(dbKey, idxKey)}, - action, options.Update().SetUpsert(true), - ) return err }); err != nil { lastErr = err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 284a7f075..4a82dbdd9 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1212,99 +1212,6 @@ func (rs *RedisStorage) RemoveTimingDrv(id string) (err error) { return } -//GetFilterIndexesDrv retrieves Indexes from dataDB -//filterType is used togheter with fieldName:Val -func (rs *RedisStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, - fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - mp := make(map[string]string) - dbKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - if len(fldNameVal) == 0 { - mp, err = rs.Cmd(redis_HGETALL, dbKey).Map() - if err != nil { - return - } else if len(mp) == 0 { - return nil, utils.ErrNotFound - } - } else { - var itmMpStrLst []string - for fldName, fldVal := range fldNameVal { - concatTypeNameVal := utils.ConcatenatedKey(filterType, fldName, fldVal) - itmMpStrLst, err = rs.Cmd(redis_HMGET, dbKey, concatTypeNameVal).List() - if err != nil { - return - } else if itmMpStrLst[0] == "" { - return nil, utils.ErrNotFound - } - mp[concatTypeNameVal] = itmMpStrLst[0] - } - } - indexes = make(map[string]utils.StringMap) - for k, v := range mp { - var sm utils.StringMap - if err = rs.ms.Unmarshal([]byte(v), &sm); err != nil { - return - } - indexes[k] = sm - } - return -} - -//SetFilterIndexesDrv stores Indexes into DataDB -func (rs *RedisStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string, - indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { - originKey := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix - dbKey := originKey - if transactionID != "" { - dbKey = "tmp_" + utils.ConcatenatedKey(dbKey, transactionID) - } - if commit && transactionID != "" { - return rs.Cmd(redis_RENAME, dbKey, originKey).Err - } else { - mp := make(map[string]string) - nameValSls := []interface{}{dbKey} - for key, strMp := range indexes { - if len(strMp) == 0 { // remove with no more elements inside - nameValSls = append(nameValSls, key) - continue - } - if encodedMp, err := rs.ms.Marshal(strMp); err != nil { - return err - } else { - mp[key] = string(encodedMp) - } - } - if len(nameValSls) != 1 { - if err = rs.Cmd(redis_HDEL, nameValSls...).Err; err != nil { - return err - } - } - if len(mp) != 0 { - return rs.Cmd(redis_HMSET, dbKey, mp).Err - } - return - } -} - -func (rs *RedisStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { - return rs.Cmd(redis_DEL, utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix).Err -} - -func (rs *RedisStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix, - filterType, fldName, fldVal string) (itemIDs utils.StringMap, err error) { - fieldValKey := utils.ConcatenatedKey(filterType, fldName, fldVal) - fldValBytes, err := rs.Cmd(redis_HGET, - utils.CacheInstanceToPrefix[cacheID]+itemIDPrefix, fieldValKey).Bytes() - if err != nil { - if err == redis.ErrRespNil { // did not find the destination - err = utils.ErrNotFound - } - return nil, err - } else if err = rs.ms.Unmarshal(fldValBytes, &itemIDs); err != nil { - return - } - return -} - func (rs *RedisStorage) GetVersions(itm string) (vrs Versions, err error) { if itm != "" { fldVal, err := rs.Cmd(redis_HGET, utils.TBLVersions, itm).Str() diff --git a/ers/partial_csv_it_test.go b/ers/partial_csv_it_test.go index f3f0e0ad9..444bc9930 100644 --- a/ers/partial_csv_it_test.go +++ b/ers/partial_csv_it_test.go @@ -21,7 +21,6 @@ along with this program. If not, see package ers import ( - "fmt" "io/ioutil" "net/rpc" "os" @@ -94,7 +93,6 @@ func testPartITInitConfig(t *testing.T) { var err error partCfgPath = path.Join(*dataDir, "conf", "samples", partCfgDIR) if partCfg, err = config.NewCGRConfigFromPath(partCfgPath); err != nil { - fmt.Println(err) t.Fatal("Got config error: ", err.Error()) } } diff --git a/migrator/alias_it_test.go b/migrator/alias_it_test.go index e9fd07795..a533c7c63 100644 --- a/migrator/alias_it_test.go +++ b/migrator/alias_it_test.go @@ -206,16 +206,17 @@ func testAlsITMigrateAndMove(t *testing.T) { t.Error("Error should be not found : ", err) } - expAlsIdx := map[string]utils.StringMap{ - "*string:~*req.Account:1001": utils.StringMap{ - "*out:*any:*any:1001:call_1001:*rated": true, + expAlsIdx := map[string]utils.StringSet{ + "*string:~*req.Account:1001": { + "*out:*any:*any:1001:call_1001:*rated": struct{}{}, }, - "*string:~*req.Subject:call_1001": utils.StringMap{ - "*out:*any:*any:1001:call_1001:*rated": true, + "*string:~*req.Subject:call_1001": { + "*out:*any:*any:1001:call_1001:*rated": struct{}{}, }, } - if alsidx, err := alsMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], - utils.ConcatenatedKey("cgrates.org", utils.META_ANY), utils.MetaString, nil); err != nil { + if alsidx, err := alsMigrator.dmOut.DataManager().GetIndexes( + utils.CacheAttributeFilterIndexes, utils.ConcatenatedKey("cgrates.org", utils.META_ANY), + "", false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(expAlsIdx, alsidx) { t.Errorf("Expected %v, recived: %v", utils.ToJSON(expAlsIdx), utils.ToJSON(alsidx)) diff --git a/migrator/derived_chargers_it_test.go b/migrator/derived_chargers_it_test.go index 631ceaae2..1280d2604 100644 --- a/migrator/derived_chargers_it_test.go +++ b/migrator/derived_chargers_it_test.go @@ -227,25 +227,28 @@ func testDCITMigrateAndMove(t *testing.T) { if _, err = dcMigrator.dmIN.getV1DerivedChargers(); err != utils.ErrNoMoreData { t.Error("Error should be not found : ", err) } - expDcIdx := map[string]utils.StringMap{ - "*string:~*req.Account:1003": utils.StringMap{ - "*out:cgrates.org:*any:1003:*any_0": true, + expDcIdx := map[string]utils.StringSet{ + "*string:~*req.Account:1003": { + "*out:cgrates.org:*any:1003:*any_0": struct{}{}, }, } - if dcidx, err := dcMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], - utils.ConcatenatedKey("cgrates.org", utils.MetaChargers), utils.MetaString, nil); err != nil { + if dcidx, err := dcMigrator.dmOut.DataManager().GetIndexes( + utils.PrefixToIndexCache[utils.AttributeProfilePrefix], + utils.ConcatenatedKey("cgrates.org", utils.MetaChargers), + "", true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(expDcIdx, dcidx) { t.Errorf("Expected %v, recived: %v", utils.ToJSON(expDcIdx), utils.ToJSON(dcidx)) } - expDcIdx = map[string]utils.StringMap{ - "*string:~*req.Account:1003": utils.StringMap{ - "*out:cgrates.org:*any:1003:*any_0": true, + expDcIdx = map[string]utils.StringSet{ + "*string:~*req.Account:1003": { + "*out:cgrates.org:*any:1003:*any_0": struct{}{}, }, } - if dcidx, err := dcMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.ChargerProfilePrefix], + if dcidx, err := dcMigrator.dmOut.DataManager().GetIndexes( + utils.PrefixToIndexCache[utils.ChargerProfilePrefix], utils.ConcatenatedKey("cgrates.org", utils.MetaChargers), - utils.MetaString, nil); err == nil || err.Error() != utils.ErrNotFound.Error() { + "", true, true); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Errorf("Expected error %v, recived: %v with reply: %v", utils.ErrNotFound, err, utils.ToJSON(dcidx)) } diff --git a/migrator/filters_it_test.go b/migrator/filters_it_test.go index 1fddf729a..1d0cf3877 100644 --- a/migrator/filters_it_test.go +++ b/migrator/filters_it_test.go @@ -232,11 +232,14 @@ func testFltrITMigrateAndMove(t *testing.T) { if !reflect.DeepEqual(*expAttrProf, *resultattr) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(expAttrProf), utils.ToJSON(resultattr)) } - expFltrIdx := map[string]utils.StringMap{ - "*prefix:~*req.Account:1001": {"ATTR_1": true}, - "*string:~*req.Account:1001": {"ATTR_1": true}} + expFltrIdx := map[string]utils.StringSet{ + "*prefix:~*req.Account:1001": {"ATTR_1": struct{}{}}, + "*string:~*req.Account:1001": {"ATTR_1": struct{}{}}} - if fltridx, err := fltrMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], utils.ConcatenatedKey(attrProf.Tenant, utils.META_ANY), utils.MetaString, nil); err != nil { + if fltridx, err := fltrMigrator.dmOut.DataManager().GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey(attrProf.Tenant, utils.META_ANY), + "", false, false); err != nil { t.Error(err) } else if !reflect.DeepEqual(expFltrIdx, fltridx) { t.Errorf("Expected %v, recived: %v", utils.ToJSON(expFltrIdx), utils.ToJSON(fltridx)) @@ -398,12 +401,15 @@ func testFltrITMigratev2(t *testing.T) { if !reflect.DeepEqual(*expAttrProf, *resultAttr) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(expAttrProf), utils.ToJSON(resultAttr)) } - expFltrIdx := map[string]utils.StringMap{ - "*string:~*req.Account:1001": utils.StringMap{"ATTR_1": true}, - "*string:~*req.Subject:1001": utils.StringMap{"ATTR_1": true}, + expFltrIdx := map[string]utils.StringSet{ + "*string:~*req.Account:1001": {"ATTR_1": struct{}{}}, + "*string:~*req.Subject:1001": {"ATTR_1": struct{}{}}, } - if fltridx, err := fltrMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], utils.ConcatenatedKey(attrProf.Tenant, utils.META_ANY), utils.MetaString, nil); err != nil { + if fltridx, err := fltrMigrator.dmOut.DataManager().GetIndexes( + utils.CacheAttributeFilterIndexes, + utils.ConcatenatedKey(attrProf.Tenant, utils.META_ANY), + "", false, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(expFltrIdx, fltridx) { t.Errorf("Expected %v, recived: %v", utils.ToJSON(expFltrIdx), utils.ToJSON(fltridx)) diff --git a/migrator/user_it_test.go b/migrator/user_it_test.go index beacb6703..0e47d7771 100644 --- a/migrator/user_it_test.go +++ b/migrator/user_it_test.go @@ -193,13 +193,15 @@ func testUsrITMigrateAndMove(t *testing.T) { t.Error("Error should be not found : ", err) } - expUsrIdx := map[string]utils.StringMap{ - "*string:~Account:1002": utils.StringMap{ - "1001": true, + expUsrIdx := map[string]utils.StringSet{ + "*string:~Account:1002": { + "1001": struct{}{}, }, } - if usridx, err := usrMigrator.dmOut.DataManager().GetFilterIndexes(utils.PrefixToIndexCache[utils.AttributeProfilePrefix], - utils.ConcatenatedKey("cgrates.org", utils.META_ANY), utils.MetaString, nil); err != nil { + if usridx, err := usrMigrator.dmOut.DataManager().GetIndexes( + utils.PrefixToIndexCache[utils.AttributeProfilePrefix], + utils.ConcatenatedKey("cgrates.org", utils.META_ANY), + "", true, true); err != nil { t.Error(err) } else if !reflect.DeepEqual(expUsrIdx, usridx) { t.Errorf("Expected %v, recived: %v", utils.ToJSON(expUsrIdx), utils.ToJSON(usridx)) diff --git a/utils/consts.go b/utils/consts.go index 744ff3f05..4d195d80c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1029,11 +1029,8 @@ const ( ReplicatorSv1GetRateProfile = "ReplicatorSv1.GetRateProfile" ReplicatorSv1GetDispatcherHost = "ReplicatorSv1.GetDispatcherHost" ReplicatorSv1GetItemLoadIDs = "ReplicatorSv1.GetItemLoadIDs" - ReplicatorSv1GetFilterIndexes = "ReplicatorSv1.GetFilterIndexes" - ReplicatorSv1MatchFilterIndex = "ReplicatorSv1.MatchFilterIndex" ReplicatorSv1SetThresholdProfile = "ReplicatorSv1.SetThresholdProfile" ReplicatorSv1SetThreshold = "ReplicatorSv1.SetThreshold" - ReplicatorSv1SetFilterIndexes = "ReplicatorSv1.SetFilterIndexes" ReplicatorSv1SetAccount = "ReplicatorSv1.SetAccount" ReplicatorSv1SetDestination = "ReplicatorSv1.SetDestination" ReplicatorSv1SetReverseDestination = "ReplicatorSv1.SetReverseDestination" diff --git a/utils/coreutils.go b/utils/coreutils.go index 47445d2a2..4bc274bad 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -860,45 +860,6 @@ func LongExecTimeDetector(logID string, maxDur time.Duration) (endchan chan stru return } -type GetFilterIndexesArg struct { - CacheID string - ItemIDPrefix string - FilterType string - FldNameVal map[string]string -} - -type MatchFilterIndexArg struct { - CacheID string - ItemIDPrefix string - FilterType string - FieldName string - FieldVal string -} - -type SetFilterIndexesArg struct { - CacheID string - ItemIDPrefix string - Indexes map[string]StringMap -} - -type GetFilterIndexesArgWithArgDispatcher struct { - *GetFilterIndexesArg - TenantArg - *ArgDispatcher -} - -type MatchFilterIndexArgWithArgDispatcher struct { - *MatchFilterIndexArg - TenantArg - *ArgDispatcher -} - -type SetFilterIndexesArgWithArgDispatcher struct { - *SetFilterIndexesArg - TenantArg - *ArgDispatcher -} - type StringWithApiKey struct { *ArgDispatcher TenantArg