Removed CDRs from migrator

This commit is contained in:
andronache98
2022-01-10 13:03:38 +02:00
committed by Dan Christian Bogos
parent df6fd49b95
commit 451d58843f
8 changed files with 1 additions and 744 deletions

View File

@@ -1,238 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"encoding/json"
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func (m *Migrator) migrateCurrentCDRs() (err error) {
if m.sameStorDB { // no move
return
}
cdrs, _, err := m.storDBIn.StorDB().GetCDRs(new(utils.CDRsFilter), false)
if err != nil {
return err
}
for _, cdr := range cdrs {
if err := m.storDBOut.StorDB().SetCDR(cdr, true); err != nil {
return err
}
m.stats[utils.CDRs]++
}
return
}
func (m *Migrator) migrateCDRs() (err error) {
var vrs engine.Versions
current := engine.CurrentStorDBVersions()
if vrs, err = m.getVersions(utils.CDRs); err != nil {
return
}
migrated := true
var v2 *engine.CDR
for {
version := vrs[utils.CDRs]
for {
switch version {
default:
return fmt.Errorf("Unsupported version %v", version)
case current[utils.CDRs]:
migrated = false
if m.sameStorDB {
break
}
if err = m.migrateCurrentCDRs(); err != nil {
return
}
case 1:
if v2, err = m.migrateV1CDRs(); err != nil && err != utils.ErrNoMoreData {
return
}
version = 2
}
if version == current[utils.CDRs] || err == utils.ErrNoMoreData {
break
}
}
if err == utils.ErrNoMoreData || !migrated {
break
}
if !m.dryRun {
//set action plan
if err = m.storDBOut.StorDB().SetCDR(v2, true); err != nil {
return
}
}
m.stats[utils.CDRs]++
}
// All done, update version wtih current one
if err = m.setVersions(utils.CDRs); err != nil {
return
}
return m.ensureIndexesStorDB(engine.ColCDRs)
}
func (m *Migrator) removeV1CDRs() (err error) {
var v1CDR *v1Cdrs
if v1CDR, err = m.storDBIn.getV1CDR(); err != nil {
return err
}
if v1CDR == nil {
return
}
if err = m.storDBIn.remV1CDRs(v1CDR); err != nil {
return
}
return
}
func (m *Migrator) migrateV1CDRs() (cdr *engine.CDR, err error) {
var v1CDR *v1Cdrs
if v1CDR, err = m.storDBIn.getV1CDR(); err != nil {
return nil, err
}
if v1CDR == nil {
return
}
cdr = v1CDR.V1toV2Cdr()
return
}
type v1Cdrs struct {
CGRID string
RunID string
OrderID int64 // Stor order id used as export order id
OriginHost string // represents the IP address of the host generating the CDR (automatically populated by the server)
Source string // formally identifies the source of the CDR (free form field)
OriginID string // represents the unique accounting id given by the telecom switch generating the CDR
ToR string // type of record, meta-field, should map to one of the TORs hardcoded inside the server <*voice|*data|*sms|*generic>
RequestType string // matching the supported request types by the **CGRateS**, accepted values are hardcoded in the server <prepaid|postpaid|pseudoprepaid|rated>.
Tenant string // tenant whom this record belongs
Category string // free-form filter for this record, matching the category defined in rating profiles.
Account string // account id (accounting subsystem) the record should be attached to
Subject string // rating subject (rating subsystem) this record should be attached to
Destination string // destination to be charged
SetupTime time.Time // set-up time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp.
AnswerTime time.Time // answer time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp.
Usage time.Duration // event usage information (eg: in case of tor=*voice this will represent the total duration of a call)
ExtraFields map[string]string // Extra fields to be stored in CDR
ExtraInfo string // Container for extra information related to this CDR, eg: populated with error reason in case of error on calculation
Partial bool // Used for partial record processing by ERs
Rated bool // Mark the CDR as rated so we do not process it during rating
CostSource string // The source of this cost
Cost float64
}
func (v1Cdr *v1Cdrs) V1toV2Cdr() (cdr *engine.CDR) {
cdr = &engine.CDR{
CGRID: v1Cdr.CGRID,
RunID: v1Cdr.RunID,
OrderID: v1Cdr.OrderID,
OriginHost: v1Cdr.OriginHost,
Source: v1Cdr.Source,
OriginID: v1Cdr.OriginID,
ToR: v1Cdr.ToR,
RequestType: v1Cdr.RequestType,
Tenant: v1Cdr.Tenant,
Category: v1Cdr.Category,
Account: v1Cdr.Account,
Subject: v1Cdr.Subject,
Destination: v1Cdr.Destination,
SetupTime: v1Cdr.SetupTime,
AnswerTime: v1Cdr.AnswerTime,
Usage: v1Cdr.Usage,
ExtraFields: make(map[string]string),
ExtraInfo: v1Cdr.ExtraInfo,
Partial: v1Cdr.Partial,
PreRated: v1Cdr.Rated,
CostSource: v1Cdr.CostSource,
Cost: v1Cdr.Cost,
}
if v1Cdr.ExtraFields != nil {
for key, value := range v1Cdr.ExtraFields {
cdr.ExtraFields[key] = value
}
}
return
}
func NewV1CDRFromCDRSql(cdrSql *engine.CDRsql) (cdr *v1Cdrs, err error) {
cdr = new(v1Cdrs)
cdr.CGRID = cdrSql.Cgrid
cdr.RunID = cdrSql.RunID
cdr.OriginHost = cdrSql.OriginHost
cdr.Source = cdrSql.Source
cdr.OriginID = cdrSql.OriginID
cdr.OrderID = cdrSql.ID
cdr.ToR = cdrSql.TOR
cdr.RequestType = cdrSql.RequestType
cdr.Tenant = cdrSql.Tenant
cdr.Category = cdrSql.Category
cdr.Account = cdrSql.Account
cdr.Subject = cdrSql.Subject
cdr.Destination = cdrSql.Destination
cdr.SetupTime = cdrSql.SetupTime
if cdrSql.AnswerTime != nil {
cdr.AnswerTime = *cdrSql.AnswerTime
}
cdr.Usage = time.Duration(cdrSql.Usage)
cdr.CostSource = cdrSql.CostSource
cdr.Cost = cdrSql.Cost
cdr.ExtraInfo = cdrSql.ExtraInfo
if cdrSql.ExtraFields != "" {
if err = json.Unmarshal([]byte(cdrSql.ExtraFields), &cdr.ExtraFields); err != nil {
return nil, err
}
}
return
}
func (cdr *v1Cdrs) AsCDRsql() (cdrSql *engine.CDRsql) {
cdrSql = new(engine.CDRsql)
cdrSql.Cgrid = cdr.CGRID
cdrSql.RunID = cdr.RunID
cdrSql.OriginHost = cdr.OriginHost
cdrSql.Source = cdr.Source
cdrSql.OriginID = cdr.OriginID
cdrSql.TOR = cdr.ToR
cdrSql.RequestType = cdr.RequestType
cdrSql.Tenant = cdr.Tenant
cdrSql.Category = cdr.Category
cdrSql.Account = cdr.Account
cdrSql.Subject = cdr.Subject
cdrSql.Destination = cdr.Destination
cdrSql.SetupTime = cdr.SetupTime
if !cdr.AnswerTime.IsZero() {
cdrSql.AnswerTime = utils.TimePointer(cdr.AnswerTime)
}
cdrSql.Usage = cdr.Usage.Nanoseconds()
cdrSql.ExtraFields = utils.ToJSON(cdr.ExtraFields)
cdrSql.CostSource = cdr.CostSource
cdrSql.Cost = cdr.Cost
cdrSql.ExtraInfo = cdr.ExtraInfo
cdrSql.CreatedAt = time.Now()
return
}

