Added dryRun , stats and multiple variable migration

This commit is contained in:
edwardro22
2017-09-17 23:44:12 +00:00
parent 1e53d05a0a
commit a22f1b797f
10 changed files with 204 additions and 118 deletions

View File

@@ -22,6 +22,7 @@ import (
"flag"
"fmt"
"log"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -35,7 +36,7 @@ var (
oStorDBType string
odataDBType string
oDBDataEncoding string
migrate = flag.String("migrate", "", "Fire up automatic migration <*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups>")
migrate = flag.String("migrate", "", "Fire up automatic migration *to use multiple values use ',' as separator \n <*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups> ")
version = flag.Bool("version", false, "Prints the application version.")
dataDBType = flag.String("datadb_type", config.CgrConfig().DataDbType, "The type of the DataDb database <redis>")
@@ -71,12 +72,9 @@ var (
dbDataEncoding = flag.String("dbdata_encoding", config.CgrConfig().DBDataEncoding, "The encoding used to store object data in strings")
oldDBDataEncoding = flag.String("old_dbdata_encoding", "", "The encoding used to store object data in strings")
//TO DO:
//dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
//verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
//slice mapstring int cate acc [0]am citit si [1]cate acc am scris
//stats = flag.Bool("stats", false, "Generates statsistics about given data.")
dryRun = flag.Bool("dry_run", false, "When true will not save loaded data to dataDb but just parse it for consistency and errors.")
verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
stats = flag.Bool("stats", false, "Generates statsistics about given data.")
)
func main() {
@@ -86,6 +84,10 @@ func main() {
return
}
if migrate != nil && *migrate != "" { // Run migrator
if *verbose {
log.Print("Initializing dataDB:", *dataDBType)
log.Print("Initializing storDB:", *storDBType)
}
dataDB, err := engine.ConfigureDataStorage(*dataDBType, *dataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding, config.CgrConfig().CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
@@ -95,7 +97,6 @@ func main() {
if err != nil {
log.Fatal(err)
}
if *oldDataDBType == "" {
*oldDataDBType = *dataDBType
*oldDataDBHost = *dataDBHost
@@ -103,13 +104,23 @@ func main() {
*oldDataDBName = *dataDBName
*oldDataDBUser = *dataDBUser
*oldDataDBPass = *dataDBPass
}
if *verbose {
log.Print("Initializing oldDataDB:", *oldDataDBType)
}
oldDataDB, err := migrator.ConfigureV1DataStorage(*oldDataDBType, *oldDataDBHost, *dataDBPort, *dataDBName, *dataDBUser, *dataDBPass, *dbDataEncoding)
if err != nil {
log.Fatal(err)
}
oldstorDB = storDB
if *verbose {
if *oldStorDBType != "" {
log.Print("Initializing oldstorDB:", *oldStorDBType)
} else {
log.Print("Initializing oldstorDB:", *storDBType)
}
}
if *oldStorDBType != "" {
oldstorDB, err = engine.ConfigureStorStorage(oStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
@@ -117,16 +128,30 @@ func main() {
log.Fatal(err)
}
}
m, err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType, oldDataDB, *oldDataDBType, *oldDBDataEncoding, oldstorDB, *oldStorDBType)
if *verbose {
log.Print("Migrating: ", *migrate)
}
m, err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType, oldDataDB, *oldDataDBType, *oldDBDataEncoding, oldstorDB, *oldStorDBType, *dryRun)
if err != nil {
log.Fatal(err)
}
err = m.Migrate(*migrate)
migrstats := make(map[string]int)
mig := strings.Split(*migrate, ",")
log.Print("migrating", mig)
err, migrstats = m.Migrate(mig)
if err != nil {
log.Fatal(err)
}
if *stats != false {
for k, v := range migrstats {
log.Print(" ", k, " : ", v)
}
}
if *verbose {
log.Print("Done migrating!")
}
log.Print("Done migrating!")
return
}
}

View File

@@ -121,23 +121,5 @@ echo 'Setting version for CostDetails'
;;
esac
echo 'Executing command cgr-migrator -migrate="*cost_details"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*cost_details"
echo 'Executing command cgr-migrator -migrate="*accounts"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*accounts"
echo 'Executing command cgr-migrator -migrate="*actions"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*actions"
echo 'Executing command cgr-migrator -migrate="*action_triggers"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*action_triggers"
echo 'Executing command cgr-migrator -migrate="*action_plans"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*action_plans"
echo 'Executing command cgr-migrator -migrate="*shared_groups"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*shared_groups"
echo 'Executing command cgr-migrator -migrate="*set_versions"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -migrate="*set_versions"
echo 'Executing command cgr-migrator -migrate="*cost_details,*accounts,*actions,*action_triggers,*action_plans,*shared_groups,*set_versions"'
cgr-migrator -datadb_host=$cgr_from_host -datadb_name=$cgr_to_db -datadb_passwd=$cgr_from_pass -datadb_port=$cgr_from_port -datadb_type=$datadb -stordb_host=$host -stordb_name=$user -stordb_passwd=$PGPASSWORD -stordb_port=$port -stordb_type=$stordb -stordb_user=$user -verbose=true -stats=true -migrate="*cost_details,*accounts,*actions,*action_triggers,*action_plans,*shared_groups,*set_versions"

View File

@@ -509,8 +509,8 @@ Command line migration tool.
cgrates@OCS:~$ cgr-migrator --help
Usage of cgr-migrator:
-datadb_host string
The DataDb host to connect to. (default "127.0.0.1")
-datadb_host string
The DataDb host to connect to. (default "192.168.100.40")
-datadb_name string
The name/number of the DataDb to connect to. (default "10")
-datadb_passwd string
@@ -523,12 +523,15 @@ Command line migration tool.
The DataDb user to sign in as. (default "cgrates")
-dbdata_encoding string
The encoding used to store object data in strings (default "msgpack")
-dry_run
When true will not save loaded data to dataDb but just parse it for consistency and errors.(default "false")
-load_history_size int
Limit the number of records in the load history (default 10)
-migrate string
Fire up automatic migration <*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups>
Fire up automatic migration *to use multiple values use ',' as separator
<*set_versions|*cost_details|*accounts|*actions|*action_triggers|*action_plans|*shared_groups>
-old_datadb_host string
The DataDb host to connect to. (default "127.0.0.1")
The DataDb host to connect to. (default "192.168.100.40")
-old_datadb_name string
The name/number of the DataDb to connect to. (default "10")
-old_datadb_passwd string
@@ -544,7 +547,7 @@ Command line migration tool.
-old_load_history_size int
Limit the number of records in the load history
-old_stordb_host string
The storDb host to connect to. (default "127.0.0.1")
The storDb host to connect to. (default "192.168.100.40")
-old_stordb_name string
The name/number of the storDb to connect to. (default "cgrates")
-old_stordb_passwd string
@@ -555,8 +558,10 @@ Command line migration tool.
The type of the storDb database <mysql|postgres>
-old_stordb_user string
The storDb user to sign in as. (default "cgrates")
-stats
Generates statsistics about given data.(default "false")
-stordb_host string
The storDb host to connect to. (default "127.0.0.1")
The storDb host to connect to. (default "192.168.100.40")
-stordb_name string
The name/number of the storDb to connect to. (default "cgrates")
-stordb_passwd string
@@ -567,5 +572,7 @@ Command line migration tool.
The type of the storDb database <mysql|postgres> (default "mysql")
-stordb_user string
The storDb user to sign in as. (default "cgrates")
-verbose
Enable detailed verbose logging output.(default "false")
-version
Prints the application version.

View File

@@ -45,18 +45,24 @@ func (m *Migrator) migrateAccounts() (err error) {
}
if v1Acnt != nil {
acnt := v1Acnt.AsAccount()
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
if m.dryRun != true {
if err = m.dataDB.SetAccount(acnt); err != nil {
return err
}
m.stats[utils.Accounts] += 1
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
}
}
return
}

View File

@@ -55,19 +55,23 @@ func (m *Migrator) migrateActions() (err error) {
acts = append(acts, act)
}
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
if m.dryRun != true {
if err := m.dataDB.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.Actions] += 1
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Actions version into dataDB", err.Error()))
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating Actions version into dataDB", err.Error()))
}
}
return
}

