Added migration from action triggers to thresholds and cdrstats to stats

This commit is contained in:
edwardro22
2017-11-07 12:24:33 +00:00
committed by Dan Christian Bogos
parent ddefb45e69
commit 4fb9d9ea63
13 changed files with 1240 additions and 60 deletions

View File

@@ -68,12 +68,4 @@ CREATE TABLE versions (
`version` int(11) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `item` (`item`)
);
ALTER TABLE cdrs CHANGE COLUMN `usage` `usage_old` DECIMAL(30,9);
ALTER TABLE cdrs ADD `usage` DECIMAL(30);
UPDATE cdrs SET `usage` = `usage_old` * 1000000000 WHERE usage_old IS NOT NULL;
ALTER TABLE cdrs DROP COLUMN usage_old;
);

View File

@@ -0,0 +1,4 @@
ALTER TABLE cdrs CHANGE COLUMN `usage` `usage_old` DECIMAL(30,9);
ALTER TABLE cdrs ADD `usage` BIGINT;
UPDATE cdrs SET `usage` = `usage_old` * 1000000000 WHERE usage_old IS NOT NULL;
ALTER TABLE cdrs DROP COLUMN usage_old;

View File

@@ -105,8 +105,8 @@ func (vers Versions) Compare(curent Versions, storType string) string {
}
func CurrentDBVersions(storType string) Versions {
dataDbVersions := Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2}
storDbVersions := Versions{utils.COST_DETAILS: 2}
dataDbVersions := CurrentDataDBVersions()
storDbVersions := CurrentStorDBVersions()
allVersions := make(Versions)
for k, v := range dataDbVersions {
@@ -127,13 +127,13 @@ func CurrentDBVersions(storType string) Versions {
return nil
}
func CurrentDataDBVersions() Versions {
return Versions{utils.StatS: 2, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2, utils.Thresholds: 2}
}
func CurrentStorDBVersions() Versions {
return Versions{utils.COST_DETAILS: 2}
}
func CurrentDataDBVersions() Versions {
return Versions{utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2}
}
// Versions will keep trac of various item versions
type Versions map[string]int64 // map[item]versionNr

View File

@@ -105,6 +105,10 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
err = m.migrateActions()
case utils.MetaSharedGroups:
err = m.migrateSharedGroups()
case utils.MetaStats:
err = m.migrateStats()
case utils.MetaThresholds:
err = m.migrateStats()
}
}
for k, v := range m.stats {

View File

@@ -44,6 +44,7 @@ var sTestsITMigrator = []func(t *testing.T){
testMigratorActionTriggers,
testMigratorActions,
testMigratorSharedGroups,
testMigratorStats,
testFlush,
}
@@ -71,7 +72,7 @@ func TestMigratorITPostgresConnect(t *testing.T) {
if err != nil {
log.Fatal(err)
}
mig, err = NewMigrator(dataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, storDB, postgresITCfg.StorDBType, oldDataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, oldstorDB, postgresITCfg.StorDBType)
mig, err = NewMigrator(dataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, storDB, postgresITCfg.StorDBType, oldDataDB, postgresITCfg.DataDbType, postgresITCfg.DBDataEncoding, oldstorDB, postgresITCfg.StorDBType, false)
if err != nil {
log.Fatal(err)
}
@@ -79,6 +80,7 @@ func TestMigratorITPostgresConnect(t *testing.T) {
func TestMigratorITPostgres(t *testing.T) {
dbtype = utils.REDIS
log.Print("REDIS+POSTGRES")
for _, stest := range sTestsITMigrator {
t.Run("TestITMigratorOnPostgres", stest)
}
@@ -108,7 +110,7 @@ func TestMigratorITRedisConnect(t *testing.T) {
if err != nil {
log.Fatal(err)
}
mig, err = NewMigrator(dataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, storDB, mysqlITCfg.StorDBType, oldDataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, oldstorDB, mysqlITCfg.StorDBType)
mig, err = NewMigrator(dataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, storDB, mysqlITCfg.StorDBType, oldDataDB, mysqlITCfg.DataDbType, mysqlITCfg.DBDataEncoding, oldstorDB, mysqlITCfg.StorDBType, false)
if err != nil {
log.Fatal(err)
}
@@ -116,6 +118,7 @@ func TestMigratorITRedisConnect(t *testing.T) {
func TestMigratorITRedis(t *testing.T) {
dbtype = utils.REDIS
log.Print("REDIS+MYSQL")
for _, stest := range sTestsITMigrator {
t.Run("TestITMigratorOnRedis", stest)
}
@@ -145,7 +148,7 @@ func TestMigratorITMongoConnect(t *testing.T) {
if err != nil {
log.Fatal(err)
}
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType)
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType, false)
if err != nil {
log.Fatal(err)
}
@@ -153,27 +156,16 @@ func TestMigratorITMongoConnect(t *testing.T) {
func TestMigratorITMongo(t *testing.T) {
dbtype = utils.MONGO
log.Print("MONGO")
for _, stest := range sTestsITMigrator {
t.Run("TestITMigratorOnMongo", stest)
}
}
func testFlush(t *testing.T) {
switch {
case dbtype == utils.REDIS:
dataDB := mig.dataDB.(*engine.RedisStorage)
err := dataDB.Cmd("FLUSHALL").Err
if err != nil {
t.Error("Error when flushing Redis ", err.Error())
}
case dbtype == utils.MONGO:
err := mig.dataDB.Flush("")
if err != nil {
t.Error("Error when flushing Mongo ", err.Error())
}
}
if err = SetDBVersions(mig.dataDB); err != nil {
return err
mig.dm.DataDB().Flush("")
if err := engine.SetDBVersions(mig.dm.DataDB()); err != nil {
t.Error("Error ", err.Error())
}
}
@@ -192,11 +184,11 @@ func testMigratorAccounts(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 acc ", err.Error())
}
err = mig.Migrate(utils.MetaAccounts)
err, _ = mig.Migrate([]string{utils.MetaAccounts})
if err != nil {
t.Error("Error when migrating accounts ", err.Error())
}
result, err := mig.dataDB.GetAccount(testAccount.ID)
result, err := mig.dm.DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error("Error when getting account ", err.Error())
}
@@ -210,11 +202,11 @@ func testMigratorAccounts(t *testing.T) {
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
err = mig.Migrate(utils.MetaAccounts)
err, _ = mig.Migrate([]string{utils.MetaAccounts})
if err != nil {
t.Error("Error when migrating accounts ", err.Error())
}
result, err := mig.dataDB.GetAccount(testAccount.ID)
result, err := mig.dm.DataDB().GetAccount(testAccount.ID)
if err != nil {
t.Error("Error when getting account ", err.Error())
}
@@ -233,11 +225,11 @@ func testMigratorActionPlans(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 ActionPlan ", err.Error())
}
err = mig.Migrate(utils.MetaActionPlans)
err, _ = mig.Migrate([]string{utils.MetaActionPlans})
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
result, err := mig.dm.DataDB().GetActionPlan(ap.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
@@ -253,11 +245,11 @@ func testMigratorActionPlans(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 ActionPlans ", err.Error())
}
err = mig.Migrate(utils.MetaActionPlans)
err, _ = mig.Migrate([]string{utils.MetaActionPlans})
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
result, err := mig.dm.DataDB().GetActionPlan(ap.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
@@ -309,11 +301,11 @@ func testMigratorActionTriggers(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 ActionTriggers ", err.Error())
}
err = mig.Migrate(utils.MetaActionTriggers)
err, _ = mig.Migrate([]string{utils.MetaActionTriggers})
if err != nil {
t.Error("Error when migrating ActionTriggers ", err.Error())
}
result, err := mig.dataDB.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional)
result, err := mig.dm.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionTriggers ", err.Error())
}
@@ -381,7 +373,7 @@ func testMigratorActionTriggers(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker)
}
case dbtype == utils.MONGO:
err := mig.Migrate(utils.MetaActionTriggers)
err, _ := mig.Migrate([]string{utils.MetaActionTriggers})
if err != nil && err != utils.ErrNotImplemented {
t.Error("Error when migrating ActionTriggers ", err.Error())
}
@@ -398,11 +390,11 @@ func testMigratorActions(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
err = mig.Migrate(utils.MetaActions)
err, _ = mig.Migrate([]string{utils.MetaActions})
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
result, err := mig.dm.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
@@ -415,11 +407,11 @@ func testMigratorActions(t *testing.T) {
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
err = mig.Migrate(utils.MetaActions)
err, _ = mig.Migrate([]string{utils.MetaActions})
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
result, err := mig.dm.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
@@ -430,14 +422,14 @@ func testMigratorActions(t *testing.T) {
}
func testMigratorSharedGroups(t *testing.T) {
v1sg := &v1SharedGroup{
v1sqp := &v1SharedGroup{
Id: "Test",
AccountParameters: map[string]*engine.SharingParameters{
"test": &engine.SharingParameters{Strategy: "*highest"},
},
MemberIds: []string{"1", "2", "3"},
}
sg := &engine.SharedGroup{
sqp := &engine.SharedGroup{
Id: "Test",
AccountParameters: map[string]*engine.SharingParameters{
"test": &engine.SharingParameters{Strategy: "*highest"},
@@ -446,37 +438,252 @@ func testMigratorSharedGroups(t *testing.T) {
}
switch {
case dbtype == utils.REDIS:
err := mig.oldDataDB.setV1SharedGroup(v1sg)
err := mig.oldDataDB.setV1SharedGroup(v1sqp)
if err != nil {
t.Error("Error when setting v1 SharedGroup ", err.Error())
}
err = mig.Migrate(utils.MetaSharedGroups)
err, _ = mig.Migrate([]string{utils.MetaSharedGroups})
if err != nil {
t.Error("Error when migrating SharedGroup ", err.Error())
}
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
result, err := mig.dm.GetSharedGroup(v1sqp.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting SharedGroup ", err.Error())
}
if !reflect.DeepEqual(sg, result) {
t.Errorf("Expecting: %+v, received: %+v", sg, result)
if !reflect.DeepEqual(sqp, result) {
t.Errorf("Expecting: %+v, received: %+v", sqp, result)
}
case dbtype == utils.MONGO:
err := mig.oldDataDB.setV1SharedGroup(v1sg)
err := mig.oldDataDB.setV1SharedGroup(v1sqp)
if err != nil {
t.Error("Error when setting v1 SharedGroup ", err.Error())
}
err = mig.Migrate(utils.MetaSharedGroups)
err, _ = mig.Migrate([]string{utils.MetaSharedGroups})
if err != nil {
t.Error("Error when migrating SharedGroup ", err.Error())
}
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
result, err := mig.dm.GetSharedGroup(v1sqp.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting SharedGroup ", err.Error())
}
if !reflect.DeepEqual(sg, result) {
t.Errorf("Expecting: %+v, received: %+v", sg, result)
if !reflect.DeepEqual(sqp, result) {
t.Errorf("Expecting: %+v, received: %+v", sqp, result)
}
}
}
func testMigratorStats(t *testing.T) {
tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local()
var filters []*engine.RequestFilter
v1Sts := &v1Stat{
Id: "test", // Config id, unique per config instance
QueueLength: 10, // Number of items in the stats buffer
TimeWindow: time.Duration(1) * time.Second, // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
SaveInterval: time.Duration(1) * time.Second,
Metrics: []string{"ASR", "ACD", "ACC"},
SetupInterval: []time.Time{time.Now()},
TOR: []string{},
CdrHost: []string{},
CdrSource: []string{},
ReqType: []string{},
Direction: []string{},
Tenant: []string{},
Category: []string{},
Account: []string{},
Subject: []string{},
DestinationIds: []string{},
UsageInterval: []time.Duration{1 * time.Second},
PddInterval: []time.Duration{1 * time.Second},
Supplier: []string{},
DisconnectCause: []string{},
MediationRunIds: []string{},
RatedAccount: []string{},
RatedSubject: []string{},
CostInterval: []float64{},
Triggers: engine.ActionTriggers{
&engine.ActionTrigger{
ID: "Test",
Balance: &engine.BalanceFilter{
ID: utils.StringPointer("TESTB"),
Timings: []*engine.RITiming{},
ExpirationDate: utils.TimePointer(tim),
Type: utils.StringPointer(utils.MONETARY),
Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)),
},
ExpirationDate: tim,
LastExecutionTime: tim,
ActivationDate: tim,
ThresholdType: utils.TRIGGER_MAX_BALANCE,
ThresholdValue: 2,
ActionsID: "TEST_ACTIONS",
Executed: true,
},
},
}
x, _ := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()})
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()})
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()})
filters = append(filters, x)
filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters}
sqp := &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "test",
FilterIDs: []string{v1Sts.Id},
QueueLength: 10,
TTL: time.Duration(0) * time.Second,
Metrics: []string{"*asr", "*acd", "*acc"},
Thresholds: []string{"Test"},
Blocker: false,
Stored: true,
Weight: float64(0),
MinItems: 0,
}
sq := &engine.StatQueue{Tenant: config.CgrConfig().DefaultTenant,
ID: v1Sts.Id,
SQMetrics: make(map[string]engine.StatMetric),
}
for _, metricID := range sqp.Metrics {
if metric, err := engine.NewStatMetric(metricID, 0); err != nil {
t.Error("Error when creating newstatMETRIc ", err.Error())
} else {
sq.SQMetrics[metricID] = metric
}
}
switch {
case dbtype == utils.REDIS:
err := mig.oldDataDB.setV1Stats(v1Sts)
if err != nil {
t.Error("Error when setting v1Stat ", err.Error())
}
currentVersion := engine.Versions{utils.StatS: 1, utils.Thresholds: 1, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2}
err = mig.dm.DataDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for stats ", err.Error())
}
err, _ = mig.Migrate([]string{utils.MetaStats})
if err != nil {
t.Error("Error when migrating Stats ", err.Error())
}
result, err := mig.dm.GetStatQueueProfile("cgrates.org", v1Sts.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Stats ", err.Error())
}
if !reflect.DeepEqual(sqp.Tenant, result.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, result.Tenant)
}
if !reflect.DeepEqual(sqp.ID, result.ID) {
t.Errorf("Expecting: %+v, received: %+v", sqp.ID, result.ID)
}
if !reflect.DeepEqual(sqp.FilterIDs, result.FilterIDs) {
t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, result.FilterIDs)
}
if !reflect.DeepEqual(sqp.QueueLength, result.QueueLength) {
t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, result.QueueLength)
}
if !reflect.DeepEqual(sqp.TTL, result.TTL) {
t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, result.TTL)
}
if !reflect.DeepEqual(sqp.Metrics, result.Metrics) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, result.Metrics)
}
if !reflect.DeepEqual(sqp.Thresholds, result.Thresholds) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, result.Thresholds)
}
if !reflect.DeepEqual(sqp.Blocker, result.Blocker) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, result.Blocker)
}
if !reflect.DeepEqual(sqp.Stored, result.Stored) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, result.Stored)
}
if !reflect.DeepEqual(sqp.Weight, result.Weight) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, result.Weight)
}
if !reflect.DeepEqual(sqp, result) {
t.Errorf("Expecting: %+v, received: %+v", sqp, result)
}
result1, err := mig.dm.GetFilter("cgrates.org", v1Sts.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Stats ", err.Error())
}
if !reflect.DeepEqual(filter, result1) {
t.Errorf("Expecting: %+v, received: %+v", filter, result1)
}
case dbtype == utils.MONGO:
err := mig.oldDataDB.setV1Stats(v1Sts)
if err != nil {
t.Error("Error when setting v1Stat ", err.Error())
}
currentVersion := engine.Versions{utils.StatS: 1, utils.Accounts: 2, utils.Actions: 2, utils.ActionTriggers: 2, utils.ActionPlans: 2, utils.SharedGroups: 2}
err = mig.dm.DataDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for stats ", err.Error())
}
err, _ = mig.Migrate([]string{utils.MetaStats})
if err != nil {
t.Error("Error when migrating Stats ", err.Error())
}
result, err := mig.dm.GetStatQueueProfile("cgrates.org", v1Sts.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Stats ", err.Error())
}
if !reflect.DeepEqual(sqp.Tenant, result.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, result.Tenant)
}
if !reflect.DeepEqual(sqp.ID, result.ID) {
t.Errorf("Expecting: %+v, received: %+v", sqp.ID, result.ID)
}
if !reflect.DeepEqual(sqp.FilterIDs, result.FilterIDs) {
t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, result.FilterIDs)
}
if !reflect.DeepEqual(sqp.QueueLength, result.QueueLength) {
t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, result.QueueLength)
}
if !reflect.DeepEqual(sqp.TTL, result.TTL) {
t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, result.TTL)
}
if !reflect.DeepEqual(sqp.Metrics, result.Metrics) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, result.Metrics)
}
if !reflect.DeepEqual(sqp.Thresholds, result.Thresholds) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, result.Thresholds)
}
if !reflect.DeepEqual(sqp.Blocker, result.Blocker) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, result.Blocker)
}
if !reflect.DeepEqual(sqp.Stored, result.Stored) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, result.Stored)
}
if !reflect.DeepEqual(sqp.Weight, result.Weight) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, result.Weight)
}
if !reflect.DeepEqual(sqp, result) {
t.Errorf("Expecting: %+v, received: %+v", sqp, result)
}
result1, err := mig.dm.GetFilter("cgrates.org", v1Sts.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Stats ", err.Error())
}
if !reflect.DeepEqual(filter.ActivationInterval, result1.ActivationInterval) {
t.Errorf("Expecting: %+v, received: %+v", filter.ActivationInterval, result1.ActivationInterval)
}
if !reflect.DeepEqual(filter.Tenant, result1.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", filter.Tenant, result1.Tenant)
}
}
result1, err := mig.dm.GetStatQueue("cgrates.org", v1Sts.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Stats ", err.Error())
}
log.Print("Wrong version", result1)
}

