New automated Migrator with partial *cost_details implementation

This commit is contained in:
DanB
2016-12-08 12:03:25 +01:00
parent 295508ab37
commit 6bee1d387b
6 changed files with 249 additions and 1 deletions

View File

@@ -37,6 +37,7 @@ var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
migrateRC8 = flag.String("migrate_rc8", "", "Migrate Accounts, Actions, ActionTriggers, DerivedChargers, ActionPlans and SharedGroups to RC8 structures, possible values: *all,*enforce,acc,atr,act,dcs,apl,shg")
migrate = flag.String("migrate", "", "Fire up automatic migration <*all|*cost_details>")
tpdb_type = flag.String("tpdb_type", cgrConfig.TpDbType, "The type of the TariffPlan database <redis>")
tpdb_host = flag.String("tpdb_host", cgrConfig.TpDbHost, "The TariffPlan host to connect to.")
tpdb_port = flag.String("tpdb_port", cgrConfig.TpDbPort, "The TariffPlan port to bind to.")

View File

@@ -240,7 +240,7 @@ func (sv *StructVersion) CompareAndMigrate(dbVer *StructVersion) []*MigrationInf
}
func CurrentStorDBVersions() Versions {
return Versions{utils.COST_DETAILS: 1}
return Versions{utils.COST_DETAILS: 2}
}
// Versions will keep trac of various item versions

73
migrator/costdetails.go Normal file
View File

@@ -0,0 +1,73 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"time"
"github.com/cgrates/cgrates/engine"
)
type CallCostMigrator interface {
AsCallCost() (*engine.CallCost, error)
}
type v1CallCost struct {
Direction, Category, Tenant, Subject, Account, Destination, TOR string
Cost float64
Timespans v1TimeSpans
}
type v1TimeSpans []*v1TimeSpan
type v1TimeSpan struct {
TimeStart, TimeEnd time.Time
Cost float64
RateInterval *engine.RateInterval
DurationIndex time.Duration
Increments v1Increments
MatchedSubject, MatchedPrefix, MatchedDestId, RatingPlanId string
}
type v1Increments []*v1Increment
type v1Increment struct {
Duration time.Duration
Cost float64
BalanceRateInterval *engine.RateInterval
BalanceInfo *v1BalanceInfo
UnitInfo *v1UnitInfo
CompressFactor int
}
type v1BalanceInfo struct {
UnitBalanceUuid string
MoneyBalanceUuid string
AccountId string // used when debited from shared balance
}
type v1UnitInfo struct {
DestinationId string
Quantity float64
TOR string
}
func (v1cc *v1CallCost) AsCallCost() (cc *engine.CallCost, err error) {
cc = new(engine.CallCost)
return
}

View File

@@ -0,0 +1,43 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"encoding/json"
"fmt"
"testing"
)
var v1TmspsStr1 = `[{"TimeStart":"2016-07-28T02:18:49+02:00","TimeEnd":"2016-07-28T02:19:28+02:00","Cost":0.0117,"RateInterval":{"Timing":{"Years":[],"Months":[],"MonthDays":[],"WeekDays":[],"StartTime":"00:00:00","EndTime":""},"Rating":{"ConnectFee":0.0564,"RoundingMethod":"*middle","RoundingDecimals":4,"MaxCost":0,"MaxCostStrategy":"","Rates":[{"GroupIntervalStart":0,"Value":0.0198,"RateIncrement":1000000000,"RateUnit":60000000000}]},"Weight":10},"DurationIndex":39000000000,"Increments":[{"Duration":1000000000,"Cost":0.0003,"BalanceInfo":{"UnitBalanceUuid":"","MoneyBalanceUuid":"c50c201c405defc3807347444efc62da","AccountId":"cgrates.org:dan"},"BalanceRateInterval":null,"UnitInfo":null,"CompressFactor":39}],"MatchedSubject":"*out:cgrates.org:call:dan","MatchedPrefix":"+311","MatchedDestId":"CST_491_DE001","RatingPlanId":"V_RET_1490_01_V"}]`
var v1TmspsStr2 = `[{"TimeStart":"2016-07-28T01:12:19+02:00","TimeEnd":"2016-07-28T01:12:27+02:00","Cost":0.00046875,"RateInterval":{"Timing":{"Years":[],"Months":[],"MonthDays":[],"WeekDays":[],"StartTime":"00:00:00","EndTime":""},"Rating":{"ConnectFee":0,"RoundingMethod":"*middle","RoundingDecimals":4,"MaxCost":0,"MaxCostStrategy":"","Rates":[{"GroupIntervalStart":0,"Value":0.06,"RateIncrement":1000000000,"RateUnit":1024000000000}]},"Weight":10},"DurationIndex":8000000000,"Increments":null,"MatchedSubject":"*out:cgrates.org:data:danb","MatchedPrefix":"+4900","MatchedDestId":"CST_data_DAT01","RatingPlanId":"M_RET_1409_01_D"}]`
func TestV1CostDetailsAsCostDetails1(t *testing.T) {
var v1tmsps v1TimeSpans
if err := json.Unmarshal([]byte(v1TmspsStr1), &v1tmsps); err != nil {
t.Error(err)
}
fmt.Printf("Timespans: %+v\n", v1tmsps[0])
}
func TestV1CostDetailsAsCostDetails2(t *testing.T) {
var v1tmsps v1TimeSpans
if err := json.Unmarshal([]byte(v1TmspsStr2), &v1tmsps); err != nil {
t.Error(err)
}
fmt.Printf("Timespans: %+v\n", v1tmsps[0])
}

