Add new infrastructure for migrator

This commit is contained in:
TeoV
2018-05-09 07:51:39 -04:00
committed by Dan Christian Bogos
parent 6dd58efd29
commit 3bc1cd511a
34 changed files with 291 additions and 410 deletions

View File

@@ -98,7 +98,8 @@ var (
CostLow = strings.ToLower(utils.COST)
)
func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) {
func NewMongoStorage(host, port, db, user, pass, storageType string,
cdrsIndexes []string, cacheCfg config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) {
url := host
if port != "" {
url += ":" + port

View File

@@ -31,7 +31,8 @@ type MySQLStorage struct {
SQLStorage
}
func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleConn, connMaxLifetime int) (*SQLStorage, error) {
func NewMySQLStorage(host, port, name, user, password string,
maxConn, maxIdleConn, connMaxLifetime int) (*SQLStorage, error) {
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name)
db, err := gorm.Open("mysql", connectString)
if err != nil {

View File

@@ -30,7 +30,8 @@ import (
// Various helpers to deal with database
func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler string, cacheCfg config.CacheConfig, loadHistorySize int) (dm *DataManager, err error) {
func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheConfig, loadHistorySize int) (dm *DataManager, err error) {
var d DataDB
switch db_type {
case utils.REDIS:
@@ -58,7 +59,8 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
return dm, nil
}
func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db Storage, err error) {
func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string,
maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db Storage, err error) {
var d Storage
switch db_type {
case utils.MONGO:
@@ -77,7 +79,8 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin
return d, nil
}
func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db LoadStorage, err error) {
func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler string,
maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db LoadStorage, err error) {
var d LoadStorage
switch db_type {
case utils.POSTGRES:
@@ -96,7 +99,8 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin
return d, nil
}
func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db CdrStorage, err error) {
func ConfigureCdrStorage(db_type, host, port, name, user, pass string,
maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db CdrStorage, err error) {
var d CdrStorage
switch db_type {
case utils.POSTGRES:
@@ -115,7 +119,8 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn,
return d, nil
}
func ConfigureStorDB(db_type, host, port, name, user, pass string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db StorDB, err error) {
func ConfigureStorDB(db_type, host, port, name, user, pass string,
maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db StorDB, err error) {
var d StorDB
switch db_type {
case utils.POSTGRES:

View File

@@ -36,22 +36,22 @@ const (
func (m *Migrator) migrateCurrentAccounts() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACCOUNT_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACCOUNT_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ACCOUNT_PREFIX)
acc, err := m.dmIN.DataDB().GetAccount(idg)
acc, err := m.dmIN.DataManager().DataDB().GetAccount(idg)
if err != nil {
return err
}
if acc != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetAccount(acc); err != nil {
if err := m.dmOut.DataManager().DataDB().SetAccount(acc); err != nil {
return err
}
if err := m.dmIN.DataDB().RemoveAccount(idg); err != nil {
if err := m.dmIN.DataManager().DataDB().RemoveAccount(idg); err != nil {
return err
}
m.stats[utils.Accounts] += 1
@@ -64,7 +64,7 @@ func (m *Migrator) migrateCurrentAccounts() (err error) {
func (m *Migrator) migrateV1Accounts() (err error) {
var v1Acnt *v1Account
for {
v1Acnt, err = m.oldDataDB.getv1Account()
v1Acnt, err = m.dmIN.getv1Account()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -74,7 +74,7 @@ func (m *Migrator) migrateV1Accounts() (err error) {
if v1Acnt != nil {
acnt := v1Acnt.V1toV3Account()
if m.dryRun != true {
if err = m.dmOut.DataDB().SetAccount(acnt); err != nil {
if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil {
return err
}
m.stats[utils.Accounts] += 1
@@ -84,7 +84,7 @@ func (m *Migrator) migrateV1Accounts() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -97,7 +97,7 @@ func (m *Migrator) migrateV1Accounts() (err error) {
func (m *Migrator) migrateV2Accounts() (err error) {
var v2Acnt *v2Account
for {
v2Acnt, err = m.oldDataDB.getv2Account()
v2Acnt, err = m.dmIN.getv2Account()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -107,7 +107,7 @@ func (m *Migrator) migrateV2Accounts() (err error) {
if v2Acnt != nil {
acnt := v2Acnt.V2toV3Account()
if m.dryRun != true {
if err = m.dmOut.DataDB().SetAccount(acnt); err != nil {
if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil {
return err
}
m.stats[utils.Accounts] += 1
@@ -117,7 +117,7 @@ func (m *Migrator) migrateV2Accounts() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -129,7 +129,7 @@ func (m *Migrator) migrateV2Accounts() (err error) {
func (m *Migrator) migrateAccounts() (err error) {
var vrs engine.Versions
vrs, err = m.dmIN.DataDB().GetVersions("")
vrs, err = m.dmIN.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -41,19 +41,19 @@ type v1Actions []*v1Action
func (m *Migrator) migrateCurrentActions() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ACTION_PREFIX)
acts, err := m.dmIN.GetActions(idg, true, utils.NonTransactional)
acts, err := m.dmIN.DataManager().GetActions(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if acts != nil {
if m.dryRun != true {
if err := m.dmOut.SetActions(idg, acts, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetActions(idg, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.Actions] += 1
@@ -67,7 +67,7 @@ func (m *Migrator) migrateV1Actions() (err error) {
var v1ACs *v1Actions
var acts engine.Actions
for {
v1ACs, err = m.oldDataDB.getV1Actions()
v1ACs, err = m.dmIN.getV1Actions()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -81,7 +81,7 @@ func (m *Migrator) migrateV1Actions() (err error) {
}
if !m.dryRun {
if err := m.dmOut.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.Actions] += 1
@@ -91,7 +91,7 @@ func (m *Migrator) migrateV1Actions() (err error) {
if !m.dryRun {
// All done, update version wtih current one
vrs := engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -104,7 +104,7 @@ func (m *Migrator) migrateV1Actions() (err error) {
func (m *Migrator) migrateActions() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -49,19 +49,19 @@ func (at *v1ActionPlan) IsASAP() bool {
func (m *Migrator) migrateCurrentActionPlans() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ACTION_PLAN_PREFIX)
acts, err := m.dmIN.DataDB().GetActionPlan(idg, true, utils.NonTransactional)
acts, err := m.dmIN.DataManager().DataDB().GetActionPlan(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if acts != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionPlans] += 1
@@ -74,7 +74,7 @@ func (m *Migrator) migrateCurrentActionPlans() (err error) {
func (m *Migrator) migrateV1ActionPlans() (err error) {
var v1APs *v1ActionPlans
for {
v1APs, err = m.oldDataDB.getV1ActionPlans()
v1APs, err = m.dmIN.getV1ActionPlans()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -85,7 +85,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) {
for _, v1ap := range *v1APs {
ap := v1ap.AsActionPlan()
if m.dryRun != true {
if err = m.dmOut.DataDB().SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
if err = m.dmOut.DataManager().DataDB().SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionPlans] += 1
@@ -96,7 +96,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionPlans: engine.CurrentDataDBVersions()[utils.ActionPlans]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -109,7 +109,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) {
func (m *Migrator) migrateActionPlans() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -54,19 +54,19 @@ type v1ActionTriggers []*v1ActionTrigger
func (m *Migrator) migrateCurrentActionTrigger() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ACTION_TRIGGER_PREFIX)
acts, err := m.dmIN.GetActionTriggers(idg, true, utils.NonTransactional)
acts, err := m.dmIN.DataManager().GetActionTriggers(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if acts != nil {
if m.dryRun != true {
if err := m.dmOut.SetActionTriggers(idg, acts, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetActionTriggers(idg, acts, utils.NonTransactional); err != nil {
return err
}
}
@@ -79,7 +79,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) {
var v1ACTs *v1ActionTriggers
var acts engine.ActionTriggers
for {
v1ACTs, err = m.oldDataDB.getV1ActionTriggers()
v1ACTs, err = m.dmIN.getV1ActionTriggers()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -93,7 +93,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) {
}
if !m.dryRun {
if err := m.dmOut.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ActionTriggers] += 1
@@ -103,7 +103,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) {
if !m.dryRun {
// All done, update version wtih current one
vrs := engine.Versions{utils.ActionTriggers: engine.CurrentDataDBVersions()[utils.ActionTriggers]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -116,7 +116,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) {
func (m *Migrator) migrateActionTriggers() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -29,19 +29,19 @@ import (
func (m *Migrator) migrateCurrentAlias() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ALIASES_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ALIASES_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ALIASES_PREFIX)
usr, err := m.dmIN.DataDB().GetAlias(idg, true, utils.NonTransactional)
usr, err := m.dmIN.DataManager().DataDB().GetAlias(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if usr != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetAlias(usr, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetAlias(usr, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.Alias] += 1
@@ -92,7 +92,7 @@ func (m *Migrator) migrateCurrentAlias() (err error) {
func (m *Migrator) migrateAlias() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -48,19 +48,19 @@ type v1AttributeProfile struct {
func (m *Migrator) migrateCurrentAttributeProfile() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.AttributeProfilePrefix+tenant+":")
attrPrf, err := m.dmIN.GetAttributeProfile(tenant, idg, true, utils.NonTransactional)
attrPrf, err := m.dmIN.DataManager().GetAttributeProfile(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if attrPrf != nil {
if m.dryRun != true {
if err := m.dmOut.SetAttributeProfile(attrPrf, true); err != nil {
if err := m.dmOut.DataManager().SetAttributeProfile(attrPrf, true); err != nil {
return err
}
}
@@ -72,7 +72,7 @@ func (m *Migrator) migrateCurrentAttributeProfile() (err error) {
func (m *Migrator) migrateV1Attributes() (err error) {
var v1Attr *v1AttributeProfile
for {
v1Attr, err = m.oldDataDB.getV1AttributeProfile()
v1Attr, err = m.dmIN.getV1AttributeProfile()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -85,10 +85,10 @@ func (m *Migrator) migrateV1Attributes() (err error) {
return err
}
if m.dryRun != true {
if err := m.dmOut.DataDB().SetAttributeProfileDrv(attrPrf); err != nil {
if err := m.dmOut.DataManager().DataDB().SetAttributeProfileDrv(attrPrf); err != nil {
return err
}
if err := m.dmOut.SetAttributeProfile(attrPrf, true); err != nil {
if err := m.dmOut.DataManager().SetAttributeProfile(attrPrf, true); err != nil {
return err
}
m.stats[utils.Attributes] += 1
@@ -98,7 +98,7 @@ func (m *Migrator) migrateV1Attributes() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Attributes: engine.CurrentStorDBVersions()[utils.Attributes]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -111,7 +111,7 @@ func (m *Migrator) migrateV1Attributes() (err error) {
func (m *Migrator) migrateAttributeProfile() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -31,12 +31,12 @@ func (m *Migrator) migrateCurrentCDRs() (err error) {
if m.sameStorDB { // no move
return
}
cdrs, _, err := m.storDBIn.GetCDRs(new(utils.CDRsFilter), false)
cdrs, _, err := m.storDBIn.StorDB().GetCDRs(new(utils.CDRsFilter), false)
if err != nil {
return err
}
for _, cdr := range cdrs {
if err := m.storDBOut.SetCDR(cdr, true); err != nil {
if err := m.storDBOut.StorDB().SetCDR(cdr, true); err != nil {
return err
}
}
@@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCDRs() (err error) {
func (m *Migrator) migrateCDRs() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -74,7 +74,7 @@ func (m *Migrator) migrateCDRs() (err error) {
func (m *Migrator) migrateV1CDRs() (err error) {
var v1CDR *v1Cdrs
for {
v1CDR, err = m.oldStorDB.getV1CDR()
v1CDR, err = m.storDBIn.getV1CDR()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -84,7 +84,7 @@ func (m *Migrator) migrateV1CDRs() (err error) {
if v1CDR != nil {
cdr := v1CDR.V1toV2Cdr()
if m.dryRun != true {
if err = m.storDBOut.SetCDR(cdr, true); err != nil {
if err = m.storDBOut.StorDB().SetCDR(cdr, true); err != nil {
return err
}
m.stats[utils.CDRs] += 1
@@ -94,7 +94,7 @@ func (m *Migrator) migrateV1CDRs() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]}
if err = m.storDBOut.SetVersions(vrs, false); err != nil {
if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),

View File

@@ -26,14 +26,14 @@ import (
)
func (m *Migrator) migrateCurrentCdrStats() (err error) {
cdrsts, err := m.dmIN.GetAllCdrStats()
cdrsts, err := m.dmIN.DataManager().GetAllCdrStats()
if err != nil {
return err
}
for _, cdrst := range cdrsts {
if cdrst != nil {
if m.dryRun != true {
if err := m.dmOut.SetCdrStats(cdrst); err != nil {
if err := m.dmOut.DataManager().SetCdrStats(cdrst); err != nil {
return err
}
m.stats[utils.CdrStats] += 1
@@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCdrStats() (err error) {
func (m *Migrator) migrateCdrStats() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -18,6 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
/*
import (
"database/sql"
"encoding/json"
@@ -36,7 +37,7 @@ func (m *Migrator) migrateCostDetails() (err error) {
utils.NoStorDBConnection,
"no connection to StorDB")
}
vrs, err := m.storDBOut.GetVersions(utils.COST_DETAILS)
vrs, err := m.storDBOut.StorDB().GetVersions(utils.COST_DETAILS)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -212,3 +213,4 @@ func (v1cc *v1CallCost) AsCallCost() (cc *engine.CallCost) {
}
return
}
*/

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentDerivedChargers() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.DERIVEDCHARGERS_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.DERIVEDCHARGERS_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.DERIVEDCHARGERS_PREFIX)
drc, err := m.dmIN.GetDerivedChargers(idg, true, utils.NonTransactional)
drc, err := m.dmIN.DataManager().GetDerivedChargers(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if drc != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetDerivedChargers(idg, drc, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetDerivedChargers(idg, drc, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.DerivedChargersV] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentDerivedChargers() (err error) {
func (m *Migrator) migrateDerivedChargers() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentDestinations() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.DESTINATION_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.DESTINATION_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.DESTINATION_PREFIX)
dst, err := m.dmIN.DataDB().GetDestination(idg, true, utils.NonTransactional)
dst, err := m.dmIN.DataManager().DataDB().GetDestination(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if dst != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetDestination(dst, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetDestination(dst, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.Destinations] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentDestinations() (err error) {
func (m *Migrator) migrateDestinations() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -80,28 +80,28 @@ func (m *Migrator) migrateDestinations() (err error) {
func (m *Migrator) migrateCurrentReverseDestinations() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
id := strings.TrimPrefix(id, utils.REVERSE_DESTINATION_PREFIX)
rdst, err := m.dmIN.DataDB().GetReverseDestination(id, true, utils.NonTransactional)
rdst, err := m.dmIN.DataManager().DataDB().GetReverseDestination(id, true, utils.NonTransactional)
if err != nil {
return err
}
if rdst != nil {
for _, rdid := range rdst {
rdstn, err := m.dmIN.DataDB().GetDestination(rdid, true, utils.NonTransactional)
rdstn, err := m.dmIN.DataManager().DataDB().GetDestination(rdid, true, utils.NonTransactional)
if err != nil {
return err
}
if rdstn != nil {
if m.dryRun != true {
if err := m.dmOut.DataDB().SetDestination(rdstn, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetDestination(rdstn, utils.NonTransactional); err != nil {
return err
}
if err := m.dmOut.DataDB().SetReverseDestination(rdstn, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().DataDB().SetReverseDestination(rdstn, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.ReverseDestinations] += 1
@@ -116,7 +116,7 @@ func (m *Migrator) migrateCurrentReverseDestinations() (err error) {
func (m *Migrator) migrateReverseDestinations() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -30,19 +30,19 @@ import (
func (m *Migrator) migrateCurrentRequestFilter() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.FilterPrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.FilterPrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.FilterPrefix+tenant+":")
fl, err := m.dmIN.GetFilter(tenant, idg, true, utils.NonTransactional)
fl, err := m.dmIN.DataManager().GetFilter(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if fl != nil {
if m.dryRun != true {
if err := m.dmOut.SetFilter(fl); err != nil {
if err := m.dmOut.DataManager().SetFilter(fl); err != nil {
return err
}
m.stats[utils.RQF] += 1
@@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentRequestFilter() (err error) {
func (m *Migrator) migrateRequestFilter() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentLCR() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.LCR_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.LCR_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.LCR_PREFIX)
lcr, err := m.dmIN.GetLCR(idg, true, utils.NonTransactional)
lcr, err := m.dmIN.DataManager().GetLCR(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if lcr != nil {
if m.dryRun != true {
if err := m.dmOut.SetLCR(lcr, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetLCR(lcr, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.LCR] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentLCR() (err error) {
func (m *Migrator) migrateLCR() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -34,26 +34,13 @@ func NewMigrator(
dryRun bool,
sameDataDB bool,
sameStorDB bool) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
if dataDBEncoding == utils.MSGPACK {
mrshlr = engine.NewCodecMsgpackMarshaler()
} else if dataDBEncoding == utils.JSON {
mrshlr = new(engine.JSONMarshaler)
} else if oldDataDBEncoding == utils.MSGPACK {
oldmrshlr = engine.NewCodecMsgpackMarshaler()
} else if oldDataDBEncoding == utils.JSON {
oldmrshlr = new(engine.JSONMarshaler)
}
stats := make(map[string]int)
m = &Migrator{
dmOut: dmOut,
dmIN: dmIN,
storDBIn: storDBIn,
storDBOut: storDBOut,
storDBType: storDBType,
dryRun: dryRun, sameDataDB: sameDataDB, sameStorDB: sameStorDB,
dmOut: dmOut,
dmIN: dmIN,
storDBIn: storDBIn,
storDBOut: storDBOut,
dryRun: dryRun, sameDataDB: sameDataDB, sameStorDB: sameStorDB,
stats: stats,
}
return m, err
@@ -82,13 +69,15 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
case utils.MetaSetVersions:
if m.dryRun != true {
if err := m.dmOut.DataDB().SetVersions(engine.CurrentDBVersions(m.dataDBType), true); err != nil {
if err := m.dmOut.DataManager().DataDB().SetVersions(
engine.CurrentDBVersions(m.dmOut.DataManager().DataDB().GetStorageType()), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil
}
if err := m.storDBOut.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil {
if err := m.storDBOut.StorDB().SetVersions(
engine.CurrentDBVersions(m.storDBOut.StorDB().GetStorageType()), true); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -102,8 +91,8 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
err = m.migrateCDRs()
case utils.MetaSessionsCosts:
err = m.migrateSessionSCosts()
case utils.MetaCostDetails:
err = m.migrateCostDetails()
// case utils.MetaCostDetails:
// err = m.migrateCostDetails()
case utils.MetaAccounts:
err = m.migrateAccounts()
case utils.MetaActionPlans:

View File

@@ -18,6 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"github.com/cgrates/cgrates/engine"
)
type MigratorDataDB interface {
getKeysForPrefix(prefix string) ([]string, error)
getv1Account() (v1Acnt *v1Account, err error)

View File

@@ -18,6 +18,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"github.com/cgrates/cgrates/engine"
)
type MigratorStorDB interface {
getV1CDR() (v1Cdr *v1Cdrs, err error)
setV1CDR(v1Cdr *v1Cdrs) (err error)

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentRatingPlans() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.RATING_PLAN_PREFIX)
rp, err := m.dmIN.GetRatingPlan(idg, true, utils.NonTransactional)
rp, err := m.dmIN.DataManager().GetRatingPlan(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if rp != nil {
if m.dryRun != true {
if err := m.dmOut.SetRatingPlan(rp, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetRatingPlan(rp, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.RatingPlan] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentRatingPlans() (err error) {
func (m *Migrator) migrateRatingPlans() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentRatingProfiles() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.RATING_PROFILE_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.RATING_PROFILE_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.RATING_PROFILE_PREFIX)
rp, err := m.dmIN.GetRatingProfile(idg, true, utils.NonTransactional)
rp, err := m.dmIN.DataManager().GetRatingProfile(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if rp != nil {
if m.dryRun != true {
if err := m.dmOut.SetRatingProfile(rp, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetRatingProfile(rp, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.RatingProfile] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentRatingProfiles() (err error) {
func (m *Migrator) migrateRatingProfiles() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -30,19 +30,19 @@ import (
func (m *Migrator) migrateCurrentResource() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ResourceProfilesPrefix+tenant+":")
res, err := m.dmIN.GetResourceProfile(tenant, idg, true, utils.NonTransactional)
res, err := m.dmIN.DataManager().GetResourceProfile(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if res != nil {
if m.dryRun != true {
if err := m.dmOut.SetResourceProfile(res, true); err != nil {
if err := m.dmOut.DataManager().SetResourceProfile(res, true); err != nil {
return err
}
m.stats[utils.Resource] += 1
@@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentResource() (err error) {
func (m *Migrator) migrateResources() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -19,7 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"database/sql"
//"database/sql"
"encoding/json"
"fmt"
"time"
@@ -32,15 +32,15 @@ func (m *Migrator) migrateCurrentSessionSCost() (err error) {
if m.sameStorDB { // no move
return
}
smCosts, err := m.storDBIn.GetSMCosts("", "", "", "")
smCosts, err := m.storDBIn.StorDB().GetSMCosts("", "", "", "")
if err != nil {
return err
}
for _, smCost := range smCosts {
if err := m.storDBOut.SetSMCost(smCost); err != nil {
if err := m.storDBOut.StorDB().SetSMCost(smCost); err != nil {
return err
}
if err := m.storDBIn.RemoveSMCost(smCost); err != nil {
if err := m.storDBIn.StorDB().RemoveSMCost(smCost); err != nil {
return err
}
}
@@ -50,7 +50,7 @@ func (m *Migrator) migrateCurrentSessionSCost() (err error) {
func (m *Migrator) migrateSessionSCosts() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
vrs, err = m.storDBOut.GetVersions("")
vrs, err = m.storDBOut.StorDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -63,30 +63,30 @@ func (m *Migrator) migrateSessionSCosts() (err error) {
"version number is not defined for SessionsCosts model")
}
switch vrs[utils.SessionSCosts] {
case 0, 1:
var isPostGres bool
var storSQL *sql.DB
switch m.storDBType {
case utils.MYSQL:
isPostGres = false
storSQL = m.storDBOut.(*engine.SQLStorage).Db
case utils.POSTGRES:
isPostGres = true
storSQL = m.storDBOut.(*engine.SQLStorage).Db
default:
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedDB,
fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
}
qry := "RENAME TABLE sm_costs TO sessions_costs;"
if isPostGres {
qry = "ALTER TABLE sm_costs RENAME TO sessions_costs"
}
if _, err := storSQL.Exec(qry); err != nil {
return err
}
fallthrough // incremental updates
// case 0, 1:
// var isPostGres bool
// var storSQL *sql.DB
// switch m.storDBType {
// case utils.MYSQL:
// isPostGres = false
// storSQL = m.storDBOut.(*engine.SQLStorage).Db
// case utils.POSTGRES:
// isPostGres = true
// storSQL = m.storDBOut.(*engine.SQLStorage).Db
// default:
// return utils.NewCGRError(utils.Migrator,
// utils.MandatoryIEMissingCaps,
// utils.UnsupportedDB,
// fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
// }
// qry := "RENAME TABLE sm_costs TO sessions_costs;"
// if isPostGres {
// qry = "ALTER TABLE sm_costs RENAME TO sessions_costs"
// }
// if _, err := storSQL.Exec(qry); err != nil {
// return err
// }
// fallthrough // incremental updates
case 2:
if err := m.migrateV2SessionSCosts(); err != nil {
return err
@@ -102,7 +102,7 @@ func (m *Migrator) migrateSessionSCosts() (err error) {
func (m *Migrator) migrateV2SessionSCosts() (err error) {
var v2Cost *v2SessionsCost
for {
v2Cost, err = m.oldStorDB.getSMCost()
v2Cost, err = m.storDBIn.getV2SMCost()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -112,10 +112,10 @@ func (m *Migrator) migrateV2SessionSCosts() (err error) {
if v2Cost != nil {
smCost := v2Cost.V2toV3Cost()
if m.dryRun != true {
if err = m.storDBOut.SetSMCost(smCost); err != nil {
if err = m.storDBOut.StorDB().SetSMCost(smCost); err != nil {
return err
}
if err = m.oldStorDB.remSMCost(v2Cost); err != nil {
if err = m.storDBIn.remV2SMCost(v2Cost); err != nil {
return err
}
m.stats[utils.SessionSCosts] += 1
@@ -125,7 +125,7 @@ func (m *Migrator) migrateV2SessionSCosts() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.SessionSCosts: engine.CurrentStorDBVersions()[utils.SessionSCosts]}
if err = m.storDBOut.SetVersions(vrs, false); err != nil {
if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),

View File

@@ -34,19 +34,19 @@ type v1SharedGroup struct {
func (m *Migrator) migrateCurrentSharedGroups() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.SHARED_GROUP_PREFIX)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.SHARED_GROUP_PREFIX)
sgs, err := m.dmIN.GetSharedGroup(idg, true, utils.NonTransactional)
sgs, err := m.dmIN.DataManager().GetSharedGroup(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if sgs != nil {
if m.dryRun != true {
if err := m.dmOut.SetSharedGroup(sgs, utils.NonTransactional); err != nil {
if err := m.dmOut.DataManager().SetSharedGroup(sgs, utils.NonTransactional); err != nil {
return err
}
}
@@ -58,7 +58,7 @@ func (m *Migrator) migrateCurrentSharedGroups() (err error) {
func (m *Migrator) migrateV1SharedGroups() (err error) {
var v1SG *v1SharedGroup
for {
v1SG, err = m.oldDataDB.getV1SharedGroup()
v1SG, err = m.dmIN.getV1SharedGroup()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -68,7 +68,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) {
if v1SG != nil {
acnt := v1SG.AsSharedGroup()
if m.dryRun != true {
if err = m.dmOut.SetSharedGroup(acnt, utils.NonTransactional); err != nil {
if err = m.dmOut.DataManager().SetSharedGroup(acnt, utils.NonTransactional); err != nil {
return err
}
m.stats[utils.SharedGroups] += 1
@@ -77,7 +77,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) {
}
// All done, update version wtih current one
vrs := engine.Versions{utils.SharedGroups: engine.CurrentStorDBVersions()[utils.SharedGroups]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -89,7 +89,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) {
func (m *Migrator) migrateSharedGroups() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -63,19 +63,19 @@ func (m *Migrator) migrateCurrentStats() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
//StatQueue
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.StatQueuePrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.StatQueuePrefix+tenant+":")
sgs, err := m.dmIN.GetStatQueue(tenant, idg, true, utils.NonTransactional)
sgs, err := m.dmIN.DataManager().GetStatQueue(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if sgs != nil {
if m.dryRun != true {
if err := m.dmOut.SetStatQueue(sgs); err != nil {
if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil {
return err
}
m.stats[utils.StatS] += 1
@@ -83,19 +83,19 @@ func (m *Migrator) migrateCurrentStats() (err error) {
}
}
//StatQueueProfile
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":")
sgs, err := m.dmIN.GetStatQueueProfile(tenant, idg, true, utils.NonTransactional)
sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if sgs != nil {
if m.dryRun != true {
if err := m.dmOut.SetStatQueueProfile(sgs, true); err != nil {
if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil {
return err
}
}
@@ -108,7 +108,7 @@ func (m *Migrator) migrateCurrentStats() (err error) {
func (m *Migrator) migrateV1CDRSTATS() (err error) {
var v1Sts *v1Stat
for {
v1Sts, err = m.oldDataDB.getV1Stats()
v1Sts, err = m.dmIN.getV1Stats()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -129,13 +129,13 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) {
return err
}
if !m.dryRun {
if err := m.dmOut.SetFilter(filter); err != nil {
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.SetStatQueue(sq); err != nil {
if err := m.dmOut.DataManager().SetStatQueue(sq); err != nil {
return err
}
if err := m.dmOut.SetStatQueueProfile(sts, true); err != nil {
if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil {
return err
}
m.stats[utils.StatS] += 1
@@ -145,7 +145,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -158,7 +158,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) {
func (m *Migrator) migrateStats() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -26,14 +26,14 @@ import (
)
func (m *Migrator) migrateCurrentSubscribers() (err error) {
subs, err := m.dmIN.GetSubscribers()
subs, err := m.dmIN.DataManager().GetSubscribers()
if err != nil {
return err
}
for id, sub := range subs {
if sub != nil {
if m.dryRun != true {
if err := m.dmOut.SetSubscriber(id, sub); err != nil {
if err := m.dmOut.DataManager().SetSubscriber(id, sub); err != nil {
return err
}
m.stats[utils.Subscribers] += 1
@@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentSubscribers() (err error) {
func (m *Migrator) migrateSubscribers() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -30,19 +30,19 @@ import (
func (m *Migrator) migrateCurrentSupplierProfile() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.SupplierProfilePrefix)
splp, err := m.dmIN.GetSupplierProfile(tenant, idg, true, utils.NonTransactional)
splp, err := m.dmIN.DataManager().GetSupplierProfile(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if splp != nil {
if m.dryRun != true {
if err := m.dmOut.SetSupplierProfile(splp, true); err != nil {
if err := m.dmOut.DataManager().SetSupplierProfile(splp, true); err != nil {
return err
}
m.stats[utils.Suppliers] += 1
@@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentSupplierProfile() (err error) {
func (m *Migrator) migrateSupplierProfiles() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -51,19 +51,19 @@ func (m *Migrator) migrateCurrentThresholds() (err error) {
var ids []string
tenant := config.CgrConfig().DefaultTenant
//Thresholds
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ThresholdPrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdPrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ThresholdPrefix+tenant+":")
ths, err := m.dmIN.GetThreshold(tenant, idg, true, utils.NonTransactional)
ths, err := m.dmIN.DataManager().GetThreshold(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if ths != nil {
if m.dryRun != true {
if err := m.dmOut.SetThreshold(ths); err != nil {
if err := m.dmOut.DataManager().SetThreshold(ths); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
@@ -71,19 +71,19 @@ func (m *Migrator) migrateCurrentThresholds() (err error) {
}
}
//ThresholdProfiles
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.ThresholdProfilePrefix+tenant+":")
ths, err := m.dmIN.GetThresholdProfile(tenant, idg, true, utils.NonTransactional)
ths, err := m.dmIN.DataManager().GetThresholdProfile(tenant, idg, true, utils.NonTransactional)
if err != nil {
return err
}
if ths != nil {
if m.dryRun != true {
if err := m.dmOut.SetThresholdProfile(ths, true); err != nil {
if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil {
return err
}
}
@@ -95,7 +95,7 @@ func (m *Migrator) migrateCurrentThresholds() (err error) {
func (m *Migrator) migrateV2ActionTriggers() (err error) {
var v2ACT *v2ActionTrigger
for {
v2ACT, err = m.oldDataDB.getV2ActionTrigger()
v2ACT, err = m.dmIN.getV2ActionTrigger()
if err != nil && err != utils.ErrNoMoreData {
return err
}
@@ -108,13 +108,13 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) {
return err
}
if m.dryRun != true {
if err := m.dmOut.SetFilter(filter); err != nil {
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
if err := m.dmOut.SetThreshold(th); err != nil {
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
return err
}
if err := m.dmOut.SetThresholdProfile(thp, true); err != nil {
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
@@ -124,7 +124,7 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) {
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
@@ -137,7 +137,7 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) {
func (m *Migrator) migrateThresholds() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
@@ -250,7 +250,7 @@ func (v2ATR v2ActionTrigger) AsThreshold() (thp *engine.ThresholdProfile, th *en
func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) {
var vrs engine.Versions
if m.dmOut.DataDB() == nil {
if m.dmOut.DataManager().DataDB() == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
@@ -262,21 +262,21 @@ func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) {
return err
}
if filter != nil {
if err := m.dmOut.SetFilter(filter); err != nil {
if err := m.dmOut.DataManager().SetFilter(filter); err != nil {
return err
}
}
if err := m.dmOut.SetThreshold(th); err != nil {
if err := m.dmOut.DataManager().SetThreshold(th); err != nil {
return err
}
if err := m.dmOut.SetThresholdProfile(thp, true); err != nil {
if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil {
return err
}
m.stats[utils.Thresholds] += 1
}
// All done, update version wtih current one
vrs = engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]}
if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil {
if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),

View File

@@ -28,19 +28,19 @@ import (
func (m *Migrator) migrateCurrentTiming() (err error) {
var ids []string
ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.TimingsPrefix)
ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.TimingsPrefix)
if err != nil {
return err
}
for _, id := range ids {
idg := strings.TrimPrefix(id, utils.TimingsPrefix)
tm, err := m.dmIN.GetTiming(idg, true, utils.NonTransactional)
tm, err := m.dmIN.DataManager().GetTiming(idg, true, utils.NonTransactional)
if err != nil {
return err
}
if tm != nil {
if m.dryRun != true {
if err := m.dmOut.SetTiming(tm); err != nil {
if err := m.dmOut.DataManager().SetTiming(tm); err != nil {
return err
}
m.stats[utils.Timing] += 1
@@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentTiming() (err error) {
func (m *Migrator) migrateTimings() (err error) {
var vrs engine.Versions
current := engine.CurrentDataDBVersions()
vrs, err = m.dmOut.DataDB().GetVersions("")
vrs, err = m.dmOut.DataManager().DataDB().GetVersions("")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,

View File

@@ -20,43 +20,57 @@ package migrator
import (
"errors"
"fmt"
"strconv"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string) {
}
func ConfigureV1DataStorage() (db MigratorDataDB, err error) {
var d MigratorDataDB
d.DataManger().(engine.RedisStoarage)
func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheConfig, loadHistorySize int) (db MigratorDataDB, err error) {
dm, err := engine.ConfigureDataStorage(db_type,
host, port, name, user, pass, marshaler,
cacheCfg, loadHistorySize)
if err != nil {
return nil, err
}
switch db_type {
case utils.REDIS:
var db_nb int
db_nb, err = strconv.Atoi(name)
if err != nil {
utils.Logger.Crit("Redis db name must be an integer!")
return nil, err
}
if port != "" {
host += ":" + port
}
d, err = newv1RedisStorage(host, db_nb, pass, marshaler)
d := newRedisMigrator(dm)
db = d.(MigratorDataDB)
case utils.MONGO:
d, err = newv1MongoStorage(host, port, name, user, pass, utils.DataDB, nil)
d := newMongoMigrator(dm)
db = d.(MigratorDataDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
}
}
/*
func NewMigratorStorDB(db_type, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheConfig, loadHistorySize int) (db MigratorDataDB, err error) {
dm, err := engine.ConfigureStorStorage(db_type,
host, port, name, user, pass, marshaler,
cacheCfg, loadHistorySize)
if err != nil {
return nil, err
}
return d, nil
switch db_type {
case utils.MONGO:
d := newRedisMigrator(dm)
db = d.(MigratorDataDB)
case utils.MYSQL:
d := newMongoMigrator(dm)
db = d.(MigratorDataDB)
default:
err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'",
db_type, utils.REDIS, utils.MONGO))
}
}
*/
/*
func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db MigratorStorDB, err error) {
var d MigratorStorDB
@@ -76,3 +90,4 @@ func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db Migrato
}
return d, nil
}
*/

View File

@@ -19,9 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"fmt"
"strings"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/mgo"
@@ -48,40 +45,15 @@ type AtKeyValue struct {
Value v1ActionPlans
}
func newmongoMigrator(dm *engine.DataManager) {
}
func newv1MongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) {
url := host
if port != "" {
url += ":" + port
}
if user != "" && pass != "" {
url = fmt.Sprintf("%s:%s@%s", user, pass, url)
}
var dbName string
if db != "" {
url += "/" + db
dbName = strings.Split(db, "?")[0] // remove extra info after ?
}
session, err := mgo.Dial(url)
if err != nil {
return nil, err
}
session.SetMode(mgo.Strong, true)
v1ms = &v1Mongo{db: dbName, session: session, v1ms: engine.NewCodecMsgpackMarshaler()}
return
}
func (v1ms *v1Mongo) Close() {}
func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error) {
return nil, nil
func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) {
mgoMig.dm = dm
mgoMig.mgoDB = dm.DataDB().(*engine.MongoStorage)
}
//Account methods
//V1
//get
func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) {
func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter()
}
@@ -96,7 +68,7 @@ func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) {
}
//set
func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) {
func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil {
return err
}
@@ -105,7 +77,7 @@ func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) {
//V2
//get
func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) {
func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v2AccountsCol).Find(nil).Iter()
}
@@ -120,7 +92,7 @@ func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) {
}
//set
func (v1ms *v1Mongo) setV2Account(x *v2Account) (err error) {
func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v2AccountsCol).Insert(x); err != nil {
return err
}
@@ -129,7 +101,7 @@ func (v1ms *v1Mongo) setV2Account(x *v2Account) (err error) {
//Action methods
//get
func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
var strct *AtKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter()
@@ -144,7 +116,7 @@ func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
}
//set
func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil {
return err
@@ -154,7 +126,7 @@ func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) {
//Actions methods
//get
func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) {
func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) {
var strct *AcKeyValue
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter()
@@ -170,7 +142,7 @@ func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) {
}
//set
func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) {
func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil {
return err
@@ -180,18 +152,18 @@ func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) {
//ActionTriggers methods
//get
func (v1ms *v1Mongo) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
func (v1ms *mongoMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
return nil, utils.ErrNotImplemented
}
//set
func (v1ms *v1Mongo) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
return utils.ErrNotImplemented
}
//Actions methods
//get
func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter()
}
@@ -205,7 +177,7 @@ func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
}
//set
func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) {
func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil {
return err
}
@@ -214,7 +186,7 @@ func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) {
//Stats methods
//get
func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) {
func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Find(nil).Iter()
}
@@ -228,7 +200,7 @@ func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) {
}
//set
func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) {
func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) {
if err := v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Insert(x); err != nil {
return err
}
@@ -237,7 +209,7 @@ func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) {
//Stats methods
//get
func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Find(nil).Iter()
}
@@ -251,7 +223,7 @@ func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
}
//set
func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Insert(x); err != nil {
return err
}
@@ -260,7 +232,7 @@ func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
//AttributeProfile methods
//get
func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Find(nil).Iter()
}
@@ -274,7 +246,7 @@ func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err
}
//set
func (v1ms *v1Mongo) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
if err := v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Insert(x); err != nil {
return err
}

View File

@@ -25,7 +25,7 @@ import (
//CDR methods
//get
func (v1ms *v1Mongo) getV1CDR() (v1Cdr *v1Cdrs, err error) {
func (v1ms *mongoMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Find(nil).Iter()
}
@@ -40,7 +40,7 @@ func (v1ms *v1Mongo) getV1CDR() (v1Cdr *v1Cdrs, err error) {
}
//set
func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) {
func (v1ms *mongoMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
if err = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Insert(v1Cdr); err != nil {
return err
}
@@ -49,7 +49,7 @@ func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) {
//SMCost methods
//get
func (v1ms *v1Mongo) getSMCost() (v2Cost *v2SessionsCost, err error) {
func (v1ms *mongoMigrator) getSMCost() (v2Cost *v2SessionsCost, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Find(nil).Iter()
}
@@ -64,7 +64,7 @@ func (v1ms *v1Mongo) getSMCost() (v2Cost *v2SessionsCost, err error) {
}
//set
func (v1ms *v1Mongo) setSMCost(v2Cost *v2SessionsCost) (err error) {
func (v1ms *mongoMigrator) setSMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Insert(v2Cost); err != nil {
return err
}
@@ -72,7 +72,7 @@ func (v1ms *v1Mongo) setSMCost(v2Cost *v2SessionsCost) (err error) {
}
//remove
func (v1ms *v1Mongo) remSMCost(v2Cost *v2SessionsCost) (err error) {
func (v1ms *mongoMigrator) remSMCost(v2Cost *v2SessionsCost) (err error) {
if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Remove(nil); err != nil {
return err
}

View File

@@ -19,124 +19,26 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
)
type redisMigrator struct {
dm *engine.DataManager
rds *engine.RedisStorage
rds *engine.RedisStorage
dataKeys []string
qryIdx *int
}
func newredisMigrator(dm *engine.Datamnager) ( rM *redisMigrator, error) {
func newRedisMigrator(dm *engine.DataManager) (rM *redisMigrator) {
rM.dm = dm
rM.rds = dm.DataDB().(*engine.RedisStorage)
df := func(network, addr string) (*redis.Client, error) {
client, err := redis.Dial(network, addr)
if err != nil {
return nil, err
}
if len(pass) != 0 {
if err = client.Cmd("AUTH", pass).Err; err != nil {
client.Close()
return nil, err
}
}
if db != 0 {
if err = client.Cmd("SELECT", db).Err; err != nil {
client.Close()
return nil, err
}
}
return client, nil
}
p, err := pool.NewCustom("tcp", address, 1, df)
if err != nil {
return nil, err
}
var mrshler engine.Marshaler
if mrshlerStr == utils.MSGPACK {
mrshler = engine.NewCodecMsgpackMarshaler()
} else if mrshlerStr == utils.JSON {
mrshler = new(engine.JSONMarshaler)
} else {
return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr)
}
return &v1Redis{dbPool: p, ms: mrshler}, nil
}
// This CMD function get a connection from the pool.
// Handles automatic failover in case of network disconnects
func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp {
c1, err := v1rs.dbPool.Get()
if err != nil {
return redis.NewResp(err)
}
result := c1.Cmd(cmd, args...)
if result.IsType(redis.IOErr) { // Failover mecaneism
utils.Logger.Warning(fmt.Sprintf("<RedisStorage> error <%s>, attempting failover.", result.Err.Error()))
c2, err := v1rs.dbPool.Get()
if err == nil {
if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) {
v1rs.dbPool.Put(c2)
return result2
}
}
} else {
v1rs.dbPool.Put(c1)
}
return result
}
func (v1rs *v1Redis) Close() {}
func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) {
r := v1rs.cmd("KEYS", prefix+"*")
if r.Err != nil {
return nil, r.Err
}
return r.List()
}
// Adds a single load instance to load history
func (v1rs *v1Redis) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error {
if loadHistSize == 0 { // Load history disabled
return nil
}
marshaled, err := v1rs.ms.Marshal(&ldInst)
if err != nil {
return err
}
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it
histLen, err := v1rs.cmd("LLEN", utils.LOADINST_KEY).Int()
if err != nil {
return nil, err
}
if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one
if err := v1rs.cmd("RPOP", utils.LOADINST_KEY).Err; err != nil {
return nil, err
}
}
err = v1rs.cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err
return nil, err
}, 0, utils.LOADINST_KEY)
return err
}
//Account methods
//V1
//get
func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) {
func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix)
if err != nil {
@@ -164,7 +66,7 @@ func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) {
}
//set
func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) {
func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) {
key := v1AccountDBPrefix + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -178,7 +80,7 @@ func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) {
//V2
//get
func (v1rs *v1Redis) getv2Account() (v2Acnt *v2Account, err error) {
func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACCOUNT_PREFIX)
if err != nil {
@@ -206,7 +108,7 @@ func (v1rs *v1Redis) getv2Account() (v2Acnt *v2Account, err error) {
}
//set
func (v1rs *v1Redis) setV2Account(x *v2Account) (err error) {
func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) {
key := utils.ACCOUNT_PREFIX + x.ID
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -220,7 +122,7 @@ func (v1rs *v1Redis) setV2Account(x *v2Account) (err error) {
//ActionPlans methods
//get
func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX)
if err != nil {
@@ -247,7 +149,7 @@ func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) {
}
//set
func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) {
func (v1rs *redisMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) {
key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -261,7 +163,7 @@ func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) {
//Actions methods
//get
func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) {
func (v1rs *redisMigrator) getV1Actions() (v1acs *v1Actions, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX)
if err != nil {
@@ -288,7 +190,7 @@ func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) {
}
//set
func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) {
func (v1rs *redisMigrator) setV1Actions(x *v1Actions) (err error) {
key := utils.ACTION_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -302,7 +204,7 @@ func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) {
//ActionTriggers methods
//get
func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
@@ -329,7 +231,7 @@ func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error)
}
//set
func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
func (v1rs *redisMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -343,7 +245,7 @@ func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) {
//SharedGroup methods
//get
func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX)
if err != nil {
@@ -370,7 +272,7 @@ func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) {
}
//set
func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) {
func (v1rs *redisMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) {
key := utils.SHARED_GROUP_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -384,7 +286,7 @@ func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) {
//Stats methods
//get
func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) {
func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.CDR_STATS_PREFIX)
if err != nil {
@@ -411,7 +313,7 @@ func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) {
}
//set
func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) {
func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) {
key := utils.CDR_STATS_PREFIX + x.Id
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -425,7 +327,7 @@ func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) {
//Action methods
//get
func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX)
if err != nil {
@@ -452,7 +354,7 @@ func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) {
}
//set
func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
func (v1rs *redisMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
key := utils.ACTION_TRIGGER_PREFIX + x.ID
bit, err := v1rs.ms.Marshal(x)
if err != nil {
@@ -466,7 +368,7 @@ func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) {
//AttributeProfile methods
//get
func (v1rs *v1Redis) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
func (v1rs *redisMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) {
var v1attr *v1AttributeProfile
if v1rs.qryIdx == nil {
v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.AttributeProfilePrefix)
@@ -494,7 +396,7 @@ func (v1rs *v1Redis) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err
}
//set
func (v1rs *v1Redis) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
func (v1rs *redisMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) {
key := utils.AttributeProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID)
bit, err := v1rs.ms.Marshal(x)
if err != nil {

View File

@@ -20,13 +20,11 @@ package migrator
import (
"database/sql"
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
)
type migratorSQL struct {
@@ -35,19 +33,7 @@ type migratorSQL struct {
rowIter *sql.Rows
}
func newSqlStorage(host, port, name, user, password string) (*sqlStorage, error) {
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name)
db, err := gorm.Open("mysql", connectString)
if err != nil {
return nil, err
}
if err = db.DB().Ping(); err != nil {
return nil, err
}
return &sqlStorage{Db: db.DB(), db: db}, nil
}
func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) {
func (sqlStorage *migratorSQL) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM cdrs")
if err != nil {
@@ -66,7 +52,7 @@ func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) {
return v1Cdr, nil
}
func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) {
func (sqlStorage *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
tx := sqlStorage.db.Begin()
cdrSql := v1Cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
@@ -78,7 +64,7 @@ func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return nil
}
func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) {
func (sqlStorage *migratorSQL) getSMCost() (v2Cost *v2SessionsCost, err error) {
if sqlStorage.rowIter == nil {
sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM sessions_costs")
if err != nil {
@@ -97,7 +83,7 @@ func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) {
return v2Cost, nil
}
func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) {
func (sqlStorage *migratorSQL) setSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
smSql := v2Cost.AsSessionsCostSql()
smSql.CreatedAt = time.Now()
@@ -109,7 +95,7 @@ func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) {
return
}
func (sqlStorage *sqlStorage) remSMCost(v2Cost *v2SessionsCost) (err error) {
func (sqlStorage *migratorSQL) remSMCost(v2Cost *v2SessionsCost) (err error) {
tx := sqlStorage.db.Begin()
var rmParam *engine.SessionsCostsSQL
if v2Cost != nil {