View File

@@ -1,191 +0,0 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"path"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
cdrPathIn string
cdrPathOut string
cdrCfgIn *config.CGRConfig
cdrCfgOut *config.CGRConfig
cdrMigrator *Migrator
cdrAction string
)
var sTestsCdrIT = []func(t *testing.T){
testCdrITConnect,
testCdrITFlush,
testCdrITMigrateAndMove,
}
func TestCdrITMongo(t *testing.T) {
var err error
cdrPathIn = path.Join(*dataDir, "conf", "samples", "tutmongo")
cdrCfgIn, err = config.NewCGRConfigFromPath(context.Background(), cdrPathIn)
if err != nil {
t.Error(err)
}
for _, stest := range sTestsCdrIT {
t.Run("TestCdrITMigrateMongo", stest)
}
cdrMigrator.Close()
}
func TestCdrITMySql(t *testing.T) {
var err error
cdrPathIn = path.Join(*dataDir, "conf", "samples", "tutmysql")
cdrCfgIn, err = config.NewCGRConfigFromPath(context.Background(), cdrPathIn)
if err != nil {
t.Error(err)
}
for _, stest := range sTestsCdrIT {
t.Run("TestCdrITMigrateMySql", stest)
}
cdrMigrator.Close()
}
func testCdrITConnect(t *testing.T) {
storDBIn, err := NewMigratorStorDB(cdrCfgIn.StorDbCfg().Type,
cdrCfgIn.StorDbCfg().Host, cdrCfgIn.StorDbCfg().Port,
cdrCfgIn.StorDbCfg().Name, cdrCfgIn.StorDbCfg().User,
cdrCfgIn.StorDbCfg().Password, cdrCfgIn.GeneralCfg().DBDataEncoding,
cdrCfgIn.StorDbCfg().StringIndexedFields, cdrCfgIn.StorDbCfg().PrefixIndexedFields,
cdrCfgIn.StorDbCfg().Opts, cdrCfgIn.StorDbCfg().Items)
if err != nil {
t.Error(err)
}
storDBOut, err := NewMigratorStorDB(cdrCfgIn.StorDbCfg().Type,
cdrCfgIn.StorDbCfg().Host, cdrCfgIn.StorDbCfg().Port,
cdrCfgIn.StorDbCfg().Name, cdrCfgIn.StorDbCfg().User,
cdrCfgIn.StorDbCfg().Password, cdrCfgIn.GeneralCfg().DBDataEncoding,
cdrCfgIn.StorDbCfg().StringIndexedFields, cdrCfgIn.StorDbCfg().PrefixIndexedFields,
cdrCfgIn.StorDbCfg().Opts, cdrCfgIn.StorDbCfg().Items)
if err != nil {
t.Error(err)
}
cdrMigrator, err = NewMigrator(nil, nil,
storDBIn, storDBOut,
false, true, false, false)
if err != nil {
t.Error(err)
}
}
func testCdrITFlush(t *testing.T) {
if err := cdrMigrator.storDBOut.StorDB().Flush(
path.Join(cdrCfgIn.DataFolderPath, "storage", cdrCfgIn.StorDbCfg().Type)); err != nil {
t.Error(err)
}
}
func testCdrITMigrateAndMove(t *testing.T) {
// cc := &engine.CallCost{
// Destination: "0723045326",
// Timespans: []*engine.TimeSpan{
// {
// TimeStart: time.Date(2013, 9, 24, 10, 48, 0, 0, time.UTC),
// TimeEnd: time.Date(2013, 9, 24, 10, 48, 10, 0, time.UTC),
// DurationIndex: 0,
// RateInterval: &engine.RateInterval{
// Rating: &engine.RIRate{
// Rates: engine.RateGroups{
// &engine.RGRate{
// GroupIntervalStart: 0,
// Value: 100,
// RateIncrement: 10 * time.Second,
// RateUnit: time.Second,
// },
// },
// },
// },
// },
// },
// ToR: utils.MetaVoice,
// }
v1Cdr := &v1Cdrs{
CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
OrderID: 123,
ToR: utils.MetaVoice,
OriginID: "dsafdsaf",
OriginHost: "192.168.1.1",
Source: utils.UnitTest,
RequestType: utils.MetaRated,
Tenant: "cgrates.org",
Category: "call",
Account: "1001",
Subject: "1001",
Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
RunID: utils.MetaDefault,
Usage: 10,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
Cost: 1.01,
Rated: true,
// CostDetails: cc,
}
var err error
if err = cdrMigrator.storDBIn.setV1CDR(v1Cdr); err != nil {
t.Error(err)
}
currentVersion := engine.Versions{
utils.CostDetails: 2,
utils.CDRs: 1,
}
err = cdrMigrator.storDBIn.StorDB().SetVersions(currentVersion, false)
if err != nil {
t.Error("Error when setting version for CDRs ", err.Error())
}
if vrs, err := cdrMigrator.storDBIn.StorDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.CDRs] != 1 {
t.Errorf("Unexpected version returned: %d", vrs[utils.CDRs])
}
err, _ = cdrMigrator.Migrate([]string{utils.MetaCDRs})
if err != nil {
t.Error("Error when migrating CDRs ", err.Error())
}
if rcvCDRs, _, err := cdrMigrator.storDBOut.StorDB().GetCDRs(new(utils.CDRsFilter), false); err != nil {
t.Error(err)
} else if len(rcvCDRs) != 1 {
t.Errorf("Unexpected number of CDRs returned: %d", len(rcvCDRs))
}
if vrs, err := cdrMigrator.storDBOut.StorDB().GetVersions(""); err != nil {
t.Error(err)
} else if vrs[utils.CDRs] != 2 {
t.Errorf("Unexpected version returned: %d", vrs[utils.CDRs])
}
// else if cdrMigrator.stats[utils.CDRs] != 1 {
// t.Errorf("Expected 1, received: %v", cdrMigrator.stats[utils.CDRs])
// }
}