View File

@@ -60,19 +60,24 @@ func (m *Migrator) migrateActionPlans() (err error) {
if *v1APs != nil {
for _, v1ap := range *v1APs {
ap := v1ap.AsActionPlan()
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
if m.dryRun != true {
if err = m.dataDB.SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionPlans] += 1
}
}
}
}
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionPlans: engine.CurrentDataDBVersions()[utils.ActionPlans]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionPlans version into dataDB", err.Error()))
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionPlans: engine.CurrentDataDBVersions()[utils.ActionPlans]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionPlans version into dataDB", err.Error()))
}
}
return
}

View File

@@ -28,8 +28,8 @@ import (
)
type v1ActionTrigger struct {
Id string // for visual identification
ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance
Id string // for visual identification
ThresholdType string //*min_counter, *max_counter, *min_balance, *max_balance
ThresholdValue float64
Recurrent bool // reset eexcuted flag each run
MinSleep time.Duration // Minimum duration between two executions in case of recurrent triggers
@@ -69,10 +69,44 @@ func (m *Migrator) migrateActionTriggers() (err error) {
acts = append(acts, act)
}
if err := m.dataDB.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
return err
if m.dryRun != true {
if err := m.dataDB.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionTriggers] += 1
}
}
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionTriggers: engine.CurrentDataDBVersions()[utils.ActionTriggers]}
if err = m.dataDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating ActionTriggers version into DataDB", err.Error()))
}
}
return
}
func (m *Migrator) dryRunActionTriggers() (err error) {
var v1ACTs *v1ActionTriggers
var acts engine.ActionTriggers
for {
v1ACTs, err = m.oldDataDB.getV1ActionTriggers()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if *v1ACTs != nil {
for _, v1ac := range *v1ACTs {
act := v1ac.AsActionTrigger()
acts = append(acts, act)
}
}
}
// All done, update version wtih current one

View File

@@ -102,19 +102,22 @@ func (m *Migrator) migrateCostDetails() (err error) {
fmt.Sprintf("<Migrator> Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id))
continue
}
if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id))
continue
if m.dryRun != true {
if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id))
continue
}
}
m.stats[utils.COST_DETAILS] += 1
// All done, update version wtih current one
vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]}
if err := m.storDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]}
if err := m.storDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
}
return
}

View File

@@ -20,11 +20,13 @@ package migrator
import (
"fmt"
"log"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB V1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) {
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB V1DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string, dryRun bool) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
if dataDBEncoding == utils.MSGPACK {
@@ -36,11 +38,14 @@ func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB
} else if oldDataDBEncoding == utils.JSON {
oldmrshlr = new(engine.JSONMarshaler)
}
stats := make(map[string]int)
m = &Migrator{
dataDB: dataDB, dataDBType: dataDBType,
storDB: storDB, storDBType: storDBType, mrshlr: mrshlr,
oldDataDB: oldDataDB, oldDataDBType: oldDataDBType,
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr: oldmrshlr,
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType,
oldmrshlr: oldmrshlr, dryRun: dryRun, stats: stats,
}
return m, err
}
@@ -56,42 +61,54 @@ type Migrator struct {
oldStorDB engine.Storage
oldStorDBType string
oldmrshlr engine.Marshaler
dryRun bool
stats map[string]int
}
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
func (m *Migrator) Migrate(taskID string) (err error) {
switch taskID {
default: // unsupported taskID
err = utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedMigrationTask,
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
case utils.MetaSetVersions:
if err := m.storDB.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
stats = make(map[string]int)
for _, taskID := range taskIDs {
log.Print("migrating", taskID)
switch taskID {
default: // unsupported taskID
err = utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedMigrationTask,
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
case utils.MetaSetVersions:
if m.dryRun != true {
if err := m.storDB.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
if err := m.dataDB.SetVersions(engine.CurrentDBVersions(m.dataDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
} else {
log.Print("Cannot dryRun SetVersions!")
}
case utils.MetaCostDetails:
err = m.migrateCostDetails()
case utils.MetaAccounts:
err = m.migrateAccounts()
case utils.MetaActionPlans:
err = m.migrateActionPlans()
case utils.MetaActionTriggers:
err = m.migrateActionTriggers()
case utils.MetaActions:
err = m.migrateActions()
case utils.MetaSharedGroups:
err = m.migrateSharedGroups()
}
if err := m.dataDB.SetVersions(engine.CurrentDBVersions(m.dataDBType), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
}
case utils.MetaCostDetails:
err = m.migrateCostDetails()
case utils.MetaAccounts:
err = m.migrateAccounts()
case utils.MetaActionPlans:
err = m.migrateActionPlans()
case utils.MetaActionTriggers:
err = m.migrateActionTriggers()
case utils.MetaActions:
err = m.migrateActions()
case utils.MetaSharedGroups:
err = m.migrateSharedGroups()
}
for k, v := range m.stats {
stats[k] = v
}
return
}

View File

@@ -43,8 +43,11 @@ func (m *Migrator) migrateSharedGroups() (err error) {
}
if v1SG != nil {
acnt := v1SG.AsSharedGroup()
if err = m.dataDB.SetSharedGroup(acnt, utils.NonTransactional); err != nil {
return err
if m.dryRun != true {
if err = m.dataDB.SetSharedGroup(acnt, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.SharedGroups] += 1
}
}
}