queryable versions from both cgr-migrator and cgr-console

This commit is contained in:
edwardro22
2017-11-25 16:21:21 +00:00
committed by Dan Christian Bogos
parent 0ad4595921
commit 6b84c2adb7
6 changed files with 318 additions and 77 deletions

View File

@@ -1785,7 +1785,24 @@ func TestApierReplayFailedPosts(t *testing.T) {
t.Errorf("Error %s removing folder: %s", err, dir)
}
}
}
func TestApierGetDataDBVesions(t *testing.T) {
var reply *engine.Versions
if err := rater.Call("ApierV1.GetDataDBVersions", "", &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(engine.CurrentDataDBVersions(), *reply) {
t.Errorf("Expecting : %+v, received: %+v", engine.CurrentDataDBVersions(), *reply)
}
}
func TestApierGetStorDBVesions(t *testing.T) {
var reply *engine.Versions
if err := rater.Call("ApierV1.GetStorDBVersions", "", &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(engine.CurrentStorDBVersions(), *reply) {
t.Errorf("Expecting : %+v, received: %+v", engine.CurrentStorDBVersions(), *reply)
}
}
// Simply kill the engine after we are done with tests within this file

48
apier/v1/versions.go Normal file
View File

@@ -0,0 +1,48 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// Queries all versions from stordb
func (self *ApierV1) GetStorDBVersions(arg string, reply *engine.Versions) error {
if vrs, err := self.StorDb.GetVersions(utils.TBLVersions); err != nil {
return utils.NewErrServerError(err)
} else if len(vrs) == 0 {
return utils.ErrNotFound
} else {
*reply = vrs
}
return nil
}
// Queries all versions from stordb
func (self *ApierV1) GetDataDBVersions(arg string, reply *engine.Versions) error {
if vrs, err := self.DataManager.DataDB().GetVersions(utils.TBLVersions); err != nil {
return utils.NewErrServerError(err)
} else if len(vrs) == 0 {
return utils.ErrNotFound
} else {
*reply = vrs
}
return nil
}

View File

@@ -32,7 +32,7 @@ import (
var (
sameDBname = true
inDataDB migrator.MigratorDataDB
outDataDB migrator.MigratorDataDB
storDB engine.Storage
oDBDataEncoding string
migrate = flag.String("migrate", "", "Fire up automatic migration "+
@@ -46,12 +46,12 @@ var (
outDataDBUser = flag.String("out_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
outDataDBPass = flag.String("out_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
outStorDBType = flag.String("out_stordb_type", "", "The type of the storDb Database <mysql|postgres>")
outStorDBHost = flag.String("out_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
outStorDBPort = flag.String("out_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
outStorDBName = flag.String("out_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
outStorDBUser = flag.String("out_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
outStorDBPass = flag.String("out_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
outStorDBType = flag.String("out_stordb_type", "", "The type of the StorDB Database <mysql|postgres>")
outStorDBHost = flag.String("out_stordb_host", config.CgrConfig().StorDBHost, "The StorDB host to connect to.")
outStorDBPort = flag.String("out_stordb_port", config.CgrConfig().StorDBPort, "The StorDB port to bind to.")
outStorDBName = flag.String("out_stordb_name", config.CgrConfig().StorDBName, "The name/number of the StorDB to connect to.")
outStorDBUser = flag.String("out_stordb_user", config.CgrConfig().StorDBUser, "The StorDB user to sign in as.")
outStorDBPass = flag.String("out_stordb_passwd", config.CgrConfig().StorDBPass, "The StorDB user's password.")
inDataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb Database <redis>")
inDataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
@@ -60,15 +60,18 @@ var (
inDataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
inDataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
inStorDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb Database <mysql|postgres>")
inStorDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
inStorDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
inStorDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
inStorDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
inStorDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
inStorDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the StorDB Database <mysql|postgres>")
inStorDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The StorDB host to connect to.")
inStorDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The StorDB port to bind to.")
inStorDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the StorDB to connect to.")
inStorDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The StorDB user to sign in as.")
inStorDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The StorDB user's password.")
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
datadb_versions = flag.Bool("datadb_versions", false, "Print DataDB versions")
stordb_versions = flag.Bool("stordb_versions", false, "Print StorDB versions")
dbDataEncoding = flag.String("dbData_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object Data in strings")
inDBDataEncoding = flag.String("in_dbData_encoding", "", "The encoding used to store object Data in strings")
dryRun = flag.Bool("dry_run", false, "When true will not save loaded Data to DataDb but just parse it for consistency and errors.")
@@ -82,61 +85,80 @@ func main() {
fmt.Println(utils.GetCGRVersion())
return
}
if migrate != nil && *migrate != "" { // Run migrator
if *verbose {
log.Print("Initializing DataDB:", *inDataDBType)
log.Print("Initializing storDB:", *inStorDBType)
if *verbose {
log.Print("Initializing DataDB:", *inDataDBType)
log.Print("Initializing storDB:", *inStorDBType)
}
var dmIN *engine.DataManager
dmIN, _ = engine.ConfigureDataStorage(*inDataDBType, *inDataDBHost, *inDataDBPort,
*inDataDBName, *inDataDBUser, *inDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
instorDB, err := engine.ConfigureStorStorage(*inStorDBType, *inStorDBHost, *inStorDBPort, *inStorDBName, *inStorDBUser, *inStorDBPass, *inDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
if *outDataDBType == "" {
*outDataDBType = *inDataDBType
*outDataDBHost = *inDataDBHost
*outDataDBPort = *inDataDBPort
*outDataDBName = *inDataDBName
*outDataDBUser = *inDataDBUser
*outDataDBPass = *inDataDBPass
}
if *verbose {
log.Print("Initializing outDataDB:", *outDataDBType)
}
var dmOUT *engine.DataManager
dmOUT, _ = engine.ConfigureDataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort,
*outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
outDataDB, err := migrator.ConfigureV1DataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort, *outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding)
if err != nil {
log.Fatal(err)
}
storDB = instorDB
if *verbose {
if *outStorDBType != "" {
log.Print("Initializing outStorDB:", *outStorDBType)
} else {
log.Print("Initializing outStorDB:", *inStorDBType)
}
var dmIN *engine.DataManager
dmIN, _ = engine.ConfigureDataStorage(*inDataDBType, *inDataDBHost, *inDataDBPort, *inDataDBName, *inDataDBUser, *inDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
instorDB, err := engine.ConfigureStorStorage(*inStorDBType, *inStorDBHost, *inStorDBPort, *inStorDBName, *inStorDBUser, *inStorDBPass, *inDBDataEncoding,
}
if *outStorDBType != "" {
storDB, err = engine.ConfigureStorStorage(*outStorDBType, *outStorDBHost, *outStorDBPort, *outStorDBName, *outStorDBUser, *outStorDBPass, *dbDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
if *outDataDBType == "" {
*outDataDBType = *inDataDBType
*outDataDBHost = *inDataDBHost
*outDataDBPort = *inDataDBPort
*outDataDBName = *inDataDBName
*outDataDBUser = *inDataDBUser
*outDataDBPass = *inDataDBPass
}
if *verbose {
log.Print("Migrating: ", *migrate)
}
if inDataDBName != outDataDBName || inStorDBName != outStorDBName {
sameDBname = false
}
m, err := migrator.NewMigrator(dmIN, dmOUT, *inDataDBType, *dbDataEncoding, storDB, *inStorDBType, outDataDB,
*outDataDBType, *inDBDataEncoding, instorDB, *outStorDBType, *dryRun, sameDBname, *datadb_versions, *stordb_versions)
if err != nil {
log.Fatal(err)
}
if *datadb_versions {
vrs, _ := dmOUT.DataDB().GetVersions(utils.TBLVersions)
if len(vrs) != 0 {
log.Print("DataDB versions :", vrs)
} else {
log.Print("DataDB versions not_found")
}
if *verbose {
log.Print("Initializing outDataDB:", *outDataDBType)
}
var dmOUT *engine.DataManager
dmOUT, _ = engine.ConfigureDataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort, *outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding, config.CgrConfig().CacheCfg(), *loadHistorySize)
inDataDB, err := migrator.ConfigureV1DataStorage(*outDataDBType, *outDataDBHost, *outDataDBPort, *outDataDBName, *outDataDBUser, *outDataDBPass, *dbDataEncoding)
if err != nil {
log.Fatal(err)
}
storDB = instorDB
if *verbose {
if *outStorDBType != "" {
log.Print("Initializing outStorDB:", *outStorDBType)
} else {
log.Print("Initializing outStorDB:", *inStorDBType)
}
}
if *outStorDBType != "" {
storDB, err = engine.ConfigureStorStorage(*outStorDBType, *outStorDBHost, *outStorDBPort, *outStorDBName, *outStorDBUser, *outStorDBPass, *dbDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
}
if *verbose {
log.Print("Migrating: ", *migrate)
}
if inDataDBName != outDataDBName || inStorDBName != outStorDBName {
sameDBname = false
}
m, err := migrator.NewMigrator(dmIN, dmOUT, *inDataDBType, *dbDataEncoding, storDB, *inStorDBType, inDataDB, *outDataDBType, *inDBDataEncoding, instorDB, *outStorDBType, *dryRun, sameDBname)
if err != nil {
log.Fatal(err)
}
if *stordb_versions {
vrs, _ := storDB.GetVersions(utils.TBLVersions)
if len(vrs) != 0 {
log.Print("StorDB versions :", vrs)
} else {
log.Print("StorDB versions not_found")
}
}
if migrate != nil && *migrate != "" { // Run migrator
migrstats := make(map[string]int)
mig := strings.Split(*migrate, ",")
log.Print("migrating", mig)

View File

@@ -0,0 +1,68 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import (
"github.com/cgrates/cgrates/engine"
)
func init() {
c := &CmdGetDataDBVersions{
name: "datadb_versions",
rpcMethod: "ApierV1.GetDataDBVersions",
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type CmdGetDataDBVersions struct {
name string
rpcMethod string
rpcParams *EmptyWrapper
*CommandExecuter
}
func (self *CmdGetDataDBVersions) Name() string {
return self.name
}
func (self *CmdGetDataDBVersions) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdGetDataDBVersions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &EmptyWrapper{}
}
return self.rpcParams
}
func (self *CmdGetDataDBVersions) PostprocessRpcParams() error {
return nil
}
func (self *CmdGetDataDBVersions) RpcResult() interface{} {
s := engine.Versions{}
return &s
}
func (self *CmdGetDataDBVersions) ClientArgs() (args []string) {
return
}

View File

@@ -0,0 +1,68 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import (
"github.com/cgrates/cgrates/engine"
)
func init() {
c := &CmdGetStorDBVersions{
name: "stordb_versions",
rpcMethod: "ApierV1.GetStorDBVersions",
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type CmdGetStorDBVersions struct {
name string
rpcMethod string
rpcParams *EmptyWrapper
*CommandExecuter
}
func (self *CmdGetStorDBVersions) Name() string {
return self.name
}
func (self *CmdGetStorDBVersions) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdGetStorDBVersions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &EmptyWrapper{}
}
return self.rpcParams
}
func (self *CmdGetStorDBVersions) PostprocessRpcParams() error {
return nil
}
func (self *CmdGetStorDBVersions) RpcResult() interface{} {
s := engine.Versions{}
return &s
}
func (self *CmdGetStorDBVersions) ClientArgs() (args []string) {
return
}

View File

@@ -26,7 +26,9 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB MigratorDataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string, dryRun bool, sameDBname bool) (m *Migrator, err error) {
func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType, dataDBEncoding string,
storDB engine.Storage, storDBType string, oldDataDB MigratorDataDB, oldDataDBType, oldDataDBEncoding string,
oldStorDB engine.Storage, oldStorDBType string, dryRun bool, sameDBname bool, datadb_versions bool, stordb_versions bool) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
if dataDBEncoding == utils.MSGPACK {
@@ -46,26 +48,28 @@ func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType
mrshlr: mrshlr, dmIN: dmIN,
oldDataDB: oldDataDB, oldDataDBType: oldDataDBType,
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType,
oldmrshlr: oldmrshlr, dryRun: dryRun, sameDBname: sameDBname, stats: stats,
oldmrshlr: oldmrshlr, dryRun: dryRun, sameDBname: sameDBname, datadb_versions: datadb_versions, stordb_versions: stordb_versions, stats: stats,
}
return m, err
}
type Migrator struct {
dmIN *engine.DataManager //oldatadb
dmOut *engine.DataManager
dataDBType string
storDB engine.Storage
storDBType string
mrshlr engine.Marshaler
oldDataDB MigratorDataDB
oldDataDBType string
oldStorDB engine.Storage
oldStorDBType string
oldmrshlr engine.Marshaler
dryRun bool
sameDBname bool
stats map[string]int
dmIN *engine.DataManager //oldatadb
dmOut *engine.DataManager
dataDBType string
storDB engine.Storage
storDBType string
mrshlr engine.Marshaler
oldDataDB MigratorDataDB
oldDataDBType string
oldStorDB engine.Storage
oldStorDBType string
oldmrshlr engine.Marshaler
dryRun bool
sameDBname bool
datadb_versions bool
stordb_versions bool
stats map[string]int
}
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
@@ -93,6 +97,20 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
if m.datadb_versions {
vrs, err := m.dmOut.DataDB().GetVersions(utils.TBLVersions)
if err != nil {
return err, nil
}
log.Print("After migrate, DataDB versions :", vrs)
}
if m.stordb_versions {
vrs, err := m.storDB.GetVersions(utils.TBLVersions)
if err != nil {
return err, nil
}
log.Print("After migrate, StorDB versions :", vrs)
}
} else {
log.Print("Cannot dryRun SetVersions!")
}