From 5ad2a1753d6d3607afb2737a67b34f2d1a2a87b0 Mon Sep 17 00:00:00 2001 From: edwardro22 Date: Wed, 3 Jan 2018 10:24:18 +0200 Subject: [PATCH] Added compile indexes for threshold with test --- apier/v1/attributes.go | 2 +- apier/v1/filter_indexes.go | 334 +++++++++++++++++++ apier/v1/filter_indexes_it_test.go | 510 +++++++++++++++++++++++++++++ apier/v1/resourcesv1.go | 2 +- apier/v1/stats.go | 2 +- apier/v1/suppliers.go | 2 +- cmd/cgr-migrator/cgr-migrator.go | 4 +- console/command_executer.go | 7 +- console/compute_filter_indexes.go | 64 ++++ engine/attributes_test.go | 2 +- engine/datamanager.go | 115 ++++++- engine/filterindexer.go | 5 + engine/onstor_it_test.go | 18 +- engine/storage_mongo_datadb.go | 49 ++- engine/storage_redis.go | 6 +- engine/suppliers_test.go | 13 +- engine/tp_reader.go | 8 +- migrator/migrator_it_test.go | 2 +- migrator/resource.go | 2 +- migrator/stats.go | 4 +- migrator/suppliers.go | 2 +- utils/apitpdata.go | 9 + utils/consts.go | 5 + 23 files changed, 1101 insertions(+), 66 deletions(-) create mode 100644 apier/v1/filter_indexes.go create mode 100644 apier/v1/filter_indexes_it_test.go create mode 100644 console/compute_filter_indexes.go diff --git a/apier/v1/attributes.go b/apier/v1/attributes.go index 79f768d46..3c90107d1 100644 --- a/apier/v1/attributes.go +++ b/apier/v1/attributes.go @@ -46,7 +46,7 @@ func (apierV1 *ApierV1) SetAttributeProfile(extAls *engine.ExternalAttributeProf return utils.NewErrMandatoryIeMissing(missing...) } alsPrf := extAls.AsAttributeProfile() - if err := apierV1.DataManager.SetAttributeProfile(alsPrf); err != nil { + if err := apierV1.DataManager.SetAttributeProfile(alsPrf, true); err != nil { return utils.APIErrorHandler(err) } cache.RemKey(utils.AttributeProfilePrefix+utils.ConcatenatedKey(extAls.Tenant, extAls.ID), true, "") // ToDo: Remove here with autoreload diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go new file mode 100644 index 000000000..e1d5ac318 --- /dev/null +++ b/apier/v1/filter_indexes.go @@ -0,0 +1,334 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "fmt" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "strings" +) + +/* +type ArgsComputeFilterIndexes struct { + Tenant string + AttributeIDs *[]string + ResourceIDs *[]string + StatIDs *[]string + SupplierIDs *[]string + ThresholdIDs *[]string +} +*/ + +func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, reply *string) error { + //ThresholdProfile Indexes + var thresholdIDs []string + thdsIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.ThresholdProfilePrefix, args.Tenant) + if args.ThresholdIDs == nil { + ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, id := range ids { + thresholdIDs = append(thresholdIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + } else { + thresholdIDs = *args.ThresholdIDs + } + + for _, id := range thresholdIDs { + th, err := self.DataManager.GetThresholdProfile(args.Tenant, id, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, fltrID := range th.FilterIDs { + fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th) + } + return utils.APIErrorHandler(err) + } else { + tpFltr := engine.FilterToTPFilter(fltr) + thdsIndexers.IndexTPFilter(tpFltr, th.ID) + } + } + } + if args.ThresholdIDs == nil { + if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + args.Tenant, false)); err != nil { + if err != utils.ErrNotFound { + return utils.APIErrorHandler(err) + } + } + if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + args.Tenant, true), ""); err != nil { + if err != utils.ErrNotFound { + return utils.APIErrorHandler(err) + } + } + } else { + indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.ThresholdProfilePrefix, args.Tenant) + for _, id := range thresholdIDs { + if err := indexRemover.RemoveItemFromIndex(id); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + } + } + if err := thdsIndexers.StoreIndexes(); err != nil { + return utils.APIErrorHandler(err) + } + //StatQueueProfile Indexes + + var statIDs []string + sqpIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.StatQueueProfilePrefix, args.Tenant) + if args.StatIDs == nil { + ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, id := range ids { + err = sqpIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + if err != nil && err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + statIDs = append(statIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + } else { + statIDs = *args.StatIDs + } + for _, id := range statIDs { + sqp, err := self.DataManager.GetStatQueueProfile(args.Tenant, id, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, fltrID := range sqp.FilterIDs { + fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, sqp) + } + return utils.APIErrorHandler(err) + } else { + tpFltr := engine.FilterToTPFilter(fltr) + sqpIndexers.IndexTPFilter(tpFltr, sqp.ID) + } + } + } + if args.StatIDs == nil { + if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, + args.Tenant, false)); err != nil { + return utils.APIErrorHandler(err) + } + if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix, + args.Tenant, true), ""); err != nil { + return utils.APIErrorHandler(err) + } + + } else { + indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.StatQueueProfilePrefix, args.Tenant) + for _, id := range statIDs { + if err := indexRemover.RemoveItemFromIndex(id); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + } + } + if err := sqpIndexers.StoreIndexes(); err != nil { + return err + } + //ResourceProfile Indexes + var resourceIDs []string + rpIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.ResourceProfilesPrefix, args.Tenant) + if args.ResourceIDs == nil { + ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, id := range ids { + err = rpIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + if err != nil && err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + resourceIDs = append(resourceIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + } else { + resourceIDs = *args.ResourceIDs + } + for _, id := range resourceIDs { + rp, err := self.DataManager.GetResourceProfile(args.Tenant, id, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, fltrID := range rp.FilterIDs { + fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, rp) + } + return utils.APIErrorHandler(err) + } else { + tpFltr := engine.FilterToTPFilter(fltr) + rpIndexers.IndexTPFilter(tpFltr, rp.ID) + + } + } + } + if args.ResourceIDs == nil { + if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ResourceProfilesPrefix, + args.Tenant, false)); err != nil { + return utils.APIErrorHandler(err) + } + if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ResourceProfilesPrefix, + args.Tenant, true), ""); err != nil { + return utils.APIErrorHandler(err) + } + } else { + indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.ResourceProfilesPrefix, args.Tenant) + for _, id := range resourceIDs { + if err := indexRemover.RemoveItemFromIndex(id); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + } + } + if err := rpIndexers.StoreIndexes(); err != nil { + return err + } + //SupplierProfile Indexes + var supplierIDs []string + sppIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.SupplierProfilePrefix, args.Tenant) + if args.SupplierIDs == nil { + ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, id := range ids { + err = sppIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + if err != nil && err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + supplierIDs = append(supplierIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + } else { + supplierIDs = *args.SupplierIDs + } + for _, id := range supplierIDs { + spp, err := self.DataManager.GetSupplierProfile(args.Tenant, id, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, fltrID := range spp.FilterIDs { + fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, spp) + } + return utils.APIErrorHandler(err) + } else { + tpFltr := engine.FilterToTPFilter(fltr) + sppIndexers.IndexTPFilter(tpFltr, spp.ID) + } + } + } + if args.SupplierIDs == nil { + if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix, + args.Tenant, false)); err != nil { + return utils.APIErrorHandler(err) + } + if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix, + args.Tenant, true), ""); err != nil { + return utils.APIErrorHandler(err) + } + + } else { + indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.SupplierProfilePrefix, args.Tenant) + for _, id := range resourceIDs { + if err := indexRemover.RemoveItemFromIndex(id); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + } + } + if err := sppIndexers.StoreIndexes(); err != nil { + return err + } + //AttributeProfile Indexes + var attributeIDs []string + attrIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.AttributeProfilePrefix, args.Tenant) + if args.AttributeIDs == nil { + ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, id := range ids { + err = attrIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + if err != nil && err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + attributeIDs = append(attributeIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1]) + } + } else { + attributeIDs = *args.AttributeIDs + } + for _, id := range attributeIDs { + ap, err := self.DataManager.GetAttributeProfile(args.Tenant, id, false, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + for _, fltrID := range ap.FilterIDs { + fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional) + if err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, ap) + } + return utils.APIErrorHandler(err) + } else { + tpFltr := engine.FilterToTPFilter(fltr) + attrIndexers.IndexTPFilter(tpFltr, ap.ID) + + } + } + } + if args.AttributeIDs == nil { + if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, + args.Tenant, false)); err != nil { + return utils.APIErrorHandler(err) + } + if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix, + args.Tenant, true), ""); err != nil { + return utils.APIErrorHandler(err) + } + + } else { + indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.AttributeProfilePrefix, args.Tenant) + for _, id := range attributeIDs { + if err := indexRemover.RemoveItemFromIndex(id); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return utils.APIErrorHandler(err) + } + } + } + if err := attrIndexers.StoreIndexes(); err != nil { + return err + } + *reply = utils.OK + return nil +} diff --git a/apier/v1/filter_indexes_it_test.go b/apier/v1/filter_indexes_it_test.go new file mode 100644 index 000000000..6ecb7d2e1 --- /dev/null +++ b/apier/v1/filter_indexes_it_test.go @@ -0,0 +1,510 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FIdxTNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + // "net/rpc" + "fmt" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" + "time" + // "log" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + rdsITdb *engine.RedisStorage + mgoITdb *engine.MongoStorage + onStor *engine.DataManager + err error + indexes map[string]utils.StringMap +) + +var sTestsFilterIndexesSV1 = []func(t *testing.T){ + testFlush, + testV1FIdxLoadConfig, + testV1FIdxdxInitDataDb, + testV1FIdxResetStorDb, + testV1FIdxStartEngine, + testV1FIdxRpcConn, + testV1FIdxSetThresholdProfile, + testV1FIdxComputeThresholdsIndexes, + testV1FIdxSetSecondThresholdProfile, + testV1FIdxSecondComputeThresholdsIndexes, + //to add testV1TSGetThresholdsAfterRestart, + // testV1FIdxSetThresholdProfile, + // testV1FIdxUpdateThresholdProfile, + // testV1FIdxRemoveThresholdProfile, + testV1FIdxStopEngine, +} + +func TestFIdxV1ITMySQLConnect(t *testing.T) { + cfg, _ := config.NewDefaultCGRConfig() + rdsITdb, err = engine.NewRedisStorage(fmt.Sprintf("%s:%s", cfg.DataDbHost, cfg.DataDbPort), 10, + cfg.DataDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, nil, 1) + + if err != nil { + t.Fatal("Could not connect to Redis", err.Error()) + } +} + +// Test start here +func TestFIdxV1ITMySQL(t *testing.T) { + onStor = engine.NewDataManager(rdsITdb) + tSv1ConfDIR = "tutmysql" + for _, stest := range sTestsFilterIndexesSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + +func TestFIdxV1ITMongoConnect(t *testing.T) { + cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo") + mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath) + if err != nil { + t.Fatal(err) + } + if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, + mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, + utils.DataDB, nil, mgoITCfg.CacheCfg(), mgoITCfg.LoadHistorySize); err != nil { + t.Fatal(err) + } +} + +func TestFIdxV1ITMongo(t *testing.T) { + onStor = engine.NewDataManager(mgoITdb) + tSv1ConfDIR = "tutmongo" + time.Sleep(time.Duration(2 * time.Second)) // give time for engine to start + for _, stest := range sTestsFilterIndexesSV1 { + t.Run(tSv1ConfDIR, stest) + } +} + +func testV1FIdxLoadConfig(t *testing.T) { + var err error + tSv1CfgPath = path.Join(*dataDir, "conf", "samples", tSv1ConfDIR) + if tSv1Cfg, err = config.NewCGRConfigFromFolder(tSv1CfgPath); err != nil { + t.Error(err) + } + switch tSv1ConfDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + thdsDelay = 4000 + default: + thdsDelay = 1000 + } +} + +func testV1FIdxdxInitDataDb(t *testing.T) { + if err := engine.InitDataDb(tSv1Cfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func testV1FIdxResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tSv1Cfg); err != nil { + t.Fatal(err) + } +} + +func testFlush(t *testing.T) { + onStor.DataDB().Flush("") + if err := engine.SetDBVersions(onStor.DataDB()); err != nil { + t.Error("Error ", err.Error()) + } +} + +func testV1FIdxStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil { + t.Fatal(err) + } +} + +func testV1FIdxRpcConn(t *testing.T) { + var err error + tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testV1FIdxSetThresholdProfile(t *testing.T) { + tenant := "cgrates.org" + var reply *engine.ThresholdProfile + filter = &engine.Filter{ + Tenant: tenant, + ID: "TestFilter", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1001"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + + var result string + if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + tPrfl = &engine.ThresholdProfile{ + Tenant: tenant, + ID: "TEST_PROFILE1", + FilterIDs: []string{"TestFilter"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + }, + Recurrent: true, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_1", "ACT_2"}, + Async: true, + } + if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + } + if err = onStor.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + tenant, false)); err != nil { + t.Error(err) + } + if err := onStor.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + tenant, true), ""); err != nil { + t.Error(err) + } + if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), + nil); err != utils.ErrNotFound { + t.Error(err) + } + +} + +func testV1FIdxComputeThresholdsIndexes(t *testing.T) { + tenant := "cgrates.org" + emptySlice := []string{} + var reply2 string + if err := tSv1Rpc.Call(utils.ComputeFilterIndexes, utils.ArgsComputeFilterIndexes{ + Tenant: "cgrates.org", + ThresholdIDs: nil, + AttributeIDs: &emptySlice, + ResourceIDs: &emptySlice, + StatIDs: &emptySlice, + SupplierIDs: &emptySlice, + }, &reply2); err != nil { + t.Error(err) + } + if reply2 != utils.OK { + t.Errorf("Error: %+v", reply2) + } + expectedIDX := map[string]utils.StringMap{"Account:1001": {"TEST_PROFILE1": true}} + indexes, err := onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), nil) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes)) + } + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true}} + indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, true), nil) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) + } +} + +func testV1FIdxSetSecondThresholdProfile(t *testing.T) { + tenant := "cgrates.org" + var reply *engine.ThresholdProfile + filter = &engine.Filter{ + Tenant: tenant, + ID: "TestFilter2", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "Account", + Type: "*string", + Values: []string{"1001"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + + var result string + if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE2"}, &reply); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + tPrfl = &engine.ThresholdProfile{ + Tenant: tenant, + ID: "TEST_PROFILE2", + FilterIDs: []string{"TestFilter2"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + }, + Recurrent: true, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_1", "ACT_2"}, + Async: true, + } + if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE2"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + } + if err = onStor.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + tenant, false)); err != nil { + t.Error(err) + } + if err := onStor.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, + tenant, true), ""); err != nil { + t.Error(err) + } + if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), + nil); err != utils.ErrNotFound { + t.Error(err) + } +} + +func testV1FIdxSecondComputeThresholdsIndexes(t *testing.T) { + tenant := "cgrates.org" + thid := []string{"TEST_PROFILE2"} + emptySlice := []string{} + var reply2 string + if err := tSv1Rpc.Call(utils.ComputeFilterIndexes, utils.ArgsComputeFilterIndexes{ + Tenant: "cgrates.org", + ThresholdIDs: &thid, + AttributeIDs: &emptySlice, + ResourceIDs: &emptySlice, + StatIDs: &emptySlice, + SupplierIDs: &emptySlice, + }, &reply2); err != nil { + t.Error(err) + } + if reply2 != utils.OK { + t.Errorf("Error: %+v", reply2) + } + expectedIDX := map[string]utils.StringMap{"Account:1001": {"TEST_PROFILE2": true}} + indexes, err := onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), nil) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes)) + } + expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE2": {"Account:1001": true}} + indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, true), nil) + if err != nil { + t.Error(err) + } + if !reflect.DeepEqual(expectedRevIDX, indexes) { + t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes)) + } +} + +// 1.set threshold in datadb fara sa faca indexuri +// 2.fac querri la index sa fiu sigur ca is 0 +// 3.compile indexes all +// 4.sa verific indexurile sa fie ok pt thresholdu setat de mine +// 5.set al doilea threshold +// 6.compute cu id +// 7.sa verific indexurile sa fie ok pt thresholdu setat de mine + +// func testV1FIdxGetThresholdsAfterRestart(t *testing.T) { +// time.Sleep(time.Second) +// if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil { +// t.Fatal(err) +// } +// var err error +// tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed +// if err != nil { +// t.Fatal("Could not connect to rater: ", err.Error()) +// } +// var td engine.Threshold +// if err := tSv1Rpc.Call(utils.ThresholdSv1GetThreshold, +// &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &td); err != nil { +// t.Error(err) +// } else if td.Snooze.IsZero() { // make sure Snooze time was reset during execution +// t.Errorf("received: %+v", td) +// } +// time.Sleep(time.Duration(1 * time.Second)) +// } + +/* + testV1FIdxSetThresholdProfile(t *testing.T) { + var reply *engine.ThresholdProfile + filter = &engine.Filter{ + Tenant: "cgrates.org", + ID: "TestFilter", + RequestFilters: []*engine.RequestFilter{ + &engine.RequestFilter{ + FieldName: "*string", + Type: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), + }, + } + + var result string + if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } + tPrfl = &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TEST_PROFILE1", + FilterIDs: []string{"TestFilter"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(), + }, + Recurrent: true, + MinSleep: time.Duration(5 * time.Minute), + Blocker: false, + Weight: 20.0, + ActionIDs: []string{"ACT_1", "ACT_2"}, + Async: true, + } + if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) + } +} +*/ + +// func testV1FIdxUpdateThresholdProfile(t *testing.T) { +// var result string +// filter = &engine.Filter{ +// Tenant: "cgrates.org", +// ID: "TestFilter2", +// RequestFilters: []*engine.RequestFilter{ +// &engine.RequestFilter{ +// FieldName: "*string", +// Type: "Account", +// Values: []string{"10", "20"}, +// }, +// }, +// ActivationInterval: &utils.ActivationInterval{ +// ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), +// ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(), +// }, +// } + +// if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil { +// t.Error(err) +// } else if result != utils.OK { +// t.Error("Unexpected reply returned", result) +// } +// tPrfl.FilterIDs = []string{"TestFilter", "TestFilter2"} +// if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { +// t.Error(err) +// } else if result != utils.OK { +// t.Error("Unexpected reply returned", result) +// } +// time.Sleep(time.Duration(100 * time.Millisecond)) // mongo is async +// var reply *engine.ThresholdProfile +// if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", +// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil { +// t.Error(err) +// } else if !reflect.DeepEqual(tPrfl, reply) { +// t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply) +// } +// } + +// func testV1FIdxRemoveThresholdProfile(t *testing.T) { +// var resp string +// if err := tSv1Rpc.Call("ApierV1.RemThresholdProfile", +// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &resp); err != nil { +// t.Error(err) +// } else if resp != utils.OK { +// t.Error("Unexpected reply returned", resp) +// } +// var sqp *engine.ThresholdProfile +// if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile", +// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil || +// err.Error() != utils.ErrNotFound.Error() { +// t.Error(err) +// } +// } + +func testV1FIdxStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go index 40f30637e..8d8520512 100644 --- a/apier/v1/resourcesv1.go +++ b/apier/v1/resourcesv1.go @@ -79,7 +79,7 @@ func (apierV1 *ApierV1) SetResourceProfile(res *engine.ResourceProfile, reply *s if missing := utils.MissingStructFields(res, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataManager.SetResourceProfile(res); err != nil { + if err := apierV1.DataManager.SetResourceProfile(res, true); err != nil { return utils.APIErrorHandler(err) } cache.RemKey(utils.ResourceProfilesPrefix+utils.ConcatenatedKey(res.Tenant, res.ID), true, "") // ToDo: Remove here with autoreload diff --git a/apier/v1/stats.go b/apier/v1/stats.go index 7610111ec..3f75cb16b 100644 --- a/apier/v1/stats.go +++ b/apier/v1/stats.go @@ -43,7 +43,7 @@ func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply if missing := utils.MissingStructFields(sqp, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataManager.SetStatQueueProfile(sqp); err != nil { + if err := apierV1.DataManager.SetStatQueueProfile(sqp, false); err != nil { return utils.APIErrorHandler(err) } cache.RemKey(utils.StatQueueProfilePrefix+utils.ConcatenatedKey(sqp.Tenant, sqp.ID), diff --git a/apier/v1/suppliers.go b/apier/v1/suppliers.go index 8726ec400..d0632b15b 100644 --- a/apier/v1/suppliers.go +++ b/apier/v1/suppliers.go @@ -45,7 +45,7 @@ func (apierV1 *ApierV1) SetSupplierProfile(spp *engine.SupplierProfile, reply *s if missing := utils.MissingStructFields(spp, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := apierV1.DataManager.SetSupplierProfile(spp); err != nil { + if err := apierV1.DataManager.SetSupplierProfile(spp, true); err != nil { return utils.APIErrorHandler(err) } cache.RemKey(utils.SupplierProfilePrefix+utils.ConcatenatedKey(spp.Tenant, spp.ID), true, "") // ToDo: Remove here with autoreload diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 6e92937e2..b6aac1f1f 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -40,14 +40,14 @@ var ( "\n <*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups|*stordb|*datadb> ") version = flag.Bool("version", false, "Prints the application version.") - outDataDBType = flag.String("out_datadb_type", "", "The type of the DataDb Database ") + outDataDBType = flag.String("out_datadb_type", "", "The type of the DataDb Database ") outDataDBHost = flag.String("out_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") outDataDBPort = flag.String("out_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") outDataDBName = flag.String("out_datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.") outDataDBUser = flag.String("out_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") outDataDBPass = flag.String("out_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") - outStorDBType = flag.String("out_stordb_type", "", "The type of the StorDB Database ") + outStorDBType = flag.String("out_stordb_type", "", "The type of the StorDB Database ") outStorDBHost = flag.String("out_stordb_host", config.CgrConfig().StorDBHost, "The StorDB host to connect to.") outStorDBPort = flag.String("out_stordb_port", config.CgrConfig().StorDBPort, "The StorDB port to bind to.") outStorDBName = flag.String("out_stordb_name", config.CgrConfig().StorDBName, "The name/number of the StorDB to connect to.") diff --git a/console/command_executer.go b/console/command_executer.go index 80c275245..4c0a73ff2 100644 --- a/console/command_executer.go +++ b/console/command_executer.go @@ -59,7 +59,12 @@ func (ce *CommandExecuter) FromArgs(args string, verbose bool) error { } func (ce *CommandExecuter) clientArgs(iface interface{}) (args []string) { - _, ok := iface.(*map[string]interface{}) + _, ok := iface.(*utils.ArgsComputeFilterIndexes) + if ok { + args = append(args, "MapStringInterface") + return + } + _, ok = iface.(*map[string]interface{}) if ok { args = append(args, "MapStringInterface") return diff --git a/console/compute_filter_indexes.go b/console/compute_filter_indexes.go new file mode 100644 index 000000000..39bb35efd --- /dev/null +++ b/console/compute_filter_indexes.go @@ -0,0 +1,64 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import ( + + "github.com/cgrates/cgrates/utils" +) + +func init() { + c := &CmdComputeFilterIndexes{ + name: "compute_filter_indexes", + rpcMethod: "ApierV1.ComputeFilterIndexes", + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +type CmdComputeFilterIndexes struct { + name string + rpcMethod string + rpcParams *utils.ArgsComputeFilterIndexes + *CommandExecuter +} + +func (self *CmdComputeFilterIndexes) Name() string { + return self.name +} + +func (self *CmdComputeFilterIndexes) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdComputeFilterIndexes) RpcParams(reset bool) interface{} { +if reset || self.rpcParams == nil { + self.rpcParams = &utils.ArgsComputeFilterIndexes{} + } + return self.rpcParams +} + +func (self *CmdComputeFilterIndexes) PostprocessRpcParams() error { + return nil +} + +func (self *CmdComputeFilterIndexes) RpcResult() interface{} { + var reply string + return &reply +} diff --git a/engine/attributes_test.go b/engine/attributes_test.go index 82678c44e..6aa0b8aa2 100644 --- a/engine/attributes_test.go +++ b/engine/attributes_test.go @@ -131,7 +131,7 @@ func testPopulateAttrService(t *testing.T) { } for _, atr := range atrPs { - dmAtr.SetAttributeProfile(atr) + dmAtr.SetAttributeProfile(atr, false) } prefix := utils.ConcatenatedKey(sev.Tenant, *sev.Context) ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix, prefix) diff --git a/engine/datamanager.go b/engine/datamanager.go index 568a013f6..e1e1acdf7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -380,17 +380,14 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, skipCache bool, tr } func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) (err error) { - if err = dm.DataDB().SetThresholdProfileDrv(th); err != nil { - return - } if withIndex { - thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) + indexer := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant) //remove old ThresholdProfile indexes - if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil && + if err = indexer.RemoveItemFromIndex(th.ID); err != nil && err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Debug(fmt.Sprintf("RemoveItemFromIndex(setTH) Error ")) return } - //Verify matching Filters for every FilterID from ThresholdProfile for _, fltrID := range th.FilterIDs { var fltr *Filter @@ -400,11 +397,13 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) } return } - thdsIndexers.IndexTPFilter(FilterToTPFilter(fltr), th.ID) + indexer.IndexTPFilter(FilterToTPFilter(fltr), th.ID) + } + if err = indexer.StoreIndexes(); err != nil { + return } - return thdsIndexers.StoreIndexes() } - return + return dm.DataDB().SetThresholdProfileDrv(th) } func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string) (err error) { @@ -438,7 +437,29 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, skipCache bool, tr return } -func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile) (err error) { +func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool) (err error) { + if withIndex { + indexer := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, sqp.Tenant) + //remove old StatQueueProfile indexes + if err = indexer.RemoveItemFromIndex(sqp.ID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + //Verify matching Filters for every FilterID from StatQueueProfile + for _, fltrID := range sqp.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp) + } + return + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID) + } + if err = indexer.StoreIndexes(); err != nil { + return + } + } return dm.DataDB().SetStatQueueProfileDrv(sqp) } @@ -539,7 +560,29 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, skipCache bool, tra return } -func (dm *DataManager) SetResourceProfile(rp *ResourceProfile) (err error) { +func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) (err error) { + if withIndex { + indexer := NewReqFilterIndexer(dm, utils.ResourceProfilesPrefix, rp.Tenant) + //remove old ResourceProfiles indexes + if err = indexer.RemoveItemFromIndex(rp.ID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + //Verify matching Filters for every FilterID from ResourceProfiles + for _, fltrID := range rp.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(rp.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, rp) + } + return + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), rp.ID) + } + if err = indexer.StoreIndexes(); err != nil { + return + } + } return dm.DataDB().SetResourceProfileDrv(rp) } @@ -958,7 +1001,29 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, skipCache bool, tra return } -func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile) (err error) { +func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool) (err error) { + if withIndex { + indexer := NewReqFilterIndexer(dm, utils.SupplierProfilePrefix, supp.Tenant) + //remove old SupplierProfile indexes + if err = indexer.RemoveItemFromIndex(supp.ID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + //Verify matching Filters for every FilterID from SupplierProfile + for _, fltrID := range supp.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(supp.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, supp) + } + return + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), supp.ID) + } + if err = indexer.StoreIndexes(); err != nil { + return + } + } return dm.DataDB().SetSupplierProfileDrv(supp) } @@ -992,8 +1057,30 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, skipCache bool, tr return } -func (dm *DataManager) SetAttributeProfile(alsPrf *AttributeProfile) (err error) { - return dm.DataDB().SetAttributeProfileDrv(alsPrf) +func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) (err error) { + if withIndex { + indexer := NewReqFilterIndexer(dm, utils.AttributeProfilePrefix, ap.Tenant) + //remove old AttributeProfile indexes + if err = indexer.RemoveItemFromIndex(ap.ID); err != nil && + err.Error() != utils.ErrNotFound.Error() { + return + } + //Verify matching Filters for every FilterID from AttributeProfile + for _, fltrID := range ap.FilterIDs { + var fltr *Filter + if fltr, err = dm.GetFilter(ap.Tenant, fltrID, false, utils.NonTransactional); err != nil { + if err == utils.ErrNotFound { + err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, ap) + } + return + } + indexer.IndexTPFilter(FilterToTPFilter(fltr), ap.ID) + } + if err = indexer.StoreIndexes(); err != nil { + return + } + } + return dm.DataDB().SetAttributeProfileDrv(ap) } func (dm *DataManager) RemoveAttributeProfile(tenant, id, transactionID string) (err error) { diff --git a/engine/filterindexer.go b/engine/filterindexer.go index 8cab463f0..bd75471db 100644 --- a/engine/filterindexer.go +++ b/engine/filterindexer.go @@ -89,6 +89,11 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID // StoreIndexes handles storing the indexes to dataDB func (rfi *ReqFilterIndexer) StoreIndexes() (err error) { + + for _, idx := range rfi.indexes { + } + for _, idx := range rfi.reveseIndex { + } if err = rfi.dm.SetFilterIndexes( GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false), rfi.indexes); err != nil { diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 8feae3feb..4df09759e 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -863,7 +863,7 @@ func testOnStorITCacheResourceProfile(t *testing.T) { Thresholds: []string{"TEST_ACTIONS"}, UsageTTL: time.Duration(1 * time.Millisecond), } - if err := onStor.SetResourceProfile(rCfg); err != nil { + if err := onStor.SetResourceProfile(rCfg, false); err != nil { t.Error(err) } expectedR := []string{"rsp_cgrates.org:RL_TEST"} @@ -977,7 +977,7 @@ func testOnStorITCacheStatQueueProfile(t *testing.T) { Weight: 20, MinItems: 1, } - if err := onStor.SetStatQueueProfile(statProfile); err != nil { + if err := onStor.SetStatQueueProfile(statProfile, false); err != nil { t.Error(err) } expectedR := []string{"sqp_cgrates.org:Test_Stat_Cache"} @@ -1193,7 +1193,7 @@ func testOnStorITCacheSupplierProfile(t *testing.T) { }, Weight: 20, } - if err := onStor.SetSupplierProfile(splProfile); err != nil { + if err := onStor.SetSupplierProfile(splProfile, false); err != nil { t.Error(err) } expectedT := []string{"spp_cgrates.org:SPRF_1"} @@ -1236,7 +1236,7 @@ func testOnStorITCacheAttributeProfile(t *testing.T) { Attributes: mapSubstitutes, Weight: 20, } - if err := onStor.SetAttributeProfile(attrProfile); err != nil { + if err := onStor.SetAttributeProfile(attrProfile, false); err != nil { t.Error(err) } expectedT := []string{"alp_cgrates.org:ATTRPRF1"} @@ -2194,7 +2194,7 @@ func testOnStorITCRUDResourceProfile(t *testing.T) { if _, rcvErr := onStor.GetResourceProfile(rL.Tenant, rL.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetResourceProfile(rL); err != nil { + if err := onStor.SetResourceProfile(rL, false); err != nil { t.Error(err) } if rcv, err := onStor.GetResourceProfile(rL.Tenant, rL.ID, true, utils.NonTransactional); err != nil { @@ -2361,7 +2361,7 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) { if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { t.Error("Should not be in cache") } - if err := onStor.SetStatQueueProfile(sq); err != nil { + if err := onStor.SetStatQueueProfile(sq, false); err != nil { t.Error(err) } if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false { @@ -2479,7 +2479,7 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) { t.Errorf("Expecting: %v, received: %v", th, rcv) } if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil { - t.Error(err) + t.Error(err) } if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound { t.Error(rcvErr) @@ -2585,7 +2585,7 @@ func testOnStorITCRUDSupplierProfile(t *testing.T) { if _, rcvErr := onStor.GetSupplierProfile("cgrates.org", "SPRF_1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetSupplierProfile(splProfile); err != nil { + if err := onStor.SetSupplierProfile(splProfile, false); err != nil { t.Error(err) } if rcv, err := onStor.GetSupplierProfile("cgrates.org", "SPRF_1", true, utils.NonTransactional); err != nil { @@ -2629,7 +2629,7 @@ func testOnStorITCRUDAttributeProfile(t *testing.T) { if _, rcvErr := onStor.GetAttributeProfile("cgrates.org", "AttrPrf1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.SetAttributeProfile(attrProfile); err != nil { + if err := onStor.SetAttributeProfile(attrProfile, false); err != nil { t.Error(err) } if rcv, err := onStor.GetAttributeProfile("cgrates.org", "AttrPrf1", true, utils.NonTransactional); err != nil { diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 4e4285d87..b2dfa6a39 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1981,20 +1981,23 @@ func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]uti pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}}) } } - bulk := col.Bulk() - bulk.Unordered() - bulk.Upsert(pairs...) - _, err = bulk.Run() + if len(pairs) != 0 { + bulk := col.Bulk() + bulk.Unordered() + bulk.Upsert(pairs...) + _, err = bulk.Run() + } return } func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) { session, col := ms.conn(colRFI) defer session.Close() - if err = col.Remove(bson.M{"key": id}); err != nil { - return + err = col.Remove(bson.M{"key": id}) + if err == mgo.ErrNotFound { + err = utils.ErrNotFound } - return nil + return } // GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB @@ -2040,14 +2043,22 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string, func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) { session, col := ms.conn(colRFI) defer session.Close() - mp := make(map[string][]string) + pairs := []interface{}{} for key, itmMp := range revIdx { - mp[key] = itmMp.Slice() + param := fmt.Sprintf("value.%s", key) + pairs = append(pairs, bson.M{"key": dbKey}) + if len(itmMp) == 0 { + pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}}) + } else { + pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}}) + } + } + if len(pairs) != 0 { + bulk := col.Bulk() + bulk.Unordered() + bulk.Upsert(pairs...) + _, err = bulk.Run() } - _, err = col.Upsert(bson.M{"key": dbKey}, &struct { - Key string - Value map[string][]string - }{dbKey, mp}) return } @@ -2055,11 +2066,15 @@ func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) { session, col := ms.conn(colRFI) defer session.Close() - findParam := fmt.Sprintf("value.%s", itemID) - if err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}}); err != nil { - return + if itemID != "" { + findParam := fmt.Sprintf("value.%s", itemID) + return col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}}) } - return nil + err = col.Remove(bson.M{"key": dbKey}) + if err == mgo.ErrNotFound { + err = utils.ErrNotFound + } + return } func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index fdc442ca2..9bac915b3 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1505,10 +1505,10 @@ func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri //RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) { - if err = rs.Cmd("HDEL", dbKey, itemID).Err; err != nil { - return err + if itemID != "" { + return rs.Cmd("HDEL", dbKey, itemID).Err } - return + return rs.Cmd("DEL", dbKey).Err } func (rs *RedisStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) { diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go index 9bd489b13..9799479b4 100644 --- a/engine/suppliers_test.go +++ b/engine/suppliers_test.go @@ -269,19 +269,20 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { Weight: 20.0, }, } + for _, spr := range sprsmatch { - dmspl.DataDB().SetSupplierProfileDrv(spr) + dmspl.SetSupplierProfile(spr, false) } ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org") - ref.IndexTPFilter(FilterToTPFilter(filter3), "attributeprofile1") - ref.IndexTPFilter(FilterToTPFilter(filter4), "attributeprofile2") + ref.IndexTPFilter(FilterToTPFilter(filter3), "supplierprofile1") + ref.IndexTPFilter(FilterToTPFilter(filter4), "supplierprofile2") err = ref.StoreIndexes() if err != nil { t.Errorf("Error: %+v", err) } //test here GetReqFilterIndexes for StorageMap with a specific map expidx := map[string]utils.StringMap{ - "supplierprofile1:Supplier": utils.StringMap{ + "supplierprofile1:Supplier": { "supplierprofile1": true, }, } @@ -292,8 +293,8 @@ func TestSuppliersPopulateSupplierService(t *testing.T) { splPrf1); err != nil { t.Errorf("Error: %+v", err) } else { - if !reflect.DeepEqual(expidx, rcvidx) { - t.Errorf("Expected: %+v received: %+v", expidx, rcvidx) + if !reflect.DeepEqual(utils.ToJSON(expidx), utils.ToJSON(rcvidx)) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(expidx), utils.ToJSON(rcvidx)) } } } diff --git a/engine/tp_reader.go b/engine/tp_reader.go index f8d711e80..de0a5c20e 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -2182,7 +2182,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dm.SetResourceProfile(rsp); err != nil { + if err = tpr.dm.SetResourceProfile(rsp, false); err != nil { return err } if verbose { @@ -2208,7 +2208,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dm.SetStatQueueProfile(st); err != nil { + if err = tpr.dm.SetStatQueueProfile(st, false); err != nil { return err } if verbose { @@ -2272,7 +2272,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dm.SetSupplierProfile(th); err != nil { + if err = tpr.dm.SetSupplierProfile(th, false); err != nil { return err } if verbose { @@ -2288,7 +2288,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dm.SetAttributeProfile(th); err != nil { + if err = tpr.dm.SetAttributeProfile(th, false); err != nil { return err } if verbose { diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go index a33fbf207..b9fdb07a5 100644 --- a/migrator/migrator_it_test.go +++ b/migrator/migrator_it_test.go @@ -1085,7 +1085,7 @@ func testMigratorStats(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sq.ID, result2.ID) } case action == Move: - if err := mig.dmIN.SetStatQueueProfile(sqp); err != nil { + if err := mig.dmIN.SetStatQueueProfile(sqp, true); err != nil { t.Error("Error when setting Stats ", err.Error()) } if err := mig.dmIN.SetStatQueue(sq); err != nil { diff --git a/migrator/resource.go b/migrator/resource.go index 2f559506a..b63b040b2 100644 --- a/migrator/resource.go +++ b/migrator/resource.go @@ -42,7 +42,7 @@ func (m *Migrator) migrateCurrentResource() (err error) { } if res != nil { if m.dryRun != true { - if err := m.dmOut.SetResourceProfile(res); err != nil { + if err := m.dmOut.SetResourceProfile(res, true); err != nil { return err } m.stats[utils.Resource] += 1 diff --git a/migrator/stats.go b/migrator/stats.go index a7423ad1d..d2b10173f 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -95,7 +95,7 @@ func (m *Migrator) migrateCurrentStats() (err error) { } if sgs != nil { if m.dryRun != true { - if err := m.dmOut.SetStatQueueProfile(sgs); err != nil { + if err := m.dmOut.SetStatQueueProfile(sgs, true); err != nil { return err } } @@ -135,7 +135,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) { if err := m.dmOut.SetStatQueue(sq); err != nil { return err } - if err := m.dmOut.SetStatQueueProfile(sts); err != nil { + if err := m.dmOut.SetStatQueueProfile(sts, true); err != nil { return err } m.stats[utils.StatS] += 1 diff --git a/migrator/suppliers.go b/migrator/suppliers.go index c7b5a24f6..56508bddb 100644 --- a/migrator/suppliers.go +++ b/migrator/suppliers.go @@ -42,7 +42,7 @@ func (m *Migrator) migrateCurrentSupplierProfile() (err error) { } if splp != nil { if m.dryRun != true { - if err := m.dmOut.SetSupplierProfile(splp); err != nil { + if err := m.dmOut.SetSupplierProfile(splp, true); err != nil { return err } m.stats[utils.Suppliers] += 1 diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 19d18a012..e5d21c6ae 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1281,6 +1281,15 @@ type ArgRSv1ResourceUsage struct { Units float64 } +type ArgsComputeFilterIndexes struct { + Tenant string + AttributeIDs *[]string + ResourceIDs *[]string + StatIDs *[]string + SupplierIDs *[]string + ThresholdIDs *[]string +} + func (args *ArgRSv1ResourceUsage) TenantID() string { return ConcatenatedKey(args.CGREvent.Tenant, args.UsageID) } diff --git a/utils/consts.go b/utils/consts.go index 5177dc797..287492060 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -543,6 +543,11 @@ const ( MetaSetVersions = "*set_versions" ) +// MetaFilterIndexesAPIs +const ( + ComputeFilterIndexes = "ApierV1.ComputeFilterIndexes" +) + // MetaSupplierAPIs const ( SupplierSv1GetSuppliers = "SupplierSv1.GetSuppliers"