diff --git a/cmd/cgr-migrator/cgr-migrator.go b/cmd/cgr-migrator/cgr-migrator.go index 16e61765d..d6efc57a8 100755 --- a/cmd/cgr-migrator/cgr-migrator.go +++ b/cmd/cgr-migrator/cgr-migrator.go @@ -85,31 +85,24 @@ if migrate != nil && *migrate != "" { // Run migrator if err != nil { log.Fatal(err) } - log.Print("#1 The redis db loaded") oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize) if err != nil { log.Fatal(err) } - log.Print("#2 Old redis db loaded") storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding, config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) if err != nil { log.Fatal(err) } - log.Print("#3 Old mysql db loaded") - oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding, config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes) if err != nil { log.Fatal(err) } - log.Print("#4 Old mysql db loaded") - m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType) if err != nil { log.Fatal(err) } - log.Print("#5 Migrator started") err = m.Migrate(*migrate); if err != nil { log.Fatal(err) diff --git a/data/storage/mysql/mysql_cdr_migration.sql b/data/storage/mysql/mysql_cdr_migration.sql index 833bad2f1..8a3c39a5a 100644 --- a/data/storage/mysql/mysql_cdr_migration.sql +++ b/data/storage/mysql/mysql_cdr_migration.sql @@ -9,69 +9,49 @@ You can increase or lower the value of step in the line after BEGIN below. You have to use 'CALL cgrates.migration();' to execute the script. If named other then default use that database name. */ + DELIMITER // CREATE PROCEDURE `migration`() BEGIN - /* DECLARE variables */ - DECLARE max_cdrs bigint; - DECLARE start_id bigint; - DECLARE end_id bigint; - DECLARE step bigint; - /* Optimize table for performance */ - ALTER TABLE cdrs DISABLE KEYS; - SET autocommit=0; - SET unique_checks=0; - SET foreign_key_checks=0; - /* You must change the step var to commit every step rows inserted */ - SET step := 10000; - SET start_id := 0; - SET end_id := start_id + step; - SET max_cdrs = (select max(id) from rated_cdrs); - WHILE (start_id <= max_cdrs) DO - INSERT INTO - cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,supplier,extra_fields,cost,extra_info, created_at, updated_at, deleted_at) - SELECT - cdrs_primary.cgrid, - rated_cdrs.runid as run_id, - cdrs_primary.cdrhost as origin_host, - cdrs_primary.cdrsource as source, - cdrs_primary.accid as origin_id, - cdrs_primary.tor, - rated_cdrs.reqtype as request_type, - rated_cdrs.direction, - rated_cdrs.tenant,rated_cdrs.category, - rated_cdrs.account, - rated_cdrs.subject, - rated_cdrs.destination, - rated_cdrs.setup_time, - rated_cdrs.answer_time, - rated_cdrs.`usage`, - rated_cdrs.supplier, - cdrs_extra.extra_fields, - rated_cdrs.cost, - rated_cdrs.extra_info, - rated_cdrs.created_at, - rated_cdrs.updated_at, - rated_cdrs.deleted_at - FROM rated_cdrs - INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid - LEFT JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid - INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid - WHERE cdrs_primary.`usage` > '0' - AND not exists (select 1 from cdrs c where c.cgrid = cdrs_primary.cgrid) - AND rated_cdrs.id >= start_id - AND rated_cdrs.id < end_id - GROUP BY cgrid, run_id, origin_id; - SET start_id = start_id + step; - SET end_id = end_id + step; - END WHILE; - /* SET Table for live usage */ - SET autocommit=1; - SET unique_checks=1; - SET foreign_key_checks=1; - ALTER TABLE cdrs ENABLE KEYS; - OPTIMIZE TABLE cdrs; + /* DECLARE variables */ + DECLARE max_cdrs bigint; + DECLARE start_id bigint; + DECLARE end_id bigint; + DECLARE step bigint; + /* Optimize table for performance */ + ALTER TABLE cdrs DISABLE KEYS; + SET autocommit=0; + SET unique_checks=0; + SET foreign_key_checks=0; + /* You must change the step var to commit every step rows inserted */ + SET step := 10000; + SET start_id := 0; + SET end_id := start_id + step; + SET max_cdrs = (select max(id) from rated_cdrs); + WHILE (start_id <= max_cdrs) DO + INSERT INTO + cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,pdd,answer_time,`usage`,supplier,disconnect_cause,extra_fields,cost_source,cost,cost_details,extra_info, created_at, updated_at, deleted_at) + SELECT cdrs_primary.cgrid,rated_cdrs.runid as run_id,cdrs_primary.cdrhost as origin_host,cdrs_primary.cdrsource as source,cdrs_primary.accid as origin_id, cdrs_primary.tor,rated_cdrs.reqtype as request_type,rated_cdrs.direction, rated_cdrs.tenant,rated_cdrs.category, rated_cdrs.account, rated_cdrs.subject, rated_cdrs.destination,rated_cdrs.setup_time,rated_cdrs.pdd,rated_cdrs.answer_time,rated_cdrs.`usage`,rated_cdrs.supplier,rated_cdrs.disconnect_cause,cdrs_extra.extra_fields,cost_details.cost_source,rated_cdrs.cost,cost_details.timespans as cost_details,rated_cdrs.extra_info,rated_cdrs.created_at,rated_cdrs.updated_at, rated_cdrs.deleted_at + FROM rated_cdrs + INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid + INNER JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid + INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid + WHERE cdrs_primary.`usage` > '0' + AND not exists (select 1 from cdrs where cdrs.cgrid = cdrs_primary.cgrid AND cdrs.run_id=rated_cdrs.runid) + AND rated_cdrs.id >= start_id + AND rated_cdrs.id < end_id + GROUP BY cgrid, run_id, origin_id; + SET start_id = start_id + step; + SET end_id = end_id + step; + END WHILE; + /* SET Table for live usage */ + SET autocommit=1; + SET unique_checks=1; + SET foreign_key_checks=1; + ALTER TABLE cdrs ENABLE KEYS; + OPTIMIZE TABLE cdrs; END // DELIMITER ; + diff --git a/migrator/accounts.go b/migrator/accounts.go index 5cece15bd..adbcb58e7 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -36,21 +36,21 @@ const ( func (m *Migrator) migrateAccounts() (err error) { switch m.dataDBType { case utils.REDIS: - log.Print("#9 Starts migrateAccounts") + log.Print("#9 Starts migrateAccounts") var acntV1Keys []string acntV1Keys, err = m.oldDataDB.GetKeysForPrefix(v1AccountDBPrefix) if err != nil { return } - log.Print("#10 it doesn't get to here",acntV1Keys) - + log.Print("#10 it doesn't get to here", acntV1Keys) + for _, acntV1Key := range acntV1Keys { - log.Print("#11 acc key:",acntV1Key) + log.Print("#11 acc key:", acntV1Key) v1Acnt, err := m.getV1AccountFromDB(acntV1Key) if err != nil { return err } - log.Print("#8 it doesn't get to here") + log.Print("#8 it doesn't get to here") if v1Acnt != nil { acnt := v1Acnt.AsAccount() if err = m.dataDB.SetAccount(acnt); err != nil { @@ -58,7 +58,7 @@ func (m *Migrator) migrateAccounts() (err error) { } } } - log.Print("#8 it doesn't get to here") + log.Print("#8 it doesn't get to here") // All done, update version wtih current one vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]} if err = m.dataDB.SetVersions(vrs, false); err != nil { @@ -67,7 +67,7 @@ func (m *Migrator) migrateAccounts() (err error) { err.Error(), fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error())) } - log.Print("#8 it doesn't get to here") + log.Print("#8 it doesn't get to here") return case utils.MONGO: dataDB := m.dataDB.(*engine.MongoStorage) @@ -102,19 +102,19 @@ func (m *Migrator) migrateAccounts() (err error) { func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) { switch m.oldDataDBType { case utils.REDIS: - log.Print("#12 start get ") + log.Print("#12 start get ") dataDB := m.oldDataDB.(*engine.RedisStorage) - log.Print("#12 start get") + log.Print("#12 start get") if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil { return nil, err } else { - log.Print("#12 start get") + log.Print("#12 start get") v1Acnt := &v1Account{Id: key} - log.Print("#12 start get") + log.Print("#12 start get") if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil { return nil, err } - log.Print("#12 start get") + log.Print("#12 start get") return v1Acnt, nil } case utils.MONGO: @@ -195,7 +195,7 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) { for oldBalKey, oldBalChain := range v1Acc.BalanceMap { keyElements := strings.Split(oldBalKey, "*") newBalKey := "*" + keyElements[1] - newBalDirection := idElements[0] + newBalDirection := idElements[0] ac.BalanceMap[newBalKey] = make(engine.Balances, len(oldBalChain)) for index, oldBal := range oldBalChain { // check default to set new id diff --git a/migrator/accounts_test.go b/migrator/accounts_test.go index ad742ee60..f613e2bee 100755 --- a/migrator/accounts_test.go +++ b/migrator/accounts_test.go @@ -37,7 +37,7 @@ func TestV1AccountAsAccount(t *testing.T) { newAcc := v1Acc.AsAccount() if !reflect.DeepEqual(testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) { t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) - }else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) { + } else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) { t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) } } diff --git a/migrator/costdetails.go b/migrator/costdetails.go index 378ce3afd..411e37021 100644 --- a/migrator/costdetails.go +++ b/migrator/costdetails.go @@ -21,6 +21,7 @@ import ( "database/sql" "encoding/json" "fmt" + "log" "time" "github.com/cgrates/cgrates/engine" @@ -47,12 +48,13 @@ func (m *Migrator) migrateCostDetails() (err error) { "version number is not defined for CostDetails model") } if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1 + log.Print("Wrong version") return } var storSQL *sql.DB switch m.storDBType { case utils.MYSQL: - storSQL = m.storDB.(*engine.MySQLStorage).Db + storSQL = m.storDB.(*engine.SQLStorage).Db case utils.POSTGRES: storSQL = m.storDB.(*engine.PostgresStorage).Db default: @@ -61,19 +63,22 @@ func (m *Migrator) migrateCostDetails() (err error) { utils.UnsupportedDB, fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) } - rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL") + rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error())) } + defer rows.Close() + for cnt := 0; rows.Next(); cnt++ { var id int64 var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString var ccCost sql.NullFloat64 var tts []byte + if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -89,6 +94,7 @@ func (m *Migrator) migrateCostDetails() (err error) { v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String, Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String, Cost: ccCost.Float64, Timespans: v1tmsps} + cc := v1CC.AsCallCost() if cc == nil { utils.Logger.Warning( diff --git a/migrator/migrator.go b/migrator/migrator.go index 0bd0a72c1..eeefbe8ab 100755 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -19,43 +19,42 @@ package migrator import ( "fmt" - "log" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string,oldDataDB engine.DataDB,oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator,err error) { +func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB engine.DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) { var mrshlr engine.Marshaler var oldmrshlr engine.Marshaler if dataDBEncoding == utils.MSGPACK { mrshlr = engine.NewCodecMsgpackMarshaler() } else if dataDBEncoding == utils.JSON { mrshlr = new(engine.JSONMarshaler) - }else if oldDataDBEncoding == utils.MSGPACK { + } else if oldDataDBEncoding == utils.MSGPACK { oldmrshlr = engine.NewCodecMsgpackMarshaler() - }else if oldDataDBEncoding == utils.JSON { + } else if oldDataDBEncoding == utils.JSON { oldmrshlr = new(engine.JSONMarshaler) } m = &Migrator{ dataDB: dataDB, dataDBType: dataDBType, storDB: storDB, storDBType: storDBType, mrshlr: mrshlr, - oldDataDB: oldDataDB, oldDataDBType: oldDataDBType, - oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr:oldmrshlr, + oldDataDB: oldDataDB, oldDataDBType: oldDataDBType, + oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr: oldmrshlr, } - return m,err + return m, err } type Migrator struct { - dataDB engine.DataDB - dataDBType string - storDB engine.Storage - storDBType string - mrshlr engine.Marshaler + dataDB engine.DataDB + dataDBType string + storDB engine.Storage + storDBType string + mrshlr engine.Marshaler oldDataDB engine.DataDB oldDataDBType string oldStorDB engine.Storage oldStorDBType string - oldmrshlr engine.Marshaler + oldmrshlr engine.Marshaler } // Migrate implements the tasks to migrate, used as a dispatcher to the individual methods @@ -76,17 +75,15 @@ func (m *Migrator) Migrate(taskID string) (err error) { case utils.MetaCostDetails: err = m.migrateCostDetails() case utils.MetaAccounts: - log.Print("#7 function is about to start") err = m.migrateAccounts() - case "migrateActionPlans": + case utils.MetaActionPlans: err = m.migrateActionPlans() - case "migrateActionTriggers": + case utils.MetaActionTriggers: err = m.migrateActionTriggers() - case "migrateActions": + case utils.MetaActions: err = m.migrateActions() - case "migrateSharedGroups": + case utils.MetaSharedGroups: err = m.migrateSharedGroups() } - return } diff --git a/utils/consts.go b/utils/consts.go index 451bdbf25..d4bdd7ccf 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -380,6 +380,10 @@ const ( MetaScheduler = "*scheduler" MetaCostDetails = "*cost_details" MetaAccounts = "*accounts" + MetaActionPlans = "*action_plans" + MetaActionTriggers = "*action_triggers" + MetaActions = "*actions" + MetaSharedGroups = "*shared_groups" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB"