diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 88424e3dd..05d69b158 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -67,6 +67,7 @@ const ( colFlt = "filters" colSpp = "supplier_profiles" colAttr = "attribute_profiles" + ColCDRs = "cdrs" ) var ( diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index ad7a7f915..669bfa24f 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -947,7 +947,7 @@ func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) { if cdr.OrderID == 0 { cdr.OrderID = ms.cnter.Next() } - session, col := ms.conn(utils.CDRsTBL) + session, col := ms.conn(ColCDRs) defer session.Close() if allowUpdate { _, err = col.Upsert(bson.M{CGRIDLow: cdr.CGRID, RunIDLow: cdr.RunID}, cdr) @@ -989,7 +989,7 @@ func (ms *MongoStorage) cleanEmptyFilters(filters bson.M) { } } -// _, err := col(utils.CDRsTBL).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) +// _, err := col(ColCDRs).UpdateAll(bson.M{CGRIDLow: bson.M{"$in": cgrIds}}, bson.M{"$set": bson.M{"deleted_at": time.Now()}}) func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, int64, error) { var minUsage, maxUsage *time.Duration if len(qryFltr.MinUsage) != 0 { @@ -1103,7 +1103,7 @@ func (ms *MongoStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR, } //file.WriteString(fmt.Sprintf("AFTER: %v\n", utils.ToIJSON(filters))) //file.Close() - session, col := ms.conn(utils.CDRsTBL) + session, col := ms.conn(ColCDRs) defer session.Close() if remove { if chgd, err := col.RemoveAll(filters); err != nil { diff --git a/migrator/cdrs.go b/migrator/cdrs.go index 736a9894a..8c64aee1e 100755 --- a/migrator/cdrs.go +++ b/migrator/cdrs.go @@ -20,6 +20,7 @@ package migrator import ( "fmt" + "time" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -33,11 +34,13 @@ func (m *Migrator) migrateCurrentCDRs() (err error) { if err != nil { return err } - for _, cdr := range cdrs { - if err := m.oldStorDB.SetCDR(cdr, true); err != nil { - return err + /* + for _, cdr := range cdrs { + if err := m.oldStorDB.SetCDR(cdr, true); err != nil { + return err + } } - } + */ return } @@ -62,3 +65,97 @@ func (m *Migrator) migrateCDRs() (err error) { } return } + +func (m *Migrator) migrateV1CDRs() (err error) { + var v1CDR *v1Cdrs + for { + v1CDR, err = m.oldStorDB.getV1CDR() + if err != nil && err != utils.ErrNoMoreData { + return err + } + if err == utils.ErrNoMoreData { + break + } + if v1CDR != nil { + cdr := v1CDR.V1toV2Cdr() + if m.dryRun != true { + if err = m.storDB.SetCDR(cdr, true); err != nil { + return err + } + m.stats[utils.CDRs] += 1 + } + } + } + if m.dryRun != true { + // All done, update version wtih current one + vrs := engine.Versions{utils.CDRs: engine.CurrentStorDBVersions()[utils.CDRs]} + if err = m.storDB.SetVersions(vrs, false); err != nil { + return utils.NewCGRError(utils.Migrator, + utils.ServerErrorCaps, + err.Error(), + fmt.Sprintf("error: <%s> when updating CDRs version into StorDB", err.Error())) + } + } + return +} + +// +type v1Cdrs struct { + CGRID string + RunID string + OrderID int64 // Stor order id used as export order id + OriginHost string // represents the IP address of the host generating the CDR (automatically populated by the server) + Source string // formally identifies the source of the CDR (free form field) + OriginID string // represents the unique accounting id given by the telecom switch generating the CDR + ToR string // type of record, meta-field, should map to one of the TORs hardcoded inside the server <*voice|*data|*sms|*generic> + RequestType string // matching the supported request types by the **CGRateS**, accepted values are hardcoded in the server . + Tenant string // tenant whom this record belongs + Category string // free-form filter for this record, matching the category defined in rating profiles. + Account string // account id (accounting subsystem) the record should be attached to + Subject string // rating subject (rating subsystem) this record should be attached to + Destination string // destination to be charged + SetupTime time.Time // set-up time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp. + AnswerTime time.Time // answer time of the event. Supported formats: datetime RFC3339 compatible, SQL datetime (eg: MySQL), unix timestamp. + Usage time.Duration // event usage information (eg: in case of tor=*voice this will represent the total duration of a call) + ExtraFields map[string]string // Extra fields to be stored in CDR + ExtraInfo string // Container for extra information related to this CDR, eg: populated with error reason in case of error on calculation + Partial bool // Used for partial record processing by CDRC + Rated bool // Mark the CDR as rated so we do not process it during rating + CostSource string // The source of this cost + Cost float64 + CostDetails *engine.CallCost // Attach the cost details to CDR when possible +} + +func (v1Cdr v1Cdrs) V1toV2Cdr() (cdr *engine.CDR) { + cdr = &engine.CDR{ + CGRID: v1Cdr.CGRID, + RunID: v1Cdr.RunID, + OrderID: v1Cdr.OrderID, + OriginHost: v1Cdr.OriginHost, + Source: v1Cdr.Source, + OriginID: v1Cdr.OriginID, + ToR: v1Cdr.ToR, + RequestType: v1Cdr.RequestType, + Tenant: v1Cdr.Tenant, + Category: v1Cdr.Category, + Account: v1Cdr.Account, + Subject: v1Cdr.Subject, + Destination: v1Cdr.Destination, + SetupTime: v1Cdr.SetupTime, + AnswerTime: v1Cdr.AnswerTime, + Usage: v1Cdr.Usage, + ExtraFields: make(map[string]string), + ExtraInfo: v1Cdr.ExtraInfo, + Partial: v1Cdr.Partial, + Rated: v1Cdr.Rated, + CostSource: v1Cdr.CostSource, + Cost: v1Cdr.Cost, + CostDetails: engine.NewEventCostFromCallCost(v1Cdr.CostDetails, v1Cdr.CGRID, v1Cdr.RunID), + } + if v1Cdr.ExtraFields != nil { + for key, value := range v1Cdr.ExtraFields { + cdr.ExtraFields[key] = value + } + } + return +} diff --git a/migrator/cdrs_test.go b/migrator/cdrs_test.go new file mode 100755 index 000000000..15ff4f5c0 --- /dev/null +++ b/migrator/cdrs_test.go @@ -0,0 +1,98 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +/* +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + + +func TestV1CDRsAsCDR(t *testing.T) { + cc := &engine.CallCost{ + Category: "call", + Account: "1001", + Subject: "1001", + Tenant: "cgrates.org", + Direction: utils.OUT, + Destination: "1003", + Timespans: []*engine.TimeSpan{ + &engine.TimeSpan{ + TimeStart: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC), + TimeEnd: time.Date(2016, 4, 6, 13, 31, 30, 0, time.UTC), + DurationIndex: 0, + RateInterval: &engine.RateInterval{ + Rating: &engine.RIRate{ + Rates: engine.RateGroups{ + &engine.Rate{ + GroupIntervalStart: 0, + Value: 0.01, + RateIncrement: 10 * time.Second, + RateUnit: time.Second}}}}, + }, + }, + TOR: utils.VOICE} + + v1Cdr := &v1Cdrs{CGRID: utils.Sha1("testprepaid1", time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC).String()), + ToR: utils.VOICE, OriginID: "testprepaid1", OriginHost: "192.168.1.1", + Source: "TEST_PREPAID_CDR_SMCOST1", RequestType: utils.META_PREPAID, Tenant: "cgrates.org", + RunID: utils.META_DEFAULT, + Category: "call", Account: "1001", Subject: "1001", Destination: "1003", + SetupTime: time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC), + AnswerTime: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC), + Usage: time.Duration(90) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + CostDetails: cc} + + cdr := v1Cdr.V1toV2Cdr() + // Create manually EventCost here + ev := &enbine.EventCost{ + CGRID: v1Cdr.CGRID, + RunID: v1Cdr.RunID, + StartTime: v1Cdr.AnswerTime, + Charges: []*engine.ChargingInterval{}, + Rating: map[string]*RatingUnit{}, + Accounting: map[string]*BalanceCharge{}, + RatingFilters: map[string]RatingMatchedFilters{}, + Rates: :map[string]RateGroups{}, + Timings: :map[string]*ChargedTiming{}, + } + + eCDR := &engine.CDR{CGRID: utils.Sha1("testprepaid1", time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC).String()), + ToR: utils.VOICE, OriginID: "testprepaid1", OriginHost: "192.168.1.1", + Source: "TEST_PREPAID_CDR_SMCOST1", RequestType: utils.META_PREPAID, Tenant: "cgrates.org", + RunID: utils.META_DEFAULT, + Category: "call", Account: "1001", Subject: "1001", Destination: "1003", + SetupTime: time.Date(2016, 4, 6, 13, 29, 24, 0, time.UTC), + AnswerTime: time.Date(2016, 4, 6, 13, 30, 0, 0, time.UTC), + Usage: time.Duration(90) * time.Second, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + CostDetails: engine.NewEventCostFromCallCost(v1Cdr.CostDetails, v1Cdr.CGRID, v1Cdr.RunID)} + + if !reflect.DeepEqual(cdr, eCDR) { + t.Errorf("Expecting: %+v, received: %+v", cdr, eCDR) + } + +} +*/ diff --git a/migrator/migrator.go b/migrator/migrator.go index 9bbbdfea9..3a8c3793f 100755 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -28,7 +28,7 @@ import ( func NewMigrator(dmIN *engine.DataManager, dmOut *engine.DataManager, dataDBType, dataDBEncoding string, storDB engine.StorDB, storDBType string, oldDataDB MigratorDataDB, oldDataDBType, oldDataDBEncoding string, - oldStorDB engine.StorDB, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool, + oldStorDB MigratorStorDB, oldStorDBType string, dryRun bool, sameDataDB bool, sameStorDB bool, datadb_versions bool, stordb_versions bool) (m *Migrator, err error) { var mrshlr engine.Marshaler var oldmrshlr engine.Marshaler @@ -64,7 +64,7 @@ type Migrator struct { mrshlr engine.Marshaler oldDataDB MigratorDataDB oldDataDBType string - oldStorDB engine.StorDB + oldStorDB MigratorStorDB oldStorDBType string oldmrshlr engine.Marshaler dryRun bool diff --git a/migrator/migratorStorDB.go b/migrator/migratorStorDB.go new file mode 100755 index 000000000..7ddac97b4 --- /dev/null +++ b/migrator/migratorStorDB.go @@ -0,0 +1,24 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +type MigratorStorDB interface { + getV1CDR() (v1Cdr *v1Cdrs, err error) + setV1CDR(v1Cdr *v1Cdrs) (err error) +} diff --git a/migrator/v1mongo_data.go b/migrator/v1mongo_data.go index 095e14a06..bc4e2f5ab 100644 --- a/migrator/v1mongo_data.go +++ b/migrator/v1mongo_data.go @@ -20,6 +20,7 @@ package migrator import ( "fmt" + "strings" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -56,15 +57,17 @@ func newv1MongoStorage(host, port, db, user, pass, storageType string, cdrsIndex if user != "" && pass != "" { url = fmt.Sprintf("%s:%s@%s", user, pass, url) } + var dbName string if db != "" { url += "/" + db + dbName = strings.Split(db, "?")[0] // remove extra info after ? } session, err := mgo.Dial(url) if err != nil { return nil, err } session.SetMode(mgo.Strong, true) - v1ms = &v1Mongo{db: db, session: session, v1ms: engine.NewCodecMsgpackMarshaler()} + v1ms = &v1Mongo{db: dbName, session: session, v1ms: engine.NewCodecMsgpackMarshaler()} return } func (v1ms *v1Mongo) Close() {} diff --git a/migrator/v1mongo_stor.go b/migrator/v1mongo_stor.go new file mode 100755 index 000000000..ac8808ddb --- /dev/null +++ b/migrator/v1mongo_stor.go @@ -0,0 +1,48 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package migrator + +import ( + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +//CDR methods +//get +func (v1ms *v1Mongo) getv1CDR() (v1Cdr *v1Cdrs, err error) { + if v1ms.qryIter == nil { + v1ms.qryIter = v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Find(nil).Iter() + } + v1ms.qryIter.Next(&v1Cdr) + + if v1Cdr == nil { + v1ms.qryIter = nil + return nil, utils.ErrNoMoreData + + } + return v1Cdr, nil +} + +//set +func (v1ms *v1Mongo) setV1CDR(v1Cdr *v1Cdrs) (err error) { + if err := v1ms.session.DB(v1ms.db).C(engine.ColCDRs).Insert(v1Cdr); err != nil { + return err + } + return +}