313
migrator/stats.go Normal file
View File

@@ -0,0 +1,313 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type v1Stat struct {
Id string // Config id, unique per config instance
QueueLength int // Number of items in the stats buffer
TimeWindow time.Duration // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
SaveInterval time.Duration
Metrics []string // ASR, ACD, ACC
SetupInterval []time.Time // CDRFieldFilter on SetupInterval, 2 or less items (>= start interval,< stop_interval)
TOR []string // CDRFieldFilter on TORs
CdrHost []string // CDRFieldFilter on CdrHosts
CdrSource []string // CDRFieldFilter on CdrSources
ReqType []string // CDRFieldFilter on RequestTypes
Direction []string // CDRFieldFilter on Directions
Tenant []string // CDRFieldFilter on Tenants
Category []string // CDRFieldFilter on Categories
Account []string // CDRFieldFilter on Accounts
Subject []string // CDRFieldFilter on Subjects
DestinationIds []string // CDRFieldFilter on DestinationPrefixes
UsageInterval []time.Duration // CDRFieldFilter on UsageInterval, 2 or less items (>= Usage, <Usage)
PddInterval []time.Duration // CDRFieldFilter on PddInterval, 2 or less items (>= Pdd, <Pdd)
Supplier []string // CDRFieldFilter on Suppliers
DisconnectCause []string // Filter on DisconnectCause
MediationRunIds []string // CDRFieldFilter on MediationRunIds
RatedAccount []string // CDRFieldFilter on RatedAccounts
RatedSubject []string // CDRFieldFilter on RatedSubjects
CostInterval []float64 // CDRFieldFilter on CostInterval, 2 or less items, (>=Cost, <Cost)
Triggers engine.ActionTriggers
}
type v1Stats []*v1Stat
func (m *Migrator) migrateStats() (err error) {
var vrs engine.Versions
if m.dm.DataDB() == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to datadb")
}
vrs, err = m.dm.DataDB().GetVersions(utils.TBLVersions)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for Stats model")
}
if vrs[utils.StatS] != 1 { // Right now we only support migrating from version 1
log.Print("Wrong version")
return
}
var v1Sts *v1Stat
for {
v1Sts, err = m.oldDataDB.getV1Stats()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if v1Sts.Id != "" {
if len(v1Sts.Triggers) != 0 {
for _, Trigger := range v1Sts.Triggers {
if err := m.SasThreshold(Trigger); err != nil {
return err
}
}
}
filter, sq, sts, err := v1Sts.AsStatQP()
if err != nil {
return err
}
if m.dryRun != true {
if err := m.dm.SetFilter(filter); err != nil {
return err
}
if err := m.dm.SetStatQueue(sq); err != nil {
return err
}
if err := m.dm.SetStatQueueProfile(sts); err != nil {
return err
}
m.stats[utils.StatS] += 1
}
}
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentStorDBVersions()[utils.StatS]}
if err = m.dm.DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Stats version into dataDB", err.Error()))
}
}
return
}
func (v1Sts v1Stat) AsStatQP() (filter *engine.Filter, sq *engine.StatQueue, stq *engine.StatQueueProfile, err error) {
var filters []*engine.RequestFilter
if len(v1Sts.SetupInterval) == 1 {
x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
} else if len(v1Sts.SetupInterval) == 2 {
x, err := engine.NewRequestFilter(engine.MetaLessThan, "SetupInterval", []string{v1Sts.SetupInterval[1].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.TOR) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TOR", v1Sts.TOR)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.CdrHost) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "CdrHost", v1Sts.CdrHost)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.ReqType) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "ReqType", v1Sts.ReqType)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Direction) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Direction", v1Sts.Direction)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Category) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Category", v1Sts.Category)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Account) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Account", v1Sts.Account)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Subject) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Subject", v1Sts.Subject)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Supplier) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Supplier", v1Sts.Supplier)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.UsageInterval) == 1 {
x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
} else if len(v1Sts.UsageInterval) == 2 {
x, err := engine.NewRequestFilter(engine.MetaLessThan, "UsageInterval", []string{v1Sts.UsageInterval[1].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.PddInterval) == 1 {
x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
} else if len(v1Sts.PddInterval) == 2 {
x, err := engine.NewRequestFilter(engine.MetaLessThan, "PddInterval", []string{v1Sts.PddInterval[1].String()})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.Supplier) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Supplier", v1Sts.Supplier)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.DisconnectCause) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "DisconnectCause", v1Sts.DisconnectCause)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.MediationRunIds) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "MediationRunIds", v1Sts.MediationRunIds)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.RatedSubject) != 0 {
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatedSubject", v1Sts.RatedSubject)
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if len(v1Sts.CostInterval) == 1 {
x, err := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "CostInterval", []string{strconv.FormatFloat(v1Sts.CostInterval[0], 'f', 6, 64)})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
} else if len(v1Sts.CostInterval) == 2 {
x, err := engine.NewRequestFilter(engine.MetaLessThan, "CostInterval", []string{strconv.FormatFloat(v1Sts.CostInterval[1], 'f', 6, 64)})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters}
stq = &engine.StatQueueProfile{
ID: v1Sts.Id,
QueueLength: v1Sts.QueueLength,
Metrics: []string{},
Tenant: config.CgrConfig().DefaultTenant,
Blocker: false,
Stored: false,
Thresholds: []string{},
FilterIDs: []string{v1Sts.Id},
}
if v1Sts.SaveInterval != 0 {
stq.Stored = true
}
if len(v1Sts.Triggers) != 0 {
for i, _ := range v1Sts.Triggers {
stq.Thresholds = append(stq.Thresholds, v1Sts.Triggers[i].ID)
}
}
sq = &engine.StatQueue{Tenant: config.CgrConfig().DefaultTenant,
ID: v1Sts.Id,
SQMetrics: make(map[string]engine.StatMetric),
}
if len(v1Sts.Metrics) != 0 {
for i, _ := range v1Sts.Metrics {
if !strings.HasPrefix(v1Sts.Metrics[i], "*") {
v1Sts.Metrics[i] = "*" + v1Sts.Metrics[i]
}
v1Sts.Metrics[i] = strings.ToLower(v1Sts.Metrics[i])
stq.Metrics = append(stq.Metrics, v1Sts.Metrics[i])
if metric, err := engine.NewStatMetric(stq.Metrics[i], 0); err != nil {
return nil, nil, nil, err
} else {
sq.SQMetrics[stq.Metrics[i]] = metric
}
}
}
return filter, sq, stq, nil
}

