From 3bc1cd511a8f64b6df3db86677d75caee78f42de Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 9 May 2018 07:51:39 -0400 Subject: [PATCH] Add new infrastructure for migrator --- engine/storage_mongo_datadb.go | 3 +- engine/storage_mysql.go | 3 +- engine/storage_utils.go | 15 ++-- migrator/accounts.go | 22 +++--- migrator/action.go | 14 ++-- migrator/action_plan.go | 14 ++-- migrator/action_trigger.go | 14 ++-- migrator/alias.go | 8 +- migrator/attributes.go | 16 ++-- migrator/cdrs.go | 12 +-- migrator/cdrstats.go | 6 +- migrator/costdetails.go | 4 +- migrator/derived_chargers.go | 8 +- migrator/destinations.go | 20 ++--- migrator/filters.go | 8 +- migrator/lcr.go | 8 +- migrator/migrator.go | 33 +++----- migrator/migratorDataDB.go | 4 + migrator/migratorStorDB.go | 4 + migrator/rating_plan.go | 8 +- migrator/rating_profile.go | 8 +- migrator/resource.go | 8 +- migrator/sessions_costs.go | 66 ++++++++-------- migrator/sharedgroup.go | 14 ++-- migrator/stats.go | 24 +++--- migrator/subscribers.go | 6 +- migrator/suppliers.go | 8 +- migrator/thresholds.go | 34 ++++---- migrator/timings.go | 8 +- migrator/v1migrator_utils.go | 57 +++++++++----- migrator/v1mongo_data.go | 70 +++++------------ migrator/v1mongo_stor.go | 10 +-- migrator/v1redis.go | 140 +++++---------------------------- migrator/v1sql.go | 24 ++---- 34 files changed, 291 insertions(+), 410 deletions(-) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 05d69b158..275b9f958 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -98,7 +98,8 @@ var ( CostLow = strings.ToLower(utils.COST) ) -func NewMongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string, cacheCfg config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) { +func NewMongoStorage(host, port, db, user, pass, storageType string, + cdrsIndexes []string, cacheCfg config.CacheConfig, loadHistorySize int) (ms *MongoStorage, err error) { url := host if port != "" { url += ":" + port diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 2f8e5939d..7d8018d20 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -31,7 +31,8 @@ type MySQLStorage struct { SQLStorage } -func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleConn, connMaxLifetime int) (*SQLStorage, error) { +func NewMySQLStorage(host, port, name, user, password string, + maxConn, maxIdleConn, connMaxLifetime int) (*SQLStorage, error) { connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name) db, err := gorm.Open("mysql", connectString) if err != nil { diff --git a/engine/storage_utils.go b/engine/storage_utils.go index b8e112e25..ce38c88ba 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -30,7 +30,8 @@ import ( // Various helpers to deal with database -func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler string, cacheCfg config.CacheConfig, loadHistorySize int) (dm *DataManager, err error) { +func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler string, + cacheCfg config.CacheConfig, loadHistorySize int) (dm *DataManager, err error) { var d DataDB switch db_type { case utils.REDIS: @@ -58,7 +59,8 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin return dm, nil } -func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db Storage, err error) { +func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string, + maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db Storage, err error) { var d Storage switch db_type { case utils.MONGO: @@ -77,7 +79,8 @@ func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler strin return d, nil } -func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db LoadStorage, err error) { +func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler string, + maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db LoadStorage, err error) { var d LoadStorage switch db_type { case utils.POSTGRES: @@ -96,7 +99,8 @@ func ConfigureLoadStorage(db_type, host, port, name, user, pass, marshaler strin return d, nil } -func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db CdrStorage, err error) { +func ConfigureCdrStorage(db_type, host, port, name, user, pass string, + maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db CdrStorage, err error) { var d CdrStorage switch db_type { case utils.POSTGRES: @@ -115,7 +119,8 @@ func ConfigureCdrStorage(db_type, host, port, name, user, pass string, maxConn, return d, nil } -func ConfigureStorDB(db_type, host, port, name, user, pass string, maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db StorDB, err error) { +func ConfigureStorDB(db_type, host, port, name, user, pass string, + maxConn, maxIdleConn, connMaxLifetime int, cdrsIndexes []string) (db StorDB, err error) { var d StorDB switch db_type { case utils.POSTGRES: diff --git a/migrator/accounts.go b/migrator/accounts.go index 9db3c20cc..1e4d7b5db 100755 --- a/migrator/accounts.go +++ b/migrator/accounts.go @@ -36,22 +36,22 @@ const ( func (m *Migrator) migrateCurrentAccounts() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACCOUNT_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACCOUNT_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACCOUNT_PREFIX) - acc, err := m.dmIN.DataDB().GetAccount(idg) + acc, err := m.dmIN.DataManager().DataDB().GetAccount(idg) if err != nil { return err } if acc != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetAccount(acc); err != nil { + if err := m.dmOut.DataManager().DataDB().SetAccount(acc); err != nil { return err } - if err := m.dmIN.DataDB().RemoveAccount(idg); err != nil { + if err := m.dmIN.DataManager().DataDB().RemoveAccount(idg); err != nil { return err } m.stats[utils.Accounts] += 1 @@ -64,7 +64,7 @@ func (m *Migrator) migrateCurrentAccounts() (err error) { func (m *Migrator) migrateV1Accounts() (err error) { var v1Acnt *v1Account for { - v1Acnt, err = m.oldDataDB.getv1Account() + v1Acnt, err = m.dmIN.getv1Account() if err != nil && err != utils.ErrNoMoreData { return err } @@ -74,7 +74,7 @@ func (m *Migrator) migrateV1Accounts() (err error) { if v1Acnt != nil { acnt := v1Acnt.V1toV3Account() if m.dryRun != true { - if err = m.dmOut.DataDB().SetAccount(acnt); err != nil { + if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil { return err } m.stats[utils.Accounts] += 1 @@ -84,7 +84,7 @@ func (m *Migrator) migrateV1Accounts() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -97,7 +97,7 @@ func (m *Migrator) migrateV1Accounts() (err error) { func (m *Migrator) migrateV2Accounts() (err error) { var v2Acnt *v2Account for { - v2Acnt, err = m.oldDataDB.getv2Account() + v2Acnt, err = m.dmIN.getv2Account() if err != nil && err != utils.ErrNoMoreData { return err } @@ -107,7 +107,7 @@ func (m *Migrator) migrateV2Accounts() (err error) { if v2Acnt != nil { acnt := v2Acnt.V2toV3Account() if m.dryRun != true { - if err = m.dmOut.DataDB().SetAccount(acnt); err != nil { + if err = m.dmOut.DataManager().DataDB().SetAccount(acnt); err != nil { return err } m.stats[utils.Accounts] += 1 @@ -117,7 +117,7 @@ func (m *Migrator) migrateV2Accounts() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.Accounts: engine.CurrentDataDBVersions()[utils.Accounts]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -129,7 +129,7 @@ func (m *Migrator) migrateV2Accounts() (err error) { func (m *Migrator) migrateAccounts() (err error) { var vrs engine.Versions - vrs, err = m.dmIN.DataDB().GetVersions("") + vrs, err = m.dmIN.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/action.go b/migrator/action.go index dc550c8a3..5cd9f5fbb 100644 --- a/migrator/action.go +++ b/migrator/action.go @@ -41,19 +41,19 @@ type v1Actions []*v1Action func (m *Migrator) migrateCurrentActions() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACTION_PREFIX) - acts, err := m.dmIN.GetActions(idg, true, utils.NonTransactional) + acts, err := m.dmIN.DataManager().GetActions(idg, true, utils.NonTransactional) if err != nil { return err } if acts != nil { if m.dryRun != true { - if err := m.dmOut.SetActions(idg, acts, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetActions(idg, acts, utils.NonTransactional); err != nil { return err } m.stats[utils.Actions] += 1 @@ -67,7 +67,7 @@ func (m *Migrator) migrateV1Actions() (err error) { var v1ACs *v1Actions var acts engine.Actions for { - v1ACs, err = m.oldDataDB.getV1Actions() + v1ACs, err = m.dmIN.getV1Actions() if err != nil && err != utils.ErrNoMoreData { return err } @@ -81,7 +81,7 @@ func (m *Migrator) migrateV1Actions() (err error) { } if !m.dryRun { - if err := m.dmOut.SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetActions(acts[0].Id, acts, utils.NonTransactional); err != nil { return err } m.stats[utils.Actions] += 1 @@ -91,7 +91,7 @@ func (m *Migrator) migrateV1Actions() (err error) { if !m.dryRun { // All done, update version wtih current one vrs := engine.Versions{utils.Actions: engine.CurrentStorDBVersions()[utils.Actions]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -104,7 +104,7 @@ func (m *Migrator) migrateV1Actions() (err error) { func (m *Migrator) migrateActions() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/action_plan.go b/migrator/action_plan.go index 5a454b6bf..b4d770892 100644 --- a/migrator/action_plan.go +++ b/migrator/action_plan.go @@ -49,19 +49,19 @@ func (at *v1ActionPlan) IsASAP() bool { func (m *Migrator) migrateCurrentActionPlans() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_PLAN_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACTION_PLAN_PREFIX) - acts, err := m.dmIN.DataDB().GetActionPlan(idg, true, utils.NonTransactional) + acts, err := m.dmIN.DataManager().DataDB().GetActionPlan(idg, true, utils.NonTransactional) if err != nil { return err } if acts != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetActionPlan(idg, acts, true, utils.NonTransactional); err != nil { return err } m.stats[utils.ActionPlans] += 1 @@ -74,7 +74,7 @@ func (m *Migrator) migrateCurrentActionPlans() (err error) { func (m *Migrator) migrateV1ActionPlans() (err error) { var v1APs *v1ActionPlans for { - v1APs, err = m.oldDataDB.getV1ActionPlans() + v1APs, err = m.dmIN.getV1ActionPlans() if err != nil && err != utils.ErrNoMoreData { return err } @@ -85,7 +85,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) { for _, v1ap := range *v1APs { ap := v1ap.AsActionPlan() if m.dryRun != true { - if err = m.dmOut.DataDB().SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil { + if err = m.dmOut.DataManager().DataDB().SetActionPlan(ap.Id, ap, true, utils.NonTransactional); err != nil { return err } m.stats[utils.ActionPlans] += 1 @@ -96,7 +96,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.ActionPlans: engine.CurrentDataDBVersions()[utils.ActionPlans]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -109,7 +109,7 @@ func (m *Migrator) migrateV1ActionPlans() (err error) { func (m *Migrator) migrateActionPlans() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/action_trigger.go b/migrator/action_trigger.go index 6fe294540..fb7f5b313 100644 --- a/migrator/action_trigger.go +++ b/migrator/action_trigger.go @@ -54,19 +54,19 @@ type v1ActionTriggers []*v1ActionTrigger func (m *Migrator) migrateCurrentActionTrigger() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ACTION_TRIGGER_PREFIX) - acts, err := m.dmIN.GetActionTriggers(idg, true, utils.NonTransactional) + acts, err := m.dmIN.DataManager().GetActionTriggers(idg, true, utils.NonTransactional) if err != nil { return err } if acts != nil { if m.dryRun != true { - if err := m.dmOut.SetActionTriggers(idg, acts, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetActionTriggers(idg, acts, utils.NonTransactional); err != nil { return err } } @@ -79,7 +79,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) { var v1ACTs *v1ActionTriggers var acts engine.ActionTriggers for { - v1ACTs, err = m.oldDataDB.getV1ActionTriggers() + v1ACTs, err = m.dmIN.getV1ActionTriggers() if err != nil && err != utils.ErrNoMoreData { return err } @@ -93,7 +93,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) { } if !m.dryRun { - if err := m.dmOut.SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetActionTriggers(acts[0].ID, acts, utils.NonTransactional); err != nil { return err } m.stats[utils.ActionTriggers] += 1 @@ -103,7 +103,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) { if !m.dryRun { // All done, update version wtih current one vrs := engine.Versions{utils.ActionTriggers: engine.CurrentDataDBVersions()[utils.ActionTriggers]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -116,7 +116,7 @@ func (m *Migrator) migrateV1ActionTrigger() (err error) { func (m *Migrator) migrateActionTriggers() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/alias.go b/migrator/alias.go index 4f31f9ec7..8af7f3eea 100644 --- a/migrator/alias.go +++ b/migrator/alias.go @@ -29,19 +29,19 @@ import ( func (m *Migrator) migrateCurrentAlias() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ALIASES_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ALIASES_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ALIASES_PREFIX) - usr, err := m.dmIN.DataDB().GetAlias(idg, true, utils.NonTransactional) + usr, err := m.dmIN.DataManager().DataDB().GetAlias(idg, true, utils.NonTransactional) if err != nil { return err } if usr != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetAlias(usr, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetAlias(usr, utils.NonTransactional); err != nil { return err } m.stats[utils.Alias] += 1 @@ -92,7 +92,7 @@ func (m *Migrator) migrateCurrentAlias() (err error) { func (m *Migrator) migrateAlias() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/attributes.go b/migrator/attributes.go index 001ee7942..c9e5e46cd 100644 --- a/migrator/attributes.go +++ b/migrator/attributes.go @@ -48,19 +48,19 @@ type v1AttributeProfile struct { func (m *Migrator) migrateCurrentAttributeProfile() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.AttributeProfilePrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.AttributeProfilePrefix+tenant+":") - attrPrf, err := m.dmIN.GetAttributeProfile(tenant, idg, true, utils.NonTransactional) + attrPrf, err := m.dmIN.DataManager().GetAttributeProfile(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if attrPrf != nil { if m.dryRun != true { - if err := m.dmOut.SetAttributeProfile(attrPrf, true); err != nil { + if err := m.dmOut.DataManager().SetAttributeProfile(attrPrf, true); err != nil { return err } } @@ -72,7 +72,7 @@ func (m *Migrator) migrateCurrentAttributeProfile() (err error) { func (m *Migrator) migrateV1Attributes() (err error) { var v1Attr *v1AttributeProfile for { - v1Attr, err = m.oldDataDB.getV1AttributeProfile() + v1Attr, err = m.dmIN.getV1AttributeProfile() if err != nil && err != utils.ErrNoMoreData { return err } @@ -85,10 +85,10 @@ func (m *Migrator) migrateV1Attributes() (err error) { return err } if m.dryRun != true { - if err := m.dmOut.DataDB().SetAttributeProfileDrv(attrPrf); err != nil { + if err := m.dmOut.DataManager().DataDB().SetAttributeProfileDrv(attrPrf); err != nil { return err } - if err := m.dmOut.SetAttributeProfile(attrPrf, true); err != nil { + if err := m.dmOut.DataManager().SetAttributeProfile(attrPrf, true); err != nil { return err } m.stats[utils.Attributes] += 1 @@ -98,7 +98,7 @@ func (m *Migrator) migrateV1Attributes() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.Attributes: engine.CurrentStorDBVersions()[utils.Attributes]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -111,7 +111,7 @@ func (m *Migrator) migrateV1Attributes() (err error) { func (m *Migrator) migrateAttributeProfile() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/cdrs.go b/migrator/cdrs.go index a23bd5778..aa8683876 100755 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -31,12 +31,12 @@ func (m *Migrator) migrateCurrentCDRs() (err error) { if m.sameStorDB { // no move return } - cdrs, _, err := m.storDBIn.GetCDRs(new(utils.CDRsFilter), false) + cdrs, _, err := m.storDBIn.StorDB().GetCDRs(new(utils.CDRsFilter), false) if err != nil { return err } for _, cdr := range cdrs { - if err := m.storDBOut.SetCDR(cdr, true); err != nil { + if err := m.storDBOut.StorDB().SetCDR(cdr, true); err != nil { return err } } @@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCDRs() (err error) { func (m *Migrator) migrateCDRs() (err error) { var vrs engine.Versions current := engine.CurrentStorDBVersions() - vrs, err = m.storDBOut.GetVersions("") + vrs, err = m.storDBOut.StorDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -74,7 +74,7 @@ func (m *Migrator) migrateCDRs() (err error) { func (m *Migrator) migrateV1CDRs() (err error) { var v1CDR *v1Cdrs for { - v1CDR, err = m.oldStorDB.getV1CDR() + v1CDR, err = m.storDBIn.getV1CDR() if err != nil && err != utils.ErrNoMoreData { return err } @@ -84,7 +84,7 @@ func (m *Migrator) migrateV1CDRs() (err error) { if v1CDR != nil { cdr := v1CDR.V1toV2Cdr() if m.dryRun != true { - if err = m.storDBOut.SetCDR(cdr, true); err != nil { + if err = m.storDBOut.StorDB().SetCDR(cdr, true); err != nil { return err } m.stats[utils.CDRs] += 1 @@ -94,7 +94,7 @@ func (m *Migrator) migrateV1CDRs() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]} - if err = m.storDBOut.SetVersions(vrs, false); err != nil { + if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), diff --git a/migrator/cdrstats.go b/migrator/cdrstats.go index 2907b456a..1f2c26411 100644 --- a/migrator/cdrstats.go +++ b/migrator/cdrstats.go @@ -26,14 +26,14 @@ import ( ) func (m *Migrator) migrateCurrentCdrStats() (err error) { - cdrsts, err := m.dmIN.GetAllCdrStats() + cdrsts, err := m.dmIN.DataManager().GetAllCdrStats() if err != nil { return err } for _, cdrst := range cdrsts { if cdrst != nil { if m.dryRun != true { - if err := m.dmOut.SetCdrStats(cdrst); err != nil { + if err := m.dmOut.DataManager().SetCdrStats(cdrst); err != nil { return err } m.stats[utils.CdrStats] += 1 @@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentCdrStats() (err error) { func (m *Migrator) migrateCdrStats() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/costdetails.go b/migrator/costdetails.go index 6add923ec..c157f9e78 100644 --- a/migrator/costdetails.go +++ b/migrator/costdetails.go @@ -18,6 +18,7 @@ along with this program. If not, see package migrator +/* import ( "database/sql" "encoding/json" @@ -36,7 +37,7 @@ func (m *Migrator) migrateCostDetails() (err error) { utils.NoStorDBConnection, "no connection to StorDB") } - vrs, err := m.storDBOut.GetVersions(utils.COST_DETAILS) + vrs, err := m.storDBOut.StorDB().GetVersions(utils.COST_DETAILS) if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -212,3 +213,4 @@ func (v1cc *v1CallCost) AsCallCost() (cc *engine.CallCost) { } return } +*/ diff --git a/migrator/derived_chargers.go b/migrator/derived_chargers.go index 7f4bc8201..b72bf8389 100644 --- a/migrator/derived_chargers.go +++ b/migrator/derived_chargers.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentDerivedChargers() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.DERIVEDCHARGERS_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.DERIVEDCHARGERS_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.DERIVEDCHARGERS_PREFIX) - drc, err := m.dmIN.GetDerivedChargers(idg, true, utils.NonTransactional) + drc, err := m.dmIN.DataManager().GetDerivedChargers(idg, true, utils.NonTransactional) if err != nil { return err } if drc != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetDerivedChargers(idg, drc, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetDerivedChargers(idg, drc, utils.NonTransactional); err != nil { return err } m.stats[utils.DerivedChargersV] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentDerivedChargers() (err error) { func (m *Migrator) migrateDerivedChargers() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/destinations.go b/migrator/destinations.go index 33f45c3a5..93eeb9826 100644 --- a/migrator/destinations.go +++ b/migrator/destinations.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentDestinations() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.DESTINATION_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.DESTINATION_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.DESTINATION_PREFIX) - dst, err := m.dmIN.DataDB().GetDestination(idg, true, utils.NonTransactional) + dst, err := m.dmIN.DataManager().DataDB().GetDestination(idg, true, utils.NonTransactional) if err != nil { return err } if dst != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetDestination(dst, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetDestination(dst, utils.NonTransactional); err != nil { return err } m.stats[utils.Destinations] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentDestinations() (err error) { func (m *Migrator) migrateDestinations() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -80,28 +80,28 @@ func (m *Migrator) migrateDestinations() (err error) { func (m *Migrator) migrateCurrentReverseDestinations() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.REVERSE_DESTINATION_PREFIX) if err != nil { return err } for _, id := range ids { id := strings.TrimPrefix(id, utils.REVERSE_DESTINATION_PREFIX) - rdst, err := m.dmIN.DataDB().GetReverseDestination(id, true, utils.NonTransactional) + rdst, err := m.dmIN.DataManager().DataDB().GetReverseDestination(id, true, utils.NonTransactional) if err != nil { return err } if rdst != nil { for _, rdid := range rdst { - rdstn, err := m.dmIN.DataDB().GetDestination(rdid, true, utils.NonTransactional) + rdstn, err := m.dmIN.DataManager().DataDB().GetDestination(rdid, true, utils.NonTransactional) if err != nil { return err } if rdstn != nil { if m.dryRun != true { - if err := m.dmOut.DataDB().SetDestination(rdstn, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetDestination(rdstn, utils.NonTransactional); err != nil { return err } - if err := m.dmOut.DataDB().SetReverseDestination(rdstn, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().DataDB().SetReverseDestination(rdstn, utils.NonTransactional); err != nil { return err } m.stats[utils.ReverseDestinations] += 1 @@ -116,7 +116,7 @@ func (m *Migrator) migrateCurrentReverseDestinations() (err error) { func (m *Migrator) migrateReverseDestinations() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/filters.go b/migrator/filters.go index d20b556f9..9806e44f6 100644 --- a/migrator/filters.go +++ b/migrator/filters.go @@ -30,19 +30,19 @@ import ( func (m *Migrator) migrateCurrentRequestFilter() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.FilterPrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.FilterPrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.FilterPrefix+tenant+":") - fl, err := m.dmIN.GetFilter(tenant, idg, true, utils.NonTransactional) + fl, err := m.dmIN.DataManager().GetFilter(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if fl != nil { if m.dryRun != true { - if err := m.dmOut.SetFilter(fl); err != nil { + if err := m.dmOut.DataManager().SetFilter(fl); err != nil { return err } m.stats[utils.RQF] += 1 @@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentRequestFilter() (err error) { func (m *Migrator) migrateRequestFilter() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/lcr.go b/migrator/lcr.go index 123c7a9ef..fc7357117 100644 --- a/migrator/lcr.go +++ b/migrator/lcr.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentLCR() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.LCR_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.LCR_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.LCR_PREFIX) - lcr, err := m.dmIN.GetLCR(idg, true, utils.NonTransactional) + lcr, err := m.dmIN.DataManager().GetLCR(idg, true, utils.NonTransactional) if err != nil { return err } if lcr != nil { if m.dryRun != true { - if err := m.dmOut.SetLCR(lcr, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetLCR(lcr, utils.NonTransactional); err != nil { return err } m.stats[utils.LCR] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentLCR() (err error) { func (m *Migrator) migrateLCR() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/migrator.go b/migrator/migrator.go index da2768d12..18addea64 100755 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -34,26 +34,13 @@ func NewMigrator( dryRun bool, sameDataDB bool, sameStorDB bool) (m *Migrator, err error) { - var mrshlr engine.Marshaler - var oldmrshlr engine.Marshaler - if dataDBEncoding == utils.MSGPACK { - mrshlr = engine.NewCodecMsgpackMarshaler() - } else if dataDBEncoding == utils.JSON { - mrshlr = new(engine.JSONMarshaler) - } else if oldDataDBEncoding == utils.MSGPACK { - oldmrshlr = engine.NewCodecMsgpackMarshaler() - } else if oldDataDBEncoding == utils.JSON { - oldmrshlr = new(engine.JSONMarshaler) - } stats := make(map[string]int) - m = &Migrator{ - dmOut: dmOut, - dmIN: dmIN, - storDBIn: storDBIn, - storDBOut: storDBOut, - storDBType: storDBType, - dryRun: dryRun, sameDataDB: sameDataDB, sameStorDB: sameStorDB, + dmOut: dmOut, + dmIN: dmIN, + storDBIn: storDBIn, + storDBOut: storDBOut, + dryRun: dryRun, sameDataDB: sameDataDB, sameStorDB: sameStorDB, stats: stats, } return m, err @@ -82,13 +69,15 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) { fmt.Sprintf("task <%s> is not a supported migration task", taskID)) case utils.MetaSetVersions: if m.dryRun != true { - if err := m.dmOut.DataDB().SetVersions(engine.CurrentDBVersions(m.dataDBType), true); err != nil { + if err := m.dmOut.DataManager().DataDB().SetVersions( + engine.CurrentDBVersions(m.dmOut.DataManager().DataDB().GetStorageType()), true); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())), nil } - if err := m.storDBOut.SetVersions(engine.CurrentDBVersions(m.storDBType), true); err != nil { + if err := m.storDBOut.StorDB().SetVersions( + engine.CurrentDBVersions(m.storDBOut.StorDB().GetStorageType()), true); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -102,8 +91,8 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) { err = m.migrateCDRs() case utils.MetaSessionsCosts: err = m.migrateSessionSCosts() - case utils.MetaCostDetails: - err = m.migrateCostDetails() + // case utils.MetaCostDetails: + // err = m.migrateCostDetails() case utils.MetaAccounts: err = m.migrateAccounts() case utils.MetaActionPlans: diff --git a/migrator/migratorDataDB.go b/migrator/migratorDataDB.go index d54b0b60e..eaed5c8a5 100644 --- a/migrator/migratorDataDB.go +++ b/migrator/migratorDataDB.go @@ -18,6 +18,10 @@ along with this program. If not, see package migrator +import ( + "github.com/cgrates/cgrates/engine" +) + type MigratorDataDB interface { getKeysForPrefix(prefix string) ([]string, error) getv1Account() (v1Acnt *v1Account, err error) diff --git a/migrator/migratorStorDB.go b/migrator/migratorStorDB.go index 8c6bcd7f6..8754dc779 100755 --- a/migrator/migratorStorDB.go +++ b/migrator/migratorStorDB.go @@ -18,6 +18,10 @@ along with this program. If not, see package migrator +import ( + "github.com/cgrates/cgrates/engine" +) + type MigratorStorDB interface { getV1CDR() (v1Cdr *v1Cdrs, err error) setV1CDR(v1Cdr *v1Cdrs) (err error) diff --git a/migrator/rating_plan.go b/migrator/rating_plan.go index eeb98791f..05b43b84b 100644 --- a/migrator/rating_plan.go +++ b/migrator/rating_plan.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentRatingPlans() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.RATING_PLAN_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.RATING_PLAN_PREFIX) - rp, err := m.dmIN.GetRatingPlan(idg, true, utils.NonTransactional) + rp, err := m.dmIN.DataManager().GetRatingPlan(idg, true, utils.NonTransactional) if err != nil { return err } if rp != nil { if m.dryRun != true { - if err := m.dmOut.SetRatingPlan(rp, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetRatingPlan(rp, utils.NonTransactional); err != nil { return err } m.stats[utils.RatingPlan] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentRatingPlans() (err error) { func (m *Migrator) migrateRatingPlans() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/rating_profile.go b/migrator/rating_profile.go index c673b7ee0..35d376d6d 100644 --- a/migrator/rating_profile.go +++ b/migrator/rating_profile.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentRatingProfiles() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.RATING_PROFILE_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.RATING_PROFILE_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.RATING_PROFILE_PREFIX) - rp, err := m.dmIN.GetRatingProfile(idg, true, utils.NonTransactional) + rp, err := m.dmIN.DataManager().GetRatingProfile(idg, true, utils.NonTransactional) if err != nil { return err } if rp != nil { if m.dryRun != true { - if err := m.dmOut.SetRatingProfile(rp, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetRatingProfile(rp, utils.NonTransactional); err != nil { return err } m.stats[utils.RatingProfile] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentRatingProfiles() (err error) { func (m *Migrator) migrateRatingProfiles() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/resource.go b/migrator/resource.go index dcebd013c..6b913ed59 100644 --- a/migrator/resource.go +++ b/migrator/resource.go @@ -30,19 +30,19 @@ import ( func (m *Migrator) migrateCurrentResource() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ResourceProfilesPrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ResourceProfilesPrefix+tenant+":") - res, err := m.dmIN.GetResourceProfile(tenant, idg, true, utils.NonTransactional) + res, err := m.dmIN.DataManager().GetResourceProfile(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if res != nil { if m.dryRun != true { - if err := m.dmOut.SetResourceProfile(res, true); err != nil { + if err := m.dmOut.DataManager().SetResourceProfile(res, true); err != nil { return err } m.stats[utils.Resource] += 1 @@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentResource() (err error) { func (m *Migrator) migrateResources() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/sessions_costs.go b/migrator/sessions_costs.go index b09850b80..cebfb2565 100644 --- a/migrator/sessions_costs.go +++ b/migrator/sessions_costs.go @@ -19,7 +19,7 @@ along with this program. If not, see package migrator import ( - "database/sql" + //"database/sql" "encoding/json" "fmt" "time" @@ -32,15 +32,15 @@ func (m *Migrator) migrateCurrentSessionSCost() (err error) { if m.sameStorDB { // no move return } - smCosts, err := m.storDBIn.GetSMCosts("", "", "", "") + smCosts, err := m.storDBIn.StorDB().GetSMCosts("", "", "", "") if err != nil { return err } for _, smCost := range smCosts { - if err := m.storDBOut.SetSMCost(smCost); err != nil { + if err := m.storDBOut.StorDB().SetSMCost(smCost); err != nil { return err } - if err := m.storDBIn.RemoveSMCost(smCost); err != nil { + if err := m.storDBIn.StorDB().RemoveSMCost(smCost); err != nil { return err } } @@ -50,7 +50,7 @@ func (m *Migrator) migrateCurrentSessionSCost() (err error) { func (m *Migrator) migrateSessionSCosts() (err error) { var vrs engine.Versions current := engine.CurrentStorDBVersions() - vrs, err = m.storDBOut.GetVersions("") + vrs, err = m.storDBOut.StorDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -63,30 +63,30 @@ func (m *Migrator) migrateSessionSCosts() (err error) { "version number is not defined for SessionsCosts model") } switch vrs[utils.SessionSCosts] { - case 0, 1: - var isPostGres bool - var storSQL *sql.DB - switch m.storDBType { - case utils.MYSQL: - isPostGres = false - storSQL = m.storDBOut.(*engine.SQLStorage).Db - case utils.POSTGRES: - isPostGres = true - storSQL = m.storDBOut.(*engine.SQLStorage).Db - default: - return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.UnsupportedDB, - fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) - } - qry := "RENAME TABLE sm_costs TO sessions_costs;" - if isPostGres { - qry = "ALTER TABLE sm_costs RENAME TO sessions_costs" - } - if _, err := storSQL.Exec(qry); err != nil { - return err - } - fallthrough // incremental updates + // case 0, 1: + // var isPostGres bool + // var storSQL *sql.DB + // switch m.storDBType { + // case utils.MYSQL: + // isPostGres = false + // storSQL = m.storDBOut.(*engine.SQLStorage).Db + // case utils.POSTGRES: + // isPostGres = true + // storSQL = m.storDBOut.(*engine.SQLStorage).Db + // default: + // return utils.NewCGRError(utils.Migrator, + // utils.MandatoryIEMissingCaps, + // utils.UnsupportedDB, + // fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) + // } + // qry := "RENAME TABLE sm_costs TO sessions_costs;" + // if isPostGres { + // qry = "ALTER TABLE sm_costs RENAME TO sessions_costs" + // } + // if _, err := storSQL.Exec(qry); err != nil { + // return err + // } + // fallthrough // incremental updates case 2: if err := m.migrateV2SessionSCosts(); err != nil { return err @@ -102,7 +102,7 @@ func (m *Migrator) migrateSessionSCosts() (err error) { func (m *Migrator) migrateV2SessionSCosts() (err error) { var v2Cost *v2SessionsCost for { - v2Cost, err = m.oldStorDB.getSMCost() + v2Cost, err = m.storDBIn.getV2SMCost() if err != nil && err != utils.ErrNoMoreData { return err } @@ -112,10 +112,10 @@ func (m *Migrator) migrateV2SessionSCosts() (err error) { if v2Cost != nil { smCost := v2Cost.V2toV3Cost() if m.dryRun != true { - if err = m.storDBOut.SetSMCost(smCost); err != nil { + if err = m.storDBOut.StorDB().SetSMCost(smCost); err != nil { return err } - if err = m.oldStorDB.remSMCost(v2Cost); err != nil { + if err = m.storDBIn.remV2SMCost(v2Cost); err != nil { return err } m.stats[utils.SessionSCosts] += 1 @@ -125,7 +125,7 @@ func (m *Migrator) migrateV2SessionSCosts() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.SessionSCosts: engine.CurrentStorDBVersions()[utils.SessionSCosts]} - if err = m.storDBOut.SetVersions(vrs, false); err != nil { + if err = m.storDBOut.StorDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), diff --git a/migrator/sharedgroup.go b/migrator/sharedgroup.go index 320e0a7c9..361c6a13c 100644 --- a/migrator/sharedgroup.go +++ b/migrator/sharedgroup.go @@ -34,19 +34,19 @@ type v1SharedGroup struct { func (m *Migrator) migrateCurrentSharedGroups() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.SHARED_GROUP_PREFIX) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.SHARED_GROUP_PREFIX) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.SHARED_GROUP_PREFIX) - sgs, err := m.dmIN.GetSharedGroup(idg, true, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetSharedGroup(idg, true, utils.NonTransactional) if err != nil { return err } if sgs != nil { if m.dryRun != true { - if err := m.dmOut.SetSharedGroup(sgs, utils.NonTransactional); err != nil { + if err := m.dmOut.DataManager().SetSharedGroup(sgs, utils.NonTransactional); err != nil { return err } } @@ -58,7 +58,7 @@ func (m *Migrator) migrateCurrentSharedGroups() (err error) { func (m *Migrator) migrateV1SharedGroups() (err error) { var v1SG *v1SharedGroup for { - v1SG, err = m.oldDataDB.getV1SharedGroup() + v1SG, err = m.dmIN.getV1SharedGroup() if err != nil && err != utils.ErrNoMoreData { return err } @@ -68,7 +68,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) { if v1SG != nil { acnt := v1SG.AsSharedGroup() if m.dryRun != true { - if err = m.dmOut.SetSharedGroup(acnt, utils.NonTransactional); err != nil { + if err = m.dmOut.DataManager().SetSharedGroup(acnt, utils.NonTransactional); err != nil { return err } m.stats[utils.SharedGroups] += 1 @@ -77,7 +77,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) { } // All done, update version wtih current one vrs := engine.Versions{utils.SharedGroups: engine.CurrentStorDBVersions()[utils.SharedGroups]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -89,7 +89,7 @@ func (m *Migrator) migrateV1SharedGroups() (err error) { func (m *Migrator) migrateSharedGroups() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/stats.go b/migrator/stats.go index 57235ed60..8bb05242d 100644 --- a/migrator/stats.go +++ b/migrator/stats.go @@ -63,19 +63,19 @@ func (m *Migrator) migrateCurrentStats() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant //StatQueue - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.StatQueuePrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueuePrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.StatQueuePrefix+tenant+":") - sgs, err := m.dmIN.GetStatQueue(tenant, idg, true, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueue(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if sgs != nil { if m.dryRun != true { - if err := m.dmOut.SetStatQueue(sgs); err != nil { + if err := m.dmOut.DataManager().SetStatQueue(sgs); err != nil { return err } m.stats[utils.StatS] += 1 @@ -83,19 +83,19 @@ func (m *Migrator) migrateCurrentStats() (err error) { } } //StatQueueProfile - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.StatQueueProfilePrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.StatQueueProfilePrefix+tenant+":") - sgs, err := m.dmIN.GetStatQueueProfile(tenant, idg, true, utils.NonTransactional) + sgs, err := m.dmIN.DataManager().GetStatQueueProfile(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if sgs != nil { if m.dryRun != true { - if err := m.dmOut.SetStatQueueProfile(sgs, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueueProfile(sgs, true); err != nil { return err } } @@ -108,7 +108,7 @@ func (m *Migrator) migrateCurrentStats() (err error) { func (m *Migrator) migrateV1CDRSTATS() (err error) { var v1Sts *v1Stat for { - v1Sts, err = m.oldDataDB.getV1Stats() + v1Sts, err = m.dmIN.getV1Stats() if err != nil && err != utils.ErrNoMoreData { return err } @@ -129,13 +129,13 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) { return err } if !m.dryRun { - if err := m.dmOut.SetFilter(filter); err != nil { + if err := m.dmOut.DataManager().SetFilter(filter); err != nil { return err } - if err := m.dmOut.SetStatQueue(sq); err != nil { + if err := m.dmOut.DataManager().SetStatQueue(sq); err != nil { return err } - if err := m.dmOut.SetStatQueueProfile(sts, true); err != nil { + if err := m.dmOut.DataManager().SetStatQueueProfile(sts, true); err != nil { return err } m.stats[utils.StatS] += 1 @@ -145,7 +145,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.StatS: engine.CurrentDataDBVersions()[utils.StatS]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -158,7 +158,7 @@ func (m *Migrator) migrateV1CDRSTATS() (err error) { func (m *Migrator) migrateStats() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/subscribers.go b/migrator/subscribers.go index 60c57b2b2..1888cd07a 100644 --- a/migrator/subscribers.go +++ b/migrator/subscribers.go @@ -26,14 +26,14 @@ import ( ) func (m *Migrator) migrateCurrentSubscribers() (err error) { - subs, err := m.dmIN.GetSubscribers() + subs, err := m.dmIN.DataManager().GetSubscribers() if err != nil { return err } for id, sub := range subs { if sub != nil { if m.dryRun != true { - if err := m.dmOut.SetSubscriber(id, sub); err != nil { + if err := m.dmOut.DataManager().SetSubscriber(id, sub); err != nil { return err } m.stats[utils.Subscribers] += 1 @@ -46,7 +46,7 @@ func (m *Migrator) migrateCurrentSubscribers() (err error) { func (m *Migrator) migrateSubscribers() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/suppliers.go b/migrator/suppliers.go index 24b1835c5..426c3cd8d 100644 --- a/migrator/suppliers.go +++ b/migrator/suppliers.go @@ -30,19 +30,19 @@ import ( func (m *Migrator) migrateCurrentSupplierProfile() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.SupplierProfilePrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.SupplierProfilePrefix) - splp, err := m.dmIN.GetSupplierProfile(tenant, idg, true, utils.NonTransactional) + splp, err := m.dmIN.DataManager().GetSupplierProfile(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if splp != nil { if m.dryRun != true { - if err := m.dmOut.SetSupplierProfile(splp, true); err != nil { + if err := m.dmOut.DataManager().SetSupplierProfile(splp, true); err != nil { return err } m.stats[utils.Suppliers] += 1 @@ -55,7 +55,7 @@ func (m *Migrator) migrateCurrentSupplierProfile() (err error) { func (m *Migrator) migrateSupplierProfiles() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/thresholds.go b/migrator/thresholds.go index 08e541485..0f632a3b9 100644 --- a/migrator/thresholds.go +++ b/migrator/thresholds.go @@ -51,19 +51,19 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { var ids []string tenant := config.CgrConfig().DefaultTenant //Thresholds - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ThresholdPrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdPrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ThresholdPrefix+tenant+":") - ths, err := m.dmIN.GetThreshold(tenant, idg, true, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThreshold(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if ths != nil { if m.dryRun != true { - if err := m.dmOut.SetThreshold(ths); err != nil { + if err := m.dmOut.DataManager().SetThreshold(ths); err != nil { return err } m.stats[utils.Thresholds] += 1 @@ -71,19 +71,19 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { } } //ThresholdProfiles - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.ThresholdProfilePrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.ThresholdProfilePrefix+tenant+":") - ths, err := m.dmIN.GetThresholdProfile(tenant, idg, true, utils.NonTransactional) + ths, err := m.dmIN.DataManager().GetThresholdProfile(tenant, idg, true, utils.NonTransactional) if err != nil { return err } if ths != nil { if m.dryRun != true { - if err := m.dmOut.SetThresholdProfile(ths, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(ths, true); err != nil { return err } } @@ -95,7 +95,7 @@ func (m *Migrator) migrateCurrentThresholds() (err error) { func (m *Migrator) migrateV2ActionTriggers() (err error) { var v2ACT *v2ActionTrigger for { - v2ACT, err = m.oldDataDB.getV2ActionTrigger() + v2ACT, err = m.dmIN.getV2ActionTrigger() if err != nil && err != utils.ErrNoMoreData { return err } @@ -108,13 +108,13 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) { return err } if m.dryRun != true { - if err := m.dmOut.SetFilter(filter); err != nil { + if err := m.dmOut.DataManager().SetFilter(filter); err != nil { return err } - if err := m.dmOut.SetThreshold(th); err != nil { + if err := m.dmOut.DataManager().SetThreshold(th); err != nil { return err } - if err := m.dmOut.SetThresholdProfile(thp, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { return err } m.stats[utils.Thresholds] += 1 @@ -124,7 +124,7 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) { if m.dryRun != true { // All done, update version wtih current one vrs := engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), @@ -137,7 +137,7 @@ func (m *Migrator) migrateV2ActionTriggers() (err error) { func (m *Migrator) migrateThresholds() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, @@ -250,7 +250,7 @@ func (v2ATR v2ActionTrigger) AsThreshold() (thp *engine.ThresholdProfile, th *en func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { var vrs engine.Versions - if m.dmOut.DataDB() == nil { + if m.dmOut.DataManager().DataDB() == nil { return utils.NewCGRError(utils.Migrator, utils.MandatoryIEMissingCaps, utils.NoStorDBConnection, @@ -262,21 +262,21 @@ func (m *Migrator) SasThreshold(v2ATR *engine.ActionTrigger) (err error) { return err } if filter != nil { - if err := m.dmOut.SetFilter(filter); err != nil { + if err := m.dmOut.DataManager().SetFilter(filter); err != nil { return err } } - if err := m.dmOut.SetThreshold(th); err != nil { + if err := m.dmOut.DataManager().SetThreshold(th); err != nil { return err } - if err := m.dmOut.SetThresholdProfile(thp, true); err != nil { + if err := m.dmOut.DataManager().SetThresholdProfile(thp, true); err != nil { return err } m.stats[utils.Thresholds] += 1 } // All done, update version wtih current one vrs = engine.Versions{utils.Thresholds: engine.CurrentStorDBVersions()[utils.Thresholds]} - if err = m.dmOut.DataDB().SetVersions(vrs, false); err != nil { + if err = m.dmOut.DataManager().DataDB().SetVersions(vrs, false); err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), diff --git a/migrator/timings.go b/migrator/timings.go index 889cf3d5b..34a59397c 100644 --- a/migrator/timings.go +++ b/migrator/timings.go @@ -28,19 +28,19 @@ import ( func (m *Migrator) migrateCurrentTiming() (err error) { var ids []string - ids, err = m.dmIN.DataDB().GetKeysForPrefix(utils.TimingsPrefix) + ids, err = m.dmIN.DataManager().DataDB().GetKeysForPrefix(utils.TimingsPrefix) if err != nil { return err } for _, id := range ids { idg := strings.TrimPrefix(id, utils.TimingsPrefix) - tm, err := m.dmIN.GetTiming(idg, true, utils.NonTransactional) + tm, err := m.dmIN.DataManager().GetTiming(idg, true, utils.NonTransactional) if err != nil { return err } if tm != nil { if m.dryRun != true { - if err := m.dmOut.SetTiming(tm); err != nil { + if err := m.dmOut.DataManager().SetTiming(tm); err != nil { return err } m.stats[utils.Timing] += 1 @@ -53,7 +53,7 @@ func (m *Migrator) migrateCurrentTiming() (err error) { func (m *Migrator) migrateTimings() (err error) { var vrs engine.Versions current := engine.CurrentDataDBVersions() - vrs, err = m.dmOut.DataDB().GetVersions("") + vrs, err = m.dmOut.DataManager().DataDB().GetVersions("") if err != nil { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, diff --git a/migrator/v1migrator_utils.go b/migrator/v1migrator_utils.go index 24a821d3f..d480fbee0 100644 --- a/migrator/v1migrator_utils.go +++ b/migrator/v1migrator_utils.go @@ -20,43 +20,57 @@ package migrator import ( "errors" - "fmt" - "strconv" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) -func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string) { - -} - -func ConfigureV1DataStorage() (db MigratorDataDB, err error) { - var d MigratorDataDB - d.DataManger().(engine.RedisStoarage) +func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string, + cacheCfg config.CacheConfig, loadHistorySize int) (db MigratorDataDB, err error) { + dm, err := engine.ConfigureDataStorage(db_type, + host, port, name, user, pass, marshaler, + cacheCfg, loadHistorySize) + if err != nil { + return nil, err + } switch db_type { case utils.REDIS: - var db_nb int - db_nb, err = strconv.Atoi(name) - if err != nil { - utils.Logger.Crit("Redis db name must be an integer!") - return nil, err - } - if port != "" { - host += ":" + port - } - d, err = newv1RedisStorage(host, db_nb, pass, marshaler) + d := newRedisMigrator(dm) + db = d.(MigratorDataDB) case utils.MONGO: - d, err = newv1MongoStorage(host, port, name, user, pass, utils.DataDB, nil) + d := newMongoMigrator(dm) db = d.(MigratorDataDB) default: err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", db_type, utils.REDIS, utils.MONGO)) } +} + +/* + +func NewMigratorStorDB(db_type, host, port, name, user, pass, marshaler string, + cacheCfg config.CacheConfig, loadHistorySize int) (db MigratorDataDB, err error) { + dm, err := engine.ConfigureStorStorage(db_type, + host, port, name, user, pass, marshaler, + cacheCfg, loadHistorySize) if err != nil { return nil, err } - return d, nil + switch db_type { + case utils.MONGO: + d := newRedisMigrator(dm) + db = d.(MigratorDataDB) + case utils.MYSQL: + d := newMongoMigrator(dm) + db = d.(MigratorDataDB) + default: + err = errors.New(fmt.Sprintf("Unknown db '%s' valid options are '%s' or '%s'", + db_type, utils.REDIS, utils.MONGO)) + } } +*/ + +/* func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db MigratorStorDB, err error) { var d MigratorStorDB @@ -76,3 +90,4 @@ func ConfigureV1StorDB(db_type, host, port, name, user, pass string) (db Migrato } return d, nil } +*/ diff --git a/migrator/v1mongo_data.go b/migrator/v1mongo_data.go index e01169ad6..fdb5d84dc 100644 --- a/migrator/v1mongo_data.go +++ b/migrator/v1mongo_data.go @@ -19,9 +19,6 @@ along with this program. If not, see package migrator import ( - "fmt" - "strings" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/mgo" @@ -48,40 +45,15 @@ type AtKeyValue struct { Value v1ActionPlans } -func newmongoMigrator(dm *engine.DataManager) { - -} - -func newv1MongoStorage(host, port, db, user, pass, storageType string, cdrsIndexes []string) (v1ms *v1Mongo, err error) { - url := host - if port != "" { - url += ":" + port - } - if user != "" && pass != "" { - url = fmt.Sprintf("%s:%s@%s", user, pass, url) - } - var dbName string - if db != "" { - url += "/" + db - dbName = strings.Split(db, "?")[0] // remove extra info after ? - } - session, err := mgo.Dial(url) - if err != nil { - return nil, err - } - session.SetMode(mgo.Strong, true) - v1ms = &v1Mongo{db: dbName, session: session, v1ms: engine.NewCodecMsgpackMarshaler()} - return -} -func (v1ms *v1Mongo) Close() {} -func (v1ms *v1Mongo) getKeysForPrefix(prefix string) ([]string, error) { - return nil, nil +func newMongoMigrator(dm *engine.DataManager) (mgoMig *mongoMigrator) { + mgoMig.dm = dm + mgoMig.mgoDB = dm.DataDB().(*engine.MongoStorage) } //Account methods //V1 //get -func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) { +func (v1ms *mongoMigrator) getv1Account() (v1Acnt *v1Account, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Find(nil).Iter() } @@ -96,7 +68,7 @@ func (v1ms *v1Mongo) getv1Account() (v1Acnt *v1Account, err error) { } //set -func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) { +func (v1ms *mongoMigrator) setV1Account(x *v1Account) (err error) { if err := v1ms.session.DB(v1ms.db).C(v1AccountDBPrefix).Insert(x); err != nil { return err } @@ -105,7 +77,7 @@ func (v1ms *v1Mongo) setV1Account(x *v1Account) (err error) { //V2 //get -func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) { +func (v1ms *mongoMigrator) getv2Account() (v2Acnt *v2Account, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v2AccountsCol).Find(nil).Iter() } @@ -120,7 +92,7 @@ func (v1ms *v1Mongo) getv2Account() (v2Acnt *v2Account, err error) { } //set -func (v1ms *v1Mongo) setV2Account(x *v2Account) (err error) { +func (v1ms *mongoMigrator) setV2Account(x *v2Account) (err error) { if err := v1ms.session.DB(v1ms.db).C(v2AccountsCol).Insert(x); err != nil { return err } @@ -129,7 +101,7 @@ func (v1ms *v1Mongo) setV2Account(x *v2Account) (err error) { //Action methods //get -func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { +func (v1ms *mongoMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { var strct *AtKeyValue if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actiontimings").Find(nil).Iter() @@ -144,7 +116,7 @@ func (v1ms *v1Mongo) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { } //set -func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) { +func (v1ms *mongoMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) { key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id if err := v1ms.session.DB(v1ms.db).C("actiontimings").Insert(&AtKeyValue{key, *x}); err != nil { return err @@ -154,7 +126,7 @@ func (v1ms *v1Mongo) setV1ActionPlans(x *v1ActionPlans) (err error) { //Actions methods //get -func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) { +func (v1ms *mongoMigrator) getV1Actions() (v1acs *v1Actions, err error) { var strct *AcKeyValue if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C("actions").Find(nil).Iter() @@ -170,7 +142,7 @@ func (v1ms *v1Mongo) getV1Actions() (v1acs *v1Actions, err error) { } //set -func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) { +func (v1ms *mongoMigrator) setV1Actions(x *v1Actions) (err error) { key := utils.ACTION_PREFIX + (*x)[0].Id if err := v1ms.session.DB(v1ms.db).C("actions").Insert(&AcKeyValue{key, *x}); err != nil { return err @@ -180,18 +152,18 @@ func (v1ms *v1Mongo) setV1Actions(x *v1Actions) (err error) { //ActionTriggers methods //get -func (v1ms *v1Mongo) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { +func (v1ms *mongoMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { return nil, utils.ErrNotImplemented } //set -func (v1ms *v1Mongo) setV1ActionTriggers(x *v1ActionTriggers) (err error) { +func (v1ms *mongoMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) { return utils.ErrNotImplemented } //Actions methods //get -func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { +func (v1ms *mongoMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Find(nil).Iter() } @@ -205,7 +177,7 @@ func (v1ms *v1Mongo) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { } //set -func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) { +func (v1ms *mongoMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) { if err := v1ms.session.DB(v1ms.db).C(utils.SHARED_GROUP_PREFIX).Insert(x); err != nil { return err } @@ -214,7 +186,7 @@ func (v1ms *v1Mongo) setV1SharedGroup(x *v1SharedGroup) (err error) { //Stats methods //get -func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) { +func (v1ms *mongoMigrator) getV1Stats() (v1st *v1Stat, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Find(nil).Iter() } @@ -228,7 +200,7 @@ func (v1ms *v1Mongo) getV1Stats() (v1st *v1Stat, err error) { } //set -func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) { +func (v1ms *mongoMigrator) setV1Stats(x *v1Stat) (err error) { if err := v1ms.session.DB(v1ms.db).C(utils.CDR_STATS_PREFIX).Insert(x); err != nil { return err } @@ -237,7 +209,7 @@ func (v1ms *v1Mongo) setV1Stats(x *v1Stat) (err error) { //Stats methods //get -func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { +func (v1ms *mongoMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Find(nil).Iter() } @@ -251,7 +223,7 @@ func (v1ms *v1Mongo) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { } //set -func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) { +func (v1ms *mongoMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) { if err := v1ms.session.DB(v1ms.db).C(v1ActionTriggersCol).Insert(x); err != nil { return err } @@ -260,7 +232,7 @@ func (v1ms *v1Mongo) setV2ActionTrigger(x *v2ActionTrigger) (err error) { //AttributeProfile methods //get -func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) { +func (v1ms *mongoMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Find(nil).Iter() } @@ -274,7 +246,7 @@ func (v1ms *v1Mongo) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err } //set -func (v1ms *v1Mongo) setV1AttributeProfile(x *v1AttributeProfile) (err error) { +func (v1ms *mongoMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) { if err := v1ms.session.DB(v1ms.db).C(v1AttributeProfilesCol).Insert(x); err != nil { return err } diff --git a/migrator/v1mongo_stor.go b/migrator/v1mongo_stor.go index f98ad6bb0..49f22b9e5 100755 --- a/migrator/v1mongo_stor.go +++ b/migrator/v1mongo_stor.go @@ -25,7 +25,7 @@ import ( //CDR methods //get -func (v1ms *v1Mongo) getV1CDR() (v1Cdr *v1Cdrs, err error) { +func (v1ms *mongoMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Find(nil).Iter() } @@ -40,7 +40,7 @@ func (v1ms *v1Mongo) getV1CDR() (v1Cdr *v1Cdrs, err error) { } //set -func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) { +func (v1ms *mongoMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) { if err = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Insert(v1Cdr); err != nil { return err } @@ -49,7 +49,7 @@ func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) { //SMCost methods //get -func (v1ms *v1Mongo) getSMCost() (v2Cost *v2SessionsCost, err error) { +func (v1ms *mongoMigrator) getSMCost() (v2Cost *v2SessionsCost, err error) { if v1ms.qryIter == nil { v1ms.qryIter = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Find(nil).Iter() } @@ -64,7 +64,7 @@ func (v1ms *v1Mongo) getSMCost() (v2Cost *v2SessionsCost, err error) { } //set -func (v1ms *v1Mongo) setSMCost(v2Cost *v2SessionsCost) (err error) { +func (v1ms *mongoMigrator) setSMCost(v2Cost *v2SessionsCost) (err error) { if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Insert(v2Cost); err != nil { return err } @@ -72,7 +72,7 @@ func (v1ms *v1Mongo) setSMCost(v2Cost *v2SessionsCost) (err error) { } //remove -func (v1ms *v1Mongo) remSMCost(v2Cost *v2SessionsCost) (err error) { +func (v1ms *mongoMigrator) remSMCost(v2Cost *v2SessionsCost) (err error) { if err = v1ms.session.DB(v1ms.db).C(utils.SessionsCostsTBL).Remove(nil); err != nil { return err } diff --git a/migrator/v1redis.go b/migrator/v1redis.go index 0e0f1dc0b..0127c7a64 100644 --- a/migrator/v1redis.go +++ b/migrator/v1redis.go @@ -19,124 +19,26 @@ along with this program. If not, see package migrator import ( - "fmt" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" - - "github.com/mediocregopher/radix.v2/pool" - "github.com/mediocregopher/radix.v2/redis" ) type redisMigrator struct { dm *engine.DataManager - rds *engine.RedisStorage + rds *engine.RedisStorage dataKeys []string qryIdx *int } -func newredisMigrator(dm *engine.Datamnager) ( rM *redisMigrator, error) { +func newRedisMigrator(dm *engine.DataManager) (rM *redisMigrator) { + rM.dm = dm rM.rds = dm.DataDB().(*engine.RedisStorage) - - - - df := func(network, addr string) (*redis.Client, error) { - client, err := redis.Dial(network, addr) - if err != nil { - return nil, err - } - if len(pass) != 0 { - if err = client.Cmd("AUTH", pass).Err; err != nil { - client.Close() - return nil, err - } - } - if db != 0 { - if err = client.Cmd("SELECT", db).Err; err != nil { - client.Close() - return nil, err - } - } - return client, nil - } - p, err := pool.NewCustom("tcp", address, 1, df) - if err != nil { - return nil, err - } - var mrshler engine.Marshaler - if mrshlerStr == utils.MSGPACK { - mrshler = engine.NewCodecMsgpackMarshaler() - } else if mrshlerStr == utils.JSON { - mrshler = new(engine.JSONMarshaler) - } else { - return nil, fmt.Errorf("Unsupported marshaler: %v", mrshlerStr) - } - return &v1Redis{dbPool: p, ms: mrshler}, nil -} - -// This CMD function get a connection from the pool. -// Handles automatic failover in case of network disconnects -func (v1rs *v1Redis) cmd(cmd string, args ...interface{}) *redis.Resp { - c1, err := v1rs.dbPool.Get() - if err != nil { - return redis.NewResp(err) - } - result := c1.Cmd(cmd, args...) - if result.IsType(redis.IOErr) { // Failover mecaneism - utils.Logger.Warning(fmt.Sprintf(" error <%s>, attempting failover.", result.Err.Error())) - c2, err := v1rs.dbPool.Get() - if err == nil { - if result2 := c2.Cmd(cmd, args...); !result2.IsType(redis.IOErr) { - v1rs.dbPool.Put(c2) - return result2 - } - } - } else { - v1rs.dbPool.Put(c1) - } - return result -} -func (v1rs *v1Redis) Close() {} - -func (v1rs *v1Redis) getKeysForPrefix(prefix string) ([]string, error) { - r := v1rs.cmd("KEYS", prefix+"*") - if r.Err != nil { - return nil, r.Err - } - return r.List() -} - -// Adds a single load instance to load history -func (v1rs *v1Redis) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { - if loadHistSize == 0 { // Load history disabled - return nil - } - marshaled, err := v1rs.ms.Marshal(&ldInst) - if err != nil { - return err - } - _, err = guardian.Guardian.Guard(func() (interface{}, error) { // Make sure we do it locked since other instance can modify history while we read it - histLen, err := v1rs.cmd("LLEN", utils.LOADINST_KEY).Int() - if err != nil { - return nil, err - } - if histLen >= loadHistSize { // Have hit maximum history allowed, remove oldest element in order to add new one - if err := v1rs.cmd("RPOP", utils.LOADINST_KEY).Err; err != nil { - return nil, err - } - } - err = v1rs.cmd("LPUSH", utils.LOADINST_KEY, marshaled).Err - return nil, err - }, 0, utils.LOADINST_KEY) - - return err } //Account methods //V1 //get -func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) { +func (v1rs *redisMigrator) getv1Account() (v1Acnt *v1Account, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(v1AccountDBPrefix) if err != nil { @@ -164,7 +66,7 @@ func (v1rs *v1Redis) getv1Account() (v1Acnt *v1Account, err error) { } //set -func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) { +func (v1rs *redisMigrator) setV1Account(x *v1Account) (err error) { key := v1AccountDBPrefix + x.Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -178,7 +80,7 @@ func (v1rs *v1Redis) setV1Account(x *v1Account) (err error) { //V2 //get -func (v1rs *v1Redis) getv2Account() (v2Acnt *v2Account, err error) { +func (v1rs *redisMigrator) getv2Account() (v2Acnt *v2Account, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACCOUNT_PREFIX) if err != nil { @@ -206,7 +108,7 @@ func (v1rs *v1Redis) getv2Account() (v2Acnt *v2Account, err error) { } //set -func (v1rs *v1Redis) setV2Account(x *v2Account) (err error) { +func (v1rs *redisMigrator) setV2Account(x *v2Account) (err error) { key := utils.ACCOUNT_PREFIX + x.ID bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -220,7 +122,7 @@ func (v1rs *v1Redis) setV2Account(x *v2Account) (err error) { //ActionPlans methods //get -func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { +func (v1rs *redisMigrator) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PLAN_PREFIX) if err != nil { @@ -247,7 +149,7 @@ func (v1rs *v1Redis) getV1ActionPlans() (v1aps *v1ActionPlans, err error) { } //set -func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) { +func (v1rs *redisMigrator) setV1ActionPlans(x *v1ActionPlans) (err error) { key := utils.ACTION_PLAN_PREFIX + (*x)[0].Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -261,7 +163,7 @@ func (v1rs *v1Redis) setV1ActionPlans(x *v1ActionPlans) (err error) { //Actions methods //get -func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) { +func (v1rs *redisMigrator) getV1Actions() (v1acs *v1Actions, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_PREFIX) if err != nil { @@ -288,7 +190,7 @@ func (v1rs *v1Redis) getV1Actions() (v1acs *v1Actions, err error) { } //set -func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) { +func (v1rs *redisMigrator) setV1Actions(x *v1Actions) (err error) { key := utils.ACTION_PREFIX + (*x)[0].Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -302,7 +204,7 @@ func (v1rs *v1Redis) setV1Actions(x *v1Actions) (err error) { //ActionTriggers methods //get -func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { +func (v1rs *redisMigrator) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) if err != nil { @@ -329,7 +231,7 @@ func (v1rs *v1Redis) getV1ActionTriggers() (v1acts *v1ActionTriggers, err error) } //set -func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) { +func (v1rs *redisMigrator) setV1ActionTriggers(x *v1ActionTriggers) (err error) { key := utils.ACTION_TRIGGER_PREFIX + (*x)[0].Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -343,7 +245,7 @@ func (v1rs *v1Redis) setV1ActionTriggers(x *v1ActionTriggers) (err error) { //SharedGroup methods //get -func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { +func (v1rs *redisMigrator) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.SHARED_GROUP_PREFIX) if err != nil { @@ -370,7 +272,7 @@ func (v1rs *v1Redis) getV1SharedGroup() (v1sg *v1SharedGroup, err error) { } //set -func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) { +func (v1rs *redisMigrator) setV1SharedGroup(x *v1SharedGroup) (err error) { key := utils.SHARED_GROUP_PREFIX + x.Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -384,7 +286,7 @@ func (v1rs *v1Redis) setV1SharedGroup(x *v1SharedGroup) (err error) { //Stats methods //get -func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) { +func (v1rs *redisMigrator) getV1Stats() (v1st *v1Stat, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.CDR_STATS_PREFIX) if err != nil { @@ -411,7 +313,7 @@ func (v1rs *v1Redis) getV1Stats() (v1st *v1Stat, err error) { } //set -func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) { +func (v1rs *redisMigrator) setV1Stats(x *v1Stat) (err error) { key := utils.CDR_STATS_PREFIX + x.Id bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -425,7 +327,7 @@ func (v1rs *v1Redis) setV1Stats(x *v1Stat) (err error) { //Action methods //get -func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { +func (v1rs *redisMigrator) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.ACTION_TRIGGER_PREFIX) if err != nil { @@ -452,7 +354,7 @@ func (v1rs *v1Redis) getV2ActionTrigger() (v2at *v2ActionTrigger, err error) { } //set -func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) { +func (v1rs *redisMigrator) setV2ActionTrigger(x *v2ActionTrigger) (err error) { key := utils.ACTION_TRIGGER_PREFIX + x.ID bit, err := v1rs.ms.Marshal(x) if err != nil { @@ -466,7 +368,7 @@ func (v1rs *v1Redis) setV2ActionTrigger(x *v2ActionTrigger) (err error) { //AttributeProfile methods //get -func (v1rs *v1Redis) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) { +func (v1rs *redisMigrator) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err error) { var v1attr *v1AttributeProfile if v1rs.qryIdx == nil { v1rs.dataKeys, err = v1rs.getKeysForPrefix(utils.AttributeProfilePrefix) @@ -494,7 +396,7 @@ func (v1rs *v1Redis) getV1AttributeProfile() (v1attrPrf *v1AttributeProfile, err } //set -func (v1rs *v1Redis) setV1AttributeProfile(x *v1AttributeProfile) (err error) { +func (v1rs *redisMigrator) setV1AttributeProfile(x *v1AttributeProfile) (err error) { key := utils.AttributeProfilePrefix + utils.ConcatenatedKey(x.Tenant, x.ID) bit, err := v1rs.ms.Marshal(x) if err != nil { diff --git a/migrator/v1sql.go b/migrator/v1sql.go index 4b214ebe4..4fda7ed8d 100755 --- a/migrator/v1sql.go +++ b/migrator/v1sql.go @@ -20,13 +20,11 @@ package migrator import ( "database/sql" - "fmt" "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" _ "github.com/go-sql-driver/mysql" - "github.com/jinzhu/gorm" ) type migratorSQL struct { @@ -35,19 +33,7 @@ type migratorSQL struct { rowIter *sql.Rows } -func newSqlStorage(host, port, name, user, password string) (*sqlStorage, error) { - connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER'", user, password, host, port, name) - db, err := gorm.Open("mysql", connectString) - if err != nil { - return nil, err - } - if err = db.DB().Ping(); err != nil { - return nil, err - } - return &sqlStorage{Db: db.DB(), db: db}, nil -} - -func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) { +func (sqlStorage *migratorSQL) getV1CDR() (v1Cdr *v1Cdrs, err error) { if sqlStorage.rowIter == nil { sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM cdrs") if err != nil { @@ -66,7 +52,7 @@ func (sqlStorage *sqlStorage) getV1CDR() (v1Cdr *v1Cdrs, err error) { return v1Cdr, nil } -func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) { +func (sqlStorage *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) { tx := sqlStorage.db.Begin() cdrSql := v1Cdr.AsCDRsql() cdrSql.CreatedAt = time.Now() @@ -78,7 +64,7 @@ func (sqlStorage *sqlStorage) setV1CDR(v1Cdr *v1Cdrs) (err error) { return nil } -func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) { +func (sqlStorage *migratorSQL) getSMCost() (v2Cost *v2SessionsCost, err error) { if sqlStorage.rowIter == nil { sqlStorage.rowIter, err = sqlStorage.Db.Query("SELECT * FROM sessions_costs") if err != nil { @@ -97,7 +83,7 @@ func (sqlStorage *sqlStorage) getSMCost() (v2Cost *v2SessionsCost, err error) { return v2Cost, nil } -func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) { +func (sqlStorage *migratorSQL) setSMCost(v2Cost *v2SessionsCost) (err error) { tx := sqlStorage.db.Begin() smSql := v2Cost.AsSessionsCostSql() smSql.CreatedAt = time.Now() @@ -109,7 +95,7 @@ func (sqlStorage *sqlStorage) setSMCost(v2Cost *v2SessionsCost) (err error) { return } -func (sqlStorage *sqlStorage) remSMCost(v2Cost *v2SessionsCost) (err error) { +func (sqlStorage *migratorSQL) remSMCost(v2Cost *v2SessionsCost) (err error) { tx := sqlStorage.db.Begin() var rmParam *engine.SessionsCostsSQL if v2Cost != nil {