diff --git a/apier/v1/tpresources.go b/apier/v1/tpresources.go index 788806861..4be74cd37 100644 --- a/apier/v1/tpresources.go +++ b/apier/v1/tpresources.go @@ -44,9 +44,10 @@ func (self *ApierV1) GetTPResource(attr AttrGetTPResource, reply *utils.TPResour return utils.NewErrMandatoryIeMissing(missing...) } if rls, err := self.StorDb.GetTPResources(attr.TPid, attr.ID); err != nil { - return utils.NewErrServerError(err) - } else if len(rls) == 0 { - return utils.ErrNotFound + if err.Error() != utils.ErrNotFound.Error() { + err = utils.NewErrServerError(err) + } + return err } else { *reply = *rls[0] } diff --git a/apier/v1/tpresources_it_test.go b/apier/v1/tpresources_it_test.go new file mode 100644 index 000000000..d257cc8d9 --- /dev/null +++ b/apier/v1/tpresources_it_test.go @@ -0,0 +1,217 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v1 + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "net/rpc" + "net/rpc/jsonrpc" + "path" + "reflect" + "testing" +) + +var ( + tpResCfgPath string + tpResCfg *config.CGRConfig + tpResRPC *rpc.Client + tpResDataDir = "/usr/share/cgrates" + tpRes *utils.TPResource + tpResDelay int + tpResConfigDIR string //run tests for specific configuration +) + +var sTestsTPResources = []func(t *testing.T){ + testTPResInitCfg, + testTPResResetStorDb, + testTPResStartEngine, + testTPResRpcConn, + testTPResGetTPResourceBeforeSet, + testTPResSetTPResource, + testTPResGetTPResourceAfterSet, + testTPResUpdateTPResource, + testTPResGetTPResourceAfterUpdate, + testTPResRemTPResource, + testTPResGetTPResourceAfterRemove, + testTPResKillEngine, +} + +//Test start here +func TestTPResITMySql(t *testing.T) { + tpResConfigDIR = "tutmysql" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func TestTPResITMongo(t *testing.T) { + tpResConfigDIR = "tutmongo" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func TestTPResITPG(t *testing.T) { + tpResConfigDIR = "tutpostgres" + for _, stest := range sTestsTPResources { + t.Run(tpResConfigDIR, stest) + } +} + +func testTPResInitCfg(t *testing.T) { + var err error + tpResCfgPath = path.Join(tpResDataDir, "conf", "samples", tpResConfigDIR) + tpResCfg, err = config.NewCGRConfigFromFolder(tpResCfgPath) + if err != nil { + t.Error(err) + } + tpResCfg.DataFolderPath = tpResDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpResCfg) + switch tpResConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpResDelay = 4000 + default: + tpResDelay = 1000 + } +} + +// Wipe out the cdr database +func testTPResResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpResCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testTPResStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpResCfgPath, tpResDelay); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testTPResRpcConn(t *testing.T) { + var err error + tpResRPC, err = jsonrpc.Dial("tcp", tpResCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } +} + +func testTPResGetTPResourceBeforeSet(t *testing.T) { + var reply *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", AttrGetTPResource{TPid: "TPR1", ID: "Res"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPResSetTPResource(t *testing.T) { + tpRes = &utils.TPResource{ + TPid: "TPR1", + ID: "Res", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + UsageTTL: "1", + Limit: "1", + AllocationMessage: "Message", + Blocker: false, + Stored: false, + Weight: 20, + Thresholds: []string{"ValOne", "ValTwo"}, + } + var result string + if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPResGetTPResourceAfterSet(t *testing.T) { + var respond *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &respond); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRes, respond) { + t.Errorf("Expecting : %+v, received: %+v", tpRes, respond) + } +} + +func testTPResUpdateTPResource(t *testing.T) { + var result string + tpRes.Filters = []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + &utils.TPRequestFilter{ + Type: "*string_prefix", + FieldName: "Destination", + Values: []string{"10", "20"}, + }, + } + if err := tpResRPC.Call("ApierV1.SetTPResource", tpRes, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testTPResGetTPResourceAfterUpdate(t *testing.T) { + var expectedTPR *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &expectedTPR); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tpRes, expectedTPR) { + t.Errorf("Expecting: %+v, received: %+v", tpRes, expectedTPR) + } +} + +func testTPResRemTPResource(t *testing.T) { + var resp string + if err := tpResRPC.Call("ApierV1.RemTPResource", &AttrGetTPResource{TPid: tpRes.TPid, ID: tpRes.ID}, &resp); err != nil { + t.Error(err) + } else if resp != utils.OK { + t.Error("Unexpected reply returned", resp) + } +} + +func testTPResGetTPResourceAfterRemove(t *testing.T) { + var respond *utils.TPResource + if err := tpResRPC.Call("ApierV1.GetTPResource", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + +func testTPResKillEngine(t *testing.T) { + if err := engine.KillEngine(tpResDelay); err != nil { + t.Error(err) + } +} diff --git a/apier/v1/tpstats_it_test.go b/apier/v1/tpstats_it_test.go index c27b25bdf..8c39b9539 100644 --- a/apier/v1/tpstats_it_test.go +++ b/apier/v1/tpstats_it_test.go @@ -30,99 +30,140 @@ import ( "testing" ) -var tpCfgPath string -var tpCfg *config.CGRConfig -var tpRPC *rpc.Client -var tpDataDir = "/usr/share/cgrates" +var ( + tpStatCfgPath string + tpStatCfg *config.CGRConfig + tpStatRPC *rpc.Client + tpStatDataDir = "/usr/share/cgrates" + tpStat *utils.TPStats + tpStatDelay int + tpStatConfigDIR string //run tests for specific configuration +) -func TestTPStatInitCfg(t *testing.T) { +var sTestsTPStats = []func(t *testing.T){ + testTPStatsInitCfg, + testTPStatsResetStorDb, + testTPStatsStartEngine, + testTPStatsRpcConn, + testTPStatsGetTPStatBeforeSet, + testTPStatsSetTPStat, + testTPStatsGetTPStatAfterSet, + testTPStatsUpdateTPStat, + testTPStatsGetTPStatAfterUpdate, + testTPStatsRemTPStat, + testTPStatsGetTPStatAfterRemove, + testTPStatsKillEngine, +} + +//Test start here +func TestTPStatITMySql(t *testing.T) { + tpStatConfigDIR = "tutmysql" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func TestTPStatITMongo(t *testing.T) { + tpStatConfigDIR = "tutmongo" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func TestTPStatITPG(t *testing.T) { + tpStatConfigDIR = "tutpostgres" + for _, stest := range sTestsTPStats { + t.Run(tpStatConfigDIR, stest) + } +} + +func testTPStatsInitCfg(t *testing.T) { var err error - tpCfgPath = path.Join(tpDataDir, "conf", "samples", "tutmysql") - tpCfg, err = config.NewCGRConfigFromFolder(tpCfgPath) + tpStatCfgPath = path.Join(tpStatDataDir, "conf", "samples", tpStatConfigDIR) + tpStatCfg, err = config.NewCGRConfigFromFolder(tpStatCfgPath) if err != nil { t.Error(err) } - tpCfg.DataFolderPath = tpDataDir // Share DataFolderPath through config towards StoreDb for Flush() - config.SetCgrConfig(tpCfg) + tpStatCfg.DataFolderPath = tpStatDataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(tpStatCfg) + switch tpStatConfigDIR { + case "tutmongo": // Mongo needs more time to reset db, need to investigate + tpStatDelay = 4000 + default: + tpStatDelay = 1000 + } } // Wipe out the cdr database -func TestTPStatResetStorDb(t *testing.T) { - if err := engine.InitStorDb(tpCfg); err != nil { +func testTPStatsResetStorDb(t *testing.T) { + if err := engine.InitStorDb(tpStatCfg); err != nil { t.Fatal(err) } } // Start CGR Engine - -func TestTPStatStartEngine(t *testing.T) { - if _, err := engine.StopStartEngine(tpCfgPath, 1000); err != nil { +func testTPStatsStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(tpStatCfgPath, tpStatDelay); err != nil { t.Fatal(err) } } // Connect rpc client to rater -func TestTPStatRpcConn(t *testing.T) { +func testTPStatsRpcConn(t *testing.T) { var err error - tpRPC, err = jsonrpc.Dial("tcp", tpCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + tpStatRPC, err = jsonrpc.Dial("tcp", tpStatCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed if err != nil { t.Fatal(err) } } -var tpStat = &utils.TPStats{ - TPid: "TPS1", - ID: "Stat1", - Filters: []*utils.TPRequestFilter{ - &utils.TPRequestFilter{ - Type: "*string", - FieldName: "Account", - Values: []string{"1001", "1002"}, - }, - &utils.TPRequestFilter{ - Type: "*string_prefix", - FieldName: "Destination", - Values: []string{"10", "20"}, - }, - }, - ActivationInterval: &utils.TPActivationInterval{ - ActivationTime: "2014-07-29T15:00:00Z", - ExpiryTime: "", - }, - TTL: "1", - Metrics: []string{"MetricValue", "MetricValueTwo"}, - Blocker: true, - Stored: true, - Weight: 20, - Thresholds: nil, -} - -func TestTPStatGetTPStatIDs(t *testing.T) { - var reply []string - if err := tpRPC.Call("ApierV1.GetTPStatIDs", AttrGetTPStatIds{TPid: "TPS1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { +func testTPStatsGetTPStatBeforeSet(t *testing.T) { + var reply *utils.TPStats + if err := tpStatRPC.Call("ApierV1.GetTPStat", AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func TestTPStatSetTPStat(t *testing.T) { +func testTPStatsSetTPStat(t *testing.T) { + tpStat = &utils.TPStats{ + TPid: "TPS1", + ID: "Stat1", + Filters: []*utils.TPRequestFilter{ + &utils.TPRequestFilter{ + Type: "*string", + FieldName: "Account", + Values: []string{"1001", "1002"}, + }, + }, + ActivationInterval: &utils.TPActivationInterval{ + ActivationTime: "2014-07-29T15:00:00Z", + ExpiryTime: "", + }, + TTL: "1", + Metrics: []string{"MetricValue", "MetricValueTwo"}, + Blocker: false, + Stored: false, + Weight: 20, + Thresholds: []string{"ThreshValue", "ThreshValueTwo"}, + } var result string - if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } } -func TestTPStatGetTPStat(t *testing.T) { +func testTPStatsGetTPStatAfterSet(t *testing.T) { var respond *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &respond); err != nil { t.Error(err) } else if !reflect.DeepEqual(tpStat, respond) { - t.Errorf("Expecting: %+v, received: %+v", tpStat.TPid, respond.TPid) + t.Errorf("Expecting: %+v, received: %+v", tpStat, respond) } } -func TestTPStatUpdateTPStat(t *testing.T) { +func testTPStatsUpdateTPStat(t *testing.T) { var result string tpStat.Weight = 21 tpStat.Filters = []*utils.TPRequestFilter{ @@ -136,43 +177,41 @@ func TestTPStatUpdateTPStat(t *testing.T) { FieldName: "Destination", Values: []string{"10", "20"}, }, - &utils.TPRequestFilter{ - Type: "*rsr_fields", - FieldName: "", - Values: []string{"Subject(~^1.*1$)", "Destination(1002)"}, - }, } - if err := tpRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { + if err := tpStatRPC.Call("ApierV1.SetTPStat", tpStat, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } +} + +func testTPStatsGetTPStatAfterUpdate(t *testing.T) { var expectedTPS *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &expectedTPS); err != nil { t.Error(err) } else if !reflect.DeepEqual(tpStat, expectedTPS) { t.Errorf("Expecting: %+v, received: %+v", tpStat, expectedTPS) } } -func TestTPStatRemTPStat(t *testing.T) { +func testTPStatsRemTPStat(t *testing.T) { var resp string - if err := tpRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil { + if err := tpStatRPC.Call("ApierV1.RemTPStat", &AttrGetTPStat{TPid: tpStat.TPid, ID: tpStat.ID}, &resp); err != nil { t.Error(err) } else if resp != utils.OK { t.Error("Unexpected reply returned", resp) } } -func TestTPStatCheckDelete(t *testing.T) { +func testTPStatsGetTPStatAfterRemove(t *testing.T) { var respond *utils.TPStats - if err := tpRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := tpStatRPC.Call("ApierV1.GetTPStat", &AttrGetTPStat{TPid: "TPS1", ID: "Stat1"}, &respond); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } } -func TestTPStatKillEngine(t *testing.T) { - if err := engine.KillEngine(100); err != nil { +func testTPStatsKillEngine(t *testing.T) { + if err := engine.KillEngine(tpStatDelay); err != nil { t.Error(err) } } diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 738e4155d..e76350134 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -22,38 +22,31 @@ import ( "fmt" "log" "path" - "strconv" - "strings" "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/migrator" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) var ( - //separator = flag.String("separator", ",", "Default field separator") - cgrConfig, _ = config.NewDefaultCGRConfig() - migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,*enforce,acc,atr,act,dcs,apl,shg") - migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>") - datadb_type = flag.String("datadb_type", cgrConfig.DataDbType, "The type of the DataDb database ") - datadb_host = flag.String("datadb_host", cgrConfig.DataDbHost, "The DataDb host to connect to.") - datadb_port = flag.String("datadb_port", cgrConfig.DataDbPort, "The DataDb port to bind to.") - datadb_name = flag.String("datadb_name", cgrConfig.DataDbName, "The name/number of the DataDb to connect to.") - datadb_user = flag.String("datadb_user", cgrConfig.DataDbUser, "The DataDb user to sign in as.") - datadb_pass = flag.String("datadb_passwd", cgrConfig.DataDbPass, "The DataDb user's password.") + datadb_type = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") + datadb_host = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") + datadb_port = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") + datadb_name = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.") + datadb_user = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") + datadb_pass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") - stor_db_type = flag.String("stordb_type", cgrConfig.StorDBType, "The type of the storDb database ") - stor_db_host = flag.String("stordb_host", cgrConfig.StorDBHost, "The storDb host to connect to.") - stor_db_port = flag.String("stordb_port", cgrConfig.StorDBPort, "The storDb port to bind to.") - stor_db_name = flag.String("stordb_name", cgrConfig.StorDBName, "The name/number of the storDb to connect to.") - stor_db_user = flag.String("stordb_user", cgrConfig.StorDBUser, "The storDb user to sign in as.") - stor_db_pass = flag.String("stordb_passwd", cgrConfig.StorDBPass, "The storDb user's password.") + stor_db_type = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database ") + stor_db_host = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.") + stor_db_port = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.") + stor_db_name = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.") + stor_db_user = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.") + stor_db_pass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.") - dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings") + dbdata_encoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") flush = flag.Bool("flushdb", false, "Flush the database before importing") tpid = flag.String("tpid", "", "The tariff plan id from the database") @@ -66,13 +59,13 @@ var ( fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") rpcEncoding = flag.String("rpc_encoding", "json", "RPC encoding used ") - historyServer = flag.String("historys", cgrConfig.RPCJSONListen, "The history server address:port, empty to disable automatic history archiving") - ralsAddress = flag.String("rals", cgrConfig.RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") - cdrstatsAddress = flag.String("cdrstats", cgrConfig.RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") - usersAddress = flag.String("users", cgrConfig.RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads") + historyServer = flag.String("historys", config.CgrConfig().RPCJSONListen, "The history server address:port, empty to disable automatic history archiving") + ralsAddress = flag.String("rals", config.CgrConfig().RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") + cdrstatsAddress = flag.String("cdrstats", config.CgrConfig().RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads") + usersAddress = flag.String("users", config.CgrConfig().RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") - loadHistorySize = flag.Int("load_history_size", cgrConfig.LoadHistorySize, "Limit the number of records in the load history") - timezone = flag.String("timezone", cgrConfig.DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`) + loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history") + timezone = flag.String("timezone", config.CgrConfig().DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`) disable_reverse = flag.Bool("disable_reverse_mappings", false, "Will disable reverse mappings rebuilding") ) @@ -87,128 +80,18 @@ func main() { var storDb engine.LoadStorage var rater, cdrstats, users rpcclient.RpcClientConnection var loader engine.LoadReader - if *migrateRC8 != "" { - if *datadb_type == "redis" { - var db_nb int - db_nb, err = strconv.Atoi(*datadb_name) - if err != nil { - log.Print("Redis db name must be an integer!") - return - } - host := *datadb_host - if *datadb_port != "" { - host += ":" + *datadb_port - } - migratorRC8dat, err := NewMigratorRC8(host, db_nb, *datadb_pass, *dbdata_encoding) - if err != nil { - log.Print(err.Error()) - return - } - if strings.Contains(*migrateRC8, "acc") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateAccounts(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "atr") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateActionTriggers(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "act") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateActions(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "dcs") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateDerivedChargers(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "apl") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateActionPlans(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "shg") || strings.Contains(*migrateRC8, "*all") { - if err := migratorRC8dat.migrateSharedGroups(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "int") { - if err := migratorRC8dat.migrateAccountsInt(); err != nil { - log.Print(err.Error()) - } - if err := migratorRC8dat.migrateActionTriggersInt(); err != nil { - log.Print(err.Error()) - } - if err := migratorRC8dat.migrateActionsInt(); err != nil { - log.Print(err.Error()) - } - } - if strings.Contains(*migrateRC8, "vf") { - if err := migratorRC8dat.migrateActionsInt2(); err != nil { - log.Print(err.Error()) - } - if err := migratorRC8dat.writeVersion(); err != nil { - log.Print(err.Error()) - } - } - if *migrateRC8 == "*enforce" { // Ignore previous data, enforce to latest version information - if err := migratorRC8dat.writeVersion(); err != nil { - log.Print(err.Error()) - } - } - } else if *datadb_type == "mongo" { - mongoMigratorDat, err := NewMongoMigrator(*datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass) - if err != nil { - log.Print(err.Error()) - return - } - if strings.Contains(*migrateRC8, "vf") { - if err := mongoMigratorDat.migrateActions(); err != nil { - log.Print(err.Error()) - } - if err := mongoMigratorDat.writeVersion(); err != nil { - log.Print(err.Error()) - } - } - if *migrateRC8 == "*enforce" { - if err := mongoMigratorDat.writeVersion(); err != nil { - log.Print(err.Error()) - } - } - } - log.Print("Done!") - return - } - if migrate != nil && *migrate != "" { // Run migrator - dataDB, err := engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize) - if err != nil { - log.Fatal(err) - } - storDB, err := engine.ConfigureStorStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding, - cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes) - if err != nil { - log.Fatal(err) - } - if err := migrator.NewMigrator(dataDB, *datadb_type, *dbdata_encoding, storDB, *stor_db_type).Migrate(*migrate); err != nil { - log.Fatal(err) - } - log.Print("Done migrating!") - return - } // Init necessary db connections, only if not already if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb if *fromStorDb { - dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize) + dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize) storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding, - cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes) + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) } else if *toStorDb { // Import from csv files to storDb storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding, - cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes) + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) } else { // Default load from csv files to dataDb - dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize) + dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize) } // Defer databases opened to be closed when we are done for _, db := range []engine.Storage{dataDB, storDb} { diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 82df1963b..7716f422d 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -20,12 +20,57 @@ package main import ( "flag" "fmt" + "log" + "github.com/cgrates/cgrates/migrator" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) var ( + migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>") version = flag.Bool("version", false, "Prints the application version.") + + dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") + dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") + dataDBPort = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") + dataDBName = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.") + dataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") + dataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") + + storDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database ") + storDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.") + storDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.") + storDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.") + storDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.") + storDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.") + + oldDataDBType = flag.String("old_datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") + oldDataDBHost = flag.String("old_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") + oldDataDBPort = flag.String("old_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") + oldDataDBName = flag.String("old_datadb_name", "11", "The name/number of the DataDb to connect to.") + oldDataDBUser = flag.String("old_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") + oldDataDBPass = flag.String("old_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") + + oldStorDBType = flag.String("old_stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database ") + oldStorDBHost = flag.String("old_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.") + oldStorDBPort = flag.String("old_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.") + oldStorDBName = flag.String("old_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.") + oldStorDBUser = flag.String("old_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.") + oldStorDBPass = flag.String("old_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.") + + loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history") + oldLoadHistorySize = flag.Int("old_load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history") + + dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") + oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") + //nu salvez doar citesc din oldDb + //dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.") + //verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") + //slice mapstring int cate acc [0]am citit si [1]cate acc am scris + //stats = flag.Bool("stats", false, "Generates statsistics about given data.") + ) func main() { @@ -34,4 +79,36 @@ func main() { fmt.Println(utils.GetCGRVersion()) return } +if migrate != nil && *migrate != "" { // Run migrator + + dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } + oldDataDB, err := migrator.ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding) + if err != nil { + log.Fatal(err) + } + storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType) + if err != nil { + log.Fatal(err) + } + err = m.Migrate(*migrate); + if err != nil { + log.Fatal(err) + } + + log.Print("Done migrating!") + return + } } diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index 4df7ad963..6286b22e0 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -15,12 +15,15 @@ "data_db": { "db_type": "mongo", + "db_host": "127.0.0.1", "db_port": 27017, }, "stor_db": { "db_type": "mongo", + "db_host": "127.0.0.1", "db_port": 27017, + "db_password":"", }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 793278d1a..a33ff2314 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -12,12 +12,31 @@ "rpc_gob": ":2013", "http": ":2080", }, +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_host": "192.168.100.40", // data_db host address + "db_port": 6379, // data_db port to reach the database + "db_name": "10", // data_db database name to connect to + "db_user": "cgrates", // username to use when connecting to data_db + "db_password": "", // password to use when connecting to data_db + "load_history_size": 10, // Number of records in the load history +}, "stor_db": { // database used to store offline tariff plans and CDRs - "db_password": "CGRateS.org", // password to use when connecting to stordb + "db_type": "mysql", // stor database type to use: + "db_host": "192.168.100.40", // the host to connect to + "db_port": 3306, // the port to reach the stordb + "db_name": "cgrates", // stor database name + "db_user": "cgrates", // username to use when connecting to stordb + "db_password": "CGRateS.org", // password to use when connecting to stordb + "max_open_conns": 100, // maximum database connections opened, not applying for mongo + "max_idle_conns": 10, // maximum database connections idle, not applying for mongo + "conn_max_lifetime": 0, // maximum amount of time in seconds a connection may be reused (0 for unlimited), not applying for mongo + "cdrs_indexes": [], // indexes on cdrs table to speed up queries, used only in case of mongo }, + "cache":{ "destinations": {"limit": 10000, "ttl":"0s", "precache": true}, "reverse_destinations": {"limit": 10000, "ttl":"0s", "precache": true}, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 0620c5679..f40477ccb 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -1,4 +1,4 @@ -{ + { // CGRateS Configuration file // // Used for cgradmin diff --git a/data/storage/mysql/mysql_cdr_migration.sql b/data/storage/mysql/mysql_cdr_migration.sql index 833bad2f1..8a3c39a5a 100644 --- a/data/storage/mysql/mysql_cdr_migration.sql +++ b/data/storage/mysql/mysql_cdr_migration.sql @@ -9,69 +9,49 @@ You can increase or lower the value of step in the line after BEGIN below. You have to use 'CALL cgrates.migration();' to execute the script. If named other then default use that database name. */ + DELIMITER // CREATE PROCEDURE `migration`() BEGIN - /* DECLARE variables */ - DECLARE max_cdrs bigint; - DECLARE start_id bigint; - DECLARE end_id bigint; - DECLARE step bigint; - /* Optimize table for performance */ - ALTER TABLE cdrs DISABLE KEYS; - SET autocommit=0; - SET unique_checks=0; - SET foreign_key_checks=0; - /* You must change the step var to commit every step rows inserted */ - SET step := 10000; - SET start_id := 0; - SET end_id := start_id + step; - SET max_cdrs = (select max(id) from rated_cdrs); - WHILE (start_id <= max_cdrs) DO - INSERT INTO - cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,supplier,extra_fields,cost,extra_info, created_at, updated_at, deleted_at) - SELECT - cdrs_primary.cgrid, - rated_cdrs.runid as run_id, - cdrs_primary.cdrhost as origin_host, - cdrs_primary.cdrsource as source, - cdrs_primary.accid as origin_id, - cdrs_primary.tor, - rated_cdrs.reqtype as request_type, - rated_cdrs.direction, - rated_cdrs.tenant,rated_cdrs.category, - rated_cdrs.account, - rated_cdrs.subject, - rated_cdrs.destination, - rated_cdrs.setup_time, - rated_cdrs.answer_time, - rated_cdrs.`usage`, - rated_cdrs.supplier, - cdrs_extra.extra_fields, - rated_cdrs.cost, - rated_cdrs.extra_info, - rated_cdrs.created_at, - rated_cdrs.updated_at, - rated_cdrs.deleted_at - FROM rated_cdrs - INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid - LEFT JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid - INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid - WHERE cdrs_primary.`usage` > '0' - AND not exists (select 1 from cdrs c where c.cgrid = cdrs_primary.cgrid) - AND rated_cdrs.id >= start_id - AND rated_cdrs.id < end_id - GROUP BY cgrid, run_id, origin_id; - SET start_id = start_id + step; - SET end_id = end_id + step; - END WHILE; - /* SET Table for live usage */ - SET autocommit=1; - SET unique_checks=1; - SET foreign_key_checks=1; - ALTER TABLE cdrs ENABLE KEYS; - OPTIMIZE TABLE cdrs; + /* DECLARE variables */ + DECLARE max_cdrs bigint; + DECLARE start_id bigint; + DECLARE end_id bigint; + DECLARE step bigint; + /* Optimize table for performance */ + ALTER TABLE cdrs DISABLE KEYS; + SET autocommit=0; + SET unique_checks=0; + SET foreign_key_checks=0; + /* You must change the step var to commit every step rows inserted */ + SET step := 10000; + SET start_id := 0; + SET end_id := start_id + step; + SET max_cdrs = (select max(id) from rated_cdrs); + WHILE (start_id <= max_cdrs) DO + INSERT INTO + cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,pdd,answer_time,`usage`,supplier,disconnect_cause,extra_fields,cost_source,cost,cost_details,extra_info, created_at, updated_at, deleted_at) + SELECT cdrs_primary.cgrid,rated_cdrs.runid as run_id,cdrs_primary.cdrhost as origin_host,cdrs_primary.cdrsource as source,cdrs_primary.accid as origin_id, cdrs_primary.tor,rated_cdrs.reqtype as request_type,rated_cdrs.direction, rated_cdrs.tenant,rated_cdrs.category, rated_cdrs.account, rated_cdrs.subject, rated_cdrs.destination,rated_cdrs.setup_time,rated_cdrs.pdd,rated_cdrs.answer_time,rated_cdrs.`usage`,rated_cdrs.supplier,rated_cdrs.disconnect_cause,cdrs_extra.extra_fields,cost_details.cost_source,rated_cdrs.cost,cost_details.timespans as cost_details,rated_cdrs.extra_info,rated_cdrs.created_at,rated_cdrs.updated_at, rated_cdrs.deleted_at + FROM rated_cdrs + INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid + INNER JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid + INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid + WHERE cdrs_primary.`usage` > '0' + AND not exists (select 1 from cdrs where cdrs.cgrid = cdrs_primary.cgrid AND cdrs.run_id=rated_cdrs.runid) + AND rated_cdrs.id >= start_id + AND rated_cdrs.id < end_id + GROUP BY cgrid, run_id, origin_id; + SET start_id = start_id + step; + SET end_id = end_id + step; + END WHILE; + /* SET Table for live usage */ + SET autocommit=1; + SET unique_checks=1; + SET foreign_key_checks=1; + ALTER TABLE cdrs ENABLE KEYS; + OPTIMIZE TABLE cdrs; END // DELIMITER ; + diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 45e156624..509b943a5 100755 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1837,6 +1837,8 @@ func (tps TpResources) AsTPResources() (result []*utils.TPResource) { if tp.AllocationMessage != "" { rl.AllocationMessage = tp.AllocationMessage } + rl.Blocker = tp.Blocker + rl.Stored = tp.Stored if len(tp.ActivationInterval) != 0 { rl.ActivationInterval = new(utils.TPActivationInterval) aiSplt := strings.Split(tp.ActivationInterval, utils.INFIELD_SEP) @@ -1884,6 +1886,8 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) { mdl.Weight = rl.Weight mdl.Limit = rl.Limit mdl.AllocationMessage = rl.AllocationMessage + mdl.Blocker = rl.Blocker + mdl.Stored = rl.Stored if rl.ActivationInterval != nil { if rl.ActivationInterval.ActivationTime != "" { mdl.ActivationInterval = rl.ActivationInterval.ActivationTime @@ -1894,20 +1898,20 @@ func APItoModelResource(rl *utils.TPResource) (mdls TpResources) { } for i, val := range rl.Thresholds { if i != 0 { - mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val - } else { - mdl.Thresholds = val + mdl.Thresholds += utils.INFIELD_SEP } + mdl.Thresholds += val + } } mdl.FilterType = fltr.Type mdl.FilterFieldName = fltr.FieldName for i, val := range fltr.Values { if i != 0 { - mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val - } else { - mdl.FilterFieldValues = val + mdl.FilterFieldValues += utils.INFIELD_SEP } + mdl.FilterFieldValues += val + } mdls = append(mdls, mdl) } @@ -1961,6 +1965,13 @@ func (tps TpStatsS) AsTPStats() (result []*utils.TPStats) { Stored: tp.Stored, } } + if tp.Blocker == false || tp.Blocker == true { + st.Blocker = tp.Blocker + } + if tp.Stored == false || tp.Stored == true { + st.Stored = tp.Stored + } + if tp.QueueLength != 0 { st.QueueLength = tp.QueueLength } @@ -2030,8 +2041,11 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { } mdl.Metrics += val } - for _, val := range st.Thresholds { - mdl.Thresholds = mdl.Thresholds + utils.INFIELD_SEP + val + for i, val := range st.Thresholds { + if i != 0 { + mdl.Thresholds += utils.INFIELD_SEP + } + mdl.Thresholds += val } if st.ActivationInterval != nil { if st.ActivationInterval.ActivationTime != "" { @@ -2046,10 +2060,9 @@ func APItoModelStats(st *utils.TPStats) (mdls TpStatsS) { mdl.FilterFieldName = fltr.FieldName for i, val := range fltr.Values { if i != 0 { - mdl.FilterFieldValues = mdl.FilterFieldValues + utils.INFIELD_SEP + val - } else { - mdl.FilterFieldValues = val + mdl.FilterFieldValues += utils.INFIELD_SEP } + mdl.FilterFieldValues += val } mdls = append(mdls, mdl) } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index b1f1300d6..412a90e20 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1,4 +1,4 @@ -/* +/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 37eee94d7..bd585440f 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1,4 +1,4 @@ -/* +/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH diff --git a/engine/stordb_it_test.go b/engine/stordb_it_test.go index 47c785177..c3dbdd0cb 100755 --- a/engine/stordb_it_test.go +++ b/engine/stordb_it_test.go @@ -1440,6 +1440,8 @@ func testStorDBitCRUDTpResources(t *testing.T) { Values: []string{"test1", "test2"}, }, }, + Blocker: true, + Stored: true, }, &utils.TPResource{ TPid: "testTPid", @@ -1455,6 +1457,8 @@ func testStorDBitCRUDTpResources(t *testing.T) { Values: []string{"test1", "test2"}, }, }, + Blocker: true, + Stored: false, }, } if err := storDB.SetTPResources(snd); err != nil { diff --git a/migrator/accounts.go b/migrator/accounts.go old mode 100644 new mode 100755 index 7548deacf..ebaa23755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "gopkg.in/mgo.v2/bson" ) const ( @@ -34,92 +33,31 @@ const ( ) func (m *Migrator) migrateAccounts() (err error) { - switch m.dataDBType { - case utils.REDIS: - var acntV1Keys []string - acntV1Keys, err = m.dataDB.GetKeysForPrefix(v1AccountDBPrefix) - if err != nil { - return + var v1Acnt *v1Account + for { + v1Acnt, err = m.oldDataDB.getv1Account() + if err != nil && err != utils.ErrNoMoreData { + return err } - for _, acntV1Key := range acntV1Keys { - v1Acnt, err := m.getV1AccountFromDB(acntV1Key) - if err != nil { + if err == utils.ErrNoMoreData { + break + } + if v1Acnt != nil { + acnt := v1Acnt.AsAccount() + if err = m.dataDB.SetAccount(acnt); err != nil { return err } - if v1Acnt != nil { - acnt := v1Acnt.AsAccount() - if err = m.dataDB.SetAccount(acnt); err != nil { - return err - } - } } - // All done, update version wtih current one - vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var accn v1Account - iter := mgoDB.C(v1AccountDBPrefix).Find(nil).Iter() - for iter.Next(&accn) { - if acnt := accn.AsAccount(); acnt != nil { - if err = m.dataDB.SetAccount(acnt); err != nil { - return err - } - } - } - // All done, update version wtih current one - vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - default: + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateAccounts method", m.dataDBType)) - } -} - -func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) { - switch m.dataDBType { - case utils.REDIS: - dataDB := m.dataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1Acnt := &v1Account{Id: key} - if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil { - return nil, err - } - return v1Acnt, nil - } - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1Acnt := new(v1Account) - if err := mgoDB.C(v1AccountTBL).Find(bson.M{"id": key}).One(v1Acnt); err != nil { - return nil, err - } - return v1Acnt, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getV1AccountFromDB method", m.dataDBType)) + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } + return } type v1Account struct { @@ -183,7 +121,7 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) { for oldBalKey, oldBalChain := range v1Acc.BalanceMap { keyElements := strings.Split(oldBalKey, "*") newBalKey := "*" + keyElements[1] - newBalDirection := "*" + idElements[0] + newBalDirection := idElements[0] ac.BalanceMap[newBalKey] = make(engine.Balances, len(oldBalChain)) for index, oldBal := range oldBalChain { // check default to set new id @@ -296,9 +234,6 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) { if oldAtr.BalanceWeight != 0 { bf.Weight = utils.Float64Pointer(oldAtr.BalanceWeight) } - if oldAtr.BalanceDisabled != false { - bf.Disabled = utils.BoolPointer(oldAtr.BalanceDisabled) - } if !oldAtr.BalanceExpirationDate.IsZero() { bf.ExpirationDate = utils.TimePointer(oldAtr.BalanceExpirationDate) } diff --git a/migrator/accounts_test.go b/migrator/accounts_test.go old mode 100644 new mode 100755 index 37665bcce..f613e2bee --- a/migrator/accounts_test.go +++ b/migrator/accounts_test.go @@ -27,7 +27,7 @@ import ( func TestV1AccountAsAccount(t *testing.T) { v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}} - v1Acc := &v1Account{Id: "OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}}}}} + v1Acc := &v1Account{Id: "*OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}}}}} v2 := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}, RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}} m2 := &engine.Balance{Uuid: "", ID: "", Value: 21, Directions: utils.StringMap{"*OUT": true}, DestinationIDs: utils.NewStringMap(""), RatingSubject: "", Categories: utils.NewStringMap(""), SharedGroups: utils.NewStringMap(""), Timings: []*engine.RITiming{&engine.RITiming{StartTime: "00:00:00"}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}} testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}} @@ -35,7 +35,9 @@ func TestV1AccountAsAccount(t *testing.T) { t.Errorf("Expecting: false, received: true") } newAcc := v1Acc.AsAccount() - if !reflect.DeepEqual(testAccount, newAcc) { - t.Errorf("Expecting: %+v, received: %+v", testAccount, newAcc) + if !reflect.DeepEqual(testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) { + t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) + } else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) { + t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) } } diff --git a/migrator/action.go b/migrator/action.go index bb0843189..62861787f 100644 --- a/migrator/action.go +++ b/migrator/action.go @@ -19,10 +19,11 @@ package migrator import ( "fmt" + // /"log" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "gopkg.in/mgo.v2/bson" + //"gopkg.in/mgo.v2/bson" ) type v1Action struct { @@ -39,96 +40,37 @@ type v1Action struct { type v1Actions []*v1Action func (m *Migrator) migrateActions() (err error) { - switch m.dataDBType { - case utils.REDIS: - var acts engine.Actions - var actv1keys []string - actv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PREFIX) - if err != nil { - return + var v1ACs *v1Actions + var acts engine.Actions + for { + v1ACs, err = m.oldDataDB.getV1Actions() + if err != nil && err != utils.ErrNoMoreData { + return err } - for _, actv1key := range actv1keys { - v1act, err := m.getV1ActionFromDB(actv1key) - if err != nil { + if err == utils.ErrNoMoreData { + break + } + if *v1ACs != nil { + for _, v1ac := range *v1ACs { + act := v1ac.AsAction() + acts = append(acts, act) + + } + if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { return err } - act := v1act.AsAction() - acts = append(acts, act) - } - if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { - return err - } - // All done, update version wtih current one - vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } - - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var acts engine.Actions - var v1act v1Action - iter := mgoDB.C(utils.ACTION_PREFIX).Find(nil).Iter() - for iter.Next(&v1act) { - act := v1act.AsAction() - acts = append(acts, act) - } - if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { - return err - } - // All done, update version wtih current one - vrs := engine.Versions{utils.ACTION_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - - default: + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PREFIX]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateActions method", m.dataDBType)) - } -} - -func (m *Migrator) getV1ActionFromDB(key string) (v1act *v1Action, err error) { - switch m.dataDBType { - case utils.REDIS: - dataDB := m.dataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1act := &v1Action{Id: key} - if err := m.mrshlr.Unmarshal(strVal, v1act); err != nil { - return nil, err - } - return v1act, nil - } - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1act := new(v1Action) - if err := mgoDB.C(utils.ACTION_PREFIX).Find(bson.M{"id": key}).One(v1act); err != nil { - return nil, err - } - return v1act, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType)) + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } + return } func (v1Act v1Action) AsAction() (act *engine.Action) { diff --git a/migrator/action_plan.go b/migrator/action_plan.go index f1c99b92c..b1096eb8a 100644 --- a/migrator/action_plan.go +++ b/migrator/action_plan.go @@ -22,8 +22,6 @@ import ( "strings" "time" - "gopkg.in/mgo.v2/bson" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -53,89 +51,33 @@ func (at *v1ActionPlan) IsASAP() bool { } func (m *Migrator) migrateActionPlans() (err error) { - switch m.dataDBType { - case utils.REDIS: - var apsv1keys []string - apsv1keys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) - if err != nil { - return + var v1APs *v1ActionPlans + for { + v1APs, err = m.oldDataDB.getV1ActionPlans() + if err != nil && err != utils.ErrNoMoreData { + return err } - for _, apsv1key := range apsv1keys { - v1aps, err := m.getV1ActionPlansFromDB(apsv1key) - if err != nil { - return err - } - aps := v1aps.AsActionPlan() - if err = m.dataDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil { - return err + if err == utils.ErrNoMoreData { + break + } + if *v1APs != nil { + for _, v1ap := range *v1APs { + ap := v1ap.AsActionPlan() + if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil { + return err + } } } - // All done, update version wtih current one - vrs := engine.Versions{utils.ACTION_PLAN_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating ActionPlans version into StorDB", err.Error())) - } - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var acp v1ActionPlan - iter := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(nil).Iter() - for iter.Next(&acp) { - aps := acp.AsActionPlan() - if err = m.dataDB.SetActionPlan(aps.Id, aps, true, utils.NonTransactional); err != nil { - return err - } - } - // All done, update version wtih current one - vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) - } - return - default: + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_PLAN_PREFIX]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateActionPlans method", m.dataDBType)) - } -} - -func (m *Migrator) getV1ActionPlansFromDB(key string) (v1aps *v1ActionPlan, err error) { - switch m.dataDBType { - case utils.REDIS: - dataDB := m.dataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1aps := &v1ActionPlan{Id: key} - if err := m.mrshlr.Unmarshal(strVal, v1aps); err != nil { - return nil, err - } - return v1aps, nil - } - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1aps := new(v1ActionPlan) - if err := mgoDB.C(utils.ACTION_PLAN_PREFIX).Find(bson.M{"id": key}).One(v1aps); err != nil { - return nil, err - } - return v1aps, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getV1ActionPlansFromDB method", m.dataDBType)) + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } + return } func (v1AP v1ActionPlan) AsActionPlan() (ap *engine.ActionPlan) { diff --git a/migrator/action_trigger.go b/migrator/action_trigger.go index 100b20042..214394791 100644 --- a/migrator/action_trigger.go +++ b/migrator/action_trigger.go @@ -5,127 +5,69 @@ import ( "strings" "time" - "gopkg.in/mgo.v2/bson" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) type v1ActionTrigger struct { - Id string - ThresholdType string + Id string // for visual identification + ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance + // stats: *min_asr, *max_asr, *min_acd, *max_acd, *min_tcd, *max_tcd, *min_acc, *max_acc, *min_tcc, *max_tcc ThresholdValue float64 - Recurrent bool - MinSleep time.Duration + Recurrent bool // reset eexcuted flag each run + MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers BalanceId string BalanceType string BalanceDirection string - BalanceDestinationIds string - BalanceWeight float64 - BalanceExpirationDate time.Time - BalanceTimingTags string - BalanceRatingSubject string - BalanceCategory string - BalanceSharedGroup string - BalanceDisabled bool + BalanceDestinationIds string // filter for balance + BalanceWeight float64 // filter for balance + BalanceExpirationDate time.Time // filter for balance + BalanceTimingTags string // filter for balance + BalanceRatingSubject string // filter for balance + BalanceCategory string // filter for balance + BalanceSharedGroup string // filter for balance Weight float64 ActionsId string - MinQueuedItems int + MinQueuedItems int // Trigger actions only if this number is hit (stats only) Executed bool + lastExecutionTime time.Time } type v1ActionTriggers []*v1ActionTrigger func (m *Migrator) migrateActionTriggers() (err error) { - switch m.dataDBType { - case utils.REDIS: - var atrrs engine.ActionTriggers - var v1atrskeys []string - v1atrskeys, err = m.dataDB.GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) - if err != nil { - return + var v1ACTs *v1ActionTriggers + var acts engine.ActionTriggers + for { + v1ACTs, err = m.oldDataDB.getV1ActionTriggers() + if err != nil && err != utils.ErrNoMoreData { + return err } - for _, v1atrskey := range v1atrskeys { - v1atrs, err := m.getV1ActionTriggerFromDB(v1atrskey) - if err != nil { + if err == utils.ErrNoMoreData { + break + } + if *v1ACTs != nil { + for _, v1ac := range *v1ACTs { + act := v1ac.AsActionTrigger() + acts = append(acts, act) + + } + if err := m.dataDB.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil { return err } - v1atr := v1atrs - if v1atrs != nil { - atr := v1atr.AsActionTrigger() - atrrs = append(atrrs, atr) - } + } - if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil { - return err - } - // All done, update version wtih current one - vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error())) - } - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var atrrs engine.ActionTriggers - var v1atr v1ActionTrigger - iter := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(nil).Iter() - for iter.Next(&v1atr) { - atr := v1atr.AsActionTrigger() - atrrs = append(atrrs, atr) - } - if err := m.dataDB.SetActionTriggers(atrrs[0].ID, atrrs, utils.NonTransactional); err != nil { - return err - } - // All done, update version wtih current one - vrs := engine.Versions{utils.ACTION_TRIGGER_PREFIX: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating ActionTrigger version into StorDB", err.Error())) - } - return - default: + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.ACTION_TRIGGER_PREFIX]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateActionTriggers method", m.dataDBType)) - } -} -func (m *Migrator) getV1ActionTriggerFromDB(key string) (v1Atr *v1ActionTrigger, err error) { - switch m.dataDBType { - case utils.REDIS: - dataDB := m.dataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1Atr := &v1ActionTrigger{Id: key} - if err := m.mrshlr.Unmarshal(strVal, &v1Atr); err != nil { - return nil, err - } - return v1Atr, nil - } - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1Atr := new(v1ActionTrigger) - if err := mgoDB.C(utils.ACTION_TRIGGER_PREFIX).Find(bson.M{"id": key}).One(v1Atr); err != nil { - return nil, err - } - return v1Atr, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getV1ActionTriggerFromDB method", m.dataDBType)) + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } + return + } func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) { @@ -169,9 +111,6 @@ func (v1Act v1ActionTrigger) AsActionTrigger() (at *engine.ActionTrigger) { if v1Act.BalanceWeight != 0 { bf.Weight = utils.Float64Pointer(v1Act.BalanceWeight) } - if v1Act.BalanceDisabled != false { - bf.Disabled = utils.BoolPointer(v1Act.BalanceDisabled) - } if !v1Act.BalanceExpirationDate.IsZero() { bf.ExpirationDate = utils.TimePointer(v1Act.BalanceExpirationDate) at.ExpirationDate = v1Act.BalanceExpirationDate diff --git a/migrator/costdetails.go b/migrator/costdetails.go index 378ce3afd..411e37021 100644 --- a/migrator/costdetails.go +++ b/migrator/costdetails.go @@ -21,6 +21,7 @@ import ( "database/sql" "encoding/json" "fmt" + "log" "time" "github.com/cgrates/cgrates/engine" @@ -47,12 +48,13 @@ func (m *Migrator) migrateCostDetails() (err error) { "version number is not defined for CostDetails model") } if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1 + log.Print("Wrong version") return } var storSQL *sql.DB switch m.storDBType { case utils.MYSQL: - storSQL = m.storDB.(*engine.MySQLStorage).Db + storSQL = m.storDB.(*engine.SQLStorage).Db case utils.POSTGRES: storSQL = m.storDB.(*engine.PostgresStorage).Db default: @@ -61,19 +63,22 @@ func (m *Migrator) migrateCostDetails() (err error) { utils.UnsupportedDB, fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) } - rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL") + rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error())) } + defer rows.Close() + for cnt := 0; rows.Next(); cnt++ { var id int64 var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString var ccCost sql.NullFloat64 var tts []byte + if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -89,6 +94,7 @@ func (m *Migrator) migrateCostDetails() (err error) { v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String, Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String, Cost: ccCost.Float64, Timespans: v1tmsps} + cc := v1CC.AsCallCost() if cc == nil { utils.Logger.Warning( diff --git a/migrator/migrator.go b/migrator/migrator.go old mode 100644 new mode 100755 index 4e8e1888b..1e8222f5f --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -19,28 +19,42 @@ package migrator import ( "fmt" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string) *Migrator { +func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB v1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) { var mrshlr engine.Marshaler + var oldmrshlr engine.Marshaler if dataDBEncoding == utils.MSGPACK { mrshlr = engine.NewCodecMsgpackMarshaler() } else if dataDBEncoding == utils.JSON { mrshlr = new(engine.JSONMarshaler) + } else if oldDataDBEncoding == utils.MSGPACK { + oldmrshlr = engine.NewCodecMsgpackMarshaler() + } else if oldDataDBEncoding == utils.JSON { + oldmrshlr = new(engine.JSONMarshaler) } - return &Migrator{dataDB: dataDB, dataDBType: dataDBType, - storDB: storDB, storDBType: storDBType, mrshlr: mrshlr} + m = &Migrator{ + dataDB: dataDB, dataDBType: dataDBType, + storDB: storDB, storDBType: storDBType, mrshlr: mrshlr, + oldDataDB: oldDataDB, oldDataDBType: oldDataDBType, + oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr: oldmrshlr, + } + return m, err } type Migrator struct { - dataDB engine.DataDB - dataDBType string - storDB engine.Storage - storDBType string - mrshlr engine.Marshaler + dataDB engine.DataDB + dataDBType string + storDB engine.Storage + storDBType string + mrshlr engine.Marshaler + oldDataDB v1DataDB + oldDataDBType string + oldStorDB engine.Storage + oldStorDBType string + oldmrshlr engine.Marshaler } // Migrate implements the tasks to migrate, used as a dispatcher to the individual methods @@ -62,15 +76,14 @@ func (m *Migrator) Migrate(taskID string) (err error) { err = m.migrateCostDetails() case utils.MetaAccounts: err = m.migrateAccounts() - case "migrateActionPlans": + case utils.MetaActionPlans: err = m.migrateActionPlans() - case "migrateActionTriggers": + case utils.MetaActionTriggers: err = m.migrateActionTriggers() - case "migrateActions": + case utils.MetaActions: err = m.migrateActions() - case "migrateSharedGroups": + case utils.MetaSharedGroups: err = m.migrateSharedGroups() } - return } diff --git a/migrator/migrator_it_test.go b/migrator/migrator_it_test.go index f3c485a3e..5bf01cc11 100644 --- a/migrator/migrator_it_test.go +++ b/migrator/migrator_it_test.go @@ -18,15 +18,15 @@ package migrator import ( "flag" - "fmt" + // "fmt" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "log" "path" "reflect" "testing" "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" ) var ( @@ -38,7 +38,40 @@ var ( dbtype string mig *Migrator dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") - db_passwd = "" + + dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") + dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") + dataDBPort = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") + dataDBName = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.") + dataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") + dataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") + + storDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database ") + storDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.") + storDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.") + storDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.") + storDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.") + storDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.") + + oldDataDBType = flag.String("old_datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database ") + oldDataDBHost = flag.String("old_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.") + oldDataDBPort = flag.String("old_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.") + oldDataDBName = flag.String("old_datadb_name", "11", "The name/number of the DataDb to connect to.") + oldDataDBUser = flag.String("old_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.") + oldDataDBPass = flag.String("old_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.") + + oldStorDBType = flag.String("old_stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database ") + oldStorDBHost = flag.String("old_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.") + oldStorDBPort = flag.String("old_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.") + oldStorDBName = flag.String("old_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.") + oldStorDBUser = flag.String("old_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.") + oldStorDBPass = flag.String("old_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.") + + loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history") + oldLoadHistorySize = flag.Int("old_load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history") + + dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") + oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings") ) // subtests to be executed for each migrator @@ -49,44 +82,73 @@ var sTestsITMigrator = []func(t *testing.T){ testMigratorActionTriggers, testMigratorActions, testMigratorSharedGroups, + testOnStorITFlush, } func TestOnStorITRedisConnect(t *testing.T) { - cfg, _ := config.NewDefaultCGRConfig() - rdsITdb, err := engine.NewRedisStorage(fmt.Sprintf("%s:%s", cfg.TpDbHost, cfg.TpDbPort), 4, cfg.TpDbPass, cfg.DBDataEncoding, utils.REDIS_MAX_CONNS, nil, 1) + dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize) if err != nil { - t.Fatal("Could not connect to Redis", err.Error()) + log.Fatal(err) + } + oldDataDB, err := ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding) + if err != nil { + log.Fatal(err) + } + storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + mig, err = NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType, oldDataDB, *oldDataDBType, *oldDBDataEncoding, oldstorDB, *oldStorDBType) + if err != nil { + log.Fatal(err) } - onStorCfg = cfg.DataDbName - mig = NewMigrator(rdsITdb, rdsITdb, utils.REDIS, utils.JSON, rdsITdb, utils.REDIS) } func TestOnStorITRedis(t *testing.T) { dbtype = utils.REDIS - onStor = rdsITdb for _, stest := range sTestsITMigrator { t.Run("TestITMigratorOnRedis", stest) } } func TestOnStorITMongoConnect(t *testing.T) { - cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "cdrsv2mongo") + cdrsMongoCfgPath := path.Join(*dataDir, "conf", "samples", "tutmongo") mgoITCfg, err := config.NewCGRConfigFromFolder(cdrsMongoCfgPath) if err != nil { t.Fatal(err) } - if mgoITdb, err = engine.NewMongoStorage(mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, db_passwd, - utils.StorDB, nil, mgoITCfg.CacheConfig, mgoITCfg.LoadHistorySize); err != nil { - t.Fatal(err) + dataDB, err := engine.ConfigureDataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding, mgoITCfg.CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } + oldDataDB, err := ConfigureV1DataStorage(mgoITCfg.DataDbType, mgoITCfg.DataDbHost, mgoITCfg.DataDbPort, mgoITCfg.DataDbName, mgoITCfg.DataDbUser, mgoITCfg.DataDbPass, mgoITCfg.DBDataEncoding) + if err != nil { + log.Fatal(err) + } + storDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + oldstorDB, err := engine.ConfigureStorStorage(mgoITCfg.StorDBType, mgoITCfg.StorDBHost, mgoITCfg.StorDBPort, mgoITCfg.StorDBName, mgoITCfg.StorDBUser, mgoITCfg.StorDBPass, mgoITCfg.DBDataEncoding, + config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) + if err != nil { + log.Fatal(err) + } + mig, err = NewMigrator(dataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, storDB, mgoITCfg.StorDBType, oldDataDB, mgoITCfg.DataDbType, mgoITCfg.DBDataEncoding, oldstorDB, mgoITCfg.StorDBType) + if err != nil { + log.Fatal(err) } - mongo = mgoITCfg - onStorCfg = mgoITCfg.StorDBName - mig = NewMigrator(mgoITdb, mgoITdb, utils.MONGO, utils.JSON, mgoITdb, utils.MONGO) } func TestOnStorITMongo(t *testing.T) { dbtype = utils.MONGO - onStor = mgoITdb for _, stest := range sTestsITMigrator { t.Run("TestITMigratorOnMongo", stest) } @@ -101,7 +163,7 @@ func testOnStorITFlush(t *testing.T) { t.Error("Error when flushing Redis ", err.Error()) } case dbtype == utils.MONGO: - err := engine.InitDataDb(mongo) + err := mig.dataDB.Flush("") if err != nil { t.Error("Error when flushing Mongo ", err.Error()) } @@ -110,19 +172,15 @@ func testOnStorITFlush(t *testing.T) { func testMigratorAccounts(t *testing.T) { v1b := &v1Balance{Value: 10, Weight: 10, DestinationIds: "NAT", ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}} - v1Acc := &v1Account{Id: "OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}} - v2 := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}, + v1Acc := &v1Account{Id: "*OUT:CUSTOMER_1:rif", BalanceMap: map[string]v1BalanceChain{utils.VOICE: v1BalanceChain{v1b}, utils.MONETARY: v1BalanceChain{&v1Balance{Value: 21, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}}} + v2b := &engine.Balance{Uuid: "", ID: "", Value: 10, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), Weight: 10, DestinationIDs: utils.StringMap{"NAT": true}, RatingSubject: "", Categories: utils.NewStringMap(), SharedGroups: utils.NewStringMap(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}} m2 := &engine.Balance{Uuid: "", ID: "", Value: 21, Directions: utils.StringMap{"*OUT": true}, ExpirationDate: time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Local(), DestinationIDs: utils.NewStringMap(""), RatingSubject: "", Categories: utils.NewStringMap(), SharedGroups: utils.NewStringMap(), Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}, TimingIDs: utils.NewStringMap(""), Factor: engine.ValueFactor{}} - testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}} + testAccount := &engine.Account{ID: "CUSTOMER_1:rif", BalanceMap: map[string]engine.Balances{utils.VOICE: engine.Balances{v2b}, utils.MONETARY: engine.Balances{m2}}, UnitCounters: engine.UnitCounters{}, ActionTriggers: engine.ActionTriggers{}} switch { case dbtype == utils.REDIS: - bit, err := mig.mrshlr.Marshal(v1Acc) - if err != nil { - t.Error("Error when marshaling ", err.Error()) - } - err = mig.SetV1onRedis(v1AccountDBPrefix+v1Acc.Id, bit) + err := mig.oldDataDB.setV1Account(v1Acc) if err != nil { t.Error("Error when setting v1 acc ", err.Error()) } @@ -134,11 +192,13 @@ func testMigratorAccounts(t *testing.T) { if err != nil { t.Error("Error when getting account ", err.Error()) } - if !reflect.DeepEqual(testAccount, result) { + if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) { + t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], result.BalanceMap["*voice"][0]) + } else if !reflect.DeepEqual(testAccount, result) { t.Errorf("Expecting: %+v, received: %+v", testAccount, result) } case dbtype == utils.MONGO: - err := mig.SetV1onMongoAccount(v1AccountDBPrefix, v1Acc) + err := mig.oldDataDB.setV1Account(v1Acc) if err != nil { t.Error("Error when marshaling ", err.Error()) } @@ -157,24 +217,19 @@ func testMigratorAccounts(t *testing.T) { } func testMigratorActionPlans(t *testing.T) { - v1ap := &v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}} + v1ap := &v1ActionPlans{&v1ActionPlan{Id: "test", AccountIds: []string{"one"}, Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}} ap := &engine.ActionPlan{Id: "test", AccountIDs: utils.StringMap{"one": true}, ActionTimings: []*engine.ActionTiming{&engine.ActionTiming{Timing: &engine.RateInterval{Timing: &engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}} switch { case dbtype == utils.REDIS: - bit, err := mig.mrshlr.Marshal(v1ap) - if err != nil { - t.Error("Error when marshaling ", err.Error()) - } - setv1id := utils.ACTION_PLAN_PREFIX + v1ap.Id - err = mig.SetV1onRedis(setv1id, bit) + err := mig.oldDataDB.setV1ActionPlans(v1ap) if err != nil { t.Error("Error when setting v1 ActionPlan ", err.Error()) } - err = mig.Migrate("migrateActionPlans") + err = mig.Migrate(utils.MetaActionPlans) if err != nil { t.Error("Error when migrating ActionPlans ", err.Error()) } - result, err := mig.tpDB.GetActionPlan(ap.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionPlan ", err.Error()) } @@ -186,15 +241,15 @@ func testMigratorActionPlans(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", ap.ActionTimings[0].Weight, result.ActionTimings[0].Weight) } case dbtype == utils.MONGO: - err := mig.SetV1onMongoActionPlan(utils.ACTION_PLAN_PREFIX, v1ap) + err := mig.oldDataDB.setV1ActionPlans(v1ap) if err != nil { t.Error("Error when setting v1 ActionPlans ", err.Error()) } - err = mig.Migrate("migrateActionPlans") + err = mig.Migrate(utils.MetaActionPlans) if err != nil { t.Error("Error when migrating ActionPlans ", err.Error()) } - result, err := mig.tpDB.GetActionPlan(ap.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActionPlan(ap.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionPlan ", err.Error()) } @@ -210,16 +265,17 @@ func testMigratorActionPlans(t *testing.T) { func testMigratorActionTriggers(t *testing.T) { tim := time.Date(2012, time.February, 27, 23, 59, 59, 0, time.UTC).Local() - var v1Atr v1ActionTrigger - v1atrs := &v1ActionTrigger{ - Id: "Test", - BalanceType: "*monetary", - BalanceDirection: "*out", - ThresholdType: "*max_balance", - ThresholdValue: 2, - ActionsId: "TEST_ACTIONS", - Executed: true, - BalanceExpirationDate: tim, + v1atrs := &v1ActionTriggers{ + &v1ActionTrigger{ + Id: "Test", + BalanceType: "*monetary", + BalanceDirection: "*out", + ThresholdType: "*max_balance", + ThresholdValue: 2, + ActionsId: "TEST_ACTIONS", + Executed: true, + BalanceExpirationDate: tim, + }, } atrs := engine.ActionTriggers{ &engine.ActionTrigger{ @@ -241,23 +297,15 @@ func testMigratorActionTriggers(t *testing.T) { } switch { case dbtype == utils.REDIS: - bit, err := mig.mrshlr.Marshal(v1atrs) - if err != nil { - t.Error("Error when marshaling ", err.Error()) - } - if err := mig.mrshlr.Unmarshal(bit, &v1Atr); err != nil { - t.Error("Error when setting v1 ActionTriggers ", err.Error()) - } - setv1id := utils.ACTION_TRIGGER_PREFIX + v1atrs.Id - err = mig.SetV1onRedis(setv1id, bit) + err := mig.oldDataDB.setV1ActionTriggers(v1atrs) if err != nil { t.Error("Error when setting v1 ActionTriggers ", err.Error()) } - err = mig.Migrate("migrateActionTriggers") + err = mig.Migrate(utils.MetaActionTriggers) if err != nil { t.Error("Error when migrating ActionTriggers ", err.Error()) } - result, err := mig.tpDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActionTriggers((*v1atrs)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting ActionTriggers ", err.Error()) } @@ -277,8 +325,8 @@ func testMigratorActionTriggers(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].ExpirationDate, result[0].ExpirationDate) } else if !reflect.DeepEqual(atrs[0].ActivationDate, result[0].ActivationDate) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].ActivationDate, result[0].ActivationDate) - } else if !reflect.DeepEqual(atrs[0].Balance, result[0].Balance) { - // t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance, result[0].Balance) + } else if !reflect.DeepEqual(atrs[0].Balance.Type, result[0].Balance.Type) { + t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Type, result[0].Balance.Type) } else if !reflect.DeepEqual(atrs[0].Weight, result[0].Weight) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].Weight, result[0].Weight) } else if !reflect.DeepEqual(atrs[0].ActionsID, result[0].ActionsID) { @@ -324,74 +372,51 @@ func testMigratorActionTriggers(t *testing.T) { } else if !reflect.DeepEqual(atrs[0].Balance.Blocker, result[0].Balance.Blocker) { t.Errorf("Expecting: %+v, received: %+v", atrs[0].Balance.Blocker, result[0].Balance.Blocker) } - case dbtype == utils.MONGO: - err := mig.SetV1onMongoActionTrigger(utils.ACTION_TRIGGER_PREFIX, v1atrs) - if err != nil { - t.Error("Error when setting v1 ActionTriggers ", err.Error()) - } - err = mig.Migrate("migrateActionTriggers") - if err != nil { + err := mig.Migrate(utils.MetaActionTriggers) + if err != nil && err != utils.ErrNotImplemented { t.Error("Error when migrating ActionTriggers ", err.Error()) } - result, err := mig.tpDB.GetActionTriggers(v1atrs.Id, true, utils.NonTransactional) - if err != nil { - t.Error("Error when getting ActionTriggers ", err.Error()) - } - if !reflect.DeepEqual(atrs[0], result[0]) { - t.Errorf("Expecting: %+v, received: %+v", atrs[0], result[0]) - } - err = mig.DropV1Colection(utils.ACTION_TRIGGER_PREFIX) - if err != nil { - t.Error("Error when flushing v1 ActionTriggers ", err.Error()) - } + } } func testMigratorActions(t *testing.T) { - v1act := &v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}} - act := engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}} + v1act := &v1Actions{&v1Action{Id: "test", ActionType: "", BalanceType: "", Direction: "INBOUND", ExtraParameters: "", ExpirationString: "", Balance: &v1Balance{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}} + act := &engine.Actions{&engine.Action{Id: "test", ActionType: "", ExtraParameters: "", ExpirationString: "", Weight: 0.00, Balance: &engine.BalanceFilter{Timings: []*engine.RITiming{&engine.RITiming{Years: utils.Years{}, Months: utils.Months{}, MonthDays: utils.MonthDays{}, WeekDays: utils.WeekDays{}}}}}} switch { case dbtype == utils.REDIS: - bit, err := mig.mrshlr.Marshal(v1act) - if err != nil { - t.Error("Error when marshaling ", err.Error()) - } - setv1id := utils.ACTION_PREFIX + v1act.Id - err = mig.SetV1onRedis(setv1id, bit) + err := mig.oldDataDB.setV1Actions(v1act) if err != nil { t.Error("Error when setting v1 Actions ", err.Error()) } - err = mig.Migrate("migrateActions") + err = mig.Migrate(utils.MetaActions) if err != nil { t.Error("Error when migrating Actions ", err.Error()) } - result, err := mig.tpDB.GetActions(v1act.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting Actions ", err.Error()) } - if !reflect.DeepEqual(act, result) { - t.Errorf("Expecting: %+v, received: %+v", act, result) + if !reflect.DeepEqual(*act, result) { + t.Errorf("Expecting: %+v, received: %+v", *act, result) } + case dbtype == utils.MONGO: - err := mig.SetV1onMongoAction(utils.ACTION_PREFIX, v1act) + err := mig.oldDataDB.setV1Actions(v1act) if err != nil { t.Error("Error when setting v1 Actions ", err.Error()) } - err = mig.Migrate("migrateActions") + err = mig.Migrate(utils.MetaActions) if err != nil { t.Error("Error when migrating Actions ", err.Error()) } - result, err := mig.tpDB.GetActions(v1act.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetActions((*v1act)[0].Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting Actions ", err.Error()) } - if !reflect.DeepEqual(act[0].Balance.Timings, result[0].Balance.Timings) { - t.Errorf("Expecting: %+v, received: %+v", act[0].Balance.Timings, result[0].Balance.Timings) - } - err = mig.DropV1Colection(utils.ACTION_PREFIX) - if err != nil { - t.Error("Error when flushing v1 Actions ", err.Error()) + if !reflect.DeepEqual(*act, result) { + t.Errorf("Expecting: %+v, received: %+v", *act, result) } } } @@ -413,20 +438,15 @@ func testMigratorSharedGroups(t *testing.T) { } switch { case dbtype == utils.REDIS: - bit, err := mig.mrshlr.Marshal(v1sg) - if err != nil { - t.Error("Error when marshaling ", err.Error()) - } - setv1id := utils.SHARED_GROUP_PREFIX + v1sg.Id - err = mig.SetV1onRedis(setv1id, bit) + err := mig.oldDataDB.setV1SharedGroup(v1sg) if err != nil { t.Error("Error when setting v1 SharedGroup ", err.Error()) } - err = mig.Migrate("migrateSharedGroups") + err = mig.Migrate(utils.MetaSharedGroups) if err != nil { t.Error("Error when migrating SharedGroup ", err.Error()) } - result, err := mig.tpDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting SharedGroup ", err.Error()) } @@ -434,20 +454,21 @@ func testMigratorSharedGroups(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", sg, result) } case dbtype == utils.MONGO: - err := mig.SetV1onMongoSharedGroup(utils.SHARED_GROUP_PREFIX, v1sg) + err := mig.oldDataDB.setV1SharedGroup(v1sg) if err != nil { t.Error("Error when setting v1 SharedGroup ", err.Error()) } - err = mig.Migrate("migrateSharedGroups") + err = mig.Migrate(utils.MetaSharedGroups) if err != nil { t.Error("Error when migrating SharedGroup ", err.Error()) } - result, err := mig.tpDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) + result, err := mig.dataDB.GetSharedGroup(v1sg.Id, true, utils.NonTransactional) if err != nil { t.Error("Error when getting SharedGroup ", err.Error()) } if !reflect.DeepEqual(sg, result) { t.Errorf("Expecting: %+v, received: %+v", sg, result) } + } } diff --git a/migrator/setv1.go b/migrator/setv1.go deleted file mode 100644 index 333c04524..000000000 --- a/migrator/setv1.go +++ /dev/null @@ -1,87 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package migrator - -import "github.com/cgrates/cgrates/engine" - -func (m *Migrator) SetV1onRedis(key string, bl []byte) (err error) { - dataDB := m.dataDB.(*engine.RedisStorage) - if err = dataDB.Cmd("SET", key, bl).Err; err != nil { - return err - } - return -} - -func (m *Migrator) SetV1onMongoAccount(pref string, x *v1Account) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} - -func (m *Migrator) SetV1onMongoAction(pref string, x *v1Action) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} - -func (m *Migrator) SetV1onMongoActionPlan(pref string, x *v1ActionPlan) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} - -func (m *Migrator) SetV1onMongoActionTrigger(pref string, x *v1ActionTrigger) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} - -func (m *Migrator) SetV1onMongoSharedGroup(pref string, x *v1SharedGroup) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).Insert(x); err != nil { - return err - } - return -} -func (m *Migrator) DropV1Colection(pref string) (err error) { - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - if err := mgoDB.C(pref).DropCollection(); err != nil { - return err - } - return -} diff --git a/migrator/sharedgroup.go b/migrator/sharedgroup.go index 7e89f3f1b..1586c5425 100644 --- a/migrator/sharedgroup.go +++ b/migrator/sharedgroup.go @@ -22,7 +22,6 @@ import ( "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "gopkg.in/mgo.v2/bson" ) type v1SharedGroup struct { @@ -32,89 +31,31 @@ type v1SharedGroup struct { } func (m *Migrator) migrateSharedGroups() (err error) { - switch m.dataDBType { - case utils.REDIS: - var sgv1keys []string - sgv1keys, err = m.dataDB.GetKeysForPrefix(utils.SHARED_GROUP_PREFIX) - if err != nil { - return + var v1SG *v1SharedGroup + for { + v1SG, err = m.oldDataDB.getV1SharedGroup() + if err != nil && err != utils.ErrNoMoreData { + return err } - for _, sgv1key := range sgv1keys { - v1sg, err := m.getv1SharedGroupFromDB(sgv1key) - if err != nil { - return err - } - sg := v1sg.AsSharedGroup() - if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil { + if err == utils.ErrNoMoreData { + break + } + if v1SG != nil { + acnt := v1SG.AsSharedGroup() + if err = m.dataDB.SetSharedGroup(acnt, utils.NonTransactional); err != nil { return err } } - // All done, update version wtih current one - vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error())) - } - return - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - var v1sg v1SharedGroup - iter := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter() - for iter.Next(&v1sg) { - sg := v1sg.AsSharedGroup() - if err = m.dataDB.SetSharedGroup(sg, utils.NonTransactional); err != nil { - return err - } - } - // All done, update version wtih current one - vrs := engine.Versions{utils.SHARED_GROUP_PREFIX: engine.CurrentStorDBVersions()[utils.SHARED_GROUP_PREFIX]} - if err = m.dataDB.SetVersions(vrs, false); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating SharedGroup version into dataDB", err.Error())) - } - return - default: + } + // All done, update version wtih current one + vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} + if err = m.dataDB.SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for migrateSharedGroups method", m.dataDBType)) - } -} - -func (m *Migrator) getv1SharedGroupFromDB(key string) (*v1SharedGroup, error) { - switch m.dataDBType { - case utils.REDIS: - dataDB := m.dataDB.(*engine.RedisStorage) - if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { - return nil, err - } else { - v1SG := &v1SharedGroup{Id: key} - if err := m.mrshlr.Unmarshal(strVal, v1SG); err != nil { - return nil, err - } - return v1SG, nil - } - case utils.MONGO: - dataDB := m.dataDB.(*engine.MongoStorage) - mgoDB := dataDB.DB() - defer mgoDB.Session.Close() - v1SG := new(v1SharedGroup) - if err := mgoDB.C(utils.SHARED_GROUP_PREFIX).Find(bson.M{"id": key}).One(v1SG); err != nil { - return nil, err - } - return v1SG, nil - default: - return nil, utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - utils.UnsupportedDB, - fmt.Sprintf("error: unsupported: <%s> for getv1SharedGroupFromDB method", m.dataDBType)) + err.Error(), + fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } + return } func (v1SG v1SharedGroup) AsSharedGroup() (sg *engine.SharedGroup) { diff --git a/migrator/v1DataDB.go b/migrator/v1DataDB.go new file mode 100644 index 000000000..27b4c8f6e --- /dev/null +++ b/migrator/v1DataDB.go @@ -0,0 +1,32 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +type v1DataDB interface { + getKeysForPrefix(prefix string) ([]string, error) + getv1Account() (v1Acnt *v1Account, err error) + setV1Account(x *v1Account) (err error) + getV1ActionPlans() (v1aps *v1ActionPlans, err error) + setV1ActionPlans(x *v1ActionPlans) (err error) + getV1Actions() (v1acs *v1Actions, err error) + setV1Actions(x *v1Actions) (err error) + getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) + setV1ActionTriggers(x *v1ActionTriggers) (err error) + getV1SharedGroup() (v1acts *v1SharedGroup, err error) + setV1SharedGroup(x *v1SharedGroup) (err error) +} diff --git a/migrator/v1Migrator_Utils.go b/migrator/v1Migrator_Utils.go new file mode 100644 index 000000000..88a786898 --- /dev/null +++ b/migrator/v1Migrator_Utils.go @@ -0,0 +1,53 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "errors" + "fmt" + "strconv" + + "github.com/cgrates/cgrates/utils" +) + +func ConfigureV1DataStorage(db_type, host, port, name, user, pass, marshaler string) (db v1DataDB, err error) { + var d v1DataDB + switch db_type { + case utils.REDIS: + var db_nb int + db_nb, err = strconv.Atoi(name) + if err != nil { + utils.Logger.Crit("Redis db name must be an integer!") + return nil, err + } + if port != "" { + host += ":" + port + } + d, err = newv1RedisStorage(host, db_nb, pass, marshaler) + case utils.MONGO: + d, err = NewMongoStorage(host, port, name, user, pass, utils.DataDB, nil) + db = d.(v1DataDB) + default: + err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", + db_type, utils.REDIS, utils.MONGO)) + } + if err != nil { + return nil, err + } + return d, nil +} diff --git a/migrator/v1MongoData.go b/migrator/v1MongoData.go new file mode 100644 index 000000000..f332cb956 --- /dev/null +++ b/migrator/v1MongoData.go @@ -0,0 +1,177 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "fmt" + // "log" + + "github.com/cgrates/cgrates/utils" + + "github.com/cgrates/cgrates/engine" + "gopkg.in/mgo.v2" +) + +type v1Mongo struct { + session *mgo.Session + db string + v1ms engine.Marshaler + qryIter *mgo.Iter +} + +type AcKeyValue struct { + Key string + Value v1Actions +} +type AtKeyValue struct { + Key string + Value v1ActionPlans +} + +func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) { + url := host + if port != "" { + url += ":" + port + } + if user != "" && pass != "" { + url = fmt.Sprintf("%s:%s@%s", user, pass, url) + } + if db != "" { + url += "/" + db + } + session, err := mgo.Dial(url) + if err != nil { + return nil, err + } + session.SetMode(mgo.Strong, true) + v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()} + return +} + +func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error) { + return nil, nil +} + +//Account methods +//get +func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter() + } + v1ms.qryIter.Next(&v1Acnt) + + if v1Acnt == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v1Acnt, nil +} + +//set +func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) { + if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil { + return err + } + return +} + +//Action methods +//get +func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { + var strct *AtKeyValue + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter() + } + v1ms.qryIter.Next(&strct) + if strct == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + } + v1aps = &strct.Value + return v1aps, nil +} + +//set +func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) { + key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id + if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil { + return err + } + return +} + +//Actions methods +//get +func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) { + var strct *AcKeyValue + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter() + } + v1ms.qryIter.Next(&strct) + if strct == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + } + + v1acs = &strct.Value + return v1acs, nil +} + +//set +func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) { + key := utils.ACTION_PREFIX + (*x)[0].Id + if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil { + return err + } + return +} + +//ActionTriggers methods +//get +func (v1ms *v1Mongo) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { + return nil, utils.ErrNotImplemented +} + +//set +func (v1ms *v1Mongo) setV1ActionTriggers(x *v1ActionTriggers) (err error) { + return utils.ErrNotImplemented +} + +//Actions methods +//get +func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter() + } + v1ms.qryIter.Next(&v1sg) + if v1sg == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v1sg, nil +} + +//set +func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) { + if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil { + return err + } + return +} diff --git a/migrator/v1MongoStor.go b/migrator/v1MongoStor.go new file mode 100644 index 000000000..12fa903fd --- /dev/null +++ b/migrator/v1MongoStor.go @@ -0,0 +1,18 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator diff --git a/migrator/v1Redis.go b/migrator/v1Redis.go new file mode 100644 index 000000000..19eb7deeb --- /dev/null +++ b/migrator/v1Redis.go @@ -0,0 +1,307 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "fmt" + //"log" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + + "github.com/mediocregopher/radix.v2/pool" + "github.com/mediocregopher/radix.v2/redis" +) + +type v1Redis struct { + dbPool *pool.Pool + ms engine.Marshaler + dataKeys []string + qryIdx *int +} + +func newv1RedisStorage(address string, db int, pass, mrshlerStr string) (*v1Redis, error) { + df := func(network, addr string) (*redis.Client, error) { + client, err := redis.Dial(network, addr) + if err != nil { + return nil, err + } + if len(pass) != 0 { + if err = client.Cmd("AUTH", pass).Err; err != nil { + client.Close() + return nil, err + } + } + if db != 0 { + if err = client.Cmd("SELECT", db).Err; err != nil { + client.Close() + return nil, err + } + } + return client, nil + } + p, err := pool.NewCustom("tcp", address, 1, df) + if err != nil { + return nil, err + } + var mrshler engine.Marshaler + if mrshlerStr == utils.MSGPACK { + mrshler = engine.NewCodecMsgpackMarshaler() + } else if mrshlerStr == utils.JSON { + mrshler = new(engine.JSONMarshaler) + } else { + return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) + } + return &v1Redis{dbPool: p, ms: mrshler}, nil +} + +// This CMD function get a connection from the pool. +// Handles automatic failover in case of network disconnects +func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp { + c1, err := v1rs.dbPool.Get() + if err != nil { + return redis.NewResp(err) + } + result := c1.Cmd(cmd, args...) + if result.IsType(redis.IOErr) { // Failover mecanism + utils.Logger.Warning(fmt.Sprintf(" error <%s>, attempting failover.", result.Err.Error())) + c2, err := v1rs.dbPool.Get() + if err == nil { + if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) { + v1rs.dbPool.Put(c2) + return result2 + } + } + } else { + v1rs.dbPool.Put(c1) + } + return result +} + +func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) { + r := v1rs.cmd("KEYS", prefix+"*") + if r.Err != nil { + return nil, r.Err + } + return r.List() +} + +//Account methods +//get +func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + v1Acnt = &v1Account{Id: v1rs.dataKeys[*v1rs.qryIdx]} + if err := v1rs.ms.Unmarshal(strVal, v1Acnt); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1Acnt, nil +} + +//set +func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) { + key := v1AccountDBPrefix + x.Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + +//ActionPlans methods +//get +func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v1aps); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1aps, nil +} + +//set +func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) { + key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + +//Actions methods +//get +func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v1acs); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1acs, nil +} + +//set +func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) { + key := utils.ACTION_PREFIX + (*x)[0].Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + +//ActionTriggers methods +//get +func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v1acts); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1acts, nil +} + +//set +func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) { + key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} + +//SharedGroup methods +//get +func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { + if v1rs.qryIdx == nil { + v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX) + if err != nil { + return + } else if len(v1rs.dataKeys) == 0 { + return nil, utils.ErrNotFound + } + v1rs.qryIdx = utils.IntPointer(0) + } + if *v1rs.qryIdx <= len(v1rs.dataKeys)-1 { + strVal, err := v1rs.cmd("GET", v1rs.dataKeys[*v1rs.qryIdx]).Bytes() + if err != nil { + return nil, err + } + if err := v1rs.ms.Unmarshal(strVal, &v1sg); err != nil { + return nil, err + } + *v1rs.qryIdx = *v1rs.qryIdx + 1 + } else { + v1rs.qryIdx = nil + return nil, utils.ErrNoMoreData + } + return v1sg, nil +} + +//set +func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) { + key := utils.SHARED_GROUP_PREFIX + x.Id + bit, err := v1rs.ms.Marshal(x) + if err != nil { + return err + } + if err = v1rs.cmd("SET", key, bit).Err; err != nil { + return err + } + return +} diff --git a/migrator/v1SqlStor.go b/migrator/v1SqlStor.go new file mode 100644 index 000000000..12fa903fd --- /dev/null +++ b/migrator/v1SqlStor.go @@ -0,0 +1,18 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator diff --git a/migrator/v1StorDB.go b/migrator/v1StorDB.go new file mode 100644 index 000000000..fe59bde8f --- /dev/null +++ b/migrator/v1StorDB.go @@ -0,0 +1,22 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +// type v1StorDB interface { + +// } \ No newline at end of file diff --git a/utils/consts.go b/utils/consts.go index e51e1e69a..3c3d63be2 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -382,6 +382,10 @@ const ( MetaScheduler = "*scheduler" MetaCostDetails = "*cost_details" MetaAccounts = "*accounts" + MetaActionPlans = "*action_plans" + MetaActionTriggers = "*action_triggers" + MetaActions = "*actions" + MetaSharedGroups = "*shared_groups" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB" diff --git a/utils/errors.go b/utils/errors.go index 2e9a215b8..5e079f55a 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -23,6 +23,7 @@ import ( ) var ( + ErrNoMoreData = errors.New("NO_MORE_DATA") ErrNotImplemented = errors.New("NOT_IMPLEMENTED") ErrNotFound = errors.New("NOT_FOUND") ErrTimedOut = errors.New("TIMED_OUT")