From 4ff66e088378f762aefb199db7ff932e6e2bf7d9 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 24 Mar 2016 17:39:47 +0200 Subject: [PATCH] populate cdr Usage --- apier/v1/cdrs.go | 8 +++--- data/storage/mysql/create_cdrs_tables.sql | 1 + data/storage/postgres/create_cdrs_tables.sql | 1 + engine/cdrs.go | 16 +++++++++--- engine/guardian_test.go | 20 ++++++++------- engine/models.go | 1 + engine/pubsub.go | 4 +-- engine/storage_cdrs_it_test.go | 8 +++--- engine/storage_interface.go | 4 +-- engine/storage_mongo_stordb.go | 8 +++--- engine/storage_sql.go | 27 ++++++++++++-------- engine/storage_utils.go | 1 + sessionmanager/smg_session.go | 2 +- sessionmanager/smgeneric.go | 13 ++-------- 14 files changed, 63 insertions(+), 51 deletions(-) diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index fe52b49ab..39dcd5378 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -26,19 +26,19 @@ import ( ) // Retrieves the callCost out of CGR logDb -func (apier *ApierV1) GetCallCostLog(attrs utils.AttrGetCallCost, reply *engine.CallCost) error { +func (apier *ApierV1) GetCallCostLog(attrs utils.AttrGetCallCost, reply *engine.SMCost) error { if attrs.CgrId == "" { return utils.NewErrMandatoryIeMissing("CgrId") } if attrs.RunId == "" { attrs.RunId = utils.META_DEFAULT } - if cc, err := apier.CdrDb.GetCallCostLog(attrs.CgrId, attrs.RunId); err != nil { + if smc, err := apier.CdrDb.GetCallCostLog(attrs.CgrId, attrs.RunId); err != nil { return utils.NewErrServerError(err) - } else if cc == nil { + } else if smc == nil { return utils.ErrNotFound } else { - *reply = *cc + *reply = *smc } return nil } diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index 3f5e0269d..463ea922e 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -42,6 +42,7 @@ CREATE TABLE sm_costs ( cgrid char(40) NOT NULL, run_id varchar(64) NOT NULL, cost_source varchar(64) NOT NULL, + `usage` DECIMAL(30,9) NOT NULL, cost_details text, created_at TIMESTAMP, deleted_at TIMESTAMP, diff --git a/data/storage/postgres/create_cdrs_tables.sql b/data/storage/postgres/create_cdrs_tables.sql index d4425fb89..2c710dc4e 100644 --- a/data/storage/postgres/create_cdrs_tables.sql +++ b/data/storage/postgres/create_cdrs_tables.sql @@ -45,6 +45,7 @@ CREATE TABLE sm_costs ( cgrid CHAR(40) NOT NULL, run_id VARCHAR(64) NOT NULL, cost_source VARCHAR(64) NOT NULL, + usage NUMERIC(30,9) NOT NULL, cost_details jsonb, created_at TIMESTAMP, deleted_at TIMESTAMP, diff --git a/engine/cdrs.go b/engine/cdrs.go index 27cd49dc6..e8b7de17d 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -38,6 +38,7 @@ type CallCostLog struct { CgrId string Source string RunId string + Usage float64 // real usage (not increment rounded) CallCost *CallCost CheckDuplicate bool } @@ -119,11 +120,11 @@ func (self *CdrServer) LogCallCost(ccl *CallCostLog) error { if cc != nil { return nil, utils.ErrExists } - return nil, self.cdrDb.LogCallCost(ccl.CgrId, ccl.RunId, ccl.Source, ccl.CallCost) + return nil, self.cdrDb.LogCallCost(&SMCost{CGRID: ccl.CgrId, RunID: ccl.RunId, CostSource: ccl.Source, Usage: ccl.Usage, CostDetails: ccl.CallCost}) }, 0, ccl.CgrId) return err } - return self.cdrDb.LogCallCost(ccl.CgrId, ccl.RunId, ccl.Source, ccl.CallCost) + return self.cdrDb.LogCallCost(&SMCost{CGRID: ccl.CgrId, RunID: ccl.RunId, CostSource: ccl.Source, Usage: ccl.Usage, CostDetails: ccl.CallCost}) } // Called by rate/re-rate API @@ -332,12 +333,16 @@ func (self *CdrServer) rateCDR(cdr *CDR) error { if cdr.RequestType == utils.META_NONE { return nil } - if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && cdr.Usage != 0 { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards + _, hasLastUsed := cdr.ExtraFields["LastUsed"] + if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && (cdr.Usage != 0 || hasLastUsed) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards // Should be previously calculated and stored in DB delay := utils.Fib() + var usage float64 for i := 0; i < 4; i++ { - qryCC, err = self.cdrDb.GetCallCostLog(cdr.CGRID, cdr.RunID) + qrySMC, err := self.cdrDb.GetCallCostLog(cdr.CGRID, cdr.RunID) if err == nil { + qryCC = qrySMC.CostDetails + usage = qrySMC.Usage break } time.Sleep(delay()) @@ -346,6 +351,9 @@ func (self *CdrServer) rateCDR(cdr *CDR) error { utils.Logger.Warning(fmt.Sprintf(" WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", cdr.CGRID, utils.SESSION_MANAGER_SOURCE, cdr.RunID)) qryCC, err = self.getCostFromRater(cdr) } + if cdr.Usage == 0 { + cdr.Usage = time.Duration(usage) + } } else { qryCC, err = self.getCostFromRater(cdr) diff --git a/engine/guardian_test.go b/engine/guardian_test.go index a8e3cc4f8..6e436e9d5 100644 --- a/engine/guardian_test.go +++ b/engine/guardian_test.go @@ -19,31 +19,33 @@ along with this program. If not, see package engine import ( - "log" "testing" "time" ) -func ATestAccountLock(t *testing.T) { +func BenchmarkGuard(b *testing.B) { for i := 0; i < 100; i++ { go Guardian.Guard(func() (interface{}, error) { - log.Print("first 1") time.Sleep(1 * time.Millisecond) - log.Print("end first 1") return 0, nil }, 0, "1") go Guardian.Guard(func() (interface{}, error) { - log.Print("first 2") time.Sleep(1 * time.Millisecond) - log.Print("end first 2") return 0, nil }, 0, "2") go Guardian.Guard(func() (interface{}, error) { - log.Print("second 1") time.Sleep(1 * time.Millisecond) - log.Print("end second 1") return 0, nil }, 0, "1") } - time.Sleep(10 * time.Second) + +} + +func BenchmarkGuardian(b *testing.B) { + for i := 0; i < 100; i++ { + go Guardian.Guard(func() (interface{}, error) { + time.Sleep(1 * time.Millisecond) + return 0, nil + }, 0, "1") + } } diff --git a/engine/models.go b/engine/models.go index 85ce94362..1da2db987 100644 --- a/engine/models.go +++ b/engine/models.go @@ -452,6 +452,7 @@ type TBLSMCosts struct { Cgrid string RunID string CostSource string + Usage float64 CostDetails string CreatedAt time.Time DeletedAt time.Time diff --git a/engine/pubsub.go b/engine/pubsub.go index 2ae544df0..0d4dd17c1 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -139,13 +139,13 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error { } transport := split[0] address := split[1] - + ttlVerify := ps.ttlVerify switch transport { case utils.META_HTTP_POST: go func() { delay := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := ps.pubFunc(address, ps.ttlVerify, evt); err == nil { + if _, err := ps.pubFunc(address, ttlVerify, evt); err == nil { break // Success, no need to reinterate } else if i == 4 { // Last iteration, syslog the warning utils.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"])) diff --git a/engine/storage_cdrs_it_test.go b/engine/storage_cdrs_it_test.go index f264ec237..4e934d4e4 100644 --- a/engine/storage_cdrs_it_test.go +++ b/engine/storage_cdrs_it_test.go @@ -222,13 +222,13 @@ func testSMCosts(cfg *config.CGRConfig) error { }, TOR: utils.VOICE, } - if err := cdrStorage.LogCallCost("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT, utils.UNIT_TEST, cc); err != nil { + if err := cdrStorage.LogCallCost(&SMCost{CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", RunID: utils.META_DEFAULT, CostSource: utils.UNIT_TEST, CostDetails: cc}); err != nil { return err } - if rcvCC, err := cdrStorage.GetCallCostLog("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT); err != nil { + if rcvSMC, err := cdrStorage.GetCallCostLog("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT); err != nil { return err - } else if len(cc.Timespans) != len(rcvCC.Timespans) { // cc.Timespans[0].RateInterval.Rating.Rates[0], rcvCC.Timespans[0].RateInterval.Rating.Rates[0]) - return fmt.Errorf("Expecting: %+v, received: %+v", cc, rcvCC) + } else if len(cc.Timespans) != len(rcvSMC.CostDetails.Timespans) { // cc.Timespans[0].RateInterval.Rating.Rates[0], rcvCC.Timespans[0].RateInterval.Rating.Rates[0]) + return fmt.Errorf("Expecting: %+v, received: %+s", cc, utils.ToIJSON(rcvSMC)) } return nil } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 8d082ae14..95d2f8d0e 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -97,8 +97,8 @@ type AccountingStorage interface { type CdrStorage interface { Storage SetCDR(*CDR, bool) error - LogCallCost(cgrid, runid, source string, cc *CallCost) error - GetCallCostLog(cgrid, runid string) (*CallCost, error) + LogCallCost(smc *SMCost) error + GetCallCostLog(cgrid, runid string) (*SMCost, error) GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index 0e0090def..a0aaffe09 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -698,16 +698,16 @@ func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Acti }{at, as, time.Now(), source}) } -func (ms *MongoStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) error { - return ms.db.C(utils.TBLSMCosts).Insert(&SMCost{CGRID: cgrid, RunID: runid, CostSource: source, CostDetails: cc}) +func (ms *MongoStorage) LogCallCost(smc *SMCost) error { + return ms.db.C(utils.TBLSMCosts).Insert(smc) } -func (ms *MongoStorage) GetCallCostLog(cgrid, runid string) (cc *CallCost, err error) { +func (ms *MongoStorage) GetCallCostLog(cgrid, runid string) (smc *SMCost, err error) { var result SMCost if err = ms.db.C(utils.TBLSMCosts).Find(bson.M{CGRIDLow: cgrid, RunIDLow: runid}).One(&result); err != nil { return nil, err } - return result.CostDetails, nil + return &result, nil } func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) { diff --git a/engine/storage_sql.go b/engine/storage_sql.go index e7796e34f..cac8c86fc 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -569,21 +569,22 @@ func (self *SQLStorage) SetTpAccountActions(aas []TpAccountAction) error { return nil } -func (self *SQLStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) error { - if cc == nil { +func (self *SQLStorage) LogCallCost(smc *SMCost) error { + if smc.CostDetails == nil { return nil } - tss, err := json.Marshal(cc) + tss, err := json.Marshal(smc.CostDetails) if err != nil { utils.Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) return err } tx := self.db.Begin() cd := &TBLSMCosts{ - Cgrid: cgrid, - RunID: runid, - CostSource: source, + Cgrid: smc.CGRID, + RunID: smc.RunID, + CostSource: smc.CostSource, CostDetails: string(tss), + Usage: smc.Usage, CreatedAt: time.Now(), } if tx.Save(cd).Error != nil { // Check further since error does not properly reflect duplicates here (sql: no rows in result set) @@ -594,7 +595,7 @@ func (self *SQLStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) e return nil } -func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*CallCost, error) { +func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*SMCost, error) { var tpCostDetail TBLSMCosts if err := self.db.Where(&TBLSMCosts{Cgrid: cgrid, RunID: runid}).First(&tpCostDetail).Error; err != nil { return nil, err @@ -602,11 +603,17 @@ func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*CallCost, error) { if len(tpCostDetail.CostDetails) == 0 { return nil, nil // No costs returned } - var cc CallCost - if err := json.Unmarshal([]byte(tpCostDetail.CostDetails), &cc); err != nil { + smc := &SMCost{ + CGRID: tpCostDetail.Cgrid, + RunID: tpCostDetail.RunID, + CostSource: tpCostDetail.CostSource, + Usage: tpCostDetail.Usage, + CostDetails: &CallCost{}, + } + if err := json.Unmarshal([]byte(tpCostDetail.CostDetails), smc.CostDetails); err != nil { return nil, err } - return &cc, nil + return smc, nil } func (self *SQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 8994020fd..16012f2e1 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -157,5 +157,6 @@ type SMCost struct { CGRID string RunID string CostSource string + Usage float64 CostDetails *CallCost } diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 53a655db6..46ade6019 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -225,7 +225,6 @@ func (self *SMGSession) saveOperations() error { } firstCC := self.callCosts[0] // was merged in close method firstCC.Round() - self.totalUsage = time.Duration(firstCC.RatedUsage) // save final usage //utils.Logger.Debug("Saved CC: " + utils.ToJSON(firstCC)) roundIncrements := firstCC.GetRoundIncrements() if len(roundIncrements) != 0 { @@ -242,6 +241,7 @@ func (self *SMGSession) saveOperations() error { CgrId: self.eventStart.GetCgrId(self.timezone), Source: utils.SESSION_MANAGER_SOURCE, RunId: self.runId, + Usage: float64(self.totalUsage), CallCost: firstCC, CheckDuplicate: true, }, &reply) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index ff5931434..2f16b2f71 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -117,6 +117,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { return nil, nil // Did not find the session so no need to close it anymore } for idx, s := range ss { + s.totalUsage = usage // save final usage as totalUsage //utils.Logger.Info(fmt.Sprintf(" Ending session: %s, runId: %s", sessionId, s.runId)) if idx == 0 && s.stopDebit != nil { close(s.stopDebit) // Stop automatic debits @@ -333,18 +334,8 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu } func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error { - cdr := gev.AsStoredCdr(self.cgrCfg, self.timezone) - if cdr.Usage == 0 { - var s *SMGSession - for _, s = range self.getSession(gev.GetUUID()) { - break - } - if s != nil { - cdr.Usage = s.TotalUsage() - } - } var reply string - if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil { + if err := self.cdrsrv.ProcessCdr(gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil { return err } return nil