139
migrator/stats_test.go Normal file
View File

@@ -0,0 +1,139 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestV1StatsAsStats(t *testing.T) {
tim := time.Date(0001, time.January, 1, 2, 0, 0, 0, time.UTC).Local()
var filters []*engine.RequestFilter
v1Sts := &v1Stat{
Id: "test", // Config id, unique per config instance
QueueLength: 10, // Number of items in the stats buffer
TimeWindow: time.Duration(1) * time.Second, // Will only keep the CDRs who's call setup time is not older than time.Now()-TimeWindow
SaveInterval: time.Duration(1) * time.Second,
Metrics: []string{"ASR", "ACD", "ACC"},
SetupInterval: []time.Time{time.Now()},
TOR: []string{},
CdrHost: []string{},
CdrSource: []string{},
ReqType: []string{},
Direction: []string{},
Tenant: []string{},
Category: []string{},
Account: []string{},
Subject: []string{},
DestinationIds: []string{},
UsageInterval: []time.Duration{1 * time.Second},
PddInterval: []time.Duration{1 * time.Second},
Supplier: []string{},
DisconnectCause: []string{},
MediationRunIds: []string{},
RatedAccount: []string{},
RatedSubject: []string{},
CostInterval: []float64{},
Triggers: engine.ActionTriggers{&engine.ActionTrigger{
ID: "TestB",
Balance: &engine.BalanceFilter{
ID: utils.StringPointer("TESTB"),
Timings: []*engine.RITiming{},
ExpirationDate: utils.TimePointer(tim),
Type: utils.StringPointer(utils.MONETARY),
Directions: utils.StringMapPointer(utils.NewStringMap(utils.OUT)),
},
ExpirationDate: tim,
LastExecutionTime: tim,
ActivationDate: tim,
ThresholdType: utils.TRIGGER_MAX_BALANCE,
ThresholdValue: 2,
ActionsID: "TEST_ACTIONS",
Executed: true,
}},
}
x, _ := engine.NewRequestFilter(engine.MetaGreaterOrEqual, "SetupInterval", []string{v1Sts.SetupInterval[0].String()})
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "UsageInterval", []string{v1Sts.UsageInterval[0].String()})
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaGreaterOrEqual, "PddInterval", []string{v1Sts.PddInterval[0].String()})
filters = append(filters, x)
filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: v1Sts.Id, RequestFilters: filters}
sqp := &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "test",
FilterIDs: []string{v1Sts.Id},
QueueLength: 10,
TTL: time.Duration(0) * time.Second,
Metrics: []string{"*asr", "*acd", "*acc"},
Blocker: false,
Thresholds: []string{"TestB"},
Stored: true,
Weight: float64(0),
MinItems: 0,
}
fltr, _, newsqp, err := v1Sts.AsStatQP()
if err != nil {
t.Errorf("err")
}
if !reflect.DeepEqual(sqp.Tenant, newsqp.Tenant) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Tenant, newsqp.Tenant)
}
if !reflect.DeepEqual(sqp.ID, newsqp.ID) {
t.Errorf("Expecting: %+v, received: %+v", sqp.ID, newsqp.ID)
}
if !reflect.DeepEqual(sqp.FilterIDs, newsqp.FilterIDs) {
t.Errorf("Expecting: %+v, received: %+v", sqp.FilterIDs, newsqp.FilterIDs)
}
if !reflect.DeepEqual(sqp.QueueLength, newsqp.QueueLength) {
t.Errorf("Expecting: %+v, received: %+v", sqp.QueueLength, newsqp.QueueLength)
}
if !reflect.DeepEqual(sqp.TTL, newsqp.TTL) {
t.Errorf("Expecting: %+v, received: %+v", sqp.TTL, newsqp.TTL)
}
if !reflect.DeepEqual(sqp.Metrics, newsqp.Metrics) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Metrics, newsqp.Metrics)
}
if !reflect.DeepEqual(sqp.Thresholds, newsqp.Thresholds) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Thresholds, newsqp.Thresholds)
}
if !reflect.DeepEqual(sqp.Blocker, newsqp.Blocker) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Blocker, newsqp.Blocker)
}
if !reflect.DeepEqual(sqp.Stored, newsqp.Stored) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Stored, newsqp.Stored)
}
if !reflect.DeepEqual(sqp.Weight, newsqp.Weight) {
t.Errorf("Expecting: %+v, received: %+v", sqp.Weight, newsqp.Weight)
}
if !reflect.DeepEqual(sqp, newsqp) {
t.Errorf("Expecting: %+v, received: %+v", sqp, newsqp)
}
if !reflect.DeepEqual(filter, fltr) {
t.Errorf("Expecting: %+v, received: %+v", filter, fltr)
}
}

