Storage.DataDB exporting Marshaller to be later used in migrator, mongodb exporting CloneSession, migrator improvements

This commit is contained in:
DanB
2017-02-19 14:59:39 +01:00
parent 087aadb37a
commit f648fbdc51
11 changed files with 226 additions and 94 deletions

View File

@@ -210,12 +210,21 @@ func main() {
return
}
if migrate != nil && *migrate != "" { // Run migrator
ratingDb, err := engine.ConfigureRatingStorage(*tpdb_type, *tpdb_host, *tpdb_port, *tpdb_name,
*tpdb_user, *tpdb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
accountDb, err := engine.ConfigureAccountingStorage(*datadb_type, *datadb_host, *datadb_port, *datadb_name, *datadb_user, *datadb_pass, *dbdata_encoding, cgrConfig.CacheConfig, *loadHistorySize)
if err != nil {
log.Fatal(err)
}
storDB, err := engine.ConfigureStorStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding,
cgrConfig.StorDBMaxOpenConns, cgrConfig.StorDBMaxIdleConns, cgrConfig.StorDBCDRSIndexes)
if err != nil {
log.Fatal(err)
}
if err := migrator.NewMigrator(storDB, *stor_db_type).Migrate(*migrate); err != nil {
if err := migrator.NewMigrator(ratingDb, accountDb, *datadb_type, storDB, *stor_db_type).Migrate(*migrate); err != nil {
log.Fatal(err)
}
log.Print("Done migrating!")

View File

@@ -42,6 +42,7 @@ type Storage interface {
// Interface for storage providers.
type RatingStorage interface {
Storage
Marshaler() Marshaler
HasData(string, string) (bool, error)
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
GetRatingPlan(string, bool, string) (*RatingPlan, error)
@@ -83,6 +84,7 @@ type RatingStorage interface {
type AccountingStorage interface {
Storage
Marshaler() Marshaler
LoadAccountingCache(alsIDs, rvAlsIDs, rlIDs []string) error
GetAccount(string) (*Account, error)
SetAccount(*Account) error
@@ -117,6 +119,7 @@ type AccountingStorage interface {
// OnlineStorage contains methods to use for administering online data
type DataDB interface {
Storage
Marshaler() Marshaler
HasData(string, string) (bool, error)
LoadRatingCache(dstIDs, rvDstIDs, rplIDs, rpfIDs, actIDs, aplIDs, aapIDs, atrgIDs, sgIDs, lcrIDs, dcIDs []string) error
GetRatingPlan(string, bool, string) (*RatingPlan, error)

View File

@@ -86,6 +86,10 @@ func (ms *MapStorage) Flush(ignore string) error {
return nil
}
func (ms *MapStorage) Marshaler() Marshaler {
return ms.ms
}
func (ms *MapStorage) SelectDatabase(dbName string) (err error) {
return
}

View File

@@ -322,6 +322,15 @@ func (ms *MongoStorage) Flush(ignore string) (err error) {
return dbSession.DB(ms.db).DropDatabase()
}
func (ms *MongoStorage) Marshaler() Marshaler {
return ms.ms
}
// CloneSession returns a clone of the existing session so we can perform queries from outside of engine package
func (ms *MongoStorage) CloneSession() *mgo.Session {
return ms.session.Copy()
}
func (ms *MongoStorage) SelectDatabase(dbName string) (err error) {
ms.db = dbName
return

View File

@@ -109,6 +109,10 @@ func (rs *RedisStorage) Flush(ignore string) error {
return rs.Cmd("FLUSHDB").Err
}
func (rs *RedisStorage) Marshaler() Marshaler {
return rs.ms
}
func (rs *RedisStorage) SelectDatabase(dbName string) (err error) {
return rs.Cmd("SELECT", dbName).Err
}

View File

@@ -61,7 +61,7 @@ func (self *SQLStorage) GetKeysForPrefix(prefix string) ([]string, error) {
return nil, utils.ErrNotImplemented
}
func (ms *SQLStorage) RebuildReverseForPrefix(prefix string) error {
func (self *SQLStorage) RebuildReverseForPrefix(prefix string) error {
return utils.ErrNotImplemented
}

97
migrator/accounts.go Normal file
View File

@@ -0,0 +1,97 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func (m *Migrator) migrateAccounts() (err error) {
return
}
type v1Account struct {
Id string
BalanceMap map[string]v1BalanceChain
UnitCounters []*v1UnitsCounter
ActionTriggers v1ActionTriggers
AllowNegative bool
Disabled bool
}
type v1BalanceChain []*v1Balance
type v1Balance struct {
Uuid string //system wide unique
Id string // account wide unique
Value float64
ExpirationDate time.Time
Weight float64
DestinationIds string
RatingSubject string
Category string
SharedGroup string
Timings []*engine.RITiming
TimingIDs string
Disabled bool
}
func (b *v1Balance) IsDefault() bool {
return (b.DestinationIds == "" || b.DestinationIds == utils.ANY) &&
b.RatingSubject == "" &&
b.Category == "" &&
b.ExpirationDate.IsZero() &&
b.SharedGroup == "" &&
b.Weight == 0 &&
b.Disabled == false
}
type v1UnitsCounter struct {
Direction string
BalanceType string
// Units float64
Balances v1BalanceChain // first balance is the general one (no destination)
}
type v1ActionTriggers []*v1ActionTrigger
type v1ActionTrigger struct {
Id string
ThresholdType string
ThresholdValue float64
Recurrent bool
MinSleep time.Duration
BalanceId string
BalanceType string
BalanceDirection string
BalanceDestinationIds string
BalanceWeight float64
BalanceExpirationDate time.Time
BalanceTimingTags string
BalanceRatingSubject string
BalanceCategory string
BalanceSharedGroup string
BalanceDisabled bool
Weight float64
ActionsId string
MinQueuedItems int
Executed bool
}

View File

@@ -18,11 +18,100 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func (m *Migrator) migrateCostDetails() (err error) {
if m.storDB == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to StorDB")
}
vrs, err := m.storDB.GetVersions(utils.COST_DETAILS)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for CostDetails model")
}
if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1
return
}
var storSQL *sql.DB
switch m.storDBType {
case utils.MYSQL:
storSQL = m.storDB.(*engine.MySQLStorage).Db
case utils.POSTGRES:
storSQL = m.storDB.(*engine.PostgresStorage).Db
default:
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedDB,
fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
}
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error()))
}
defer rows.Close()
for cnt := 0; rows.Next(); cnt++ {
var id int64
var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString
var ccCost sql.NullFloat64
var tts []byte
if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when scanning at count: <%d>", err.Error(), cnt))
}
var v1tmsps v1TimeSpans
if err := json.Unmarshal(tts, &v1tmsps); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Unmarshalling timespans at CDR with id: <%d>, error: <%s>", id, err.Error()))
continue
}
v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String,
Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String,
Cost: ccCost.Float64, Timespans: v1tmsps}
cc, err := v1CC.AsCallCost()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id))
continue
}
if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id))
continue
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]}
if err := m.storDB.SetVersions(vrs); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
}
return
}
type v1CallCost struct {
Direction, Category, Tenant, Subject, Account, Destination, TOR string
Cost float64

View File

@@ -18,23 +18,25 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package migrator
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewMigrator(storDB engine.Storage, storDBType string) *Migrator {
return &Migrator{storDB: storDB, storDBType: storDBType}
func NewMigrator(tpDB engine.RatingStorage, dataDB engine.AccountingStorage, dataDBType string, storDB engine.Storage, storDBType string) *Migrator {
return &Migrator{tpDB: tpDB, dataDB: dataDB, dataDBType: dataDBType, storDB: storDB, storDBType: storDBType}
}
type Migrator struct {
tpDB engine.RatingStorage // ToDo: unify the databases when ready
dataDB engine.AccountingStorage
dataDBType string
storDB engine.Storage
storDBType string
}
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
func (m *Migrator) Migrate(taskID string) (err error) {
switch taskID {
default: // unsupported taskID
@@ -51,91 +53,8 @@ func (m *Migrator) Migrate(taskID string) (err error) {
}
case utils.MetaCostDetails:
err = m.migrateCostDetails()
}
return
}
func (m *Migrator) migrateCostDetails() (err error) {
if m.storDB == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to StorDB")
}
vrs, err := m.storDB.GetVersions(utils.COST_DETAILS)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for CostDetails model")
}
if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1
return
}
var storSQL *sql.DB
switch m.storDBType {
case utils.MYSQL:
storSQL = m.storDB.(*engine.MySQLStorage).Db
case utils.POSTGRES:
storSQL = m.storDB.(*engine.PostgresStorage).Db
default:
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedDB,
fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
}
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error()))
}
defer rows.Close()
for cnt := 0; rows.Next(); cnt++ {
var id int64
var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString
var ccCost sql.NullFloat64
var tts []byte
if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when scanning at count: <%d>", err.Error(), cnt))
}
var v1tmsps v1TimeSpans
if err := json.Unmarshal(tts, &v1tmsps); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Unmarshalling timespans at CDR with id: <%d>, error: <%s>", id, err.Error()))
continue
}
v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String,
Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String,
Cost: ccCost.Float64, Timespans: v1tmsps}
cc, err := v1CC.AsCallCost()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id))
continue
}
if _, err := storSQL.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id))
continue
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]}
if err := m.storDB.SetVersions(vrs); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
case utils.MetaAccounts:
err = m.migrateAccounts()
}
return
}

View File

@@ -473,9 +473,6 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
ssMux.RLock()
ss := ssMp[cgrID]
ssMux.RUnlock()
if len(ss) == 0 {
return
}
var wg sync.WaitGroup
for _, rplConn := range smgReplConns {
if rplConn.Synchronous {

View File

@@ -347,6 +347,7 @@ const (
SchedulerNotRunningCaps = "SCHEDULLER_NOT_RUNNING"
MetaScheduler = "*scheduler"
MetaCostDetails = "*cost_details"
MetaAccounts = "*accounts"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"