126
migrator/migrator.go Normal file
View File

@@ -0,0 +1,126 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewMigrator(storDB engine.StorDB, storDBType string) *Migrator {
return &Migrator{storDB: storDB, storDBType: storDBType}
}
type Migrator struct {
storDB engine.StorDB
storDBType string // Useful to convert back to real
}
func (m *Migrator) Migrate(taskID string) (err error) {
switch taskID {
default: // unsupported taskID
err = utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UnsupportedMigrationTask,
fmt.Sprintf("task <%s> is not a supported migration task", taskID))
case utils.MetaCostDetails:
err = m.migrateCostDetails()
}
return
}
func (m *Migrator) migrateCostDetails() (err error) {
if m.storDB == nil {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.NoStorDBConnection,
"no connection to StorDB")
}
if !utils.IsSliceMember([]string{utils.MYSQL, utils.POSTGRES}, m.storDBType) {
return // CostDetails are migrated only for MySQL and Postgres
}
vrs, err := m.storDB.GetVersions(utils.COST_DETAILS)
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for versions", err.Error()))
} else if len(vrs) == 0 {
return utils.NewCGRError(utils.Migrator,
utils.MandatoryIEMissingCaps,
utils.UndefinedVersion,
"version number is not defined for CostDetails model")
}
if vrs[utils.COST_DETAILS] != 1 {
return
}
storSQL := m.storDB.(*engine.SQLStorage)
rows, err := storSQL.Db.Query("SELECT id, tor, direction, tenant, category, account, subject, destination, cost, cost_details FROM cdrs WHERE run_id!= '*raw' and cost_details NOT NULL")
if err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when querying storDB for cdrs", err.Error()))
}
defer rows.Close()
for cnt := 0; rows.Next(); cnt++ {
var id int64
var ccDirection, ccCategory, ccTenant, ccSubject, ccAccount, ccDestination, ccTor sql.NullString
var ccCost sql.NullFloat64
var tts []byte
if err := rows.Scan(&id, &ccTor, &ccDirection, &ccTenant, &ccCategory, &ccAccount, &ccSubject, &ccDestination, &ccCost, &tts); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when scanning at count: <%d>", err.Error(), cnt))
}
var v1tmsps v1TimeSpans
if err := json.Unmarshal(tts, &v1tmsps); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Unmarshalling timespans at CDR with id: <%d>, error: <%s>", id, err.Error()))
continue
}
v1CC := &v1CallCost{Direction: ccDirection.String, Category: ccCategory.String, Tenant: ccTenant.String,
Subject: ccSubject.String, Account: ccAccount.String, Destination: ccDestination.String, TOR: ccTor.String,
Cost: ccCost.Float64, Timespans: v1tmsps}
cc, err := v1CC.AsCallCost()
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> when converting into CallCost CDR with id: <%d>", err.Error(), id))
continue
}
if _, err := storSQL.Db.Exec(fmt.Sprintf("UPDATE cdrs SET cost_details='%s' WHERE id=%d", cc.AsJSON(), id)); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<Migrator> Error: <%s> updating CDR with id <%d> into StorDB", err.Error(), id))
continue
}
}
// All done, update version wtih current one
vrs = engine.Versions{utils.COST_DETAILS: engine.CurrentStorDBVersions()[utils.COST_DETAILS]}
if err := m.storDB.SetVersions(vrs); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CostDetails version into StorDB", err.Error()))
}
return
}

View File

@@ -322,4 +322,9 @@ const (
StoppedCaps = "STOPPED"
SchedulerNotRunningCaps = "SCHEDULLER_NOT_RUNNING"
MetaScheduler = "*scheduler"
MetaCostDetails = "*cost_details"
Migrator = "migrator"
UnsupportedMigrationTask = "unsupported migration task"
NoStorDBConnection = "not connected to StorDB"
UndefinedVersion = "undefined version"
)