diff --git a/data/storage/migrator/mysql_tables_update.sql b/data/storage/migrator/mysql_tables_update.sql index da020828f..25007ef41 100755 --- a/data/storage/migrator/mysql_tables_update.sql +++ b/data/storage/migrator/mysql_tables_update.sql @@ -68,12 +68,4 @@ CREATE TABLE versions ( `version` int(11) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `item` (`item`) -); - - - - -ALTER TABLE cdrs CHANGE COLUMN `usage` `usage_old` DECIMAL(30,9); -ALTER TABLE cdrs ADD `usage` DECIMAL(30); -UPDATE cdrs SET `usage` = `usage_old` * 1000000000 WHERE usage_old IS NOT NULL; -ALTER TABLE cdrs DROP COLUMN usage_old; \ No newline at end of file +); \ No newline at end of file diff --git a/data/storage/migrator/usage_float_to_int.sql b/data/storage/migrator/usage_float_to_int.sql new file mode 100644 index 000000000..9061b2ceb --- /dev/null +++ b/data/storage/migrator/usage_float_to_int.sql @@ -0,0 +1,4 @@ +ALTER TABLE cdrs CHANGE COLUMN `usage` `usage_old` DECIMAL(30,9); +ALTER TABLE cdrs ADD `usage` BIGINT; +UPDATE cdrs SET `usage` = `usage_old` * 1000000000 WHERE usage_old IS NOT NULL; +ALTER TABLE cdrs DROP COLUMN usage_old; \ No newline at end of file diff --git a/engine/version.go b/engine/version.go index 18df01307..ec7d7ffb8 100644 --- a/engine/version.go +++ b/engine/version.go @@ -105,8 +105,8 @@ func (vers Versions) Compare(curent Versions, storType string) string { } func CurrentDBVersions(storType string) Versions { - dataDbVersions := Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} - storDbVersions := Versions{utils.COST_DETAILS: 2} + dataDbVersions := CurrentDataDBVersions() + storDbVersions := CurrentStorDBVersions() allVersions := make(Versions) for k, v := range dataDbVersions { @@ -127,13 +127,13 @@ func CurrentDBVersions(storType string) Versions { return nil } +func CurrentDataDBVersions() Versions { + return Versions{utils.StatS: 2, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.Thresholds: 2} +} + func CurrentStorDBVersions() Versions { return Versions{utils.COST_DETAILS: 2} } -func CurrentDataDBVersions() Versions { - return Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} -} - // Versions will keep trac of various item versions type Versions map[string]int64 // map[item]versionNr diff --git a/migrator/migrator.go b/migrator/migrator.go index e712ba861..4f1647e71 100755 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -105,6 +105,10 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) { err = m.migrateActions() case utils.MetaSharedGroups: err = m.migrateSharedGroups() + case utils.MetaStats: + err = m.migrateStats() + case utils.MetaThresholds: + err = m.migrateStats() } } for k, v := range m.stats { diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go index 9f4a88445..f4a0aa13a 100644 --- a/migrator/migrator_it_test.go +++ b/migrator/migrator_it_test.go @@ -44,6 +44,7 @@ var sTestsITMigrator = []func(t *testing.T){ testMigratorActionTriggers, testMigratorActions, testMigratorSharedGroups, + testMigratorStats, testFlush, } @@ -71,7 +72,7 @@ func TestMigratorITPostgresConnect(t *testing.T) { if err != nil { log.Fatal(err) } - mig, err = NewMigrator(dataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, storDB, postgresITCfg.StorDBType, oldDataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, oldstorDB, postgresITCfg.StorDBType) + mig, err = NewMigrator(dataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, storDB, postgresITCfg.StorDBType, oldDataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, oldstorDB, postgresITCfg.StorDBType, false) if err != nil { log.Fatal(err) } @@ -79,6 +80,7 @@ func TestMigratorITPostgresConnect(t *testing.T) { func TestMigratorITPostgres(t *testing.T) { dbtype = utils.REDIS + log.Print("REDIS+POSTGRES") for _, stest := range sTestsITMigrator { t.Run("TestITMigratorOnPostgres", stest) } @@ -108,7 +110,7 @@ func TestMigratorITRedisConnect(t *testing.T) { if err != nil { log.Fatal(err) } - mig, err = NewMigrator(dataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, storDB, mysqlITCfg.StorDBType, oldDataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, oldstorDB, mysqlITCfg.StorDBType) + mig, err = NewMigrator(dataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, storDB, mysqlITCfg.StorDBType, oldDataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, oldstorDB, mysqlITCfg.StorDBType, false) if err != nil { log.Fatal(err) } @@ -116,6 +118,7 @@ func TestMigratorITRedisConnect(t *testing.T) { func TestMigratorITRedis(t *testing.T) { dbtype = utils.REDIS + log.Print("REDIS+MYSQL") for _, stest := range sTestsITMigrator { t.Run("TestITMigratorOnRedis", stest) } @@ -145,7 +148,7 @@ func TestMigratorITMongoConnect(t *testing.T) { if err != nil { log.Fatal(err) } - mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType) + mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType, false) if err != nil { log.Fatal(err) } @@ -153,27 +156,16 @@ func TestMigratorITMongoConnect(t *testing.T) { func TestMigratorITMongo(t *testing.T) { dbtype = utils.MONGO + log.Print("MONGO") for _, stest := range sTestsITMigrator { t.Run("TestITMigratorOnMongo", stest) } } func testFlush(t *testing.T) { - switch { - case dbtype == utils.REDIS: - dataDB := mig.dataDB.(*engine.RedisStorage) - err := dataDB.Cmd("FLUSHALL").Err - if err != nil { - t.Error("Error when flushing Redis ", err.Error()) - } - case dbtype == utils.MONGO: - err := mig.dataDB.Flush("") - if err != nil { - t.Error("Error when flushing Mongo ", err.Error()) - } - } - if err = SetDBVersions(mig.dataDB); err != nil { - return err + mig.dm.DataDB().Flush("") + if err := engine.SetDBVersions(mig.dm.DataDB()); err != nil { + t.Error("Error ", err.Error()) } } @@ -192,11 +184,11 @@ func testMigratorAccounts(t *testing.T) { if err != nil { t.Error("Error when setting v1 acc ", err.Error()) } - err = mig.Migrate(utils.MetaAccounts) + err, _ = mig.Migrate([]string{utils.MetaAccounts}) if err != nil { t.Error("Error when migrating accounts ", err.Error()) } - result, err := mig.dataDB.GetAccount(testAccount.ID) + result, err := mig.dm.DataDB().GetAccount(testAccount.ID) if err != nil { t.Error("Error when getting account ", err.Error()) } @@ -210,11 +202,11 @@ func testMigratorAccounts(t *testing.T) { if err != nil { t.Error("Error when marshaling ", err.Error()) } - err = mig.Migrate(utils.MetaAccounts) + err, _ = mig.Migrate([]string{utils.MetaAccounts}) if err != nil { t.Error("Error when migrating accounts ", err.Error()) } - result, err := mig.dataDB.GetAccount(testAccount.ID) + result, err := mig.dm.DataDB().GetAccount(testAccount.ID) if err != nil { t.Error("Error when getting account ", err.Error()) } @@ -233,11 +225,11 @@ func testMigratorActionPlans(t *testing.T) { if err != nil { t.Error("Error when setting v1 ActionPlan ", err.Error()) } - err = mig.Migrate(utils.MetaActionPlans) + err, _ = mig.Migrate([]string{utils.MetaActionPlans}) if err != nil { t.Error("Error when migrating ActionPlans ", err.Error()) } - result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional) + result, err := mig.dm.DataDB().GetActionPlan(ap.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionPlan ", err.Error()) } @@ -253,11 +245,11 @@ func testMigratorActionPlans(t *testing.T) { if err != nil { t.Error("Error when setting v1 ActionPlans ", err.Error()) } - err = mig.Migrate(utils.MetaActionPlans) + err, _ = mig.Migrate([]string{utils.MetaActionPlans}) if err != nil { t.Error("Error when migrating ActionPlans ", err.Error()) } - result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional) + result, err := mig.dm.DataDB().GetActionPlan(ap.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionPlan ", err.Error()) } @@ -309,11 +301,11 @@ func testMigratorActionTriggers(t *testing.T) { if err != nil { t.Error("Error when setting v1 ActionTriggers ", err.Error()) } - err = mig.Migrate(utils.MetaActionTriggers) + err, _ = mig.Migrate([]string{utils.MetaActionTriggers}) if err != nil { t.Error("Error when migrating ActionTriggers ", err.Error()) } - result, err := mig.dataDB.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional) + result, err := mig.dm.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionTriggers ", err.Error()) } @@ -381,7 +373,7 @@ func testMigratorActionTriggers(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker) } case dbtype == utils.MONGO: - err := mig.Migrate(utils.MetaActionTriggers) + err, _ := mig.Migrate([]string{utils.MetaActionTriggers}) if err != nil && err != utils.ErrNotImplemented { t.Error("Error when migrating ActionTriggers ", err.Error()) } @@ -398,11 +390,11 @@ func testMigratorActions(t *testing.T) { if err != nil { t.Error("Error when setting v1 Actions ", err.Error()) } - err = mig.Migrate(utils.MetaActions) + err, _ = mig.Migrate([]string{utils.MetaActions}) if err != nil { t.Error("Error when migrating Actions ", err.Error()) } - result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional) + result, err := mig.dm.GetActions((*v1act)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting Actions ", err.Error()) } @@ -415,11 +407,11 @@ func testMigratorActions(t *testing.T) { if err != nil { t.Error("Error when setting v1 Actions ", err.Error()) } - err = mig.Migrate(utils.MetaActions) + err, _ = mig.Migrate([]string{utils.MetaActions}) if err != nil { t.Error("Error when migrating Actions ", err.Error()) } - result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional) + result, err := mig.dm.GetActions((*v1act)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting Actions ", err.Error()) } @@ -430,14 +422,14 @@ func testMigratorActions(t *testing.T) { } func testMigratorSharedGroups(t *testing.T) { - v1sg := &v1SharedGroup{ + v1sqp := &v1SharedGroup{ Id: "Test", AccountParameters: map[string]*engine.SharingParameters{ "test": &engine.SharingParameters{Strategy: "*highest"}, }, MemberIds: []string{"1", "2", "3"}, } - sg := &engine.SharedGroup{ + sqp := &engine.SharedGroup{ Id: "Test", AccountParameters: map[string]*engine.SharingParameters{ "test": &engine.SharingParameters{Strategy: "*highest"}, @@ -446,37 +438,252 @@ func testMigratorSharedGroups(t *testing.T) { } switch { case dbtype == utils.REDIS: - err := mig.oldDataDB.setV1SharedGroup(v1sg) + err := mig.oldDataDB.setV1SharedGroup(v1sqp) if err != nil { t.Error("Error when setting v1 SharedGroup ", err.Error()) } - err = mig.Migrate(utils.MetaSharedGroups) + err, _ = mig.Migrate([]string{utils.MetaSharedGroups}) if err != nil { t.Error("Error when migrating SharedGroup ", err.Error()) } - result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) + result, err := mig.dm.GetSharedGroup(v1sqp.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting SharedGroup ", err.Error()) } - if !reflect.DeepEqual(sg, result) { - t.Errorf("Expecting: %+v, received: %+v", sg, result) + if !reflect.DeepEqual(sqp, result) { + t.Errorf("Expecting: %+v, received: %+v", sqp, result) } case dbtype == utils.MONGO: - err := mig.oldDataDB.setV1SharedGroup(v1sg) + err := mig.oldDataDB.setV1SharedGroup(v1sqp) if err != nil { t.Error("Error when setting v1 SharedGroup ", err.Error()) } - err = mig.Migrate(utils.MetaSharedGroups) + err, _ = mig.Migrate([]string{utils.MetaSharedGroups}) if err != nil { t.Error("Error when migrating SharedGroup ", err.Error()) } - result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) + result, err := mig.dm.GetSharedGroup(v1sqp.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting SharedGroup ", err.Error()) } - if !reflect.DeepEqual(sg, result) { - t.Errorf("Expecting: %+v, received: %+v", sg, result) + if !reflect.DeepEqual(sqp, result) { + t.Errorf("Expecting: %+v, received: %+v", sqp, result) } } } + +func testMigratorStats(t *testing.T) { + tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local() + var filters []*engine.RequestFilter + v1Sts := &v1Stat{ + Id: "test", // Config id, unique per config instance + QueueLength: 10, // Number of items in the stats buffer + TimeWindow: time.Duration(1) * time.Second, // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + SaveInterval: time.Duration(1) * time.Second, + Metrics: []string{"ASR", "ACD", "ACC"}, + SetupInterval: []time.Time{time.Now()}, + TOR: []string{}, + CdrHost: []string{}, + CdrSource: []string{}, + ReqType: []string{}, + Direction: []string{}, + Tenant: []string{}, + Category: []string{}, + Account: []string{}, + Subject: []string{}, + DestinationIds: []string{}, + UsageInterval: []time.Duration{1 * time.Second}, + PddInterval: []time.Duration{1 * time.Second}, + Supplier: []string{}, + DisconnectCause: []string{}, + MediationRunIds: []string{}, + RatedAccount: []string{}, + RatedSubject: []string{}, + CostInterval: []float64{}, + Triggers: engine.ActionTriggers{ + &engine.ActionTrigger{ + ID: "Test", + Balance: &engine.BalanceFilter{ + ID: utils.StringPointer("TESTB"), + Timings: []*engine.RITiming{}, + ExpirationDate: utils.TimePointer(tim), + Type: utils.StringPointer(utils.MONETARY), + Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)), + }, + ExpirationDate: tim, + LastExecutionTime: tim, + ActivationDate: tim, + ThresholdType: utils.TRIGGER_MAX_BALANCE, + ThresholdValue: 2, + ActionsID: "TEST_ACTIONS", + Executed: true, + }, + }, + } + + x, _ := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()}) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()}) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()}) + filters = append(filters, x) + + filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters} + + sqp := &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "test", + FilterIDs: []string{v1Sts.Id}, + QueueLength: 10, + TTL: time.Duration(0) * time.Second, + Metrics: []string{"*asr", "*acd", "*acc"}, + Thresholds: []string{"Test"}, + Blocker: false, + Stored: true, + Weight: float64(0), + MinItems: 0, + } + sq := &engine.StatQueue{Tenant: config.CgrConfig().DefaultTenant, + ID: v1Sts.Id, + SQMetrics: make(map[string]engine.StatMetric), + } + for _, metricID := range sqp.Metrics { + if metric, err := engine.NewStatMetric(metricID, 0); err != nil { + t.Error("Error when creating newstatMETRIc ", err.Error()) + } else { + sq.SQMetrics[metricID] = metric + } + } + switch { + case dbtype == utils.REDIS: + + err := mig.oldDataDB.setV1Stats(v1Sts) + if err != nil { + t.Error("Error when setting v1Stat ", err.Error()) + } + currentVersion := engine.Versions{utils.StatS: 1, utils.Thresholds: 1, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} + err = mig.dm.DataDB().SetVersions(currentVersion, false) + if err != nil { + t.Error("Error when setting version for stats ", err.Error()) + } + err, _ = mig.Migrate([]string{utils.MetaStats}) + if err != nil { + t.Error("Error when migrating Stats ", err.Error()) + } + + result, err := mig.dm.GetStatQueueProfile("cgrates.org", v1Sts.Id, true, utils.NonTransactional) + if err != nil { + t.Error("Error when getting Stats ", err.Error()) + } + + if !reflect.DeepEqual(sqp.Tenant, result.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, result.Tenant) + } + if !reflect.DeepEqual(sqp.ID, result.ID) { + t.Errorf("Expecting: %+v, received: %+v", sqp.ID, result.ID) + } + if !reflect.DeepEqual(sqp.FilterIDs, result.FilterIDs) { + t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, result.FilterIDs) + } + if !reflect.DeepEqual(sqp.QueueLength, result.QueueLength) { + t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, result.QueueLength) + } + if !reflect.DeepEqual(sqp.TTL, result.TTL) { + t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, result.TTL) + } + if !reflect.DeepEqual(sqp.Metrics, result.Metrics) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, result.Metrics) + } + if !reflect.DeepEqual(sqp.Thresholds, result.Thresholds) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, result.Thresholds) + } + if !reflect.DeepEqual(sqp.Blocker, result.Blocker) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, result.Blocker) + } + if !reflect.DeepEqual(sqp.Stored, result.Stored) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, result.Stored) + } + if !reflect.DeepEqual(sqp.Weight, result.Weight) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, result.Weight) + } + if !reflect.DeepEqual(sqp, result) { + t.Errorf("Expecting: %+v, received: %+v", sqp, result) + } + result1, err := mig.dm.GetFilter("cgrates.org", v1Sts.Id, true, utils.NonTransactional) + if err != nil { + t.Error("Error when getting Stats ", err.Error()) + } + if !reflect.DeepEqual(filter, result1) { + t.Errorf("Expecting: %+v, received: %+v", filter, result1) + } + + case dbtype == utils.MONGO: + err := mig.oldDataDB.setV1Stats(v1Sts) + if err != nil { + t.Error("Error when setting v1Stat ", err.Error()) + } + currentVersion := engine.Versions{utils.StatS: 1, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2} + err = mig.dm.DataDB().SetVersions(currentVersion, false) + if err != nil { + t.Error("Error when setting version for stats ", err.Error()) + } + err, _ = mig.Migrate([]string{utils.MetaStats}) + if err != nil { + t.Error("Error when migrating Stats ", err.Error()) + } + result, err := mig.dm.GetStatQueueProfile("cgrates.org", v1Sts.Id, true, utils.NonTransactional) + if err != nil { + t.Error("Error when getting Stats ", err.Error()) + } + if !reflect.DeepEqual(sqp.Tenant, result.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, result.Tenant) + } + if !reflect.DeepEqual(sqp.ID, result.ID) { + t.Errorf("Expecting: %+v, received: %+v", sqp.ID, result.ID) + } + if !reflect.DeepEqual(sqp.FilterIDs, result.FilterIDs) { + t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, result.FilterIDs) + } + if !reflect.DeepEqual(sqp.QueueLength, result.QueueLength) { + t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, result.QueueLength) + } + if !reflect.DeepEqual(sqp.TTL, result.TTL) { + t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, result.TTL) + } + if !reflect.DeepEqual(sqp.Metrics, result.Metrics) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, result.Metrics) + } + if !reflect.DeepEqual(sqp.Thresholds, result.Thresholds) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, result.Thresholds) + } + if !reflect.DeepEqual(sqp.Blocker, result.Blocker) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, result.Blocker) + } + if !reflect.DeepEqual(sqp.Stored, result.Stored) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, result.Stored) + } + if !reflect.DeepEqual(sqp.Weight, result.Weight) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, result.Weight) + } + if !reflect.DeepEqual(sqp, result) { + t.Errorf("Expecting: %+v, received: %+v", sqp, result) + } + result1, err := mig.dm.GetFilter("cgrates.org", v1Sts.Id, true, utils.NonTransactional) + if err != nil { + t.Error("Error when getting Stats ", err.Error()) + } + if !reflect.DeepEqual(filter.ActivationInterval, result1.ActivationInterval) { + t.Errorf("Expecting: %+v, received: %+v", filter.ActivationInterval, result1.ActivationInterval) + } + if !reflect.DeepEqual(filter.Tenant, result1.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", filter.Tenant, result1.Tenant) + } + } + result1, err := mig.dm.GetStatQueue("cgrates.org", v1Sts.Id, true, utils.NonTransactional) + if err != nil { + t.Error("Error when getting Stats ", err.Error()) + } + log.Print("Wrong version", result1) + +} diff --git a/migrator/stats.go b/migrator/stats.go new file mode 100644 index 000000000..aad3c4e86 --- /dev/null +++ b/migrator/stats.go @@ -0,0 +1,313 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "fmt" + "log" + "strconv" + "strings" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +type v1Stat struct { + Id string // Config id, unique per config instance + QueueLength int // Number of items in the stats buffer + TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + SaveInterval time.Duration + Metrics []string // ASR, ACD, ACC + SetupInterval []time.Time // CDRFieldFilter on SetupInterval, 2 or less items (>= start interval,< stop_interval) + TOR []string // CDRFieldFilter on TORs + CdrHost []string // CDRFieldFilter on CdrHosts + CdrSource []string // CDRFieldFilter on CdrSources + ReqType []string // CDRFieldFilter on RequestTypes + Direction []string // CDRFieldFilter on Directions + Tenant []string // CDRFieldFilter on Tenants + Category []string // CDRFieldFilter on Categories + Account []string // CDRFieldFilter on Accounts + Subject []string // CDRFieldFilter on Subjects + DestinationIds []string // CDRFieldFilter on DestinationPrefixes + UsageInterval []time.Duration // CDRFieldFilter on UsageInterval, 2 or less items (>= Usage, = Pdd, =Cost, when querying oldDataDB for versions", err.Error())) + } else if len(vrs) == 0 { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.UndefinedVersion, + "version number is not defined for Stats model") + } + if vrs[utils.StatS] != 1 { // Right now we only support migrating from version 1 + log.Print("Wrong version") + return + } + var v1Sts *v1Stat + for { + v1Sts, err = m.oldDataDB.getV1Stats() + if err != nil && err != utils.ErrNoMoreData { + return err + } + if err == utils.ErrNoMoreData { + break + } + if v1Sts.Id != "" { + if len(v1Sts.Triggers) != 0 { + for _, Trigger := range v1Sts.Triggers { + if err := m.SasThreshold(Trigger); err != nil { + return err + + } + } + } + filter, sq, sts, err := v1Sts.AsStatQP() + if err != nil { + return err + } + if m.dryRun != true { + if err := m.dm.SetFilter(filter); err != nil { + return err + } + if err := m.dm.SetStatQueue(sq); err != nil { + return err + } + if err := m.dm.SetStatQueueProfile(sts); err != nil { + return err + } + m.stats[utils.StatS] += 1 + } + } + } + if m.dryRun != true { + // All done, update version wtih current one + vrs := engine.Versions{utils.StatS: engine.CurrentStorDBVersions()[utils.StatS]} + if err = m.dm.DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error())) + } + } + return +} + +func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq *engine.StatQueueProfile, err error) { + var filters []*engine.RequestFilter + if len(v1Sts.SetupInterval) == 1 { + x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } else if len(v1Sts.SetupInterval) == 2 { + x, err := engine.NewRequestFilter(engine.MetaLessThan, "SetupInterval", []string{v1Sts.SetupInterval[1].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + + if len(v1Sts.TOR) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TOR", v1Sts.TOR) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.CdrHost) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "CdrHost", v1Sts.CdrHost) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.ReqType) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "ReqType", v1Sts.ReqType) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Direction) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Direction", v1Sts.Direction) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Category) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Category", v1Sts.Category) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Account) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Account", v1Sts.Account) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Subject) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Subject", v1Sts.Subject) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Supplier) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Supplier", v1Sts.Supplier) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.UsageInterval) == 1 { + x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } else if len(v1Sts.UsageInterval) == 2 { + x, err := engine.NewRequestFilter(engine.MetaLessThan, "UsageInterval", []string{v1Sts.UsageInterval[1].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.PddInterval) == 1 { + x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } else if len(v1Sts.PddInterval) == 2 { + x, err := engine.NewRequestFilter(engine.MetaLessThan, "PddInterval", []string{v1Sts.PddInterval[1].String()}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.Supplier) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Supplier", v1Sts.Supplier) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.DisconnectCause) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "DisconnectCause", v1Sts.DisconnectCause) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.MediationRunIds) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "MediationRunIds", v1Sts.MediationRunIds) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.RatedSubject) != 0 { + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatedSubject", v1Sts.RatedSubject) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if len(v1Sts.CostInterval) == 1 { + x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "CostInterval", []string{strconv.FormatFloat(v1Sts.CostInterval[0], 'f', 6, 64)}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } else if len(v1Sts.CostInterval) == 2 { + x, err := engine.NewRequestFilter(engine.MetaLessThan, "CostInterval", []string{strconv.FormatFloat(v1Sts.CostInterval[1], 'f', 6, 64)}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters} + stq = &engine.StatQueueProfile{ + ID: v1Sts.Id, + QueueLength: v1Sts.QueueLength, + Metrics: []string{}, + Tenant: config.CgrConfig().DefaultTenant, + Blocker: false, + Stored: false, + Thresholds: []string{}, + FilterIDs: []string{v1Sts.Id}, + } + if v1Sts.SaveInterval != 0 { + stq.Stored = true + } + if len(v1Sts.Triggers) != 0 { + for i, _ := range v1Sts.Triggers { + stq.Thresholds = append(stq.Thresholds, v1Sts.Triggers[i].ID) + } + } + sq = &engine.StatQueue{Tenant: config.CgrConfig().DefaultTenant, + ID: v1Sts.Id, + SQMetrics: make(map[string]engine.StatMetric), + } + if len(v1Sts.Metrics) != 0 { + for i, _ := range v1Sts.Metrics { + if !strings.HasPrefix(v1Sts.Metrics[i], "*") { + v1Sts.Metrics[i] = "*" + v1Sts.Metrics[i] + } + v1Sts.Metrics[i] = strings.ToLower(v1Sts.Metrics[i]) + stq.Metrics = append(stq.Metrics, v1Sts.Metrics[i]) + if metric, err := engine.NewStatMetric(stq.Metrics[i], 0); err != nil { + return nil, nil, nil, err + } else { + sq.SQMetrics[stq.Metrics[i]] = metric + } + } + } + return filter, sq, stq, nil +} diff --git a/migrator/stats_test.go b/migrator/stats_test.go new file mode 100644 index 000000000..44348841c --- /dev/null +++ b/migrator/stats_test.go @@ -0,0 +1,139 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestV1StatsAsStats(t *testing.T) { + tim := time.Date(0001, time.January, 1, 2, 0, 0, 0, time.UTC).Local() + var filters []*engine.RequestFilter + v1Sts := &v1Stat{ + Id: "test", // Config id, unique per config instance + QueueLength: 10, // Number of items in the stats buffer + TimeWindow: time.Duration(1) * time.Second, // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow + SaveInterval: time.Duration(1) * time.Second, + Metrics: []string{"ASR", "ACD", "ACC"}, + SetupInterval: []time.Time{time.Now()}, + TOR: []string{}, + CdrHost: []string{}, + CdrSource: []string{}, + ReqType: []string{}, + Direction: []string{}, + Tenant: []string{}, + Category: []string{}, + Account: []string{}, + Subject: []string{}, + DestinationIds: []string{}, + UsageInterval: []time.Duration{1 * time.Second}, + PddInterval: []time.Duration{1 * time.Second}, + Supplier: []string{}, + DisconnectCause: []string{}, + MediationRunIds: []string{}, + RatedAccount: []string{}, + RatedSubject: []string{}, + CostInterval: []float64{}, + Triggers: engine.ActionTriggers{&engine.ActionTrigger{ + ID: "TestB", + Balance: &engine.BalanceFilter{ + ID: utils.StringPointer("TESTB"), + Timings: []*engine.RITiming{}, + ExpirationDate: utils.TimePointer(tim), + Type: utils.StringPointer(utils.MONETARY), + Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)), + }, + ExpirationDate: tim, + LastExecutionTime: tim, + ActivationDate: tim, + ThresholdType: utils.TRIGGER_MAX_BALANCE, + ThresholdValue: 2, + ActionsID: "TEST_ACTIONS", + Executed: true, + }}, + } + + x, _ := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()}) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()}) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()}) + filters = append(filters, x) + + filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters} + + sqp := &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "test", + FilterIDs: []string{v1Sts.Id}, + QueueLength: 10, + TTL: time.Duration(0) * time.Second, + Metrics: []string{"*asr", "*acd", "*acc"}, + Blocker: false, + Thresholds: []string{"TestB"}, + Stored: true, + Weight: float64(0), + MinItems: 0, + } + fltr, _, newsqp, err := v1Sts.AsStatQP() + if err != nil { + t.Errorf("err") + } + if !reflect.DeepEqual(sqp.Tenant, newsqp.Tenant) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, newsqp.Tenant) + } + if !reflect.DeepEqual(sqp.ID, newsqp.ID) { + t.Errorf("Expecting: %+v, received: %+v", sqp.ID, newsqp.ID) + } + if !reflect.DeepEqual(sqp.FilterIDs, newsqp.FilterIDs) { + t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, newsqp.FilterIDs) + } + if !reflect.DeepEqual(sqp.QueueLength, newsqp.QueueLength) { + t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, newsqp.QueueLength) + } + if !reflect.DeepEqual(sqp.TTL, newsqp.TTL) { + t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, newsqp.TTL) + } + if !reflect.DeepEqual(sqp.Metrics, newsqp.Metrics) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, newsqp.Metrics) + } + if !reflect.DeepEqual(sqp.Thresholds, newsqp.Thresholds) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, newsqp.Thresholds) + } + if !reflect.DeepEqual(sqp.Blocker, newsqp.Blocker) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, newsqp.Blocker) + } + if !reflect.DeepEqual(sqp.Stored, newsqp.Stored) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, newsqp.Stored) + } + if !reflect.DeepEqual(sqp.Weight, newsqp.Weight) { + t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, newsqp.Weight) + } + if !reflect.DeepEqual(sqp, newsqp) { + t.Errorf("Expecting: %+v, received: %+v", sqp, newsqp) + } + if !reflect.DeepEqual(filter, fltr) { + t.Errorf("Expecting: %+v, received: %+v", filter, fltr) + } +} diff --git a/migrator/thresholds.go b/migrator/thresholds.go new file mode 100644 index 000000000..d7be759c3 --- /dev/null +++ b/migrator/thresholds.go @@ -0,0 +1,296 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "fmt" + "log" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +type v2ActionTrigger struct { + ID string // original csv tag + UniqueID string // individual id + ThresholdType string //*min_event_counter, *max_event_counter, *min_balance_counter, *max_balance_counter, *min_balance, *max_balance, *balance_expired + // stats: *min_asr, *max_asr, *min_acd, *max_acd, *min_tcd, *max_tcd, *min_acc, *max_acc, *min_tcc, *max_tcc, *min_ddc, *max_ddc + ThresholdValue float64 + Recurrent bool // reset excuted flag each run + MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers + ExpirationDate time.Time + ActivationDate time.Time + //BalanceType string // *monetary/*voice etc + Balance *engine.BalanceFilter //filtru + Weight float64 + ActionsID string + MinQueuedItems int // Trigger actions only if this number is hit (stats only) MINHITS + Executed bool + LastExecutionTime time.Time +} + +type v2ActionTriggers []*v2ActionTrigger + +func (m *Migrator) migratev1ActionTriggers() (err error) { + var vrs engine.Versions + if m.dm.DataDB() == nil { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.NoStorDBConnection, + "no connection to datadb") + } + vrs, err = m.dm.DataDB().GetVersions(utils.TBLVersions) + if err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error())) + } else if len(vrs) == 0 { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.UndefinedVersion, + "version number is not defined for Stats model") + } + if vrs[utils.Thresholds] != 1 { // Right now we only support migrating from version 1 + log.Print("Wrong version") + return + } + var v2ACT *v2ActionTrigger + for { + v2ACT, err = m.oldDataDB.getV2ActionTrigger() + if err != nil && err != utils.ErrNoMoreData { + return err + } + if err == utils.ErrNoMoreData { + break + } + if v2ACT.ID != "" { + thp, th, filter, err := v2ACT.AsThreshold() + if err != nil { + return err + } + if m.dryRun != true { + if err := m.dm.SetFilter(filter); err != nil { + return err + } + if err := m.dm.SetThreshold(th); err != nil { + return err + } + if err := m.dm.SetThresholdProfile(thp); err != nil { + return err + } + m.stats[utils.Thresholds] += 1 + } + } + } + if m.dryRun != true { + // All done, update version wtih current one + vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} + if err = m.dm.DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + } + } + return +} +func (v2ATR v2ActionTrigger) AsThreshold() (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) { + var filters []*engine.RequestFilter + if *v2ATR.Balance.ID != "" { + if v2ATR.Balance.Directions != nil { + x, err := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + //TO DO: + // if v2ATR.Balance.ExpirationDate != nil { //MetaLess + // x, err := engine.NewRequestFilter(engine.MetaTimings, "ExpirationDate", v2ATR.Balance.ExpirationDate) + // if err != nil { + // return nil, nil, err + // } + // filters = append(filters, x) + // } + // if v2ATR.Balance.Weight != nil { //MetaLess /MetaRSRFields + // x, err := engine.NewRequestFilter(engine.MetaLessOrEqual, "Weight", []string{strconv.FormatFloat(*v2ATR.Balance.Weight, 'f', 6, 64)}) + // if err != nil { + // return nil, nil, err + // } + // filters = append(filters, x) + // } + if v2ATR.Balance.DestinationIDs != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.RatingSubject != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.Categories != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.SharedGroups != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.TimingIDs != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + } + filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters} + + th = &engine.Threshold{ + Tenant: config.CgrConfig().DefaultTenant, + ID: v2ATR.ID, + } + + thp = &engine.ThresholdProfile{ + ID: v2ATR.ID, + Tenant: config.CgrConfig().DefaultTenant, + Weight: v2ATR.Weight, + ActivationInterval: &utils.ActivationInterval{ActivationTime: v2ATR.ActivationDate, ExpiryTime: v2ATR.ExpirationDate}, + FilterIDs: []string{filter.ID}, + MinSleep: v2ATR.MinSleep, + } + return thp, th, filter, nil +} + +func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { + var vrs engine.Versions + if m.dm.DataDB() == nil { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.NoStorDBConnection, + "no connection to datadb") + } + if v2ATR.ID != "" { + thp, th, filter, err := AsThreshold2(*v2ATR) + if err != nil { + return err + } + if filter != nil { + if err := m.dm.SetFilter(filter); err != nil { + return err + } + } + if err := m.dm.SetThreshold(th); err != nil { + return err + } + if err := m.dm.SetThresholdProfile(thp); err != nil { + return err + } + m.stats[utils.Thresholds] += 1 + } + // All done, update version wtih current one + vrs = engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} + if err = m.dm.DataDB().SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error())) + } + return +} + +func AsThreshold2(v2ATR engine.ActionTrigger) (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) { + var filterIDS []string + var filters []*engine.RequestFilter + if v2ATR.Balance.ID != nil && *v2ATR.Balance.ID != "" { + if v2ATR.Balance.Directions != nil { + x, err := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.DestinationIDs != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.RatingSubject != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject}) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.Categories != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.SharedGroups != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + if v2ATR.Balance.TimingIDs != nil { //MetaLess /RSRfields + x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice()) + if err != nil { + return nil, nil, nil, err + } + filters = append(filters, x) + } + filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters} + filterIDS = append(filterIDS, filter.ID) + } + th = &engine.Threshold{ + Tenant: config.CgrConfig().DefaultTenant, + ID: v2ATR.ID, + } + + thp = &engine.ThresholdProfile{ + ID: v2ATR.ID, + Tenant: config.CgrConfig().DefaultTenant, + Weight: v2ATR.Weight, + ActivationInterval: &utils.ActivationInterval{ActivationTime: v2ATR.ActivationDate, ExpiryTime: v2ATR.ExpirationDate}, + FilterIDs: filterIDS, + MinSleep: v2ATR.MinSleep, + } + + return thp, th, filter, nil +} diff --git a/migrator/thresholds_test.go b/migrator/thresholds_test.go new file mode 100644 index 000000000..1cfdb5d2d --- /dev/null +++ b/migrator/thresholds_test.go @@ -0,0 +1,89 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func Testv2ActionTriggerAsThreshold(t *testing.T) { + var filters []*engine.RequestFilter + v2ATR := &v2ActionTrigger{ + ID: "test2", // original csv tag + UniqueID: "testUUID", // individual id + ThresholdType: "*min_event_counter", //*min_event_counter, *max_event_counter, *min_balance_counter, *max_balance_counter, *min_balance, *max_balance, *balance_expired + ThresholdValue: 5.32, + Recurrent: false, // reset excuted flag each run + MinSleep: time.Duration(5) * time.Second, // Minimum duration between two executions in case of recurrent triggers + ExpirationDate: time.Now(), + ActivationDate: time.Now(), + Balance: new(engine.BalanceFilter), + Weight: 0, + ActionsID: "Action1", + MinQueuedItems: 10, // Trigger actions only if this number is hit (stats only) + Executed: false, + LastExecutionTime: time.Now(), + } + x, _ := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice()) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice()) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject}) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice()) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice()) + filters = append(filters, x) + x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice()) + filters = append(filters, x) + + filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters} + + thp := &engine.ThresholdProfile{ + ID: v2ATR.ID, + Tenant: config.CgrConfig().DefaultTenant, + Blocker: false, + Weight: v2ATR.Weight, + ActivationInterval: &utils.ActivationInterval{v2ATR.ExpirationDate, v2ATR.ActivationDate}, + MinSleep: v2ATR.MinSleep, + } + th := &engine.Threshold{ + Tenant: config.CgrConfig().DefaultTenant, + ID: v2ATR.ID, + } + + newthp, newth, fltr, err := v2ATR.AsThreshold() + if err != nil { + t.Errorf("err") + } + if !reflect.DeepEqual(thp, newthp) { + t.Errorf("Expecting: %+v, received: %+v", thp, newthp) + } + if !reflect.DeepEqual(th, newth) { + t.Errorf("Expecting: %+v, received: %+v", th, newth) + } + if !reflect.DeepEqual(filter, fltr) { + t.Errorf("Expecting: %+v, received: %+v", filter, fltr) + } +} diff --git a/migrator/v1datadb.go b/migrator/v1datadb.go index 57f6eebd1..9719472fd 100644 --- a/migrator/v1datadb.go +++ b/migrator/v1datadb.go @@ -30,4 +30,8 @@ type V1DataDB interface { setV1ActionTriggers(x *v1ActionTriggers) (err error) getV1SharedGroup() (v1acts *v1SharedGroup, err error) setV1SharedGroup(x *v1SharedGroup) (err error) + getV1Stats() (v1st *v1Stat, err error) + setV1Stats(x *v1Stat) (err error) + getV2ActionTrigger() (v2at *v2ActionTrigger, err error) + setV2ActionTrigger(x *v2ActionTrigger) (err error) } diff --git a/migrator/v1mongo_data.go b/migrator/v1mongo_data.go index ccb6900d2..ba0d7b26e 100644 --- a/migrator/v1mongo_data.go +++ b/migrator/v1mongo_data.go @@ -174,3 +174,49 @@ func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) { } return } + +//Stats methods +//get +func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Find(nil).Iter() + } + v1ms.qryIter.Next(&v1st) + if v1st == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v1st, nil +} + +//set +func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) { + if err := v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Insert(x); err != nil { + return err + } + return +} + +//Stats methods +//get +func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter() + } + v1ms.qryIter.Next(&v2at) + if v2at == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v2at, nil +} + +//set +func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) { + if err := v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Insert(x); err != nil { + return err + } + return +} diff --git a/migrator/v1redis.go b/migrator/v1redis.go index 5780193b5..d4764b226 100644 --- a/migrator/v1redis.go +++ b/migrator/v1redis.go @@ -20,6 +20,7 @@ package migrator import ( "fmt" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -305,3 +306,85 @@ func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) { } return } + +//Stats methods +//get +func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.CDR_STATS_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v1st); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1st, nil +} + +//set +func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) { + key := utils.CDR_STATS_PREFIX + x.Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + +//Action methods +//get +func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v2at); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v2at, nil +} + +//set +func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) { + key := utils.ACTION_TRIGGER_PREFIX + x.ID + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} diff --git a/utils/consts.go b/utils/consts.go index aa0c6871c..d4f29e410 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -397,6 +397,8 @@ const ( MetaActionTriggers = "*action_triggers" MetaActions = "*actions" MetaSharedGroups = "*shared_groups" + MetaStats = "*stats" + MetaThresholds = "*thresholds" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB" @@ -450,6 +452,7 @@ const ( CacheResources = "resources" CacheResourceProfiles = "resource_profiles" CacheTimings = "timings" + Thresholds = "Thresholds" StatS = "stats" StatService = "StatS" RALService = "RALs"