diff --git a/apier/v1/attributes.go b/apier/v1/attributes.go
index 79f768d46..3c90107d1 100644
--- a/apier/v1/attributes.go
+++ b/apier/v1/attributes.go
@@ -46,7 +46,7 @@ func (apierV1 *ApierV1) SetAttributeProfile(extAls *engine.ExternalAttributeProf
return utils.NewErrMandatoryIeMissing(missing...)
}
alsPrf := extAls.AsAttributeProfile()
- if err := apierV1.DataManager.SetAttributeProfile(alsPrf); err != nil {
+ if err := apierV1.DataManager.SetAttributeProfile(alsPrf, true); err != nil {
return utils.APIErrorHandler(err)
}
cache.RemKey(utils.AttributeProfilePrefix+utils.ConcatenatedKey(extAls.Tenant, extAls.ID), true, "") // ToDo: Remove here with autoreload
diff --git a/apier/v1/filter_indexes.go b/apier/v1/filter_indexes.go
new file mode 100644
index 000000000..e1d5ac318
--- /dev/null
+++ b/apier/v1/filter_indexes.go
@@ -0,0 +1,334 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package v1
+
+import (
+ "fmt"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "strings"
+)
+
+/*
+type ArgsComputeFilterIndexes struct {
+ Tenant string
+ AttributeIDs *[]string
+ ResourceIDs *[]string
+ StatIDs *[]string
+ SupplierIDs *[]string
+ ThresholdIDs *[]string
+}
+*/
+
+func (self *ApierV1) ComputeFilterIndexes(args utils.ArgsComputeFilterIndexes, reply *string) error {
+ //ThresholdProfile Indexes
+ var thresholdIDs []string
+ thdsIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.ThresholdProfilePrefix, args.Tenant)
+ if args.ThresholdIDs == nil {
+ ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, id := range ids {
+ thresholdIDs = append(thresholdIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ }
+ } else {
+ thresholdIDs = *args.ThresholdIDs
+ }
+
+ for _, id := range thresholdIDs {
+ th, err := self.DataManager.GetThresholdProfile(args.Tenant, id, false, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, fltrID := range th.FilterIDs {
+ fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, th)
+ }
+ return utils.APIErrorHandler(err)
+ } else {
+ tpFltr := engine.FilterToTPFilter(fltr)
+ thdsIndexers.IndexTPFilter(tpFltr, th.ID)
+ }
+ }
+ }
+ if args.ThresholdIDs == nil {
+ if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ args.Tenant, false)); err != nil {
+ if err != utils.ErrNotFound {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ args.Tenant, true), ""); err != nil {
+ if err != utils.ErrNotFound {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ } else {
+ indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.ThresholdProfilePrefix, args.Tenant)
+ for _, id := range thresholdIDs {
+ if err := indexRemover.RemoveItemFromIndex(id); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ }
+ if err := thdsIndexers.StoreIndexes(); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ //StatQueueProfile Indexes
+
+ var statIDs []string
+ sqpIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.StatQueueProfilePrefix, args.Tenant)
+ if args.StatIDs == nil {
+ ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, id := range ids {
+ err = sqpIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ if err != nil && err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ statIDs = append(statIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ }
+ } else {
+ statIDs = *args.StatIDs
+ }
+ for _, id := range statIDs {
+ sqp, err := self.DataManager.GetStatQueueProfile(args.Tenant, id, false, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, fltrID := range sqp.FilterIDs {
+ fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, sqp)
+ }
+ return utils.APIErrorHandler(err)
+ } else {
+ tpFltr := engine.FilterToTPFilter(fltr)
+ sqpIndexers.IndexTPFilter(tpFltr, sqp.ID)
+ }
+ }
+ }
+ if args.StatIDs == nil {
+ if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix,
+ args.Tenant, false)); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.StatQueueProfilePrefix,
+ args.Tenant, true), ""); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+
+ } else {
+ indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.StatQueueProfilePrefix, args.Tenant)
+ for _, id := range statIDs {
+ if err := indexRemover.RemoveItemFromIndex(id); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ }
+ if err := sqpIndexers.StoreIndexes(); err != nil {
+ return err
+ }
+ //ResourceProfile Indexes
+ var resourceIDs []string
+ rpIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.ResourceProfilesPrefix, args.Tenant)
+ if args.ResourceIDs == nil {
+ ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, id := range ids {
+ err = rpIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ if err != nil && err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ resourceIDs = append(resourceIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ }
+ } else {
+ resourceIDs = *args.ResourceIDs
+ }
+ for _, id := range resourceIDs {
+ rp, err := self.DataManager.GetResourceProfile(args.Tenant, id, false, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, fltrID := range rp.FilterIDs {
+ fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, rp)
+ }
+ return utils.APIErrorHandler(err)
+ } else {
+ tpFltr := engine.FilterToTPFilter(fltr)
+ rpIndexers.IndexTPFilter(tpFltr, rp.ID)
+
+ }
+ }
+ }
+ if args.ResourceIDs == nil {
+ if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ResourceProfilesPrefix,
+ args.Tenant, false)); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ResourceProfilesPrefix,
+ args.Tenant, true), ""); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ } else {
+ indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.ResourceProfilesPrefix, args.Tenant)
+ for _, id := range resourceIDs {
+ if err := indexRemover.RemoveItemFromIndex(id); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ }
+ if err := rpIndexers.StoreIndexes(); err != nil {
+ return err
+ }
+ //SupplierProfile Indexes
+ var supplierIDs []string
+ sppIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.SupplierProfilePrefix, args.Tenant)
+ if args.SupplierIDs == nil {
+ ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, id := range ids {
+ err = sppIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ if err != nil && err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ supplierIDs = append(supplierIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ }
+ } else {
+ supplierIDs = *args.SupplierIDs
+ }
+ for _, id := range supplierIDs {
+ spp, err := self.DataManager.GetSupplierProfile(args.Tenant, id, false, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, fltrID := range spp.FilterIDs {
+ fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, spp)
+ }
+ return utils.APIErrorHandler(err)
+ } else {
+ tpFltr := engine.FilterToTPFilter(fltr)
+ sppIndexers.IndexTPFilter(tpFltr, spp.ID)
+ }
+ }
+ }
+ if args.SupplierIDs == nil {
+ if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix,
+ args.Tenant, false)); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.SupplierProfilePrefix,
+ args.Tenant, true), ""); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+
+ } else {
+ indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.SupplierProfilePrefix, args.Tenant)
+ for _, id := range resourceIDs {
+ if err := indexRemover.RemoveItemFromIndex(id); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ }
+ if err := sppIndexers.StoreIndexes(); err != nil {
+ return err
+ }
+ //AttributeProfile Indexes
+ var attributeIDs []string
+ attrIndexers := engine.NewReqFilterIndexer(self.DataManager, utils.AttributeProfilePrefix, args.Tenant)
+ if args.AttributeIDs == nil {
+ ids, err := self.DataManager.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, id := range ids {
+ err = attrIndexers.RemoveItemFromIndex(strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ if err != nil && err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ attributeIDs = append(attributeIDs, strings.Split(id, utils.CONCATENATED_KEY_SEP)[1])
+ }
+ } else {
+ attributeIDs = *args.AttributeIDs
+ }
+ for _, id := range attributeIDs {
+ ap, err := self.DataManager.GetAttributeProfile(args.Tenant, id, false, utils.NonTransactional)
+ if err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ for _, fltrID := range ap.FilterIDs {
+ fltr, err := self.DataManager.GetFilter(args.Tenant, fltrID, false, utils.NonTransactional)
+ if err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for stats queue: %+v", fltrID, ap)
+ }
+ return utils.APIErrorHandler(err)
+ } else {
+ tpFltr := engine.FilterToTPFilter(fltr)
+ attrIndexers.IndexTPFilter(tpFltr, ap.ID)
+
+ }
+ }
+ }
+ if args.AttributeIDs == nil {
+ if err := self.DataManager.RemoveFilterIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix,
+ args.Tenant, false)); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ if err := self.DataManager.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.AttributeProfilePrefix,
+ args.Tenant, true), ""); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+
+ } else {
+ indexRemover := engine.NewReqFilterIndexer(self.DataManager, utils.AttributeProfilePrefix, args.Tenant)
+ for _, id := range attributeIDs {
+ if err := indexRemover.RemoveItemFromIndex(id); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return utils.APIErrorHandler(err)
+ }
+ }
+ }
+ if err := attrIndexers.StoreIndexes(); err != nil {
+ return err
+ }
+ *reply = utils.OK
+ return nil
+}
diff --git a/apier/v1/filter_indexes_it_test.go b/apier/v1/filter_indexes_it_test.go
new file mode 100644
index 000000000..6ecb7d2e1
--- /dev/null
+++ b/apier/v1/filter_indexes_it_test.go
@@ -0,0 +1,510 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FIdxTNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package v1
+
+import (
+ // "net/rpc"
+ "fmt"
+ "net/rpc/jsonrpc"
+ "path"
+ "reflect"
+ "testing"
+ "time"
+ // "log"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var (
+ rdsITdb *engine.RedisStorage
+ mgoITdb *engine.MongoStorage
+ onStor *engine.DataManager
+ err error
+ indexes map[string]utils.StringMap
+)
+
+var sTestsFilterIndexesSV1 = []func(t *testing.T){
+ testFlush,
+ testV1FIdxLoadConfig,
+ testV1FIdxdxInitDataDb,
+ testV1FIdxResetStorDb,
+ testV1FIdxStartEngine,
+ testV1FIdxRpcConn,
+ testV1FIdxSetThresholdProfile,
+ testV1FIdxComputeThresholdsIndexes,
+ testV1FIdxSetSecondThresholdProfile,
+ testV1FIdxSecondComputeThresholdsIndexes,
+ //to add testV1TSGetThresholdsAfterRestart,
+ // testV1FIdxSetThresholdProfile,
+ // testV1FIdxUpdateThresholdProfile,
+ // testV1FIdxRemoveThresholdProfile,
+ testV1FIdxStopEngine,
+}
+
+func TestFIdxV1ITMySQLConnect(t *testing.T) {
+ cfg, _ := config.NewDefaultCGRConfig()
+ rdsITdb, err = engine.NewRedisStorage(fmt.Sprintf("%s:%s", cfg.DataDbHost, cfg.DataDbPort), 10,
+ cfg.DataDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, nil, 1)
+
+ if err != nil {
+ t.Fatal("Could not connect to Redis", err.Error())
+ }
+}
+
+// Test start here
+func TestFIdxV1ITMySQL(t *testing.T) {
+ onStor = engine.NewDataManager(rdsITdb)
+ tSv1ConfDIR = "tutmysql"
+ for _, stest := range sTestsFilterIndexesSV1 {
+ t.Run(tSv1ConfDIR, stest)
+ }
+}
+
+func TestFIdxV1ITMongoConnect(t *testing.T) {
+ cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo")
+ mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.DataDbHost, mgoITCfg.DataDbPort,
+ mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass,
+ utils.DataDB, nil, mgoITCfg.CacheCfg(), mgoITCfg.LoadHistorySize); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestFIdxV1ITMongo(t *testing.T) {
+ onStor = engine.NewDataManager(mgoITdb)
+ tSv1ConfDIR = "tutmongo"
+ time.Sleep(time.Duration(2 * time.Second)) // give time for engine to start
+ for _, stest := range sTestsFilterIndexesSV1 {
+ t.Run(tSv1ConfDIR, stest)
+ }
+}
+
+func testV1FIdxLoadConfig(t *testing.T) {
+ var err error
+ tSv1CfgPath = path.Join(*dataDir, "conf", "samples", tSv1ConfDIR)
+ if tSv1Cfg, err = config.NewCGRConfigFromFolder(tSv1CfgPath); err != nil {
+ t.Error(err)
+ }
+ switch tSv1ConfDIR {
+ case "tutmongo": // Mongo needs more time to reset db, need to investigate
+ thdsDelay = 4000
+ default:
+ thdsDelay = 1000
+ }
+}
+
+func testV1FIdxdxInitDataDb(t *testing.T) {
+ if err := engine.InitDataDb(tSv1Cfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+// Wipe out the cdr database
+func testV1FIdxResetStorDb(t *testing.T) {
+ if err := engine.InitStorDb(tSv1Cfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testFlush(t *testing.T) {
+ onStor.DataDB().Flush("")
+ if err := engine.SetDBVersions(onStor.DataDB()); err != nil {
+ t.Error("Error ", err.Error())
+ }
+}
+
+func testV1FIdxStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testV1FIdxRpcConn(t *testing.T) {
+ var err error
+ tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+ if err != nil {
+ t.Fatal("Could not connect to rater: ", err.Error())
+ }
+}
+
+func testV1FIdxSetThresholdProfile(t *testing.T) {
+ tenant := "cgrates.org"
+ var reply *engine.ThresholdProfile
+ filter = &engine.Filter{
+ Tenant: tenant,
+ ID: "TestFilter",
+ RequestFilters: []*engine.RequestFilter{
+ &engine.RequestFilter{
+ FieldName: "Account",
+ Type: "*string",
+ Values: []string{"1001"},
+ },
+ },
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ },
+ }
+
+ var result string
+ if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil ||
+ err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+ tPrfl = &engine.ThresholdProfile{
+ Tenant: tenant,
+ ID: "TEST_PROFILE1",
+ FilterIDs: []string{"TestFilter"},
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ },
+ Recurrent: true,
+ MinSleep: time.Duration(5 * time.Minute),
+ Blocker: false,
+ Weight: 20.0,
+ ActionIDs: []string{"ACT_1", "ACT_2"},
+ Async: true,
+ }
+ if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(tPrfl, reply) {
+ t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply)
+ }
+ if err = onStor.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ tenant, false)); err != nil {
+ t.Error(err)
+ }
+ if err := onStor.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ tenant, true), ""); err != nil {
+ t.Error(err)
+ }
+ if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false),
+ nil); err != utils.ErrNotFound {
+ t.Error(err)
+ }
+
+}
+
+func testV1FIdxComputeThresholdsIndexes(t *testing.T) {
+ tenant := "cgrates.org"
+ emptySlice := []string{}
+ var reply2 string
+ if err := tSv1Rpc.Call(utils.ComputeFilterIndexes, utils.ArgsComputeFilterIndexes{
+ Tenant: "cgrates.org",
+ ThresholdIDs: nil,
+ AttributeIDs: &emptySlice,
+ ResourceIDs: &emptySlice,
+ StatIDs: &emptySlice,
+ SupplierIDs: &emptySlice,
+ }, &reply2); err != nil {
+ t.Error(err)
+ }
+ if reply2 != utils.OK {
+ t.Errorf("Error: %+v", reply2)
+ }
+ expectedIDX := map[string]utils.StringMap{"Account:1001": {"TEST_PROFILE1": true}}
+ indexes, err := onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(expectedIDX, indexes) {
+ t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes))
+ }
+ expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE1": {"Account:1001": true}}
+ indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, true), nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(expectedRevIDX, indexes) {
+ t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes))
+ }
+}
+
+func testV1FIdxSetSecondThresholdProfile(t *testing.T) {
+ tenant := "cgrates.org"
+ var reply *engine.ThresholdProfile
+ filter = &engine.Filter{
+ Tenant: tenant,
+ ID: "TestFilter2",
+ RequestFilters: []*engine.RequestFilter{
+ &engine.RequestFilter{
+ FieldName: "Account",
+ Type: "*string",
+ Values: []string{"1001"},
+ },
+ },
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ },
+ }
+
+ var result string
+ if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE2"}, &reply); err == nil ||
+ err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+ tPrfl = &engine.ThresholdProfile{
+ Tenant: tenant,
+ ID: "TEST_PROFILE2",
+ FilterIDs: []string{"TestFilter2"},
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ },
+ Recurrent: true,
+ MinSleep: time.Duration(5 * time.Minute),
+ Blocker: false,
+ Weight: 20.0,
+ ActionIDs: []string{"ACT_1", "ACT_2"},
+ Async: true,
+ }
+ if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE2"}, &reply); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(tPrfl, reply) {
+ t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply)
+ }
+ if err = onStor.RemoveFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ tenant, false)); err != nil {
+ t.Error(err)
+ }
+ if err := onStor.RemoveFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix,
+ tenant, true), ""); err != nil {
+ t.Error(err)
+ }
+ if indexes, err = onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false),
+ nil); err != utils.ErrNotFound {
+ t.Error(err)
+ }
+}
+
+func testV1FIdxSecondComputeThresholdsIndexes(t *testing.T) {
+ tenant := "cgrates.org"
+ thid := []string{"TEST_PROFILE2"}
+ emptySlice := []string{}
+ var reply2 string
+ if err := tSv1Rpc.Call(utils.ComputeFilterIndexes, utils.ArgsComputeFilterIndexes{
+ Tenant: "cgrates.org",
+ ThresholdIDs: &thid,
+ AttributeIDs: &emptySlice,
+ ResourceIDs: &emptySlice,
+ StatIDs: &emptySlice,
+ SupplierIDs: &emptySlice,
+ }, &reply2); err != nil {
+ t.Error(err)
+ }
+ if reply2 != utils.OK {
+ t.Errorf("Error: %+v", reply2)
+ }
+ expectedIDX := map[string]utils.StringMap{"Account:1001": {"TEST_PROFILE2": true}}
+ indexes, err := onStor.GetFilterIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, false), nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(expectedIDX, indexes) {
+ t.Errorf("Expecting: %+v, received: %+v", expectedIDX, utils.ToJSON(indexes))
+ }
+ expectedRevIDX := map[string]utils.StringMap{"TEST_PROFILE2": {"Account:1001": true}}
+ indexes, err = onStor.GetFilterReverseIndexes(engine.GetDBIndexKey(utils.ThresholdProfilePrefix, tenant, true), nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if !reflect.DeepEqual(expectedRevIDX, indexes) {
+ t.Errorf("Expecting: %+v, received: %+v", expectedRevIDX, utils.ToJSON(indexes))
+ }
+}
+
+// 1.set threshold in datadb fara sa faca indexuri
+// 2.fac querri la index sa fiu sigur ca is 0
+// 3.compile indexes all
+// 4.sa verific indexurile sa fie ok pt thresholdu setat de mine
+// 5.set al doilea threshold
+// 6.compute cu id
+// 7.sa verific indexurile sa fie ok pt thresholdu setat de mine
+
+// func testV1FIdxGetThresholdsAfterRestart(t *testing.T) {
+// time.Sleep(time.Second)
+// if _, err := engine.StopStartEngine(tSv1CfgPath, thdsDelay); err != nil {
+// t.Fatal(err)
+// }
+// var err error
+// tSv1Rpc, err = jsonrpc.Dial("tcp", tSv1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
+// if err != nil {
+// t.Fatal("Could not connect to rater: ", err.Error())
+// }
+// var td engine.Threshold
+// if err := tSv1Rpc.Call(utils.ThresholdSv1GetThreshold,
+// &utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &td); err != nil {
+// t.Error(err)
+// } else if td.Snooze.IsZero() { // make sure Snooze time was reset during execution
+// t.Errorf("received: %+v", td)
+// }
+// time.Sleep(time.Duration(1 * time.Second))
+// }
+
+/*
+ testV1FIdxSetThresholdProfile(t *testing.T) {
+ var reply *engine.ThresholdProfile
+ filter = &engine.Filter{
+ Tenant: "cgrates.org",
+ ID: "TestFilter",
+ RequestFilters: []*engine.RequestFilter{
+ &engine.RequestFilter{
+ FieldName: "*string",
+ Type: "Account",
+ Values: []string{"1001", "1002"},
+ },
+ },
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+ },
+ }
+
+ var result string
+ if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err == nil ||
+ err.Error() != utils.ErrNotFound.Error() {
+ t.Error(err)
+ }
+ tPrfl = &engine.ThresholdProfile{
+ Tenant: "cgrates.org",
+ ID: "TEST_PROFILE1",
+ FilterIDs: []string{"TestFilter"},
+ ActivationInterval: &utils.ActivationInterval{
+ ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC).Local(),
+ },
+ Recurrent: true,
+ MinSleep: time.Duration(5 * time.Minute),
+ Blocker: false,
+ Weight: 20.0,
+ ActionIDs: []string{"ACT_1", "ACT_2"},
+ Async: true,
+ }
+ if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
+ t.Error(err)
+ } else if result != utils.OK {
+ t.Error("Unexpected reply returned", result)
+ }
+ if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+ &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil {
+ t.Error(err)
+ } else if !reflect.DeepEqual(tPrfl, reply) {
+ t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply)
+ }
+}
+*/
+
+// func testV1FIdxUpdateThresholdProfile(t *testing.T) {
+// var result string
+// filter = &engine.Filter{
+// Tenant: "cgrates.org",
+// ID: "TestFilter2",
+// RequestFilters: []*engine.RequestFilter{
+// &engine.RequestFilter{
+// FieldName: "*string",
+// Type: "Account",
+// Values: []string{"10", "20"},
+// },
+// },
+// ActivationInterval: &utils.ActivationInterval{
+// ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+// ExpiryTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC).Local(),
+// },
+// }
+
+// if err := tSv1Rpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
+// t.Error(err)
+// } else if result != utils.OK {
+// t.Error("Unexpected reply returned", result)
+// }
+// tPrfl.FilterIDs = []string{"TestFilter", "TestFilter2"}
+// if err := tSv1Rpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
+// t.Error(err)
+// } else if result != utils.OK {
+// t.Error("Unexpected reply returned", result)
+// }
+// time.Sleep(time.Duration(100 * time.Millisecond)) // mongo is async
+// var reply *engine.ThresholdProfile
+// if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &reply); err != nil {
+// t.Error(err)
+// } else if !reflect.DeepEqual(tPrfl, reply) {
+// t.Errorf("Expecting: %+v, received: %+v", tPrfl, reply)
+// }
+// }
+
+// func testV1FIdxRemoveThresholdProfile(t *testing.T) {
+// var resp string
+// if err := tSv1Rpc.Call("ApierV1.RemThresholdProfile",
+// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &resp); err != nil {
+// t.Error(err)
+// } else if resp != utils.OK {
+// t.Error("Unexpected reply returned", resp)
+// }
+// var sqp *engine.ThresholdProfile
+// if err := tSv1Rpc.Call("ApierV1.GetThresholdProfile",
+// &utils.TenantID{Tenant: "cgrates.org", ID: "TEST_PROFILE1"}, &sqp); err == nil ||
+// err.Error() != utils.ErrNotFound.Error() {
+// t.Error(err)
+// }
+// }
+
+func testV1FIdxStopEngine(t *testing.T) {
+ if err := engine.KillEngine(100); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/apier/v1/resourcesv1.go b/apier/v1/resourcesv1.go
index 40f30637e..8d8520512 100644
--- a/apier/v1/resourcesv1.go
+++ b/apier/v1/resourcesv1.go
@@ -79,7 +79,7 @@ func (apierV1 *ApierV1) SetResourceProfile(res *engine.ResourceProfile, reply *s
if missing := utils.MissingStructFields(res, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
- if err := apierV1.DataManager.SetResourceProfile(res); err != nil {
+ if err := apierV1.DataManager.SetResourceProfile(res, true); err != nil {
return utils.APIErrorHandler(err)
}
cache.RemKey(utils.ResourceProfilesPrefix+utils.ConcatenatedKey(res.Tenant, res.ID), true, "") // ToDo: Remove here with autoreload
diff --git a/apier/v1/stats.go b/apier/v1/stats.go
index 7610111ec..3f75cb16b 100644
--- a/apier/v1/stats.go
+++ b/apier/v1/stats.go
@@ -43,7 +43,7 @@ func (apierV1 *ApierV1) SetStatQueueProfile(sqp *engine.StatQueueProfile, reply
if missing := utils.MissingStructFields(sqp, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
- if err := apierV1.DataManager.SetStatQueueProfile(sqp); err != nil {
+ if err := apierV1.DataManager.SetStatQueueProfile(sqp, false); err != nil {
return utils.APIErrorHandler(err)
}
cache.RemKey(utils.StatQueueProfilePrefix+utils.ConcatenatedKey(sqp.Tenant, sqp.ID),
diff --git a/apier/v1/suppliers.go b/apier/v1/suppliers.go
index 8726ec400..d0632b15b 100644
--- a/apier/v1/suppliers.go
+++ b/apier/v1/suppliers.go
@@ -45,7 +45,7 @@ func (apierV1 *ApierV1) SetSupplierProfile(spp *engine.SupplierProfile, reply *s
if missing := utils.MissingStructFields(spp, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
- if err := apierV1.DataManager.SetSupplierProfile(spp); err != nil {
+ if err := apierV1.DataManager.SetSupplierProfile(spp, true); err != nil {
return utils.APIErrorHandler(err)
}
cache.RemKey(utils.SupplierProfilePrefix+utils.ConcatenatedKey(spp.Tenant, spp.ID), true, "") // ToDo: Remove here with autoreload
diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go
index 6e92937e2..b6aac1f1f 100755
--- a/cmd/cgr-migrator/cgr-migrator.go
+++ b/cmd/cgr-migrator/cgr-migrator.go
@@ -40,14 +40,14 @@ var (
"\n <*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups|*stordb|*datadb> ")
version = flag.Bool("version", false, "Prints the application version.")
- outDataDBType = flag.String("out_datadb_type", "", "The type of the DataDb Database ")
+ outDataDBType = flag.String("out_datadb_type", "", "The type of the DataDb Database ")
outDataDBHost = flag.String("out_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
outDataDBPort = flag.String("out_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
outDataDBName = flag.String("out_datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
outDataDBUser = flag.String("out_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
outDataDBPass = flag.String("out_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
- outStorDBType = flag.String("out_stordb_type", "", "The type of the StorDB Database ")
+ outStorDBType = flag.String("out_stordb_type", "", "The type of the StorDB Database ")
outStorDBHost = flag.String("out_stordb_host", config.CgrConfig().StorDBHost, "The StorDB host to connect to.")
outStorDBPort = flag.String("out_stordb_port", config.CgrConfig().StorDBPort, "The StorDB port to bind to.")
outStorDBName = flag.String("out_stordb_name", config.CgrConfig().StorDBName, "The name/number of the StorDB to connect to.")
diff --git a/console/command_executer.go b/console/command_executer.go
index 80c275245..4c0a73ff2 100644
--- a/console/command_executer.go
+++ b/console/command_executer.go
@@ -59,7 +59,12 @@ func (ce *CommandExecuter) FromArgs(args string, verbose bool) error {
}
func (ce *CommandExecuter) clientArgs(iface interface{}) (args []string) {
- _, ok := iface.(*map[string]interface{})
+ _, ok := iface.(*utils.ArgsComputeFilterIndexes)
+ if ok {
+ args = append(args, "MapStringInterface")
+ return
+ }
+ _, ok = iface.(*map[string]interface{})
if ok {
args = append(args, "MapStringInterface")
return
diff --git a/console/compute_filter_indexes.go b/console/compute_filter_indexes.go
new file mode 100644
index 000000000..39bb35efd
--- /dev/null
+++ b/console/compute_filter_indexes.go
@@ -0,0 +1,64 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package console
+
+import (
+
+ "github.com/cgrates/cgrates/utils"
+)
+
+func init() {
+ c := &CmdComputeFilterIndexes{
+ name: "compute_filter_indexes",
+ rpcMethod: "ApierV1.ComputeFilterIndexes",
+ }
+ commands[c.Name()] = c
+ c.CommandExecuter = &CommandExecuter{c}
+}
+
+type CmdComputeFilterIndexes struct {
+ name string
+ rpcMethod string
+ rpcParams *utils.ArgsComputeFilterIndexes
+ *CommandExecuter
+}
+
+func (self *CmdComputeFilterIndexes) Name() string {
+ return self.name
+}
+
+func (self *CmdComputeFilterIndexes) RpcMethod() string {
+ return self.rpcMethod
+}
+
+func (self *CmdComputeFilterIndexes) RpcParams(reset bool) interface{} {
+if reset || self.rpcParams == nil {
+ self.rpcParams = &utils.ArgsComputeFilterIndexes{}
+ }
+ return self.rpcParams
+}
+
+func (self *CmdComputeFilterIndexes) PostprocessRpcParams() error {
+ return nil
+}
+
+func (self *CmdComputeFilterIndexes) RpcResult() interface{} {
+ var reply string
+ return &reply
+}
diff --git a/engine/attributes_test.go b/engine/attributes_test.go
index 82678c44e..6aa0b8aa2 100644
--- a/engine/attributes_test.go
+++ b/engine/attributes_test.go
@@ -131,7 +131,7 @@ func testPopulateAttrService(t *testing.T) {
}
for _, atr := range atrPs {
- dmAtr.SetAttributeProfile(atr)
+ dmAtr.SetAttributeProfile(atr, false)
}
prefix := utils.ConcatenatedKey(sev.Tenant, *sev.Context)
ref := NewReqFilterIndexer(dmAtr, utils.AttributeProfilePrefix, prefix)
diff --git a/engine/datamanager.go b/engine/datamanager.go
index 568a013f6..e1e1acdf7 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -380,17 +380,14 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, skipCache bool, tr
}
func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) (err error) {
- if err = dm.DataDB().SetThresholdProfileDrv(th); err != nil {
- return
- }
if withIndex {
- thdsIndexers := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
+ indexer := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, th.Tenant)
//remove old ThresholdProfile indexes
- if err = thdsIndexers.RemoveItemFromIndex(th.ID); err != nil &&
+ if err = indexer.RemoveItemFromIndex(th.ID); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Debug(fmt.Sprintf("RemoveItemFromIndex(setTH) Error "))
return
}
-
//Verify matching Filters for every FilterID from ThresholdProfile
for _, fltrID := range th.FilterIDs {
var fltr *Filter
@@ -400,11 +397,13 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
}
return
}
- thdsIndexers.IndexTPFilter(FilterToTPFilter(fltr), th.ID)
+ indexer.IndexTPFilter(FilterToTPFilter(fltr), th.ID)
+ }
+ if err = indexer.StoreIndexes(); err != nil {
+ return
}
- return thdsIndexers.StoreIndexes()
}
- return
+ return dm.DataDB().SetThresholdProfileDrv(th)
}
func (dm *DataManager) RemoveThresholdProfile(tenant, id, transactionID string) (err error) {
@@ -438,7 +437,29 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, skipCache bool, tr
return
}
-func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile) (err error) {
+func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool) (err error) {
+ if withIndex {
+ indexer := NewReqFilterIndexer(dm, utils.ThresholdProfilePrefix, sqp.Tenant)
+ //remove old StatQueueProfile indexes
+ if err = indexer.RemoveItemFromIndex(sqp.ID); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return
+ }
+ //Verify matching Filters for every FilterID from StatQueueProfile
+ for _, fltrID := range sqp.FilterIDs {
+ var fltr *Filter
+ if fltr, err = dm.GetFilter(sqp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, sqp)
+ }
+ return
+ }
+ indexer.IndexTPFilter(FilterToTPFilter(fltr), sqp.ID)
+ }
+ if err = indexer.StoreIndexes(); err != nil {
+ return
+ }
+ }
return dm.DataDB().SetStatQueueProfileDrv(sqp)
}
@@ -539,7 +560,29 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, skipCache bool, tra
return
}
-func (dm *DataManager) SetResourceProfile(rp *ResourceProfile) (err error) {
+func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) (err error) {
+ if withIndex {
+ indexer := NewReqFilterIndexer(dm, utils.ResourceProfilesPrefix, rp.Tenant)
+ //remove old ResourceProfiles indexes
+ if err = indexer.RemoveItemFromIndex(rp.ID); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return
+ }
+ //Verify matching Filters for every FilterID from ResourceProfiles
+ for _, fltrID := range rp.FilterIDs {
+ var fltr *Filter
+ if fltr, err = dm.GetFilter(rp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, rp)
+ }
+ return
+ }
+ indexer.IndexTPFilter(FilterToTPFilter(fltr), rp.ID)
+ }
+ if err = indexer.StoreIndexes(); err != nil {
+ return
+ }
+ }
return dm.DataDB().SetResourceProfileDrv(rp)
}
@@ -958,7 +1001,29 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, skipCache bool, tra
return
}
-func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile) (err error) {
+func (dm *DataManager) SetSupplierProfile(supp *SupplierProfile, withIndex bool) (err error) {
+ if withIndex {
+ indexer := NewReqFilterIndexer(dm, utils.SupplierProfilePrefix, supp.Tenant)
+ //remove old SupplierProfile indexes
+ if err = indexer.RemoveItemFromIndex(supp.ID); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return
+ }
+ //Verify matching Filters for every FilterID from SupplierProfile
+ for _, fltrID := range supp.FilterIDs {
+ var fltr *Filter
+ if fltr, err = dm.GetFilter(supp.Tenant, fltrID, false, utils.NonTransactional); err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, supp)
+ }
+ return
+ }
+ indexer.IndexTPFilter(FilterToTPFilter(fltr), supp.ID)
+ }
+ if err = indexer.StoreIndexes(); err != nil {
+ return
+ }
+ }
return dm.DataDB().SetSupplierProfileDrv(supp)
}
@@ -992,8 +1057,30 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, skipCache bool, tr
return
}
-func (dm *DataManager) SetAttributeProfile(alsPrf *AttributeProfile) (err error) {
- return dm.DataDB().SetAttributeProfileDrv(alsPrf)
+func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) (err error) {
+ if withIndex {
+ indexer := NewReqFilterIndexer(dm, utils.AttributeProfilePrefix, ap.Tenant)
+ //remove old AttributeProfile indexes
+ if err = indexer.RemoveItemFromIndex(ap.ID); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ return
+ }
+ //Verify matching Filters for every FilterID from AttributeProfile
+ for _, fltrID := range ap.FilterIDs {
+ var fltr *Filter
+ if fltr, err = dm.GetFilter(ap.Tenant, fltrID, false, utils.NonTransactional); err != nil {
+ if err == utils.ErrNotFound {
+ err = fmt.Errorf("broken reference to filter: %+v for threshold: %+v", fltrID, ap)
+ }
+ return
+ }
+ indexer.IndexTPFilter(FilterToTPFilter(fltr), ap.ID)
+ }
+ if err = indexer.StoreIndexes(); err != nil {
+ return
+ }
+ }
+ return dm.DataDB().SetAttributeProfileDrv(ap)
}
func (dm *DataManager) RemoveAttributeProfile(tenant, id, transactionID string) (err error) {
diff --git a/engine/filterindexer.go b/engine/filterindexer.go
index 8cab463f0..bd75471db 100644
--- a/engine/filterindexer.go
+++ b/engine/filterindexer.go
@@ -89,6 +89,11 @@ func (rfi *ReqFilterIndexer) IndexTPFilter(tpFltr *utils.TPFilterProfile, itemID
// StoreIndexes handles storing the indexes to dataDB
func (rfi *ReqFilterIndexer) StoreIndexes() (err error) {
+
+ for _, idx := range rfi.indexes {
+ }
+ for _, idx := range rfi.reveseIndex {
+ }
if err = rfi.dm.SetFilterIndexes(
GetDBIndexKey(rfi.itemType, rfi.dbKeySuffix, false),
rfi.indexes); err != nil {
diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go
index 8feae3feb..4df09759e 100644
--- a/engine/onstor_it_test.go
+++ b/engine/onstor_it_test.go
@@ -863,7 +863,7 @@ func testOnStorITCacheResourceProfile(t *testing.T) {
Thresholds: []string{"TEST_ACTIONS"},
UsageTTL: time.Duration(1 * time.Millisecond),
}
- if err := onStor.SetResourceProfile(rCfg); err != nil {
+ if err := onStor.SetResourceProfile(rCfg, false); err != nil {
t.Error(err)
}
expectedR := []string{"rsp_cgrates.org:RL_TEST"}
@@ -977,7 +977,7 @@ func testOnStorITCacheStatQueueProfile(t *testing.T) {
Weight: 20,
MinItems: 1,
}
- if err := onStor.SetStatQueueProfile(statProfile); err != nil {
+ if err := onStor.SetStatQueueProfile(statProfile, false); err != nil {
t.Error(err)
}
expectedR := []string{"sqp_cgrates.org:Test_Stat_Cache"}
@@ -1193,7 +1193,7 @@ func testOnStorITCacheSupplierProfile(t *testing.T) {
},
Weight: 20,
}
- if err := onStor.SetSupplierProfile(splProfile); err != nil {
+ if err := onStor.SetSupplierProfile(splProfile, false); err != nil {
t.Error(err)
}
expectedT := []string{"spp_cgrates.org:SPRF_1"}
@@ -1236,7 +1236,7 @@ func testOnStorITCacheAttributeProfile(t *testing.T) {
Attributes: mapSubstitutes,
Weight: 20,
}
- if err := onStor.SetAttributeProfile(attrProfile); err != nil {
+ if err := onStor.SetAttributeProfile(attrProfile, false); err != nil {
t.Error(err)
}
expectedT := []string{"alp_cgrates.org:ATTRPRF1"}
@@ -2194,7 +2194,7 @@ func testOnStorITCRUDResourceProfile(t *testing.T) {
if _, rcvErr := onStor.GetResourceProfile(rL.Tenant, rL.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
- if err := onStor.SetResourceProfile(rL); err != nil {
+ if err := onStor.SetResourceProfile(rL, false); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetResourceProfile(rL.Tenant, rL.ID, true, utils.NonTransactional); err != nil {
@@ -2361,7 +2361,7 @@ func testOnStorITCRUDStatQueueProfile(t *testing.T) {
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
t.Error("Should not be in cache")
}
- if err := onStor.SetStatQueueProfile(sq); err != nil {
+ if err := onStor.SetStatQueueProfile(sq, false); err != nil {
t.Error(err)
}
if _, ok := cache.Get(utils.StatQueueProfilePrefix + sq.ID); ok != false {
@@ -2479,7 +2479,7 @@ func testOnStorITCRUDThresholdProfile(t *testing.T) {
t.Errorf("Expecting: %v, received: %v", th, rcv)
}
if err := onStor.RemoveThresholdProfile(th.Tenant, th.ID, utils.NonTransactional); err != nil {
- t.Error(err)
+ t.Error(err)
}
if _, rcvErr := onStor.GetThresholdProfile(th.Tenant, th.ID, true, utils.NonTransactional); rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
@@ -2585,7 +2585,7 @@ func testOnStorITCRUDSupplierProfile(t *testing.T) {
if _, rcvErr := onStor.GetSupplierProfile("cgrates.org", "SPRF_1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
- if err := onStor.SetSupplierProfile(splProfile); err != nil {
+ if err := onStor.SetSupplierProfile(splProfile, false); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetSupplierProfile("cgrates.org", "SPRF_1", true, utils.NonTransactional); err != nil {
@@ -2629,7 +2629,7 @@ func testOnStorITCRUDAttributeProfile(t *testing.T) {
if _, rcvErr := onStor.GetAttributeProfile("cgrates.org", "AttrPrf1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound {
t.Error(rcvErr)
}
- if err := onStor.SetAttributeProfile(attrProfile); err != nil {
+ if err := onStor.SetAttributeProfile(attrProfile, false); err != nil {
t.Error(err)
}
if rcv, err := onStor.GetAttributeProfile("cgrates.org", "AttrPrf1", true, utils.NonTransactional); err != nil {
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index 4e4285d87..b2dfa6a39 100755
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -1981,20 +1981,23 @@ func (ms *MongoStorage) SetFilterIndexesDrv(dbKey string, indexes map[string]uti
pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}})
}
}
- bulk := col.Bulk()
- bulk.Unordered()
- bulk.Upsert(pairs...)
- _, err = bulk.Run()
+ if len(pairs) != 0 {
+ bulk := col.Bulk()
+ bulk.Unordered()
+ bulk.Upsert(pairs...)
+ _, err = bulk.Run()
+ }
return
}
func (ms *MongoStorage) RemoveFilterIndexesDrv(id string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
- if err = col.Remove(bson.M{"key": id}); err != nil {
- return
+ err = col.Remove(bson.M{"key": id})
+ if err == mgo.ErrNotFound {
+ err = utils.ErrNotFound
}
- return nil
+ return
}
// GetFilterReverseIndexesDrv retrieves ReverseIndexes from dataDB
@@ -2040,14 +2043,22 @@ func (ms *MongoStorage) GetFilterReverseIndexesDrv(dbKey string,
func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[string]utils.StringMap) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
- mp := make(map[string][]string)
+ pairs := []interface{}{}
for key, itmMp := range revIdx {
- mp[key] = itmMp.Slice()
+ param := fmt.Sprintf("value.%s", key)
+ pairs = append(pairs, bson.M{"key": dbKey})
+ if len(itmMp) == 0 {
+ pairs = append(pairs, bson.M{"$unset": bson.M{param: 1}})
+ } else {
+ pairs = append(pairs, bson.M{"$set": bson.M{"key": dbKey, param: itmMp.Slice()}})
+ }
+ }
+ if len(pairs) != 0 {
+ bulk := col.Bulk()
+ bulk.Unordered()
+ bulk.Upsert(pairs...)
+ _, err = bulk.Run()
}
- _, err = col.Upsert(bson.M{"key": dbKey}, &struct {
- Key string
- Value map[string][]string
- }{dbKey, mp})
return
}
@@ -2055,11 +2066,15 @@ func (ms *MongoStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri
func (ms *MongoStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
session, col := ms.conn(colRFI)
defer session.Close()
- findParam := fmt.Sprintf("value.%s", itemID)
- if err = col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}}); err != nil {
- return
+ if itemID != "" {
+ findParam := fmt.Sprintf("value.%s", itemID)
+ return col.Update(bson.M{"key": dbKey}, bson.M{"$unset": bson.M{findParam: 1}})
}
- return nil
+ err = col.Remove(bson.M{"key": dbKey})
+ if err == mgo.ErrNotFound {
+ err = utils.ErrNotFound
+ }
+ return
}
func (ms *MongoStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
diff --git a/engine/storage_redis.go b/engine/storage_redis.go
index fdc442ca2..9bac915b3 100755
--- a/engine/storage_redis.go
+++ b/engine/storage_redis.go
@@ -1505,10 +1505,10 @@ func (rs *RedisStorage) SetFilterReverseIndexesDrv(dbKey string, revIdx map[stri
//RemoveFilterReverseIndexesDrv removes ReverseIndexes for a specific itemID
func (rs *RedisStorage) RemoveFilterReverseIndexesDrv(dbKey, itemID string) (err error) {
- if err = rs.Cmd("HDEL", dbKey, itemID).Err; err != nil {
- return err
+ if itemID != "" {
+ return rs.Cmd("HDEL", dbKey, itemID).Err
}
- return
+ return rs.Cmd("DEL", dbKey).Err
}
func (rs *RedisStorage) MatchFilterIndexDrv(dbKey, fldName, fldVal string) (itemIDs utils.StringMap, err error) {
diff --git a/engine/suppliers_test.go b/engine/suppliers_test.go
index 9bd489b13..9799479b4 100644
--- a/engine/suppliers_test.go
+++ b/engine/suppliers_test.go
@@ -269,19 +269,20 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
Weight: 20.0,
},
}
+
for _, spr := range sprsmatch {
- dmspl.DataDB().SetSupplierProfileDrv(spr)
+ dmspl.SetSupplierProfile(spr, false)
}
ref := NewReqFilterIndexer(dmspl, utils.SupplierProfilePrefix, "cgrates.org")
- ref.IndexTPFilter(FilterToTPFilter(filter3), "attributeprofile1")
- ref.IndexTPFilter(FilterToTPFilter(filter4), "attributeprofile2")
+ ref.IndexTPFilter(FilterToTPFilter(filter3), "supplierprofile1")
+ ref.IndexTPFilter(FilterToTPFilter(filter4), "supplierprofile2")
err = ref.StoreIndexes()
if err != nil {
t.Errorf("Error: %+v", err)
}
//test here GetReqFilterIndexes for StorageMap with a specific map
expidx := map[string]utils.StringMap{
- "supplierprofile1:Supplier": utils.StringMap{
+ "supplierprofile1:Supplier": {
"supplierprofile1": true,
},
}
@@ -292,8 +293,8 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
splPrf1); err != nil {
t.Errorf("Error: %+v", err)
} else {
- if !reflect.DeepEqual(expidx, rcvidx) {
- t.Errorf("Expected: %+v received: %+v", expidx, rcvidx)
+ if !reflect.DeepEqual(utils.ToJSON(expidx), utils.ToJSON(rcvidx)) {
+ t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(expidx), utils.ToJSON(rcvidx))
}
}
}
diff --git a/engine/tp_reader.go b/engine/tp_reader.go
index f8d711e80..de0a5c20e 100755
--- a/engine/tp_reader.go
+++ b/engine/tp_reader.go
@@ -2182,7 +2182,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
- if err = tpr.dm.SetResourceProfile(rsp); err != nil {
+ if err = tpr.dm.SetResourceProfile(rsp, false); err != nil {
return err
}
if verbose {
@@ -2208,7 +2208,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
- if err = tpr.dm.SetStatQueueProfile(st); err != nil {
+ if err = tpr.dm.SetStatQueueProfile(st, false); err != nil {
return err
}
if verbose {
@@ -2272,7 +2272,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
- if err = tpr.dm.SetSupplierProfile(th); err != nil {
+ if err = tpr.dm.SetSupplierProfile(th, false); err != nil {
return err
}
if verbose {
@@ -2288,7 +2288,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err
if err != nil {
return err
}
- if err = tpr.dm.SetAttributeProfile(th); err != nil {
+ if err = tpr.dm.SetAttributeProfile(th, false); err != nil {
return err
}
if verbose {
diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go
index a33fbf207..b9fdb07a5 100644
--- a/migrator/migrator_it_test.go
+++ b/migrator/migrator_it_test.go
@@ -1085,7 +1085,7 @@ func testMigratorStats(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", sq.ID, result2.ID)
}
case action == Move:
- if err := mig.dmIN.SetStatQueueProfile(sqp); err != nil {
+ if err := mig.dmIN.SetStatQueueProfile(sqp, true); err != nil {
t.Error("Error when setting Stats ", err.Error())
}
if err := mig.dmIN.SetStatQueue(sq); err != nil {
diff --git a/migrator/resource.go b/migrator/resource.go
index 2f559506a..b63b040b2 100644
--- a/migrator/resource.go
+++ b/migrator/resource.go
@@ -42,7 +42,7 @@ func (m *Migrator) migrateCurrentResource() (err error) {
}
if res != nil {
if m.dryRun != true {
- if err := m.dmOut.SetResourceProfile(res); err != nil {
+ if err := m.dmOut.SetResourceProfile(res, true); err != nil {
return err
}
m.stats[utils.Resource] += 1
diff --git a/migrator/stats.go b/migrator/stats.go
index a7423ad1d..d2b10173f 100644
--- a/migrator/stats.go
+++ b/migrator/stats.go
@@ -95,7 +95,7 @@ func (m *Migrator) migrateCurrentStats() (err error) {
}
if sgs != nil {
if m.dryRun != true {
- if err := m.dmOut.SetStatQueueProfile(sgs); err != nil {
+ if err := m.dmOut.SetStatQueueProfile(sgs, true); err != nil {
return err
}
}
@@ -135,7 +135,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) {
if err := m.dmOut.SetStatQueue(sq); err != nil {
return err
}
- if err := m.dmOut.SetStatQueueProfile(sts); err != nil {
+ if err := m.dmOut.SetStatQueueProfile(sts, true); err != nil {
return err
}
m.stats[utils.StatS] += 1
diff --git a/migrator/suppliers.go b/migrator/suppliers.go
index c7b5a24f6..56508bddb 100644
--- a/migrator/suppliers.go
+++ b/migrator/suppliers.go
@@ -42,7 +42,7 @@ func (m *Migrator) migrateCurrentSupplierProfile() (err error) {
}
if splp != nil {
if m.dryRun != true {
- if err := m.dmOut.SetSupplierProfile(splp); err != nil {
+ if err := m.dmOut.SetSupplierProfile(splp, true); err != nil {
return err
}
m.stats[utils.Suppliers] += 1
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index 19d18a012..e5d21c6ae 100755
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -1281,6 +1281,15 @@ type ArgRSv1ResourceUsage struct {
Units float64
}
+type ArgsComputeFilterIndexes struct {
+ Tenant string
+ AttributeIDs *[]string
+ ResourceIDs *[]string
+ StatIDs *[]string
+ SupplierIDs *[]string
+ ThresholdIDs *[]string
+}
+
func (args *ArgRSv1ResourceUsage) TenantID() string {
return ConcatenatedKey(args.CGREvent.Tenant, args.UsageID)
}
diff --git a/utils/consts.go b/utils/consts.go
index 5177dc797..287492060 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -543,6 +543,11 @@ const (
MetaSetVersions = "*set_versions"
)
+// MetaFilterIndexesAPIs
+const (
+ ComputeFilterIndexes = "ApierV1.ComputeFilterIndexes"
+)
+
// MetaSupplierAPIs
const (
SupplierSv1GetSuppliers = "SupplierSv1.GetSuppliers"