View File

@@ -1,210 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestCdrsNewV1CDRFromCDRSql(t *testing.T) {
testCdrSql := &engine.CDRsql{
ID: 1,
Cgrid: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
Source: "testSource",
TOR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: utils.TimePointer(time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)),
Usage: 1,
CostSource: "testSource",
Cost: 2,
ExtraInfo: "testExtraInfo",
CreatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
UpdatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
DeletedAt: utils.TimePointer(time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)),
}
expected := &v1Cdrs{
CGRID: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
OrderID: 1,
Source: "testSource",
ToR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
Usage: time.Nanosecond,
ExtraInfo: "testExtraInfo",
Partial: false,
Rated: false,
CostSource: "testSource",
Cost: 2,
}
result, err := NewV1CDRFromCDRSql(testCdrSql)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}
func TestCdrsAsCDRsql(t *testing.T) {
testV1Cdrs := &v1Cdrs{
CGRID: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
OrderID: 1,
Source: "testSource",
ToR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
Usage: time.Nanosecond,
ExtraInfo: "testExtraInfo",
Partial: false,
Rated: false,
CostSource: "testSource",
Cost: 2,
}
expected := &engine.CDRsql{
ID: 0,
Cgrid: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
Source: "testSource",
TOR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: utils.TimePointer(time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)),
Usage: 1,
CostSource: "testSource",
Cost: 2,
ExtraInfo: "testExtraInfo",
CreatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
ExtraFields: "",
CostDetails: "",
}
result := testV1Cdrs.AsCDRsql()
result.CreatedAt = time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)
result.ExtraFields = ""
result.CostDetails = ""
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(expected), utils.ToJSON(result))
}
}
func TestCdrsAsCDRsqlAnswertimeEmpty(t *testing.T) {
var answTime time.Time
testV1Cdrs := &v1Cdrs{
AnswerTime: answTime,
}
expected := &engine.CDRsql{
AnswerTime: nil,
CreatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
}
result := testV1Cdrs.AsCDRsql()
result.CreatedAt = time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)
result.ExtraFields = ""
result.CostDetails = ""
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", utils.ToJSON(expected), utils.ToJSON(result))
}
}
func TestCdrsNewV1CDRFromCDRSqlAnswerTimeNil(t *testing.T) {
testCdrSql := &engine.CDRsql{
ID: 1,
Cgrid: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
Source: "testSource",
TOR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: nil,
Usage: 1,
CostSource: "testSource",
Cost: 2,
ExtraInfo: "testExtraInfo",
CreatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
UpdatedAt: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
DeletedAt: utils.TimePointer(time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC)),
}
var answTime time.Time
expected := &v1Cdrs{
CGRID: "testID",
RunID: "testRunID",
OriginHost: "testOriginHost",
OrderID: 1,
Source: "testSource",
ToR: "testTOR",
RequestType: "testRequestType",
Tenant: "cgrates.org",
Category: "testCategory",
Account: "testAccount",
Subject: "testSubject",
Destination: "testDestination",
SetupTime: time.Date(2021, 3, 3, 3, 3, 3, 3, time.UTC),
AnswerTime: answTime,
Usage: time.Nanosecond,
ExtraInfo: "testExtraInfo",
Partial: false,
Rated: false,
CostSource: "testSource",
Cost: 2,
}
result, err := NewV1CDRFromCDRSql(testCdrSql)
if err != nil {
t.Error(err)
}
if !reflect.DeepEqual(expected, result) {
t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, result)
}
}