296
migrator/thresholds.go Normal file
View File

@@ -0,0 +1,296 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"fmt"
"log"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type v2ActionTrigger struct {
ID string // original csv tag
UniqueID string // individual id
ThresholdType string //*min_event_counter, *max_event_counter, *min_balance_counter, *max_balance_counter, *min_balance, *max_balance, *balance_expired
// stats: *min_asr, *max_asr, *min_acd, *max_acd, *min_tcd, *max_tcd, *min_acc, *max_acc, *min_tcc, *max_tcc, *min_ddc, *max_ddc
ThresholdValue float64
Recurrent bool // reset excuted flag each run
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
ExpirationDate time.Time
ActivationDate time.Time
//BalanceType string // *monetary/*voice etc
Balance *engine.BalanceFilter //filtru
Weight float64
ActionsID string
MinQueuedItems int // Trigger actions only if this number is hit (stats only) MINHITS
Executed bool
LastExecutionTime time.Time
}
type v2ActionTriggers []*v2ActionTrigger
func (m *Migrator) migratev1ActionTriggers() (err error) {
var vrs engine.Versions
if m.dm.DataDB() == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to datadb")
}
vrs, err = m.dm.DataDB().GetVersions(utils.TBLVersions)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying oldDataDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for Stats model")
}
if vrs[utils.Thresholds] != 1 { // Right now we only support migrating from version 1
log.Print("Wrong version")
return
}
var v2ACT *v2ActionTrigger
for {
v2ACT, err = m.oldDataDB.getV2ActionTrigger()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if v2ACT.ID != "" {
thp, th, filter, err := v2ACT.AsThreshold()
if err != nil {
return err
}
if m.dryRun != true {
if err := m.dm.SetFilter(filter); err != nil {
return err
}
if err := m.dm.SetThreshold(th); err != nil {
return err
}
if err := m.dm.SetThresholdProfile(thp); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
}
}
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
if err = m.dm.DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
}
}
return
}
func (v2ATR v2ActionTrigger) AsThreshold() (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) {
var filters []*engine.RequestFilter
if *v2ATR.Balance.ID != "" {
if v2ATR.Balance.Directions != nil {
x, err := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
//TO DO:
// if v2ATR.Balance.ExpirationDate != nil { //MetaLess
// x, err := engine.NewRequestFilter(engine.MetaTimings, "ExpirationDate", v2ATR.Balance.ExpirationDate)
// if err != nil {
// return nil, nil, err
// }
// filters = append(filters, x)
// }
// if v2ATR.Balance.Weight != nil { //MetaLess /MetaRSRFields
// x, err := engine.NewRequestFilter(engine.MetaLessOrEqual, "Weight", []string{strconv.FormatFloat(*v2ATR.Balance.Weight, 'f', 6, 64)})
// if err != nil {
// return nil, nil, err
// }
// filters = append(filters, x)
// }
if v2ATR.Balance.DestinationIDs != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.RatingSubject != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.Categories != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.SharedGroups != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.TimingIDs != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
}
filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters}
th = &engine.Threshold{
Tenant: config.CgrConfig().DefaultTenant,
ID: v2ATR.ID,
}
thp = &engine.ThresholdProfile{
ID: v2ATR.ID,
Tenant: config.CgrConfig().DefaultTenant,
Weight: v2ATR.Weight,
ActivationInterval: &utils.ActivationInterval{ActivationTime: v2ATR.ActivationDate, ExpiryTime: v2ATR.ExpirationDate},
FilterIDs: []string{filter.ID},
MinSleep: v2ATR.MinSleep,
}
return thp, th, filter, nil
}
func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) {
var vrs engine.Versions
if m.dm.DataDB() == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to datadb")
}
if v2ATR.ID != "" {
thp, th, filter, err := AsThreshold2(*v2ATR)
if err != nil {
return err
}
if filter != nil {
if err := m.dm.SetFilter(filter); err != nil {
return err
}
}
if err := m.dm.SetThreshold(th); err != nil {
return err
}
if err := m.dm.SetThresholdProfile(thp); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
}
// All done, update version wtih current one
vrs = engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
if err = m.dm.DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Thresholds version into dataDB", err.Error()))
}
return
}
func AsThreshold2(v2ATR engine.ActionTrigger) (thp *engine.ThresholdProfile, th *engine.Threshold, filter *engine.Filter, err error) {
var filterIDS []string
var filters []*engine.RequestFilter
if v2ATR.Balance.ID != nil && *v2ATR.Balance.ID != "" {
if v2ATR.Balance.Directions != nil {
x, err := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.DestinationIDs != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.RatingSubject != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject})
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.Categories != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.SharedGroups != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
if v2ATR.Balance.TimingIDs != nil { //MetaLess /RSRfields
x, err := engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice())
if err != nil {
return nil, nil, nil, err
}
filters = append(filters, x)
}
filter = &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters}
filterIDS = append(filterIDS, filter.ID)
}
th = &engine.Threshold{
Tenant: config.CgrConfig().DefaultTenant,
ID: v2ATR.ID,
}
thp = &engine.ThresholdProfile{
ID: v2ATR.ID,
Tenant: config.CgrConfig().DefaultTenant,
Weight: v2ATR.Weight,
ActivationInterval: &utils.ActivationInterval{ActivationTime: v2ATR.ActivationDate, ExpiryTime: v2ATR.ExpirationDate},
FilterIDs: filterIDS,
MinSleep: v2ATR.MinSleep,
}
return thp, th, filter, nil
}

