mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
repaired cost detail and fixed so small typos
This commit is contained in:
@@ -85,31 +85,24 @@ if migrate != nil && *migrate != "" { // Run migrator
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("#1 The redis db loaded")
|
||||
oldDataDB, err := engine.ConfigureDataStorage(*oldDataDBType, *oldDataDBHost, *oldDataDBPort, *oldDataDBName, *oldDataDBUser, *oldDataDBPass, *oldDBDataEncoding, config.CgrConfig().CacheConfig, *oldLoadHistorySize)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("#2 Old redis db loaded")
|
||||
storDB, err := engine.ConfigureStorStorage(*storDBType, *storDBHost, *storDBPort, *storDBName, *storDBUser, *storDBPass, *dbDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("#3 Old mysql db loaded")
|
||||
|
||||
oldstorDB, err := engine.ConfigureStorStorage(*oldStorDBType, *oldStorDBHost, *oldStorDBPort, *oldStorDBName, *oldStorDBUser, *oldStorDBPass, *oldDBDataEncoding,
|
||||
config.CgrConfig().StorDBMaxOpenConns, config.CgrConfig().StorDBMaxIdleConns, config.CgrConfig().StorDBConnMaxLifetime, config.CgrConfig().StorDBCDRSIndexes)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("#4 Old mysql db loaded")
|
||||
|
||||
m,err := migrator.NewMigrator(dataDB, *dataDBType, *dbDataEncoding, storDB, *storDBType,oldDataDB,*oldDataDBType,*oldDBDataEncoding,oldstorDB,*oldStorDBType)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Print("#5 Migrator started")
|
||||
err = m.Migrate(*migrate);
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
|
||||
@@ -9,69 +9,49 @@ You can increase or lower the value of step in the line after BEGIN below.
|
||||
You have to use 'CALL cgrates.migration();' to execute the script. If named other then default use that database name.
|
||||
*/
|
||||
|
||||
|
||||
DELIMITER //
|
||||
|
||||
CREATE PROCEDURE `migration`()
|
||||
BEGIN
|
||||
/* DECLARE variables */
|
||||
DECLARE max_cdrs bigint;
|
||||
DECLARE start_id bigint;
|
||||
DECLARE end_id bigint;
|
||||
DECLARE step bigint;
|
||||
/* Optimize table for performance */
|
||||
ALTER TABLE cdrs DISABLE KEYS;
|
||||
SET autocommit=0;
|
||||
SET unique_checks=0;
|
||||
SET foreign_key_checks=0;
|
||||
/* You must change the step var to commit every step rows inserted */
|
||||
SET step := 10000;
|
||||
SET start_id := 0;
|
||||
SET end_id := start_id + step;
|
||||
SET max_cdrs = (select max(id) from rated_cdrs);
|
||||
WHILE (start_id <= max_cdrs) DO
|
||||
INSERT INTO
|
||||
cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,answer_time,`usage`,supplier,extra_fields,cost,extra_info, created_at, updated_at, deleted_at)
|
||||
SELECT
|
||||
cdrs_primary.cgrid,
|
||||
rated_cdrs.runid as run_id,
|
||||
cdrs_primary.cdrhost as origin_host,
|
||||
cdrs_primary.cdrsource as source,
|
||||
cdrs_primary.accid as origin_id,
|
||||
cdrs_primary.tor,
|
||||
rated_cdrs.reqtype as request_type,
|
||||
rated_cdrs.direction,
|
||||
rated_cdrs.tenant,rated_cdrs.category,
|
||||
rated_cdrs.account,
|
||||
rated_cdrs.subject,
|
||||
rated_cdrs.destination,
|
||||
rated_cdrs.setup_time,
|
||||
rated_cdrs.answer_time,
|
||||
rated_cdrs.`usage`,
|
||||
rated_cdrs.supplier,
|
||||
cdrs_extra.extra_fields,
|
||||
rated_cdrs.cost,
|
||||
rated_cdrs.extra_info,
|
||||
rated_cdrs.created_at,
|
||||
rated_cdrs.updated_at,
|
||||
rated_cdrs.deleted_at
|
||||
FROM rated_cdrs
|
||||
INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid
|
||||
LEFT JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid
|
||||
INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid
|
||||
WHERE cdrs_primary.`usage` > '0'
|
||||
AND not exists (select 1 from cdrs c where c.cgrid = cdrs_primary.cgrid)
|
||||
AND rated_cdrs.id >= start_id
|
||||
AND rated_cdrs.id < end_id
|
||||
GROUP BY cgrid, run_id, origin_id;
|
||||
SET start_id = start_id + step;
|
||||
SET end_id = end_id + step;
|
||||
END WHILE;
|
||||
/* SET Table for live usage */
|
||||
SET autocommit=1;
|
||||
SET unique_checks=1;
|
||||
SET foreign_key_checks=1;
|
||||
ALTER TABLE cdrs ENABLE KEYS;
|
||||
OPTIMIZE TABLE cdrs;
|
||||
/* DECLARE variables */
|
||||
DECLARE max_cdrs bigint;
|
||||
DECLARE start_id bigint;
|
||||
DECLARE end_id bigint;
|
||||
DECLARE step bigint;
|
||||
/* Optimize table for performance */
|
||||
ALTER TABLE cdrs DISABLE KEYS;
|
||||
SET autocommit=0;
|
||||
SET unique_checks=0;
|
||||
SET foreign_key_checks=0;
|
||||
/* You must change the step var to commit every step rows inserted */
|
||||
SET step := 10000;
|
||||
SET start_id := 0;
|
||||
SET end_id := start_id + step;
|
||||
SET max_cdrs = (select max(id) from rated_cdrs);
|
||||
WHILE (start_id <= max_cdrs) DO
|
||||
INSERT INTO
|
||||
cdrs(cgrid,run_id,origin_host,source,origin_id,tor,request_type,direction,tenant,category,account,subject,destination,setup_time,pdd,answer_time,`usage`,supplier,disconnect_cause,extra_fields,cost_source,cost,cost_details,extra_info, created_at, updated_at, deleted_at)
|
||||
SELECT cdrs_primary.cgrid,rated_cdrs.runid as run_id,cdrs_primary.cdrhost as origin_host,cdrs_primary.cdrsource as source,cdrs_primary.accid as origin_id, cdrs_primary.tor,rated_cdrs.reqtype as request_type,rated_cdrs.direction, rated_cdrs.tenant,rated_cdrs.category, rated_cdrs.account, rated_cdrs.subject, rated_cdrs.destination,rated_cdrs.setup_time,rated_cdrs.pdd,rated_cdrs.answer_time,rated_cdrs.`usage`,rated_cdrs.supplier,rated_cdrs.disconnect_cause,cdrs_extra.extra_fields,cost_details.cost_source,rated_cdrs.cost,cost_details.timespans as cost_details,rated_cdrs.extra_info,rated_cdrs.created_at,rated_cdrs.updated_at, rated_cdrs.deleted_at
|
||||
FROM rated_cdrs
|
||||
INNER JOIN cdrs_primary ON rated_cdrs.cgrid = cdrs_primary.cgrid
|
||||
INNER JOIN cdrs_extra ON rated_cdrs.cgrid = cdrs_extra.cgrid
|
||||
INNER JOIN cost_details ON rated_cdrs.cgrid = cost_details.cgrid
|
||||
WHERE cdrs_primary.`usage` > '0'
|
||||
AND not exists (select 1 from cdrs where cdrs.cgrid = cdrs_primary.cgrid AND cdrs.run_id=rated_cdrs.runid)
|
||||
AND rated_cdrs.id >= start_id
|
||||
AND rated_cdrs.id < end_id
|
||||
GROUP BY cgrid, run_id, origin_id;
|
||||
SET start_id = start_id + step;
|
||||
SET end_id = end_id + step;
|
||||
END WHILE;
|
||||
/* SET Table for live usage */
|
||||
SET autocommit=1;
|
||||
SET unique_checks=1;
|
||||
SET foreign_key_checks=1;
|
||||
ALTER TABLE cdrs ENABLE KEYS;
|
||||
OPTIMIZE TABLE cdrs;
|
||||
END //
|
||||
|
||||
DELIMITER ;
|
||||
|
||||
|
||||
@@ -36,21 +36,21 @@ const (
|
||||
func (m *Migrator) migrateAccounts() (err error) {
|
||||
switch m.dataDBType {
|
||||
case utils.REDIS:
|
||||
log.Print("#9 Starts migrateAccounts")
|
||||
log.Print("#9 Starts migrateAccounts")
|
||||
var acntV1Keys []string
|
||||
acntV1Keys, err = m.oldDataDB.GetKeysForPrefix(v1AccountDBPrefix)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Print("#10 it doesn't get to here",acntV1Keys)
|
||||
|
||||
log.Print("#10 it doesn't get to here", acntV1Keys)
|
||||
|
||||
for _, acntV1Key := range acntV1Keys {
|
||||
log.Print("#11 acc key:",acntV1Key)
|
||||
log.Print("#11 acc key:", acntV1Key)
|
||||
v1Acnt, err := m.getV1AccountFromDB(acntV1Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Print("#8 it doesn't get to here")
|
||||
log.Print("#8 it doesn't get to here")
|
||||
if v1Acnt != nil {
|
||||
acnt := v1Acnt.AsAccount()
|
||||
if err = m.dataDB.SetAccount(acnt); err != nil {
|
||||
@@ -58,7 +58,7 @@ func (m *Migrator) migrateAccounts() (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Print("#8 it doesn't get to here")
|
||||
log.Print("#8 it doesn't get to here")
|
||||
// All done, update version wtih current one
|
||||
vrs := engine.Versions{utils.Accounts: engine.CurrentStorDBVersions()[utils.Accounts]}
|
||||
if err = m.dataDB.SetVersions(vrs, false); err != nil {
|
||||
@@ -67,7 +67,7 @@ func (m *Migrator) migrateAccounts() (err error) {
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when updating Accounts version into StorDB", err.Error()))
|
||||
}
|
||||
log.Print("#8 it doesn't get to here")
|
||||
log.Print("#8 it doesn't get to here")
|
||||
return
|
||||
case utils.MONGO:
|
||||
dataDB := m.dataDB.(*engine.MongoStorage)
|
||||
@@ -102,19 +102,19 @@ func (m *Migrator) migrateAccounts() (err error) {
|
||||
func (m *Migrator) getV1AccountFromDB(key string) (*v1Account, error) {
|
||||
switch m.oldDataDBType {
|
||||
case utils.REDIS:
|
||||
log.Print("#12 start get ")
|
||||
log.Print("#12 start get ")
|
||||
dataDB := m.oldDataDB.(*engine.RedisStorage)
|
||||
log.Print("#12 start get")
|
||||
log.Print("#12 start get")
|
||||
if strVal, err := dataDB.Cmd("GET", key).Bytes(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
log.Print("#12 start get")
|
||||
log.Print("#12 start get")
|
||||
v1Acnt := &v1Account{Id: key}
|
||||
log.Print("#12 start get")
|
||||
log.Print("#12 start get")
|
||||
if err := m.mrshlr.Unmarshal(strVal, v1Acnt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Print("#12 start get")
|
||||
log.Print("#12 start get")
|
||||
return v1Acnt, nil
|
||||
}
|
||||
case utils.MONGO:
|
||||
@@ -195,7 +195,7 @@ func (v1Acc v1Account) AsAccount() (ac *engine.Account) {
|
||||
for oldBalKey, oldBalChain := range v1Acc.BalanceMap {
|
||||
keyElements := strings.Split(oldBalKey, "*")
|
||||
newBalKey := "*" + keyElements[1]
|
||||
newBalDirection := idElements[0]
|
||||
newBalDirection := idElements[0]
|
||||
ac.BalanceMap[newBalKey] = make(engine.Balances, len(oldBalChain))
|
||||
for index, oldBal := range oldBalChain {
|
||||
// check default to set new id
|
||||
|
||||
@@ -37,7 +37,7 @@ func TestV1AccountAsAccount(t *testing.T) {
|
||||
newAcc := v1Acc.AsAccount()
|
||||
if !reflect.DeepEqual(testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*monetary"][0], newAcc.BalanceMap["*monetary"][0])
|
||||
}else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) {
|
||||
} else if !reflect.DeepEqual(testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0]) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", testAccount.BalanceMap["*voice"][0], newAcc.BalanceMap["*voice"][0])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
@@ -47,12 +48,13 @@ func (m *Migrator) migrateCostDetails() (err error) {
|
||||
"version number is not defined for CostDetails model")
|
||||
}
|
||||
if vrs[utils.COST_DETAILS] != 1 { // Right now we only support migrating from version 1
|
||||
log.Print("Wrong version")
|
||||
return
|
||||
}
|
||||
var storSQL *sql.DB
|
||||
switch m.storDBType {
|
||||
case utils.MYSQL:
|
||||
storSQL = m.storDB.(*engine.MySQLStorage).Db
|
||||
storSQL = m.storDB.(*engine.SQLStorage).Db
|
||||
case utils.POSTGRES:
|
||||
storSQL = m.storDB.(*engine.PostgresStorage).Db
|
||||
default:
|
||||
@@ -61,19 +63,22 @@ func (m *Migrator) migrateCostDetails() (err error) {
|
||||
utils.UnsupportedDB,
|
||||
fmt.Sprintf("unsupported database type: <%s>", m.storDBType))
|
||||
}
|
||||
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details IS NOT NULL AND deleted_at IS NULL")
|
||||
rows, err := storSQL.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs")
|
||||
if err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
err.Error(),
|
||||
fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error()))
|
||||
}
|
||||
|
||||
defer rows.Close()
|
||||
|
||||
for cnt := 0; rows.Next(); cnt++ {
|
||||
var id int64
|
||||
var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString
|
||||
var ccCost sql.NullFloat64
|
||||
var tts []byte
|
||||
|
||||
if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil {
|
||||
return utils.NewCGRError(utils.Migrator,
|
||||
utils.ServerErrorCaps,
|
||||
@@ -89,6 +94,7 @@ func (m *Migrator) migrateCostDetails() (err error) {
|
||||
v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String,
|
||||
Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String,
|
||||
Cost: ccCost.Float64, Timespans: v1tmsps}
|
||||
|
||||
cc := v1CC.AsCallCost()
|
||||
if cc == nil {
|
||||
utils.Logger.Warning(
|
||||
|
||||
@@ -19,43 +19,42 @@ package migrator
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string,oldDataDB engine.DataDB,oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator,err error) {
|
||||
func NewMigrator(dataDB engine.DataDB, dataDBType, dataDBEncoding string, storDB engine.Storage, storDBType string, oldDataDB engine.DataDB, oldDataDBType, oldDataDBEncoding string, oldStorDB engine.Storage, oldStorDBType string) (m *Migrator, err error) {
|
||||
var mrshlr engine.Marshaler
|
||||
var oldmrshlr engine.Marshaler
|
||||
if dataDBEncoding == utils.MSGPACK {
|
||||
mrshlr = engine.NewCodecMsgpackMarshaler()
|
||||
} else if dataDBEncoding == utils.JSON {
|
||||
mrshlr = new(engine.JSONMarshaler)
|
||||
}else if oldDataDBEncoding == utils.MSGPACK {
|
||||
} else if oldDataDBEncoding == utils.MSGPACK {
|
||||
oldmrshlr = engine.NewCodecMsgpackMarshaler()
|
||||
}else if oldDataDBEncoding == utils.JSON {
|
||||
} else if oldDataDBEncoding == utils.JSON {
|
||||
oldmrshlr = new(engine.JSONMarshaler)
|
||||
}
|
||||
m = &Migrator{
|
||||
dataDB: dataDB, dataDBType: dataDBType,
|
||||
storDB: storDB, storDBType: storDBType, mrshlr: mrshlr,
|
||||
oldDataDB: oldDataDB, oldDataDBType: oldDataDBType,
|
||||
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr:oldmrshlr,
|
||||
oldDataDB: oldDataDB, oldDataDBType: oldDataDBType,
|
||||
oldStorDB: oldStorDB, oldStorDBType: oldStorDBType, oldmrshlr: oldmrshlr,
|
||||
}
|
||||
return m,err
|
||||
return m, err
|
||||
}
|
||||
|
||||
type Migrator struct {
|
||||
dataDB engine.DataDB
|
||||
dataDBType string
|
||||
storDB engine.Storage
|
||||
storDBType string
|
||||
mrshlr engine.Marshaler
|
||||
dataDB engine.DataDB
|
||||
dataDBType string
|
||||
storDB engine.Storage
|
||||
storDBType string
|
||||
mrshlr engine.Marshaler
|
||||
oldDataDB engine.DataDB
|
||||
oldDataDBType string
|
||||
oldStorDB engine.Storage
|
||||
oldStorDBType string
|
||||
oldmrshlr engine.Marshaler
|
||||
oldmrshlr engine.Marshaler
|
||||
}
|
||||
|
||||
// Migrate implements the tasks to migrate, used as a dispatcher to the individual methods
|
||||
@@ -76,17 +75,15 @@ func (m *Migrator) Migrate(taskID string) (err error) {
|
||||
case utils.MetaCostDetails:
|
||||
err = m.migrateCostDetails()
|
||||
case utils.MetaAccounts:
|
||||
log.Print("#7 function is about to start")
|
||||
err = m.migrateAccounts()
|
||||
case "migrateActionPlans":
|
||||
case utils.MetaActionPlans:
|
||||
err = m.migrateActionPlans()
|
||||
case "migrateActionTriggers":
|
||||
case utils.MetaActionTriggers:
|
||||
err = m.migrateActionTriggers()
|
||||
case "migrateActions":
|
||||
case utils.MetaActions:
|
||||
err = m.migrateActions()
|
||||
case "migrateSharedGroups":
|
||||
case utils.MetaSharedGroups:
|
||||
err = m.migrateSharedGroups()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -380,6 +380,10 @@ const (
|
||||
MetaScheduler = "*scheduler"
|
||||
MetaCostDetails = "*cost_details"
|
||||
MetaAccounts = "*accounts"
|
||||
MetaActionPlans = "*action_plans"
|
||||
MetaActionTriggers = "*action_triggers"
|
||||
MetaActions = "*actions"
|
||||
MetaSharedGroups = "*shared_groups"
|
||||
Migrator = "migrator"
|
||||
UnsupportedMigrationTask = "unsupported migration task"
|
||||
NoStorDBConnection = "not connected to StorDB"
|
||||
|
||||
Reference in New Issue
Block a user