Add infrastructure for migrator to work with storDB

This commit is contained in:
TeoV
2018-05-02 05:50:18 -04:00
committed by Dan Christian Bogos
parent 4d3ba1c7eb
commit 25620b5295
8 changed files with 281 additions and 10 deletions

View File

@@ -67,6 +67,7 @@ const (
colFlt = "filters"
colSpp = "supplier_profiles"
colAttr = "attribute_profiles"
ColCDRs = "cdrs"
)
var (

View File

@@ -947,7 +947,7 @@ func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = ms.cnter.Next()
}
session, col := ms.conn(utils.CDRsTBL)
session, col := ms.conn(ColCDRs)
defer session.Close()
if allowUpdate {
_, err = col.Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr)
@@ -989,7 +989,7 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) {
}
}
// _, err := col(utils.CDRsTBL).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
// _, err := col(ColCDRs).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}})
func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) {
var minUsage, maxUsage *time.Duration
if len(qryFltr.MinUsage) != 0 {
@@ -1103,7 +1103,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
}
//file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters)))
//file.Close()
session, col := ms.conn(utils.CDRsTBL)
session, col := ms.conn(ColCDRs)
defer session.Close()
if remove {
if chgd, err := col.RemoveAll(filters); err != nil {

View File

@@ -20,6 +20,7 @@ package migrator
import (
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -33,11 +34,13 @@ func (m *Migrator) migrateCurrentCDRs() (err error) {
if err != nil {
return err
}
for _, cdr := range cdrs {
if err := m.oldStorDB.SetCDR(cdr, true); err != nil {
return err
/*
for _, cdr := range cdrs {
if err := m.oldStorDB.SetCDR(cdr, true); err != nil {
return err
}
}
}
*/
return
}
@@ -62,3 +65,97 @@ func (m *Migrator) migrateCDRs() (err error) {
}
return
}
func (m *Migrator) migrateV1CDRs() (err error) {
var v1CDR *v1Cdrs
for {
v1CDR, err = m.oldStorDB.getV1CDR()
if err != nil && err != utils.ErrNoMoreData {
return err
}
if err == utils.ErrNoMoreData {
break
}
if v1CDR != nil {
cdr := v1CDR.V1toV2Cdr()
if m.dryRun != true {
if err = m.storDB.SetCDR(cdr, true); err != nil {
return err
}
m.stats[utils.CDRs] += 1
}
}
}
if m.dryRun != true {
// All done, update version wtih current one
vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]}
if err = m.storDB.SetVersions(vrs, false); err != nil {
return utils.NewCGRError(utils.Migrator,
utils.ServerErrorCaps,
err.Error(),
fmt.Sprintf("error: <%s> when updating CDRs version into StorDB", err.Error()))
}
}
return
}
//
type v1Cdrs struct {
CGRID string
RunID string
OrderID int64 // Stor order id used as export order id
OriginHost string // represents the IP address of the host generating the CDR (automatically populated by the server)
Source string // formally identifies the source of the CDR (free form field)
OriginID string // represents the unique accounting id given by the telecom switch generating the CDR
ToR string // type of record, meta-field, should map to one of the TORs hardcoded inside the server <*voice|*data|*sms|*generic>
RequestType string // matching the supported request types by the **CGRateS**, accepted values are hardcoded in the server <prepaid|postpaid|pseudoprepaid|rated>.
Tenant string // tenant whom this record belongs
Category string // free-form filter for this record, matching the category defined in rating profiles.
Account string // account id (accounting subsystem) the record should be attached to
Subject string // rating subject (rating subsystem) this record should be attached to
Destination string // destination to be charged
SetupTime time.Time // set-up time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp.
AnswerTime time.Time // answer time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp.
Usage time.Duration // event usage information (eg: in case of tor=*voice this will represent the total duration of a call)
ExtraFields map[string]string // Extra fields to be stored in CDR
ExtraInfo string // Container for extra information related to this CDR, eg: populated with error reason in case of error on calculation
Partial bool // Used for partial record processing by CDRC
Rated bool // Mark the CDR as rated so we do not process it during rating
CostSource string // The source of this cost
Cost float64
CostDetails *engine.CallCost // Attach the cost details to CDR when possible
}
func (v1Cdr v1Cdrs) V1toV2Cdr() (cdr *engine.CDR) {
cdr = &engine.CDR{
CGRID: v1Cdr.CGRID,
RunID: v1Cdr.RunID,
OrderID: v1Cdr.OrderID,
OriginHost: v1Cdr.OriginHost,
Source: v1Cdr.Source,
OriginID: v1Cdr.OriginID,
ToR: v1Cdr.ToR,
RequestType: v1Cdr.RequestType,
Tenant: v1Cdr.Tenant,
Category: v1Cdr.Category,
Account: v1Cdr.Account,
Subject: v1Cdr.Subject,
Destination: v1Cdr.Destination,
SetupTime: v1Cdr.SetupTime,
AnswerTime: v1Cdr.AnswerTime,
Usage: v1Cdr.Usage,
ExtraFields: make(map[string]string),
ExtraInfo: v1Cdr.ExtraInfo,
Partial: v1Cdr.Partial,
Rated: v1Cdr.Rated,
CostSource: v1Cdr.CostSource,
Cost: v1Cdr.Cost,
CostDetails: engine.NewEventCostFromCallCost(v1Cdr.CostDetails, v1Cdr.CGRID, v1Cdr.RunID),
}
if v1Cdr.ExtraFields != nil {
for key, value := range v1Cdr.ExtraFields {
cdr.ExtraFields[key] = value
}
}
return
}

98
migrator/cdrs_test.go Executable file
View File

@@ -0,0 +1,98 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
/*
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestV1CDRsAsCDR(t *testing.T) {
cc := &engine.CallCost{
Category: "call",
Account: "1001",
Subject: "1001",
Tenant: "cgrates.org",
Direction: utils.OUT,
Destination: "1003",
Timespans: []*engine.TimeSpan{
&engine.TimeSpan{
TimeStart: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC),
TimeEnd: time.Date(2016, 4, 6, 13, 31, 30, 0, time.UTC),
DurationIndex: 0,
RateInterval: &engine.RateInterval{
Rating: &engine.RIRate{
Rates: engine.RateGroups{
&engine.Rate{
GroupIntervalStart: 0,
Value: 0.01,
RateIncrement: 10 * time.Second,
RateUnit: time.Second}}}},
},
},
TOR: utils.VOICE}
v1Cdr := &v1Cdrs{CGRID: utils.Sha1("testprepaid1", time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC).String()),
ToR: utils.VOICE, OriginID: "testprepaid1", OriginHost: "192.168.1.1",
Source: "TEST_PREPAID_CDR_SMCOST1", RequestType: utils.META_PREPAID, Tenant: "cgrates.org",
RunID: utils.META_DEFAULT,
Category: "call", Account: "1001", Subject: "1001", Destination: "1003",
SetupTime: time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC),
AnswerTime: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC),
Usage: time.Duration(90) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
CostDetails: cc}
cdr := v1Cdr.V1toV2Cdr()
// Create manually EventCost here
ev := &enbine.EventCost{
CGRID: v1Cdr.CGRID,
RunID: v1Cdr.RunID,
StartTime: v1Cdr.AnswerTime,
Charges: []*engine.ChargingInterval{},
Rating: map[string]*RatingUnit{},
Accounting: map[string]*BalanceCharge{},
RatingFilters: map[string]RatingMatchedFilters{},
Rates: :map[string]RateGroups{},
Timings: :map[string]*ChargedTiming{},
}
eCDR := &engine.CDR{CGRID: utils.Sha1("testprepaid1", time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC).String()),
ToR: utils.VOICE, OriginID: "testprepaid1", OriginHost: "192.168.1.1",
Source: "TEST_PREPAID_CDR_SMCOST1", RequestType: utils.META_PREPAID, Tenant: "cgrates.org",
RunID: utils.META_DEFAULT,
Category: "call", Account: "1001", Subject: "1001", Destination: "1003",
SetupTime: time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC),
AnswerTime: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC),
Usage: time.Duration(90) * time.Second,
ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
CostDetails: engine.NewEventCostFromCallCost(v1Cdr.CostDetails, v1Cdr.CGRID, v1Cdr.RunID)}
if !reflect.DeepEqual(cdr, eCDR) {
t.Errorf("Expecting: %+v, received: %+v", cdr, eCDR)
}
}
*/

View File

@@ -28,7 +28,7 @@ import (
func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType, dataDBEncoding string,
storDB engine.StorDB, storDBType string, oldDataDB MigratorDataDB, oldDataDBType, oldDataDBEncoding string,
oldStorDB engine.StorDB, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool,
oldStorDB MigratorStorDB, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool,
datadb_versions bool, stordb_versions bool) (m *Migrator, err error) {
var mrshlr engine.Marshaler
var oldmrshlr engine.Marshaler
@@ -64,7 +64,7 @@ type Migrator struct {
mrshlr engine.Marshaler
oldDataDB MigratorDataDB
oldDataDBType string
oldStorDB engine.StorDB
oldStorDB MigratorStorDB
oldStorDBType string
oldmrshlr engine.Marshaler
dryRun bool

24
migrator/migratorStorDB.go Executable file
View File

@@ -0,0 +1,24 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
type MigratorStorDB interface {
getV1CDR() (v1Cdr *v1Cdrs, err error)
setV1CDR(v1Cdr *v1Cdrs) (err error)
}

View File

@@ -20,6 +20,7 @@ package migrator
import (
"fmt"
"strings"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
@@ -56,15 +57,17 @@ func newv1MongoStorage(host, port, db, user, pass, storageType string, cdrsIndex
if user != "" && pass != "" {
url = fmt.Sprintf("%s:%s@%s", user, pass, url)
}
var dbName string
if db != "" {
url += "/" + db
dbName = strings.Split(db, "?")[0] // remove extra info after ?
}
session, err := mgo.Dial(url)
if err != nil {
return nil, err
}
session.SetMode(mgo.Strong, true)
v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()}
v1ms = &v1Mongo{db: dbName, session: session, v1ms: engine.NewCodecMsgpackMarshaler()}
return
}
func (v1ms *v1Mongo) Close() {}

48
migrator/v1mongo_stor.go Executable file
View File

@@ -0,0 +1,48 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
//CDR methods
//get
func (v1ms *v1Mongo) getv1CDR() (v1Cdr *v1Cdrs, err error) {
if v1ms.qryIter == nil {
v1ms.qryIter = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Find(nil).Iter()
}
v1ms.qryIter.Next(&v1Cdr)
if v1Cdr == nil {
v1ms.qryIter = nil
return nil, utils.ErrNoMoreData
}
return v1Cdr, nil
}
//set
func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) {
if err := v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Insert(v1Cdr); err != nil {
return err
}
return
}