Merge branch 'master' of github.com:cgrates/cgrates

This commit is contained in:
DanB
2017-09-04 16:29:12 +02:00
32 changed files with 1449 additions and 927 deletions

View File

@@ -44,9 +44,10 @@ func (self *ApierV1) GetTPResource(attr AttrGetTPResource, reply *utils.TPResour
return utils.NewErrMandatoryIeMissing(missing...)
}
if rls, err := self.StorDb.GetTPResources(attr.TPid, attr.ID); err != nil {
return utils.NewErrServerError(err)
} else if len(rls) == 0 {
return utils.ErrNotFound
if err.Error() != utils.ErrNotFound.Error() {
err = utils.NewErrServerError(err)
}
return err
} else {
*reply = *rls[0]
}

View File

@@ -0,0 +1,217 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"reflect"
"testing"
)
var (
tpResCfgPath string
tpResCfg *config.CGRConfig
tpResRPC *rpc.Client
tpResDataDir = "/usr/share/cgrates"
tpRes *utils.TPResource
tpResDelay int
tpResConfigDIR string //run tests for specific configuration
)
var sTestsTPResources = []func(t *testing.T){
testTPResInitCfg,
testTPResResetStorDb,
testTPResStartEngine,
testTPResRpcConn,
testTPResGetTPResourceBeforeSet,
testTPResSetTPResource,
testTPResGetTPResourceAfterSet,
testTPResUpdateTPResource,
testTPResGetTPResourceAfterUpdate,
testTPResRemTPResource,
testTPResGetTPResourceAfterRemove,
testTPResKillEngine,
}
//Test start here
func TestTPResITMySql(t *testing.T) {
tpResConfigDIR = "tutmysql"
for _, stest := range sTestsTPResources {
t.Run(tpResConfigDIR, stest)
}
}
func TestTPResITMongo(t *testing.T) {
tpResConfigDIR = "tutmongo"
for _, stest := range sTestsTPResources {
t.Run(tpResConfigDIR, stest)
}
}
func TestTPResITPG(t *testing.T) {
tpResConfigDIR = "tutpostgres"
for _, stest := range sTestsTPResources {
t.Run(tpResConfigDIR, stest)
}
}
func testTPResInitCfg(t *testing.T) {
var err error
tpResCfgPath = path.Join(tpResDataDir, "conf", "samples", tpResConfigDIR)
tpResCfg, err = config.NewCGRConfigFromFolder(tpResCfgPath)
if err != nil {
t.Error(err)
}
tpResCfg.DataFolderPath = tpResDataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(tpResCfg)
switch tpResConfigDIR {
case "tutmongo": // Mongo needs more time to reset db, need to investigate
tpResDelay = 4000
default:
tpResDelay = 1000
}
}
// Wipe out the cdr database
func testTPResResetStorDb(t *testing.T) {
if err := engine.InitStorDb(tpResCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func testTPResStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(tpResCfgPath, tpResDelay); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testTPResRpcConn(t *testing.T) {
var err error
tpResRPC, err = jsonrpc.Dial("tcp", tpResCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
func testTPResGetTPResourceBeforeSet(t *testing.T) {
var reply *utils.TPResource
if err := tpResRPC.Call("ApierV1.GetTPResource", AttrGetTPResource{TPid: "TPR1", ID: "Res"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testTPResSetTPResource(t *testing.T) {
tpRes = &utils.TPResource{
TPid: "TPR1",
ID: "Res",
Filters: []*utils.TPRequestFilter{
&utils.TPRequestFilter{
Type: "*string",
FieldName: "Account",
Values: []string{"1001", "1002"},
},
},
ActivationInterval: &utils.TPActivationInterval{
ActivationTime: "2014-07-29T15:00:00Z",
ExpiryTime: "",
},
UsageTTL: "1",
Limit: "1",
AllocationMessage: "Message",
Blocker: false,
Stored: false,
Weight: 20,
Thresholds: []string{"ValOne", "ValTwo"},
}
var result string
if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testTPResGetTPResourceAfterSet(t *testing.T) {
var respond *utils.TPResource
if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &respond); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tpRes, respond) {
t.Errorf("Expecting : %+v, received: %+v", tpRes, respond)
}
}
func testTPResUpdateTPResource(t *testing.T) {
var result string
tpRes.Filters = []*utils.TPRequestFilter{
&utils.TPRequestFilter{
Type: "*string",
FieldName: "Account",
Values: []string{"1001", "1002"},
},
&utils.TPRequestFilter{
Type: "*string_prefix",
FieldName: "Destination",
Values: []string{"10", "20"},
},
}
if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testTPResGetTPResourceAfterUpdate(t *testing.T) {
var expectedTPR *utils.TPResource
if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &expectedTPR); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tpRes, expectedTPR) {
t.Errorf("Expecting: %+v, received: %+v", tpRes, expectedTPR)
}
}
func testTPResRemTPResource(t *testing.T) {
var resp string
if err := tpResRPC.Call("ApierV1.RemTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &resp); err != nil {
t.Error(err)
} else if resp != utils.OK {
t.Error("Unexpected reply returned", resp)
}
}
func testTPResGetTPResourceAfterRemove(t *testing.T) {
var respond *utils.TPResource
if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testTPResKillEngine(t *testing.T) {
if err := engine.KillEngine(tpResDelay); err != nil {
t.Error(err)
}
}

View File

@@ -30,99 +30,140 @@ import (
"testing"
)
var tpCfgPath string
var tpCfg *config.CGRConfig
var tpRPC *rpc.Client
var tpDataDir = "/usr/share/cgrates"
var (
tpStatCfgPath string
tpStatCfg *config.CGRConfig
tpStatRPC *rpc.Client
tpStatDataDir = "/usr/share/cgrates"
tpStat *utils.TPStats
tpStatDelay int
tpStatConfigDIR string //run tests for specific configuration
)
func TestTPStatInitCfg(t *testing.T) {
var sTestsTPStats = []func(t *testing.T){
testTPStatsInitCfg,
testTPStatsResetStorDb,
testTPStatsStartEngine,
testTPStatsRpcConn,
testTPStatsGetTPStatBeforeSet,
testTPStatsSetTPStat,
testTPStatsGetTPStatAfterSet,
testTPStatsUpdateTPStat,
testTPStatsGetTPStatAfterUpdate,
testTPStatsRemTPStat,
testTPStatsGetTPStatAfterRemove,
testTPStatsKillEngine,
}
//Test start here
func TestTPStatITMySql(t *testing.T) {
tpStatConfigDIR = "tutmysql"
for _, stest := range sTestsTPStats {
t.Run(tpStatConfigDIR, stest)
}
}
func TestTPStatITMongo(t *testing.T) {
tpStatConfigDIR = "tutmongo"
for _, stest := range sTestsTPStats {
t.Run(tpStatConfigDIR, stest)
}
}
func TestTPStatITPG(t *testing.T) {
tpStatConfigDIR = "tutpostgres"
for _, stest := range sTestsTPStats {
t.Run(tpStatConfigDIR, stest)
}
}
func testTPStatsInitCfg(t *testing.T) {
var err error
tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql")
tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath)
tpStatCfgPath = path.Join(tpStatDataDir, "conf", "samples", tpStatConfigDIR)
tpStatCfg, err = config.NewCGRConfigFromFolder(tpStatCfgPath)
if err != nil {
t.Error(err)
}
tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(tpCfg)
tpStatCfg.DataFolderPath = tpStatDataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(tpStatCfg)
switch tpStatConfigDIR {
case "tutmongo": // Mongo needs more time to reset db, need to investigate
tpStatDelay = 4000
default:
tpStatDelay = 1000
}
}
// Wipe out the cdr database
func TestTPStatResetStorDb(t *testing.T) {
if err := engine.InitStorDb(tpCfg); err != nil {
func testTPStatsResetStorDb(t *testing.T) {
if err := engine.InitStorDb(tpStatCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func TestTPStatStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(tpCfgPath, 1000); err != nil {
func testTPStatsStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(tpStatCfgPath, tpStatDelay); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func TestTPStatRpcConn(t *testing.T) {
func testTPStatsRpcConn(t *testing.T) {
var err error
tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
tpStatRPC, err = jsonrpc.Dial("tcp", tpStatCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
var tpStat = &utils.TPStats{
TPid: "TPS1",
ID: "Stat1",
Filters: []*utils.TPRequestFilter{
&utils.TPRequestFilter{
Type: "*string",
FieldName: "Account",
Values: []string{"1001", "1002"},
},
&utils.TPRequestFilter{
Type: "*string_prefix",
FieldName: "Destination",
Values: []string{"10", "20"},
},
},
ActivationInterval: &utils.TPActivationInterval{
ActivationTime: "2014-07-29T15:00:00Z",
ExpiryTime: "",
},
TTL: "1",
Metrics: []string{"MetricValue", "MetricValueTwo"},
Blocker: true,
Stored: true,
Weight: 20,
Thresholds: nil,
}
func TestTPStatGetTPStatIDs(t *testing.T) {
var reply []string
if err := tpRPC.Call("ApierV1.GetTPStatIDs", AttrGetTPStatIds{TPid: "TPS1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
func testTPStatsGetTPStatBeforeSet(t *testing.T) {
var reply *utils.TPStats
if err := tpStatRPC.Call("ApierV1.GetTPStat", AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func TestTPStatSetTPStat(t *testing.T) {
func testTPStatsSetTPStat(t *testing.T) {
tpStat = &utils.TPStats{
TPid: "TPS1",
ID: "Stat1",
Filters: []*utils.TPRequestFilter{
&utils.TPRequestFilter{
Type: "*string",
FieldName: "Account",
Values: []string{"1001", "1002"},
},
},
ActivationInterval: &utils.TPActivationInterval{
ActivationTime: "2014-07-29T15:00:00Z",
ExpiryTime: "",
},
TTL: "1",
Metrics: []string{"MetricValue", "MetricValueTwo"},
Blocker: false,
Stored: false,
Weight: 20,
Thresholds: []string{"ThreshValue", "ThreshValueTwo"},
}
var result string
if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func TestTPStatGetTPStat(t *testing.T) {
func testTPStatsGetTPStatAfterSet(t *testing.T) {
var respond *utils.TPStats
if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil {
if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tpStat, respond) {
t.Errorf("Expecting: %+v, received: %+v", tpStat.TPid, respond.TPid)
t.Errorf("Expecting: %+v, received: %+v", tpStat, respond)
}
}
func TestTPStatUpdateTPStat(t *testing.T) {
func testTPStatsUpdateTPStat(t *testing.T) {
var result string
tpStat.Weight = 21
tpStat.Filters = []*utils.TPRequestFilter{
@@ -136,43 +177,41 @@ func TestTPStatUpdateTPStat(t *testing.T) {
FieldName: "Destination",
Values: []string{"10", "20"},
},
&utils.TPRequestFilter{
Type: "*rsr_fields",
FieldName: "",
Values: []string{"Subject(~^1.*1$)", "Destination(1002)"},
},
}
if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testTPStatsGetTPStatAfterUpdate(t *testing.T) {
var expectedTPS *utils.TPStats
if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil {
if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tpStat, expectedTPS) {
t.Errorf("Expecting: %+v, received: %+v", tpStat, expectedTPS)
}
}
func TestTPStatRemTPStat(t *testing.T) {
func testTPStatsRemTPStat(t *testing.T) {
var resp string
if err := tpRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil {
if err := tpStatRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil {
t.Error(err)
} else if resp != utils.OK {
t.Error("Unexpected reply returned", resp)
}
}
func TestTPStatCheckDelete(t *testing.T) {
func testTPStatsGetTPStatAfterRemove(t *testing.T) {
var respond *utils.TPStats
if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func TestTPStatKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
func testTPStatsKillEngine(t *testing.T) {
if err := engine.KillEngine(tpStatDelay); err != nil {
t.Error(err)
}
}

View File

@@ -22,38 +22,31 @@ import (
"fmt"
"log"
"path"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/migrator"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,*enforce,acc,atr,act,dcs,apl,shg")
migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>")
datadb_type = flag.String("datadb_type", cgrConfig.DataDbType, "The type of the DataDb database <redis>")
datadb_host = flag.String("datadb_host", cgrConfig.DataDbHost, "The DataDb host to connect to.")
datadb_port = flag.String("datadb_port", cgrConfig.DataDbPort, "The DataDb port to bind to.")
datadb_name = flag.String("datadb_name", cgrConfig.DataDbName, "The name/number of the DataDb to connect to.")
datadb_user = flag.String("datadb_user", cgrConfig.DataDbUser, "The DataDb user to sign in as.")
datadb_pass = flag.String("datadb_passwd", cgrConfig.DataDbPass, "The DataDb user's password.")
datadb_type = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
datadb_host = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
datadb_port = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
datadb_name = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
datadb_user = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
datadb_pass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
stor_db_type = flag.String("stordb_type", cgrConfig.StorDBType, "The type of the storDb database <mysql>")
stor_db_host = flag.String("stordb_host", cgrConfig.StorDBHost, "The storDb host to connect to.")
stor_db_port = flag.String("stordb_port", cgrConfig.StorDBPort, "The storDb port to bind to.")
stor_db_name = flag.String("stordb_name", cgrConfig.StorDBName, "The name/number of the storDb to connect to.")
stor_db_user = flag.String("stordb_user", cgrConfig.StorDBUser, "The storDb user to sign in as.")
stor_db_pass = flag.String("stordb_passwd", cgrConfig.StorDBPass, "The storDb user's password.")
stor_db_type = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
stor_db_host = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
stor_db_port = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
stor_db_name = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
stor_db_user = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
stor_db_pass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings")
dbdata_encoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
flush = flag.Bool("flushdb", false, "Flush the database before importing")
tpid = flag.String("tpid", "", "The tariff plan id from the database")
@@ -66,13 +59,13 @@ var (
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
rpcEncoding = flag.String("rpc_encoding", "json", "RPC encoding used <gob|json>")
historyServer = flag.String("historys", cgrConfig.RPCJSONListen, "The history server address:port, empty to disable automatic history archiving")
ralsAddress = flag.String("rals", cgrConfig.RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats", cgrConfig.RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users", cgrConfig.RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
historyServer = flag.String("historys", config.CgrConfig().RPCJSONListen, "The history server address:port, empty to disable automatic history archiving")
ralsAddress = flag.String("rals", config.CgrConfig().RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats", config.CgrConfig().RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users", config.CgrConfig().RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
loadHistorySize = flag.Int("load_history_size", cgrConfig.LoadHistorySize, "Limit the number of records in the load history")
timezone = flag.String("timezone", cgrConfig.DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`)
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
timezone = flag.String("timezone", config.CgrConfig().DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`)
disable_reverse = flag.Bool("disable_reverse_mappings", false, "Will disable reverse mappings rebuilding")
)
@@ -87,128 +80,18 @@ func main() {
var storDb engine.LoadStorage
var rater, cdrstats, users rpcclient.RpcClientConnection
var loader engine.LoadReader
if *migrateRC8 != "" {
if *datadb_type == "redis" {
var db_nb int
db_nb, err = strconv.Atoi(*datadb_name)
if err != nil {
log.Print("Redis db name must be an integer!")
return
}
host := *datadb_host
if *datadb_port != "" {
host += ":" + *datadb_port
}
migratorRC8dat, err := NewMigratorRC8(host, db_nb, *datadb_pass, *dbdata_encoding)
if err != nil {
log.Print(err.Error())
return
}
if strings.Contains(*migrateRC8, "acc") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateAccounts(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "atr") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActionTriggers(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "act") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActions(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "dcs") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateDerivedChargers(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "apl") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActionPlans(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "shg") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateSharedGroups(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "int") {
if err := migratorRC8dat.migrateAccountsInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.migrateActionTriggersInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.migrateActionsInt(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "vf") {
if err := migratorRC8dat.migrateActionsInt2(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
if *migrateRC8 == "*enforce" { // Ignore previous data, enforce to latest version information
if err := migratorRC8dat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
} else if *datadb_type == "mongo" {
mongoMigratorDat, err := NewMongoMigrator(*datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass)
if err != nil {
log.Print(err.Error())
return
}
if strings.Contains(*migrateRC8, "vf") {
if err := mongoMigratorDat.migrateActions(); err != nil {
log.Print(err.Error())
}
if err := mongoMigratorDat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
if *migrateRC8 == "*enforce" {
if err := mongoMigratorDat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
}
log.Print("Done!")
return
}
if migrate != nil && *migrate != "" { // Run migrator
dataDB, err := engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
if err := migrator.NewMigrator(dataDB, *datadb_type, *dbdata_encoding, storDB, *stor_db_type).Migrate(*migrate); err != nil {
log.Fatal(err)
}
log.Print("Done migrating!")
return
}
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
if *fromStorDb {
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize)
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
} else if *toStorDb { // Import from csv files to storDb
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
} else { // Default load from csv files to dataDb
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize)
}
// Defer databases opened to be closed when we are done
for _, db := range []engine.Storage{dataDB, storDb} {

View File

@@ -20,12 +20,57 @@ package main
import (
"flag"
"fmt"
"log"
"github.com/cgrates/cgrates/migrator"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>")
version = flag.Bool("version", false, "Prints the application version.")
dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
dataDBPort = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
dataDBName = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
dataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
dataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
storDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
storDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
storDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
storDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
storDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
storDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
oldDataDBType = flag.String("old_datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
oldDataDBHost = flag.String("old_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
oldDataDBPort = flag.String("old_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
oldDataDBName = flag.String("old_datadb_name", "11", "The name/number of the DataDb to connect to.")
oldDataDBUser = flag.String("old_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
oldDataDBPass = flag.String("old_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
oldStorDBType = flag.String("old_stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
oldStorDBHost = flag.String("old_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
oldStorDBPort = flag.String("old_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
oldStorDBName = flag.String("old_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
oldStorDBUser = flag.String("old_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
oldStorDBPass = flag.String("old_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
oldLoadHistorySize = flag.Int("old_load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
//nu salvez doar citesc din oldDb
//dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
//verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
//slice mapstring int cate acc [0]am citit si [1]cate acc am scris
//stats = flag.Bool("stats", false, "Generates statsistics about given data.")
)
func main() {
@@ -34,4 +79,36 @@ func main() {
fmt.Println(utils.GetCGRVersion())
return
}
if migrate != nil && *migrate != "" { // Run migrator
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := migrator.ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType)
if err != nil {
log.Fatal(err)
}
err = m.Migrate(*migrate);
if err != nil {
log.Fatal(err)
}
log.Print("Done migrating!")
return
}
}

View File

@@ -15,12 +15,15 @@
"data_db": {
"db_type": "mongo",
"db_host": "127.0.0.1",
"db_port": 27017,
},
"stor_db": {
"db_type": "mongo",
"db_host": "127.0.0.1",
"db_port": 27017,
"db_password":"",
},

View File

@@ -12,12 +12,31 @@
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_host": "192.168.100.40", // data_db host address
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
"db_user": "cgrates", // username to use when connecting to data_db
"db_password": "", // password to use when connecting to data_db
"load_history_size": 10, // Number of records in the load history
},
"stor_db": { // database used to store offline tariff plans and CDRs
"db_password": "CGRateS.org", // password to use when connecting to stordb
"db_type": "mysql", // stor database type to use: <mongo|mysql|postgres>
"db_host": "192.168.100.40", // the host to connect to
"db_port": 3306, // the port to reach the stordb
"db_name": "cgrates", // stor database name
"db_user": "cgrates", // username to use when connecting to stordb
"db_password": "CGRateS.org", // password to use when connecting to stordb
"max_open_conns": 100, // maximum database connections opened, not applying for mongo
"max_idle_conns": 10, // maximum database connections idle, not applying for mongo
"conn_max_lifetime": 0, // maximum amount of time in seconds a connection may be reused (0 for unlimited), not applying for mongo
"cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of mongo
},
"cache":{
"destinations": {"limit": 10000, "ttl":"0s", "precache": true},
"reverse_destinations": {"limit": 10000, "ttl":"0s", "precache": true},

View File

@@ -1,4 +1,4 @@
{
{
// CGRateS Configuration file
//
// Used for cgradmin

View File

@@ -9,69 +9,49 @@ You can increase or lower the value of step in the line after BEGIN below.
You have to use 'CALL cgrates.migration();' to execute the script. If named other then default use that database name.
*/
DELIMITER //
CREATE PROCEDURE `migration`()
BEGIN
/* DECLARE variables */
DECLARE max_cdrs bigint;
DECLARE start_id bigint;
DECLARE end_id bigint;
DECLARE step bigint;
/* Optimize table for performance */
ALTER TABLE cdrs DISABLE KEYS;
SET autocommit=0;
SET unique_checks=0;
SET foreign_key_checks=0;
/* You must change the step var to commit every step rows inserted */
SET step := 10000;
SET start_id := 0;
SET end_id := start_id + step;
SET max_cdrs = (select max(id) from rated_cdrs);
WHILE (start_id <= max_cdrs) DO
INSERT INTO
cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,supplier,extra_fields,cost,extra_info, created_at, updated_at, deleted_at)
SELECT
cdrs_primary.cgrid,
rated_cdrs.runid as run_id,
cdrs_primary.cdrhost as origin_host,
cdrs_primary.cdrsource as source,
cdrs_primary.accid as origin_id,
cdrs_primary.tor,
rated_cdrs.reqtype as request_type,
rated_cdrs.direction,
rated_cdrs.tenant,rated_cdrs.category,
rated_cdrs.account,
rated_cdrs.subject,
rated_cdrs.destination,
rated_cdrs.setup_time,
rated_cdrs.answer_time,
rated_cdrs.`usage`,
rated_cdrs.supplier,
cdrs_extra.extra_fields,
rated_cdrs.cost,
rated_cdrs.extra_info,
rated_cdrs.created_at,
rated_cdrs.updated_at,
rated_cdrs.deleted_at
FROM rated_cdrs
INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid
LEFT JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid
INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid
WHERE cdrs_primary.`usage` > '0'
AND not exists (select 1 from cdrs c where c.cgrid = cdrs_primary.cgrid)
AND rated_cdrs.id >= start_id
AND rated_cdrs.id < end_id
GROUP BY cgrid, run_id, origin_id;
SET start_id = start_id + step;
SET end_id = end_id + step;
END WHILE;
/* SET Table for live usage */
SET autocommit=1;
SET unique_checks=1;
SET foreign_key_checks=1;
ALTER TABLE cdrs ENABLE KEYS;
OPTIMIZE TABLE cdrs;
/* DECLARE variables */
DECLARE max_cdrs bigint;
DECLARE start_id bigint;
DECLARE end_id bigint;
DECLARE step bigint;
/* Optimize table for performance */
ALTER TABLE cdrs DISABLE KEYS;
SET autocommit=0;
SET unique_checks=0;
SET foreign_key_checks=0;
/* You must change the step var to commit every step rows inserted */
SET step := 10000;
SET start_id := 0;
SET end_id := start_id + step;
SET max_cdrs = (select max(id) from rated_cdrs);
WHILE (start_id <= max_cdrs) DO
INSERT INTO
cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,pdd,answer_time,`usage`,supplier,disconnect_cause,extra_fields,cost_source,cost,cost_details,extra_info, created_at, updated_at, deleted_at)
SELECT cdrs_primary.cgrid,rated_cdrs.runid as run_id,cdrs_primary.cdrhost as origin_host,cdrs_primary.cdrsource as source,cdrs_primary.accid as origin_id, cdrs_primary.tor,rated_cdrs.reqtype as request_type,rated_cdrs.direction, rated_cdrs.tenant,rated_cdrs.category, rated_cdrs.account, rated_cdrs.subject, rated_cdrs.destination,rated_cdrs.setup_time,rated_cdrs.pdd,rated_cdrs.answer_time,rated_cdrs.`usage`,rated_cdrs.supplier,rated_cdrs.disconnect_cause,cdrs_extra.extra_fields,cost_details.cost_source,rated_cdrs.cost,cost_details.timespans as cost_details,rated_cdrs.extra_info,rated_cdrs.created_at,rated_cdrs.updated_at, rated_cdrs.deleted_at
FROM rated_cdrs
INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid
INNER JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid
INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid
WHERE cdrs_primary.`usage` > '0'
AND not exists (select 1 from cdrs where cdrs.cgrid = cdrs_primary.cgrid AND cdrs.run_id=rated_cdrs.runid)
AND rated_cdrs.id >= start_id
AND rated_cdrs.id < end_id
GROUP BY cgrid, run_id, origin_id;
SET start_id = start_id + step;
SET end_id = end_id + step;
END WHILE;
/* SET Table for live usage */
SET autocommit=1;
SET unique_checks=1;
SET foreign_key_checks=1;
ALTER TABLE cdrs ENABLE KEYS;
OPTIMIZE TABLE cdrs;
END //
DELIMITER ;

View File

@@ -1837,6 +1837,8 @@ func (tps TpResources) AsTPResources() (result []*utils.TPResource) {
if tp.AllocationMessage != "" {
rl.AllocationMessage = tp.AllocationMessage
}
rl.Blocker = tp.Blocker
rl.Stored = tp.Stored
if len(tp.ActivationInterval) != 0 {
rl.ActivationInterval = new(utils.TPActivationInterval)
aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP)
@@ -1884,6 +1886,8 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) {
mdl.Weight = rl.Weight
mdl.Limit = rl.Limit
mdl.AllocationMessage = rl.AllocationMessage
mdl.Blocker = rl.Blocker
mdl.Stored = rl.Stored
if rl.ActivationInterval != nil {
if rl.ActivationInterval.ActivationTime != "" {
mdl.ActivationInterval = rl.ActivationInterval.ActivationTime
@@ -1894,20 +1898,20 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) {
}
for i, val := range rl.Thresholds {
if i != 0 {
mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val
} else {
mdl.Thresholds = val
mdl.Thresholds += utils.INFIELD_SEP
}
mdl.Thresholds += val
}
}
mdl.FilterType = fltr.Type
mdl.FilterFieldName = fltr.FieldName
for i, val := range fltr.Values {
if i != 0 {
mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
} else {
mdl.FilterFieldValues = val
mdl.FilterFieldValues += utils.INFIELD_SEP
}
mdl.FilterFieldValues += val
}
mdls = append(mdls, mdl)
}
@@ -1961,6 +1965,13 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) {
Stored: tp.Stored,
}
}
if tp.Blocker == false || tp.Blocker == true {
st.Blocker = tp.Blocker
}
if tp.Stored == false || tp.Stored == true {
st.Stored = tp.Stored
}
if tp.QueueLength != 0 {
st.QueueLength = tp.QueueLength
}
@@ -2030,8 +2041,11 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
}
mdl.Metrics += val
}
for _, val := range st.Thresholds {
mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val
for i, val := range st.Thresholds {
if i != 0 {
mdl.Thresholds += utils.INFIELD_SEP
}
mdl.Thresholds += val
}
if st.ActivationInterval != nil {
if st.ActivationInterval.ActivationTime != "" {
@@ -2046,10 +2060,9 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) {
mdl.FilterFieldName = fltr.FieldName
for i, val := range fltr.Values {
if i != 0 {
mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val
} else {
mdl.FilterFieldValues = val
mdl.FilterFieldValues += utils.INFIELD_SEP
}
mdl.FilterFieldValues += val
}
mdls = append(mdls, mdl)
}

View File

@@ -1,4 +1,4 @@
/*
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH

View File

@@ -1,4 +1,4 @@
/*
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH

View File

@@ -1440,6 +1440,8 @@ func testStorDBitCRUDTpResources(t *testing.T) {
Values: []string{"test1", "test2"},
},
},
Blocker: true,
Stored: true,
},
&utils.TPResource{
TPid: "testTPid",
@@ -1455,6 +1457,8 @@ func testStorDBitCRUDTpResources(t *testing.T) {
Values: []string{"test1", "test2"},
},
},
Blocker: true,
Stored: false,
},
}
if err := storDB.SetTPResources(snd); err != nil {

103
migrator/accounts.go Normal file → Executable file
View File

@@ -25,7 +25,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2/bson"
)
const (
@@ -34,92 +33,31 @@ const (
)
func (m *Migrator) migrateAccounts() (err error) {
switch m.dataDBType {
case utils.REDIS:
var acntV1Keys []string
acntV1Keys, err = m.dataDB.GetKeysForPrefix(v1AccountDBPrefix)
if err != nil {
return
var v1Acnt *v1Account
for {
v1Acnt, err = m.oldDataDB.getv1Account()
if err != nil && err != utils.ErrNoMoreData {
return err
}
for _, acntV1Key := range acntV1Keys {
v1Acnt, err := m.getV1AccountFromDB(acntV1Key)
if err != nil {
if err == utils.ErrNoMoreData {
break
}
if v1Acnt != nil {
acnt := v1Acnt.AsAccount()
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
}
if v1Acnt != nil {
acnt := v1Acnt.AsAccount()
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var accn v1Account
iter := mgoDB.C(v1AccountDBPrefix).Find(nil).Iter()
for iter.Next(&accn) {
if acnt := accn.AsAccount(); acnt != nil {
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
default:
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateAccounts method", m.dataDBType))
}
}
func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
v1Acnt := &v1Account{Id: key}
if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil {
return nil, err
}
return v1Acnt, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1Acnt := new(v1Account)
if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil {
return nil, err
}
return v1Acnt, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.dataDBType))
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
}
type v1Account struct {
@@ -183,7 +121,7 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) {
for oldBalKey, oldBalChain := range v1Acc.BalanceMap {
keyElements := strings.Split(oldBalKey, "*")
newBalKey := "*" + keyElements[1]
newBalDirection := "*" + idElements[0]
newBalDirection := idElements[0]
ac.BalanceMap[newBalKey] = make(engine.Balances, len(oldBalChain))
for index, oldBal := range oldBalChain {
// check default to set new id
@@ -296,9 +234,6 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) {
if oldAtr.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight)
}
if oldAtr.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled)
}
if !oldAtr.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate)
}

8
migrator/accounts_test.go Normal file → Executable file
View File

@@ -27,7 +27,7 @@ import (
func TestV1AccountAsAccount(t *testing.T) {
v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}}
v1Acc := &v1Account{Id: "OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}}}}}
v1Acc := &v1Account{Id: "*OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}}}}}
v2 := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}, RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}}
m2 := &engine.Balance{Uuid: "", ID: "", Value: 21, Directions: utils.StringMap{"*OUT": true}, DestinationIDs: utils.NewStringMap(""), RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}}
testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}}
@@ -35,7 +35,9 @@ func TestV1AccountAsAccount(t *testing.T) {
t.Errorf("Expecting: false, received: true")
}
newAcc := v1Acc.AsAccount()
if !reflect.DeepEqual(testAccount, newAcc) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, newAcc)
if !reflect.DeepEqual(testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) {
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0])
} else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) {
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0])
}
}

View File

@@ -19,10 +19,11 @@ package migrator
import (
"fmt"
// /"log"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2/bson"
//"gopkg.in/mgo.v2/bson"
)
type v1Action struct {
@@ -39,96 +40,37 @@ type v1Action struct {
type v1Actions []*v1Action
func (m *Migrator) migrateActions() (err error) {
switch m.dataDBType {
case utils.REDIS:
var acts engine.Actions
var actv1keys []string
actv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
return
var v1ACs *v1Actions
var acts engine.Actions
for {
v1ACs, err = m.oldDataDB.getV1Actions()
if err != nil && err != utils.ErrNoMoreData {
return err
}
for _, actv1key := range actv1keys {
v1act, err := m.getV1ActionFromDB(actv1key)
if err != nil {
if err == utils.ErrNoMoreData {
break
}
if *v1ACs != nil {
for _, v1ac := range *v1ACs {
act := v1ac.AsAction()
acts = append(acts, act)
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
act := v1act.AsAction()
acts = append(acts, act)
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var acts engine.Actions
var v1act v1Action
iter := mgoDB.C(utils.ACTION_PREFIX).Find(nil).Iter()
for iter.Next(&v1act) {
act := v1act.AsAction()
acts = append(acts, act)
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
default:
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateActions method", m.dataDBType))
}
}
func (m *Migrator) getV1ActionFromDB(key string) (v1act *v1Action, err error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
v1act := &v1Action{Id: key}
if err := m.mrshlr.Unmarshal(strVal, v1act); err != nil {
return nil, err
}
return v1act, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1act := new(v1Action)
if err := mgoDB.C(utils.ACTION_PREFIX).Find(bson.M{"id": key}).One(v1act); err != nil {
return nil, err
}
return v1act, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
}
func (v1Act v1Action) AsAction() (act *engine.Action) {

View File

@@ -22,8 +22,6 @@ import (
"strings"
"time"
"gopkg.in/mgo.v2/bson"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -53,89 +51,33 @@ func (at *v1ActionPlan) IsASAP() bool {
}
func (m *Migrator) migrateActionPlans() (err error) {
switch m.dataDBType {
case utils.REDIS:
var apsv1keys []string
apsv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return
var v1APs *v1ActionPlans
for {
v1APs, err = m.oldDataDB.getV1ActionPlans()
if err != nil && err != utils.ErrNoMoreData {
return err
}
for _, apsv1key := range apsv1keys {
v1aps, err := m.getV1ActionPlansFromDB(apsv1key)
if err != nil {
return err
}
aps := v1aps.AsActionPlan()
if err = m.dataDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil {
return err
if err == utils.ErrNoMoreData {
break
}
if *v1APs != nil {
for _, v1ap := range *v1APs {
ap := v1ap.AsActionPlan()
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_PLAN_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionPlans version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var acp v1ActionPlan
iter := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(nil).Iter()
for iter.Next(&acp) {
aps := acp.AsActionPlan()
if err = m.dataDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil {
return err
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
default:
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateActionPlans method", m.dataDBType))
}
}
func (m *Migrator) getV1ActionPlansFromDB(key string) (v1aps *v1ActionPlan, err error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
v1aps := &v1ActionPlan{Id: key}
if err := m.mrshlr.Unmarshal(strVal, v1aps); err != nil {
return nil, err
}
return v1aps, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1aps := new(v1ActionPlan)
if err := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(bson.M{"id": key}).One(v1aps); err != nil {
return nil, err
}
return v1aps, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType))
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
}
func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) {

View File

@@ -5,127 +5,69 @@ import (
"strings"
"time"
"gopkg.in/mgo.v2/bson"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type v1ActionTrigger struct {
Id string
ThresholdType string
Id string // for visual identification
ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance
// stats: *min_asr, *max_asr, *min_acd, *max_acd, *min_tcd, *max_tcd, *min_acc, *max_acc, *min_tcc, *max_tcc
ThresholdValue float64
Recurrent bool
MinSleep time.Duration
Recurrent bool // reset eexcuted flag each run
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
BalanceId string
BalanceType string
BalanceDirection string
BalanceDestinationIds string
BalanceWeight float64
BalanceExpirationDate time.Time
BalanceTimingTags string
BalanceRatingSubject string
BalanceCategory string
BalanceSharedGroup string
BalanceDisabled bool
BalanceDestinationIds string // filter for balance
BalanceWeight float64 // filter for balance
BalanceExpirationDate time.Time // filter for balance
BalanceTimingTags string // filter for balance
BalanceRatingSubject string // filter for balance
BalanceCategory string // filter for balance
BalanceSharedGroup string // filter for balance
Weight float64
ActionsId string
MinQueuedItems int
MinQueuedItems int // Trigger actions only if this number is hit (stats only)
Executed bool
lastExecutionTime time.Time
}
type v1ActionTriggers []*v1ActionTrigger
func (m *Migrator) migrateActionTriggers() (err error) {
switch m.dataDBType {
case utils.REDIS:
var atrrs engine.ActionTriggers
var v1atrskeys []string
v1atrskeys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return
var v1ACTs *v1ActionTriggers
var acts engine.ActionTriggers
for {
v1ACTs, err = m.oldDataDB.getV1ActionTriggers()
if err != nil && err != utils.ErrNoMoreData {
return err
}
for _, v1atrskey := range v1atrskeys {
v1atrs, err := m.getV1ActionTriggerFromDB(v1atrskey)
if err != nil {
if err == utils.ErrNoMoreData {
break
}
if *v1ACTs != nil {
for _, v1ac := range *v1ACTs {
act := v1ac.AsActionTrigger()
acts = append(acts, act)
}
if err := m.dataDB.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
return err
}
v1atr := v1atrs
if v1atrs != nil {
atr := v1atr.AsActionTrigger()
atrrs = append(atrrs, atr)
}
}
if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil {
return err
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var atrrs engine.ActionTriggers
var v1atr v1ActionTrigger
iter := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter()
for iter.Next(&v1atr) {
atr := v1atr.AsActionTrigger()
atrrs = append(atrrs, atr)
}
if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil {
return err
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error()))
}
return
default:
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateActionTriggers method", m.dataDBType))
}
}
func (m *Migrator) getV1ActionTriggerFromDB(key string) (v1Atr *v1ActionTrigger, err error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
v1Atr := &v1ActionTrigger{Id: key}
if err := m.mrshlr.Unmarshal(strVal, &v1Atr); err != nil {
return nil, err
}
return v1Atr, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1Atr := new(v1ActionTrigger)
if err := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(bson.M{"id": key}).One(v1Atr); err != nil {
return nil, err
}
return v1Atr, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getV1ActionTriggerFromDB method", m.dataDBType))
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
}
func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) {
@@ -169,9 +111,6 @@ func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) {
if v1Act.BalanceWeight != 0 {
bf.Weight = utils.Float64Pointer(v1Act.BalanceWeight)
}
if v1Act.BalanceDisabled != false {
bf.Disabled = utils.BoolPointer(v1Act.BalanceDisabled)
}
if !v1Act.BalanceExpirationDate.IsZero() {
bf.ExpirationDate = utils.TimePointer(v1Act.BalanceExpirationDate)
at.ExpirationDate = v1Act.BalanceExpirationDate

View File

@@ -21,6 +21,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log"
"time"
"github.com/cgrates/cgrates/engine"
@@ -47,12 +48,13 @@ func (m *Migrator) migrateCostDetails() (err error) {
"version number is not defined for CostDetails model")
}
if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1
log.Print("Wrong version")
return
}
var storSQL *sql.DB
switch m.storDBType {
case utils.MYSQL:
storSQL = m.storDB.(*engine.MySQLStorage).Db
storSQL = m.storDB.(*engine.SQLStorage).Db
case utils.POSTGRES:
storSQL = m.storDB.(*engine.PostgresStorage).Db
default:
@@ -61,19 +63,22 @@ func (m *Migrator) migrateCostDetails() (err error) {
utils.UnsupportedDB,
fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
}
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL")
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error()))
}
defer rows.Close()
for cnt := 0; rows.Next(); cnt++ {
var id int64
var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString
var ccCost sql.NullFloat64
var tts []byte
if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -89,6 +94,7 @@ func (m *Migrator) migrateCostDetails() (err error) {
v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String,
Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String,
Cost: ccCost.Float64, Timespans: v1tmsps}
cc := v1CC.AsCallCost()
if cc == nil {
utils.Logger.Warning(

41
migrator/migrator.go Normal file → Executable file
View File

@@ -19,28 +19,42 @@ package migrator
import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string) *Migrator {
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB v1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
if dataDBEncoding == utils.MSGPACK {
mrshlr = engine.NewCodecMsgpackMarshaler()
} else if dataDBEncoding == utils.JSON {
mrshlr = new(engine.JSONMarshaler)
} else if oldDataDBEncoding == utils.MSGPACK {
oldmrshlr = engine.NewCodecMsgpackMarshaler()
} else if oldDataDBEncoding == utils.JSON {
oldmrshlr = new(engine.JSONMarshaler)
}
return &Migrator{dataDB: dataDB, dataDBType: dataDBType,
storDB: storDB, storDBType: storDBType, mrshlr: mrshlr}
m = &Migrator{
dataDB: dataDB, dataDBType: dataDBType,
storDB: storDB, storDBType: storDBType, mrshlr: mrshlr,
oldDataDB: oldDataDB, oldDataDBType: oldDataDBType,
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr: oldmrshlr,
}
return m, err
}
type Migrator struct {
dataDB engine.DataDB
dataDBType string
storDB engine.Storage
storDBType string
mrshlr engine.Marshaler
dataDB engine.DataDB
dataDBType string
storDB engine.Storage
storDBType string
mrshlr engine.Marshaler
oldDataDB v1DataDB
oldDataDBType string
oldStorDB engine.Storage
oldStorDBType string
oldmrshlr engine.Marshaler
}
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
@@ -62,15 +76,14 @@ func (m *Migrator) Migrate(taskID string) (err error) {
err = m.migrateCostDetails()
case utils.MetaAccounts:
err = m.migrateAccounts()
case "migrateActionPlans":
case utils.MetaActionPlans:
err = m.migrateActionPlans()
case "migrateActionTriggers":
case utils.MetaActionTriggers:
err = m.migrateActionTriggers()
case "migrateActions":
case utils.MetaActions:
err = m.migrateActions()
case "migrateSharedGroups":
case utils.MetaSharedGroups:
err = m.migrateSharedGroups()
}
return
}

View File

@@ -18,15 +18,15 @@ package migrator
import (
"flag"
"fmt"
// "fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"log"
"path"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
@@ -38,7 +38,40 @@ var (
dbtype string
mig *Migrator
dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here")
db_passwd = ""
dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
dataDBPort = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
dataDBName = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
dataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
dataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
storDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
storDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
storDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
storDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
storDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
storDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
oldDataDBType = flag.String("old_datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
oldDataDBHost = flag.String("old_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
oldDataDBPort = flag.String("old_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
oldDataDBName = flag.String("old_datadb_name", "11", "The name/number of the DataDb to connect to.")
oldDataDBUser = flag.String("old_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
oldDataDBPass = flag.String("old_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
oldStorDBType = flag.String("old_stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
oldStorDBHost = flag.String("old_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
oldStorDBPort = flag.String("old_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
oldStorDBName = flag.String("old_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
oldStorDBUser = flag.String("old_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
oldStorDBPass = flag.String("old_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
oldLoadHistorySize = flag.Int("old_load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
)
// subtests to be executed for each migrator
@@ -49,44 +82,73 @@ var sTestsITMigrator = []func(t *testing.T){
testMigratorActionTriggers,
testMigratorActions,
testMigratorSharedGroups,
testOnStorITFlush,
}
func TestOnStorITRedisConnect(t *testing.T) {
cfg, _ := config.NewDefaultCGRConfig()
rdsITdb, err := engine.NewRedisStorage(fmt.Sprintf("%s:%s", cfg.TpDbHost, cfg.TpDbPort), 4, cfg.TpDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, nil, 1)
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
log.Fatal(err)
}
oldDataDB, err := ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
mig, err = NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType, oldDataDB, *oldDataDBType, *oldDBDataEncoding, oldstorDB, *oldStorDBType)
if err != nil {
log.Fatal(err)
}
onStorCfg = cfg.DataDbName
mig = NewMigrator(rdsITdb, rdsITdb, utils.REDIS, utils.JSON, rdsITdb, utils.REDIS)
}
func TestOnStorITRedis(t *testing.T) {
dbtype = utils.REDIS
onStor = rdsITdb
for _, stest := range sTestsITMigrator {
t.Run("TestITMigratorOnRedis", stest)
}
}
func TestOnStorITMongoConnect(t *testing.T) {
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo")
cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo")
mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath)
if err != nil {
t.Fatal(err)
}
if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, db_passwd,
utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil {
t.Fatal(err)
dataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := ConfigureV1DataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
oldstorDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType)
if err != nil {
log.Fatal(err)
}
mongo = mgoITCfg
onStorCfg = mgoITCfg.StorDBName
mig = NewMigrator(mgoITdb, mgoITdb, utils.MONGO, utils.JSON, mgoITdb, utils.MONGO)
}
func TestOnStorITMongo(t *testing.T) {
dbtype = utils.MONGO
onStor = mgoITdb
for _, stest := range sTestsITMigrator {
t.Run("TestITMigratorOnMongo", stest)
}
@@ -101,7 +163,7 @@ func testOnStorITFlush(t *testing.T) {
t.Error("Error when flushing Redis ", err.Error())
}
case dbtype == utils.MONGO:
err := engine.InitDataDb(mongo)
err := mig.dataDB.Flush("")
if err != nil {
t.Error("Error when flushing Mongo ", err.Error())
}
@@ -110,19 +172,15 @@ func testOnStorITFlush(t *testing.T) {
func testMigratorAccounts(t *testing.T) {
v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}
v1Acc := &v1Account{Id: "OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}}
v2 := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Weight: 10, DestinationIDs: utils.StringMap{"NAT": true},
v1Acc := &v1Account{Id: "*OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}}
v2b := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Weight: 10, DestinationIDs: utils.StringMap{"NAT": true},
RatingSubject: "", Categories: utils.NewStringMap(), SharedGroups: utils.NewStringMap(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}}
m2 := &engine.Balance{Uuid: "", ID: "", Value: 21, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), DestinationIDs: utils.NewStringMap(""), RatingSubject: "",
Categories: utils.NewStringMap(), SharedGroups: utils.NewStringMap(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}}
testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}}
testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2b}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1Acc)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
err = mig.SetV1onRedis(v1AccountDBPrefix+v1Acc.Id, bit)
err := mig.oldDataDB.setV1Account(v1Acc)
if err != nil {
t.Error("Error when setting v1 acc ", err.Error())
}
@@ -134,11 +192,13 @@ func testMigratorAccounts(t *testing.T) {
if err != nil {
t.Error("Error when getting account ", err.Error())
}
if !reflect.DeepEqual(testAccount, result) {
if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) {
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0])
} else if !reflect.DeepEqual(testAccount, result) {
t.Errorf("Expecting: %+v, received: %+v", testAccount, result)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoAccount(v1AccountDBPrefix, v1Acc)
err := mig.oldDataDB.setV1Account(v1Acc)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
@@ -157,24 +217,19 @@ func testMigratorAccounts(t *testing.T) {
}
func testMigratorActionPlans(t *testing.T) {
v1ap := &v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}
v1ap := &v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1ap)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
setv1id := utils.ACTION_PLAN_PREFIX + v1ap.Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1ActionPlans(v1ap)
if err != nil {
t.Error("Error when setting v1 ActionPlan ", err.Error())
}
err = mig.Migrate("migrateActionPlans")
err = mig.Migrate(utils.MetaActionPlans)
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
result, err := mig.tpDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
@@ -186,15 +241,15 @@ func testMigratorActionPlans(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoActionPlan(utils.ACTION_PLAN_PREFIX, v1ap)
err := mig.oldDataDB.setV1ActionPlans(v1ap)
if err != nil {
t.Error("Error when setting v1 ActionPlans ", err.Error())
}
err = mig.Migrate("migrateActionPlans")
err = mig.Migrate(utils.MetaActionPlans)
if err != nil {
t.Error("Error when migrating ActionPlans ", err.Error())
}
result, err := mig.tpDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionPlan ", err.Error())
}
@@ -210,16 +265,17 @@ func testMigratorActionPlans(t *testing.T) {
func testMigratorActionTriggers(t *testing.T) {
tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local()
var v1Atr v1ActionTrigger
v1atrs := &v1ActionTrigger{
Id: "Test",
BalanceType: "*monetary",
BalanceDirection: "*out",
ThresholdType: "*max_balance",
ThresholdValue: 2,
ActionsId: "TEST_ACTIONS",
Executed: true,
BalanceExpirationDate: tim,
v1atrs := &v1ActionTriggers{
&v1ActionTrigger{
Id: "Test",
BalanceType: "*monetary",
BalanceDirection: "*out",
ThresholdType: "*max_balance",
ThresholdValue: 2,
ActionsId: "TEST_ACTIONS",
Executed: true,
BalanceExpirationDate: tim,
},
}
atrs := engine.ActionTriggers{
&engine.ActionTrigger{
@@ -241,23 +297,15 @@ func testMigratorActionTriggers(t *testing.T) {
}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1atrs)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
if err := mig.mrshlr.Unmarshal(bit, &v1Atr); err != nil {
t.Error("Error when setting v1 ActionTriggers ", err.Error())
}
setv1id := utils.ACTION_TRIGGER_PREFIX + v1atrs.Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1ActionTriggers(v1atrs)
if err != nil {
t.Error("Error when setting v1 ActionTriggers ", err.Error())
}
err = mig.Migrate("migrateActionTriggers")
err = mig.Migrate(utils.MetaActionTriggers)
if err != nil {
t.Error("Error when migrating ActionTriggers ", err.Error())
}
result, err := mig.tpDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionTriggers ", err.Error())
}
@@ -277,8 +325,8 @@ func testMigratorActionTriggers(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ExpirationDate, result[0].ExpirationDate)
} else if !reflect.DeepEqual(atrs[0].ActivationDate, result[0].ActivationDate) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].ActivationDate, result[0].ActivationDate)
} else if !reflect.DeepEqual(atrs[0].Balance, result[0].Balance) {
// t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance, result[0].Balance)
} else if !reflect.DeepEqual(atrs[0].Balance.Type, result[0].Balance.Type) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Type, result[0].Balance.Type)
} else if !reflect.DeepEqual(atrs[0].Weight, result[0].Weight) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Weight, result[0].Weight)
} else if !reflect.DeepEqual(atrs[0].ActionsID, result[0].ActionsID) {
@@ -324,74 +372,51 @@ func testMigratorActionTriggers(t *testing.T) {
} else if !reflect.DeepEqual(atrs[0].Balance.Blocker, result[0].Balance.Blocker) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, v1atrs)
if err != nil {
t.Error("Error when setting v1 ActionTriggers ", err.Error())
}
err = mig.Migrate("migrateActionTriggers")
if err != nil {
err := mig.Migrate(utils.MetaActionTriggers)
if err != nil && err != utils.ErrNotImplemented {
t.Error("Error when migrating ActionTriggers ", err.Error())
}
result, err := mig.tpDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting ActionTriggers ", err.Error())
}
if !reflect.DeepEqual(atrs[0], result[0]) {
t.Errorf("Expecting: %+v, received: %+v", atrs[0], result[0])
}
err = mig.DropV1Colection(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
t.Error("Error when flushing v1 ActionTriggers ", err.Error())
}
}
}
func testMigratorActions(t *testing.T) {
v1act := &v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}
act := engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
v1act := &v1Actions{&v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
act := &engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1act)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
setv1id := utils.ACTION_PREFIX + v1act.Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1Actions(v1act)
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
err = mig.Migrate("migrateActions")
err = mig.Migrate(utils.MetaActions)
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.tpDB.GetActions(v1act.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
if !reflect.DeepEqual(act, result) {
t.Errorf("Expecting: %+v, received: %+v", act, result)
if !reflect.DeepEqual(*act, result) {
t.Errorf("Expecting: %+v, received: %+v", *act, result)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, v1act)
err := mig.oldDataDB.setV1Actions(v1act)
if err != nil {
t.Error("Error when setting v1 Actions ", err.Error())
}
err = mig.Migrate("migrateActions")
err = mig.Migrate(utils.MetaActions)
if err != nil {
t.Error("Error when migrating Actions ", err.Error())
}
result, err := mig.tpDB.GetActions(v1act.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting Actions ", err.Error())
}
if !reflect.DeepEqual(act[0].Balance.Timings, result[0].Balance.Timings) {
t.Errorf("Expecting: %+v, received: %+v", act[0].Balance.Timings, result[0].Balance.Timings)
}
err = mig.DropV1Colection(utils.ACTION_PREFIX)
if err != nil {
t.Error("Error when flushing v1 Actions ", err.Error())
if !reflect.DeepEqual(*act, result) {
t.Errorf("Expecting: %+v, received: %+v", *act, result)
}
}
}
@@ -413,20 +438,15 @@ func testMigratorSharedGroups(t *testing.T) {
}
switch {
case dbtype == utils.REDIS:
bit, err := mig.mrshlr.Marshal(v1sg)
if err != nil {
t.Error("Error when marshaling ", err.Error())
}
setv1id := utils.SHARED_GROUP_PREFIX + v1sg.Id
err = mig.SetV1onRedis(setv1id, bit)
err := mig.oldDataDB.setV1SharedGroup(v1sg)
if err != nil {
t.Error("Error when setting v1 SharedGroup ", err.Error())
}
err = mig.Migrate("migrateSharedGroups")
err = mig.Migrate(utils.MetaSharedGroups)
if err != nil {
t.Error("Error when migrating SharedGroup ", err.Error())
}
result, err := mig.tpDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting SharedGroup ", err.Error())
}
@@ -434,20 +454,21 @@ func testMigratorSharedGroups(t *testing.T) {
t.Errorf("Expecting: %+v, received: %+v", sg, result)
}
case dbtype == utils.MONGO:
err := mig.SetV1onMongoSharedGroup(utils.SHARED_GROUP_PREFIX, v1sg)
err := mig.oldDataDB.setV1SharedGroup(v1sg)
if err != nil {
t.Error("Error when setting v1 SharedGroup ", err.Error())
}
err = mig.Migrate("migrateSharedGroups")
err = mig.Migrate(utils.MetaSharedGroups)
if err != nil {
t.Error("Error when migrating SharedGroup ", err.Error())
}
result, err := mig.tpDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional)
if err != nil {
t.Error("Error when getting SharedGroup ", err.Error())
}
if !reflect.DeepEqual(sg, result) {
t.Errorf("Expecting: %+v, received: %+v", sg, result)
}
}
}

View File

@@ -1,87 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import "github.com/cgrates/cgrates/engine"
func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) {
dataDB := m.dataDB.(*engine.RedisStorage)
if err = dataDB.Cmd("SET", key, bl).Err; err != nil {
return err
}
return
}
func (m *Migrator) SetV1onMongoAccount(pref string, x *v1Account) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).Insert(x); err != nil {
return err
}
return
}
func (m *Migrator) SetV1onMongoAction(pref string, x *v1Action) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).Insert(x); err != nil {
return err
}
return
}
func (m *Migrator) SetV1onMongoActionPlan(pref string, x *v1ActionPlan) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).Insert(x); err != nil {
return err
}
return
}
func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTrigger) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).Insert(x); err != nil {
return err
}
return
}
func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).Insert(x); err != nil {
return err
}
return
}
func (m *Migrator) DropV1Colection(pref string) (err error) {
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
if err := mgoDB.C(pref).DropCollection(); err != nil {
return err
}
return
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"gopkg.in/mgo.v2/bson"
)
type v1SharedGroup struct {
@@ -32,89 +31,31 @@ type v1SharedGroup struct {
}
func (m *Migrator) migrateSharedGroups() (err error) {
switch m.dataDBType {
case utils.REDIS:
var sgv1keys []string
sgv1keys, err = m.dataDB.GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
if err != nil {
return
var v1SG *v1SharedGroup
for {
v1SG, err = m.oldDataDB.getV1SharedGroup()
if err != nil && err != utils.ErrNoMoreData {
return err
}
for _, sgv1key := range sgv1keys {
v1sg, err := m.getv1SharedGroupFromDB(sgv1key)
if err != nil {
return err
}
sg := v1sg.AsSharedGroup()
if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil {
if err == utils.ErrNoMoreData {
break
}
if v1SG != nil {
acnt := v1SG.AsSharedGroup()
if err = m.dataDB.SetSharedGroup(acnt, utils.NonTransactional); err != nil {
return err
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error()))
}
return
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
var v1sg v1SharedGroup
iter := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
for iter.Next(&v1sg) {
sg := v1sg.AsSharedGroup()
if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil {
return err
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error()))
}
return
default:
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for migrateSharedGroups method", m.dataDBType))
}
}
func (m *Migrator) getv1SharedGroupFromDB(key string) (*v1SharedGroup, error) {
switch m.dataDBType {
case utils.REDIS:
dataDB := m.dataDB.(*engine.RedisStorage)
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
return nil, err
} else {
v1SG := &v1SharedGroup{Id: key}
if err := m.mrshlr.Unmarshal(strVal, v1SG); err != nil {
return nil, err
}
return v1SG, nil
}
case utils.MONGO:
dataDB := m.dataDB.(*engine.MongoStorage)
mgoDB := dataDB.DB()
defer mgoDB.Session.Close()
v1SG := new(v1SharedGroup)
if err := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(bson.M{"id": key}).One(v1SG); err != nil {
return nil, err
}
return v1SG, nil
default:
return nil, utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
utils.UnsupportedDB,
fmt.Sprintf("error: unsupported: <%s> for getv1SharedGroupFromDB method", m.dataDBType))
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
return
}
func (v1SG v1SharedGroup) AsSharedGroup() (sg *engine.SharedGroup) {

32
migrator/v1DataDB.go Normal file
View File

@@ -0,0 +1,32 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
type v1DataDB interface {
getKeysForPrefix(prefix string) ([]string, error)
getv1Account() (v1Acnt *v1Account, err error)
setV1Account(x *v1Account) (err error)
getV1ActionPlans() (v1aps *v1ActionPlans, err error)
setV1ActionPlans(x *v1ActionPlans) (err error)
getV1Actions() (v1acs *v1Actions, err error)
setV1Actions(x *v1Actions) (err error)
getV1ActionTriggers() (v1acts *v1ActionTriggers, err error)
setV1ActionTriggers(x *v1ActionTriggers) (err error)
getV1SharedGroup() (v1acts *v1SharedGroup, err error)
setV1SharedGroup(x *v1SharedGroup) (err error)
}

View File

@@ -0,0 +1,53 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"errors"
"fmt"
"strconv"
"github.com/cgrates/cgrates/utils"
)
func ConfigureV1DataStorage(db_type, host, port, name, user, pass, marshaler string) (db v1DataDB, err error) {
var d v1DataDB
switch db_type {
case utils.REDIS:
var db_nb int
db_nb, err = strconv.Atoi(name)
if err != nil {
utils.Logger.Crit("Redis db name must be an integer!")
return nil, err
}
if port != "" {
host += ":" + port
}
d, err = newv1RedisStorage(host, db_nb, pass, marshaler)
case utils.MONGO:
d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil)
db = d.(v1DataDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
}
if err != nil {
return nil, err
}
return d, nil
}

177
migrator/v1MongoData.go Normal file
View File

@@ -0,0 +1,177 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"fmt"
// "log"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
"gopkg.in/mgo.v2"
)
type v1Mongo struct {
session *mgo.Session
db string
v1ms engine.Marshaler
qryIter *mgo.Iter
}
type AcKeyValue struct {
Key string
Value v1Actions
}
type AtKeyValue struct {
Key string
Value v1ActionPlans
}
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) {
url := host
if port != "" {
url += ":" + port
}
if user != "" && pass != "" {
url = fmt.Sprintf("%s:%s@%s", user, pass, url)
}
if db != "" {
url += "/" + db
}
session, err := mgo.Dial(url)
if err != nil {
return nil, err
}
session.SetMode(mgo.Strong, true)
v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()}
return
}
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error) {
return nil, nil
}
//Account methods
//get
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1Acnt)
if v1Acnt == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
return v1Acnt, nil
}
//set
func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil {
return err
}
return
}
//Action methods
//get
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
var strct *AtKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
if strct == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
v1aps = &strct.Value
return v1aps, nil
}
//set
func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
}
return
}
//Actions methods
//get
func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) {
var strct *AcKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
}
v1ms.qryIter.Next(&strct)
if strct == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
v1acs = &strct.Value
return v1acs, nil
}
//set
func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
}
return
}
//ActionTriggers methods
//get
func (v1ms *v1Mongo) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
return nil, utils.ErrNotImplemented
}
//set
func (v1ms *v1Mongo) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
return utils.ErrNotImplemented
}
//Actions methods
//get
func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1sg)
if v1sg == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
return v1sg, nil
}
//set
func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
return err
}
return
}

18
migrator/v1MongoStor.go Normal file
View File

@@ -0,0 +1,18 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator

307
migrator/v1Redis.go Normal file
View File

@@ -0,0 +1,307 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"fmt"
//"log"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
)
type v1Redis struct {
dbPool *pool.Pool
ms engine.Marshaler
dataKeys []string
qryIdx *int
}
func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redis, error) {
df := func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if len(pass) != 0 {
if err = client.Cmd("AUTH", pass).Err; err != nil {
client.Close()
return nil, err
}
}
if db != 0 {
if err = client.Cmd("SELECT", db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
}
p, err := pool.NewCustom("tcp", address, 1, df)
if err != nil {
return nil, err
}
var mrshler engine.Marshaler
if mrshlerStr == utils.MSGPACK {
mrshler = engine.NewCodecMsgpackMarshaler()
} else if mrshlerStr == utils.JSON {
mrshler = new(engine.JSONMarshaler)
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
return &v1Redis{dbPool: p, ms: mrshler}, nil
}
// This CMD function get a connection from the pool.
// Handles automatic failover in case of network disconnects
func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp {
c1, err := v1rs.dbPool.Get()
if err != nil {
return redis.NewResp(err)
}
result := c1.Cmd(cmd, args...)
if result.IsType(redis.IOErr) { // Failover mecanism
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
c2, err := v1rs.dbPool.Get()
if err == nil {
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
v1rs.dbPool.Put(c2)
return result2
}
}
} else {
v1rs.dbPool.Put(c1)
}
return result
}
func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
r := v1rs.cmd("KEYS", prefix+"*")
if r.Err != nil {
return nil, r.Err
}
return r.List()
}
//Account methods
//get
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]}
if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1Acnt, nil
}
//set
func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) {
key := v1AccountDBPrefix + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//ActionPlans methods
//get
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1aps, nil
}
//set
func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//Actions methods
//get
func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1acs, nil
}
//set
func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//ActionTriggers methods
//get
func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1acts); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1acts, nil
}
//set
func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}
//SharedGroup methods
//get
func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX)
if err != nil {
return
} else if len(v1rs.dataKeys) == 0 {
return nil, utils.ErrNotFound
}
v1rs.qryIdx = utils.IntPointer(0)
}
if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 {
strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes()
if err != nil {
return nil, err
}
if err := v1rs.ms.Unmarshal(strVal, &v1sg); err != nil {
return nil, err
}
*v1rs.qryIdx = *v1rs.qryIdx + 1
} else {
v1rs.qryIdx = nil
return nil, utils.ErrNoMoreData
}
return v1sg, nil
}
//set
func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) {
key := utils.SHARED_GROUP_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
return err
}
if err = v1rs.cmd("SET", key, bit).Err; err != nil {
return err
}
return
}

18
migrator/v1SqlStor.go Normal file
View File

@@ -0,0 +1,18 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator

22
migrator/v1StorDB.go Normal file
View File

@@ -0,0 +1,22 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
// type v1StorDB interface {
// }

View File

@@ -382,6 +382,10 @@ const (
MetaScheduler = "*scheduler"
MetaCostDetails = "*cost_details"
MetaAccounts = "*accounts"
MetaActionPlans = "*action_plans"
MetaActionTriggers = "*action_triggers"
MetaActions = "*actions"
MetaSharedGroups = "*shared_groups"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"

View File

@@ -23,6 +23,7 @@ import (
)
var (
ErrNoMoreData = errors.New("NO_MORE_DATA")
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
ErrNotFound = errors.New("NOT_FOUND")
ErrTimedOut = errors.New("TIMED_OUT")