diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index f21fe5d6d..8c978cd26 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -210,12 +210,21 @@ func main() { return } if migrate != nil && *migrate != "" { // Run migrator + ratingDb, err := engine.ConfigureRatingStorage(*tpdb_type, *tpdb_host, *tpdb_port, *tpdb_name, + *tpdb_user, *tpdb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } + accountDb, err := engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize) + if err != nil { + log.Fatal(err) + } storDB, err := engine.ConfigureStorStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding, cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBCDRSIndexes) if err != nil { log.Fatal(err) } - if err := migrator.NewMigrator(storDB, *stor_db_type).Migrate(*migrate); err != nil { + if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, storDB, *stor_db_type).Migrate(*migrate); err != nil { log.Fatal(err) } log.Print("Done migrating!") diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 4aab3a610..efa952488 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -42,6 +42,7 @@ type Storage interface { // Interface for storage providers. type RatingStorage interface { Storage + Marshaler() Marshaler HasData(string, string) (bool, error) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error GetRatingPlan(string, bool, string) (*RatingPlan, error) @@ -83,6 +84,7 @@ type RatingStorage interface { type AccountingStorage interface { Storage + Marshaler() Marshaler LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error GetAccount(string) (*Account, error) SetAccount(*Account) error @@ -117,6 +119,7 @@ type AccountingStorage interface { // OnlineStorage contains methods to use for administering online data type DataDB interface { Storage + Marshaler() Marshaler HasData(string, string) (bool, error) LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error GetRatingPlan(string, bool, string) (*RatingPlan, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 722572cf7..5a8cb3769 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -86,6 +86,10 @@ func (ms *MapStorage) Flush(ignore string) error { return nil } +func (ms *MapStorage) Marshaler() Marshaler { + return ms.ms +} + func (ms *MapStorage) SelectDatabase(dbName string) (err error) { return } diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index fcbe00219..efc0567b6 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -322,6 +322,15 @@ func (ms *MongoStorage) Flush(ignore string) (err error) { return dbSession.DB(ms.db).DropDatabase() } +func (ms *MongoStorage) Marshaler() Marshaler { + return ms.ms +} + +// CloneSession returns a clone of the existing session so we can perform queries from outside of engine package +func (ms *MongoStorage) CloneSession() *mgo.Session { + return ms.session.Copy() +} + func (ms *MongoStorage) SelectDatabase(dbName string) (err error) { ms.db = dbName return diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 2dd9188cc..1e956606d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -109,6 +109,10 @@ func (rs *RedisStorage) Flush(ignore string) error { return rs.Cmd("FLUSHDB").Err } +func (rs *RedisStorage) Marshaler() Marshaler { + return rs.ms +} + func (rs *RedisStorage) SelectDatabase(dbName string) (err error) { return rs.Cmd("SELECT", dbName).Err } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 2c4f86d1e..cf63150fd 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -61,7 +61,7 @@ func (self *SQLStorage) GetKeysForPrefix(prefix string) ([]string, error) { return nil, utils.ErrNotImplemented } -func (ms *SQLStorage) RebuildReverseForPrefix(prefix string) error { +func (self *SQLStorage) RebuildReverseForPrefix(prefix string) error { return utils.ErrNotImplemented } diff --git a/migrator/accounts.go b/migrator/accounts.go new file mode 100644 index 000000000..c4c6c6de1 --- /dev/null +++ b/migrator/accounts.go @@ -0,0 +1,97 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package migrator + +import ( + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func (m *Migrator) migrateAccounts() (err error) { + return +} + +type v1Account struct { + Id string + BalanceMap map[string]v1BalanceChain + UnitCounters []*v1UnitsCounter + ActionTriggers v1ActionTriggers + AllowNegative bool + Disabled bool +} + +type v1BalanceChain []*v1Balance + +type v1Balance struct { + Uuid string //system wide unique + Id string // account wide unique + Value float64 + ExpirationDate time.Time + Weight float64 + DestinationIds string + RatingSubject string + Category string + SharedGroup string + Timings []*engine.RITiming + TimingIDs string + Disabled bool +} + +func (b *v1Balance) IsDefault() bool { + return (b.DestinationIds == "" || b.DestinationIds == utils.ANY) && + b.RatingSubject == "" && + b.Category == "" && + b.ExpirationDate.IsZero() && + b.SharedGroup == "" && + b.Weight == 0 && + b.Disabled == false +} + +type v1UnitsCounter struct { + Direction string + BalanceType string + // Units float64 + Balances v1BalanceChain // first balance is the general one (no destination) +} + +type v1ActionTriggers []*v1ActionTrigger + +type v1ActionTrigger struct { + Id string + ThresholdType string + ThresholdValue float64 + Recurrent bool + MinSleep time.Duration + BalanceId string + BalanceType string + BalanceDirection string + BalanceDestinationIds string + BalanceWeight float64 + BalanceExpirationDate time.Time + BalanceTimingTags string + BalanceRatingSubject string + BalanceCategory string + BalanceSharedGroup string + BalanceDisabled bool + Weight float64 + ActionsId string + MinQueuedItems int + Executed bool +} diff --git a/migrator/costdetails.go b/migrator/costdetails.go index d0b37d770..7d2755b61 100644 --- a/migrator/costdetails.go +++ b/migrator/costdetails.go @@ -18,11 +18,100 @@ along with this program. If not, see package migrator import ( + "database/sql" + "encoding/json" + "fmt" "time" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) +func (m *Migrator) migrateCostDetails() (err error) { + if m.storDB == nil { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.NoStorDBConnection, + "no connection to StorDB") + } + vrs, err := m.storDB.GetVersions(utils.COST_DETAILS) + if err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when querying storDB for versions", err.Error())) + } else if len(vrs) == 0 { + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.UndefinedVersion, + "version number is not defined for CostDetails model") + } + if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1 + return + } + var storSQL *sql.DB + switch m.storDBType { + case utils.MYSQL: + storSQL = m.storDB.(*engine.MySQLStorage).Db + case utils.POSTGRES: + storSQL = m.storDB.(*engine.PostgresStorage).Db + default: + return utils.NewCGRError(utils.Migrator, + utils.MandatoryIEMissingCaps, + utils.UnsupportedDB, + fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) + } + rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL") + if err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error())) + } + defer rows.Close() + for cnt := 0; rows.Next(); cnt++ { + var id int64 + var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString + var ccCost sql.NullFloat64 + var tts []byte + if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when scanning at count: <%d>", err.Error(), cnt)) + } + var v1tmsps v1TimeSpans + if err := json.Unmarshal(tts, &v1tmsps); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" Unmarshalling timespans at CDR with id: <%d>, error: <%s>", id, err.Error())) + continue + } + v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String, + Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String, + Cost: ccCost.Float64, Timespans: v1tmsps} + cc, err := v1CC.AsCallCost() + if err != nil { + utils.Logger.Warning( + fmt.Sprintf(" Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id)) + continue + } + if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id)) + continue + } + } + // All done, update version wtih current one + vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]} + if err := m.storDB.SetVersions(vrs); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())) + } + return +} + type v1CallCost struct { Direction, Category, Tenant, Subject, Account, Destination, TOR string Cost float64 diff --git a/migrator/migrator.go b/migrator/migrator.go index fe4c7e242..2b9cc4bfe 100644 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -18,23 +18,25 @@ along with this program. If not, see package migrator import ( - "database/sql" - "encoding/json" "fmt" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewMigrator(storDB engine.Storage, storDBType string) *Migrator { - return &Migrator{storDB: storDB, storDBType: storDBType} +func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType string, storDB engine.Storage, storDBType string) *Migrator { + return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType, storDB: storDB, storDBType: storDBType} } type Migrator struct { + tpDB engine.RatingStorage // ToDo: unify the databases when ready + dataDB engine.AccountingStorage + dataDBType string storDB engine.Storage storDBType string } +// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods func (m *Migrator) Migrate(taskID string) (err error) { switch taskID { default: // unsupported taskID @@ -51,91 +53,8 @@ func (m *Migrator) Migrate(taskID string) (err error) { } case utils.MetaCostDetails: err = m.migrateCostDetails() - } - return -} - -func (m *Migrator) migrateCostDetails() (err error) { - if m.storDB == nil { - return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.NoStorDBConnection, - "no connection to StorDB") - } - vrs, err := m.storDB.GetVersions(utils.COST_DETAILS) - if err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when querying storDB for versions", err.Error())) - } else if len(vrs) == 0 { - return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.UndefinedVersion, - "version number is not defined for CostDetails model") - } - if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1 - return - } - var storSQL *sql.DB - switch m.storDBType { - case utils.MYSQL: - storSQL = m.storDB.(*engine.MySQLStorage).Db - case utils.POSTGRES: - storSQL = m.storDB.(*engine.PostgresStorage).Db - default: - return utils.NewCGRError(utils.Migrator, - utils.MandatoryIEMissingCaps, - utils.UnsupportedDB, - fmt.Sprintf("unsupported database type: <%s>", m.storDBType)) - } - rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL") - if err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error())) - } - defer rows.Close() - for cnt := 0; rows.Next(); cnt++ { - var id int64 - var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString - var ccCost sql.NullFloat64 - var tts []byte - if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when scanning at count: <%d>", err.Error(), cnt)) - } - var v1tmsps v1TimeSpans - if err := json.Unmarshal(tts, &v1tmsps); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" Unmarshalling timespans at CDR with id: <%d>, error: <%s>", id, err.Error())) - continue - } - v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String, - Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String, - Cost: ccCost.Float64, Timespans: v1tmsps} - cc, err := v1CC.AsCallCost() - if err != nil { - utils.Logger.Warning( - fmt.Sprintf(" Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id)) - continue - } - if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil { - utils.Logger.Warning( - fmt.Sprintf(" Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id)) - continue - } - } - // All done, update version wtih current one - vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]} - if err := m.storDB.SetVersions(vrs); err != nil { - return utils.NewCGRError(utils.Migrator, - utils.ServerErrorCaps, - err.Error(), - fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error())) + case utils.MetaAccounts: + err = m.migrateAccounts() } return } diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 2e05f523f..0bc255a3e 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -473,9 +473,6 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool ssMux.RLock() ss := ssMp[cgrID] ssMux.RUnlock() - if len(ss) == 0 { - return - } var wg sync.WaitGroup for _, rplConn := range smgReplConns { if rplConn.Synchronous { diff --git a/utils/consts.go b/utils/consts.go index 90a98fabb..037110ce9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -347,6 +347,7 @@ const ( SchedulerNotRunningCaps = "SCHEDULLER_NOT_RUNNING" MetaScheduler = "*scheduler" MetaCostDetails = "*cost_details" + MetaAccounts = "*accounts" Migrator = "migrator" UnsupportedMigrationTask = "unsupported migration task" NoStorDBConnection = "not connected to StorDB"