View File

@@ -0,0 +1,89 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func Testv2ActionTriggerAsThreshold(t *testing.T) {
var filters []*engine.RequestFilter
v2ATR := &v2ActionTrigger{
ID: "test2", // original csv tag
UniqueID: "testUUID", // individual id
ThresholdType: "*min_event_counter", //*min_event_counter, *max_event_counter, *min_balance_counter, *max_balance_counter, *min_balance, *max_balance, *balance_expired
ThresholdValue: 5.32,
Recurrent: false, // reset excuted flag each run
MinSleep: time.Duration(5) * time.Second, // Minimum duration between two executions in case of recurrent triggers
ExpirationDate: time.Now(),
ActivationDate: time.Now(),
Balance: new(engine.BalanceFilter),
Weight: 0,
ActionsID: "Action1",
MinQueuedItems: 10, // Trigger actions only if this number is hit (stats only)
Executed: false,
LastExecutionTime: time.Now(),
}
x, _ := engine.NewRequestFilter(engine.MetaRSRFields, "Directions", v2ATR.Balance.Directions.Slice())
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaDestinations, "DestinationIDs", v2ATR.Balance.DestinationIDs.Slice())
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "RatingSubject", []string{*v2ATR.Balance.RatingSubject})
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "Categories", v2ATR.Balance.Categories.Slice())
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "SharedGroups", v2ATR.Balance.SharedGroups.Slice())
filters = append(filters, x)
x, _ = engine.NewRequestFilter(engine.MetaStringPrefix, "TimingIDs", v2ATR.Balance.TimingIDs.Slice())
filters = append(filters, x)
filter := &engine.Filter{Tenant: config.CgrConfig().DefaultTenant, ID: *v2ATR.Balance.ID, RequestFilters: filters}
thp := &engine.ThresholdProfile{
ID: v2ATR.ID,
Tenant: config.CgrConfig().DefaultTenant,
Blocker: false,
Weight: v2ATR.Weight,
ActivationInterval: &utils.ActivationInterval{v2ATR.ExpirationDate, v2ATR.ActivationDate},
MinSleep: v2ATR.MinSleep,
}
th := &engine.Threshold{
Tenant: config.CgrConfig().DefaultTenant,
ID: v2ATR.ID,
}
newthp, newth, fltr, err := v2ATR.AsThreshold()
if err != nil {
t.Errorf("err")
}
if !reflect.DeepEqual(thp, newthp) {
t.Errorf("Expecting: %+v, received: %+v", thp, newthp)
}
if !reflect.DeepEqual(th, newth) {
t.Errorf("Expecting: %+v, received: %+v", th, newth)
}
if !reflect.DeepEqual(filter, fltr) {
t.Errorf("Expecting: %+v, received: %+v", filter, fltr)
}
}