View File

@@ -103,8 +103,7 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
} else {
log.Printf("The DataDB type has to be %s .\n ", utils.Mongo)
}
case utils.MetaCDRs:
err = m.migrateCDRs()
case utils.MetaStats:
err = m.migrateStats()
case utils.MetaThresholds:
@@ -204,9 +203,6 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
if err := m.migrateTPDispatchers(); err != nil {
log.Print("ERROR: ", utils.MetaTpDispatchers, " ", err)
}
if err := m.migrateCDRs(); err != nil {
log.Print("ERROR: ", utils.MetaCDRs, " ", err)
}
err = nil
}
}

View File

@@ -23,9 +23,6 @@ import (
)
type MigratorStorDB interface {
getV1CDR() (v1Cdr *v1Cdrs, err error)
setV1CDR(v1Cdr *v1Cdrs) (err error)
remV1CDRs(v1Cdr *v1Cdrs) (err error)
createV1SMCosts() (err error)
renameV1SMCosts() (err error)
StorDB() engine.StorDB

View File

@@ -43,22 +43,6 @@ func (iDBMig *internalStorDBMigrator) StorDB() engine.StorDB {
return *iDBMig.storDB
}
//CDR methods
//get
func (iDBMig *internalStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
return nil, utils.ErrNotImplemented
}
//set
func (iDBMig *internalStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
return utils.ErrNotImplemented
}
//rem
func (iDBMig *internalStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
return utils.ErrNotImplemented
}
//SMCost methods
//rename
func (iDBMig *internalStorDBMigrator) renameV1SMCosts() (err error) {

View File

@@ -47,39 +47,6 @@ func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
return *mgoMig.storDB
}
//CDR methods
//get
func (v1ms *mongoStorDBMigrator) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.cursor == nil {
v1ms.cursor, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).Find(v1ms.mgoDB.GetContext(), bson.D{})
if err != nil {
return nil, err
}
}
if !(*v1ms.cursor).Next(v1ms.mgoDB.GetContext()) {
(*v1ms.cursor).Close(v1ms.mgoDB.GetContext())
v1ms.cursor = nil
return nil, utils.ErrNoMoreData
}
v1Cdr = new(v1Cdrs)
if err := (*v1ms.cursor).Decode(v1Cdr); err != nil {
return nil, err
}
return v1Cdr, nil
}
//set
func (v1ms *mongoStorDBMigrator) setV1CDR(v1Cdr *v1Cdrs) (err error) {
_, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).InsertOne(v1ms.mgoDB.GetContext(), v1Cdr)
return
}
//rem
func (v1ms *mongoStorDBMigrator) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
_, err = v1ms.mgoDB.DB().Collection(engine.ColCDRs).DeleteOne(v1ms.mgoDB.GetContext(), v1Cdr)
return
}
//SMCost methods
//rename
func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {

View File

@@ -21,7 +21,6 @@ package migrator
import (
"database/sql"
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -49,53 +48,6 @@ func (sqlMig *migratorSQL) StorDB() engine.StorDB {
return *sqlMig.storDB
}
func (mgSQL *migratorSQL) getV1CDR() (v1Cdr *v1Cdrs, err error) {
if mgSQL.rowIter == nil {
mgSQL.rowIter, err = mgSQL.sqlStorage.DB.Query("SELECT * FROM cdrs")
if err != nil {
return nil, err
}
}
cdrSql := new(engine.CDRsql)
mgSQL.rowIter.Scan(&cdrSql)
v1Cdr, err = NewV1CDRFromCDRSql(cdrSql)
if mgSQL.rowIter.Next() {
v1Cdr = nil
mgSQL.rowIter = nil
return nil, utils.ErrNoMoreData
}
return v1Cdr, nil
}
func (mgSQL *migratorSQL) setV1CDR(v1Cdr *v1Cdrs) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
cdrSql := v1Cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
return saved.Error
}
tx.Commit()
return nil
}
//rem
func (mgSQL *migratorSQL) remV1CDRs(v1Cdr *v1Cdrs) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
var rmParam *v1Cdrs
if v1Cdr != nil {
rmParam = &v1Cdrs{CGRID: v1Cdr.CGRID,
RunID: v1Cdr.RunID}
}
if err := tx.Where(rmParam).Delete(v1Cdrs{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (mgSQL *migratorSQL) renameV1SMCosts() (err error) {
qry := "RENAME TABLE sm_costs TO session_costs;"
if mgSQL.StorDB().GetStorageType() == utils.Postgres {