added support for account migration

This commit is contained in:
edwardro22
2017-08-17 17:45:22 +03:00
parent 1acb51769a
commit 11eed08cb1
5 changed files with 125 additions and 151 deletions

View File

@@ -22,38 +22,31 @@ import (
"fmt"
"log"
"path"
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/migrator"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,*enforce,acc,atr,act,dcs,apl,shg")
migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>")
datadb_type = flag.String("datadb_type", cgrConfig.DataDbType, "The type of the DataDb database <redis>")
datadb_host = flag.String("datadb_host", cgrConfig.DataDbHost, "The DataDb host to connect to.")
datadb_port = flag.String("datadb_port", cgrConfig.DataDbPort, "The DataDb port to bind to.")
datadb_name = flag.String("datadb_name", cgrConfig.DataDbName, "The name/number of the DataDb to connect to.")
datadb_user = flag.String("datadb_user", cgrConfig.DataDbUser, "The DataDb user to sign in as.")
datadb_pass = flag.String("datadb_passwd", cgrConfig.DataDbPass, "The DataDb user's password.")
datadb_type = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
datadb_host = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
datadb_port = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
datadb_name = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
datadb_user = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
datadb_pass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
stor_db_type = flag.String("stordb_type", cgrConfig.StorDBType, "The type of the storDb database <mysql>")
stor_db_host = flag.String("stordb_host", cgrConfig.StorDBHost, "The storDb host to connect to.")
stor_db_port = flag.String("stordb_port", cgrConfig.StorDBPort, "The storDb port to bind to.")
stor_db_name = flag.String("stordb_name", cgrConfig.StorDBName, "The name/number of the storDb to connect to.")
stor_db_user = flag.String("stordb_user", cgrConfig.StorDBUser, "The storDb user to sign in as.")
stor_db_pass = flag.String("stordb_passwd", cgrConfig.StorDBPass, "The storDb user's password.")
stor_db_type = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
stor_db_host = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
stor_db_port = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
stor_db_name = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
stor_db_user = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
stor_db_pass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings")
dbdata_encoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
flush = flag.Bool("flushdb", false, "Flush the database before importing")
tpid = flag.String("tpid", "", "The tariff plan id from the database")
@@ -66,13 +59,13 @@ var (
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
rpcEncoding = flag.String("rpc_encoding", "json", "RPC encoding used <gob|json>")
historyServer = flag.String("historys", cgrConfig.RPCJSONListen, "The history server address:port, empty to disable automatic history archiving")
ralsAddress = flag.String("rals", cgrConfig.RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats", cgrConfig.RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users", cgrConfig.RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
historyServer = flag.String("historys", config.CgrConfig().RPCJSONListen, "The history server address:port, empty to disable automatic history archiving")
ralsAddress = flag.String("rals", config.CgrConfig().RPCJSONListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
cdrstatsAddress = flag.String("cdrstats", config.CgrConfig().RPCJSONListen, "CDRStats service to contact for data reloads, empty to disable automatic data reloads")
usersAddress = flag.String("users", config.CgrConfig().RPCJSONListen, "Users service to contact for data reloads, empty to disable automatic data reloads")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
loadHistorySize = flag.Int("load_history_size", cgrConfig.LoadHistorySize, "Limit the number of records in the load history")
timezone = flag.String("timezone", cgrConfig.DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`)
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
timezone = flag.String("timezone", config.CgrConfig().DefaultTimezone, `Timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>`)
disable_reverse = flag.Bool("disable_reverse_mappings", false, "Will disable reverse mappings rebuilding")
)
@@ -87,128 +80,18 @@ func main() {
var storDb engine.LoadStorage
var rater, cdrstats, users rpcclient.RpcClientConnection
var loader engine.LoadReader
if *migrateRC8 != "" {
if *datadb_type == "redis" {
var db_nb int
db_nb, err = strconv.Atoi(*datadb_name)
if err != nil {
log.Print("Redis db name must be an integer!")
return
}
host := *datadb_host
if *datadb_port != "" {
host += ":" + *datadb_port
}
migratorRC8dat, err := NewMigratorRC8(host, db_nb, *datadb_pass, *dbdata_encoding)
if err != nil {
log.Print(err.Error())
return
}
if strings.Contains(*migrateRC8, "acc") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateAccounts(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "atr") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActionTriggers(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "act") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActions(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "dcs") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateDerivedChargers(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "apl") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateActionPlans(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "shg") || strings.Contains(*migrateRC8, "*all") {
if err := migratorRC8dat.migrateSharedGroups(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "int") {
if err := migratorRC8dat.migrateAccountsInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.migrateActionTriggersInt(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.migrateActionsInt(); err != nil {
log.Print(err.Error())
}
}
if strings.Contains(*migrateRC8, "vf") {
if err := migratorRC8dat.migrateActionsInt2(); err != nil {
log.Print(err.Error())
}
if err := migratorRC8dat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
if *migrateRC8 == "*enforce" { // Ignore previous data, enforce to latest version information
if err := migratorRC8dat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
} else if *datadb_type == "mongo" {
mongoMigratorDat, err := NewMongoMigrator(*datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass)
if err != nil {
log.Print(err.Error())
return
}
if strings.Contains(*migrateRC8, "vf") {
if err := mongoMigratorDat.migrateActions(); err != nil {
log.Print(err.Error())
}
if err := mongoMigratorDat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
if *migrateRC8 == "*enforce" {
if err := mongoMigratorDat.writeVersion(); err != nil {
log.Print(err.Error())
}
}
}
log.Print("Done!")
return
}
if migrate != nil && *migrate != "" { // Run migrator
dataDB, err := engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
if err := migrator.NewMigrator(dataDB, *datadb_type, *dbdata_encoding, storDB, *stor_db_type).Migrate(*migrate); err != nil {
log.Fatal(err)
}
log.Print("Done migrating!")
return
}
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
if *fromStorDb {
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize)
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
} else if *toStorDb { // Import from csv files to storDb
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBConnMaxLifetime, cgrConfig.StorDBCDRSIndexes)
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
} else { // Default load from csv files to dataDb
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
dataDB, errDataDB = engine.ConfigureDataStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, config.CgrConfig().CacheConfig, *loadHistorySize)
}
// Defer databases opened to be closed when we are done
for _, db := range []engine.Storage{dataDB, storDb} {

View File

@@ -20,12 +20,57 @@ package main
import (
"flag"
"fmt"
"log"
"github.com/cgrates/cgrates/migrator"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
migrate = flag.String("migrate", "", "Fire up automatic migration <*cost_details|*set_versions>")
version = flag.Bool("version", false, "Prints the application version.")
dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
dataDBHost = flag.String("datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
dataDBPort = flag.String("datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
dataDBName = flag.String("datadb_name", config.CgrConfig().DataDbName, "The name/number of the DataDb to connect to.")
dataDBUser = flag.String("datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
dataDBPass = flag.String("datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
storDBType = flag.String("stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
storDBHost = flag.String("stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
storDBPort = flag.String("stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
storDBName = flag.String("stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
storDBUser = flag.String("stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
storDBPass = flag.String("stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
oldDataDBType = flag.String("old_datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
oldDataDBHost = flag.String("old_datadb_host", config.CgrConfig().DataDbHost, "The DataDb host to connect to.")
oldDataDBPort = flag.String("old_datadb_port", config.CgrConfig().DataDbPort, "The DataDb port to bind to.")
oldDataDBName = flag.String("old_datadb_name", "11", "The name/number of the DataDb to connect to.")
oldDataDBUser = flag.String("old_datadb_user", config.CgrConfig().DataDbUser, "The DataDb user to sign in as.")
oldDataDBPass = flag.String("old_datadb_passwd", config.CgrConfig().DataDbPass, "The DataDb user's password.")
oldStorDBType = flag.String("old_stordb_type", config.CgrConfig().StorDBType, "The type of the storDb database <mysql>")
oldStorDBHost = flag.String("old_stordb_host", config.CgrConfig().StorDBHost, "The storDb host to connect to.")
oldStorDBPort = flag.String("old_stordb_port", config.CgrConfig().StorDBPort, "The storDb port to bind to.")
oldStorDBName = flag.String("old_stordb_name", config.CgrConfig().StorDBName, "The name/number of the storDb to connect to.")
oldStorDBUser = flag.String("old_stordb_user", config.CgrConfig().StorDBUser, "The storDb user to sign in as.")
oldStorDBPass = flag.String("old_stordb_passwd", config.CgrConfig().StorDBPass, "The storDb user's password.")
loadHistorySize = flag.Int("load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
oldLoadHistorySize = flag.Int("old_load_history_size", config.CgrConfig().LoadHistorySize, "Limit the number of records in the load history")
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
oldDBDataEncoding = flag.String("old_dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
//nu salvez doar citesc din oldDb
//dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
//verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
//slice mapstring int cate acc [0]am citit si [1]cate acc am scris
//stats = flag.Bool("stats", false, "Generates statsistics about given data.")
)
func main() {
@@ -34,4 +79,33 @@ func main() {
fmt.Println(utils.GetCGRVersion())
return
}
}
if migrate != nil && *migrate != "" { // Run migrator
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType)
if err != nil {
log.Fatal(err)
}
if err:=m.Migrate(*migrate); err != nil {
log.Fatal(err)
}
log.Print("Done migrating!")
return
}
}