View File

@@ -30,4 +30,8 @@ type V1DataDB interface {
setV1ActionTriggers(x *v1ActionTriggers) (err error)
getV1SharedGroup() (v1acts *v1SharedGroup, err error)
setV1SharedGroup(x *v1SharedGroup) (err error)
getV1Stats() (v1st *v1Stat, err error)
setV1Stats(x *v1Stat) (err error)
getV2ActionTrigger() (v2at *v2ActionTrigger, err error)
setV2ActionTrigger(x *v2ActionTrigger) (err error)
}

View File

@@ -174,3 +174,49 @@ func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) {
}
return
}
//Stats methods
//get
func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1st)
if v1st == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
return v1st, nil
}
//set
func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Insert(x); err != nil {
return err
}
return
}
//Stats methods
//get
func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter()
}
v1ms.qryIter.Next(&v2at)
if v2at == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
return v2at, nil
}
//set
func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.ACTION_TRIGGER_PREFIX).Insert(x); err != nil {
return err
}
return
}

View File

@@ -20,6 +20,7 @@ package migrator
import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -305,3 +306,85 @@ func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) {
}
return
}
//Stats methods
//get
func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.CDR_STATS_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1st); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1st, nil
}
//set
func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) {
key := utils.CDR_STATS_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//Action methods
//get
func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v2at); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v2at, nil
}
//set
func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + x.ID
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}

View File

@@ -397,6 +397,8 @@ const (
MetaActionTriggers = "*action_triggers"
MetaActions = "*actions"
MetaSharedGroups = "*shared_groups"
MetaStats = "*stats"
MetaThresholds = "*thresholds"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
@@ -450,6 +452,7 @@ const (
CacheResources = "resources"
CacheResourceProfiles = "resource_profiles"
CacheTimings = "timings"
Thresholds = "Thresholds"
StatS = "stats"
StatService = "StatS"
RALService = "RALs"