populate cdr Usage

This commit is contained in:
Radu Ioan Fericean
2016-03-24 17:39:47 +02:00
parent 73286e3c71
commit 4ff66e0883
14 changed files with 63 additions and 51 deletions

View File

@@ -26,19 +26,19 @@ import (
)
// Retrieves the callCost out of CGR logDb
func (apier *ApierV1) GetCallCostLog(attrs utils.AttrGetCallCost, reply *engine.CallCost) error {
func (apier *ApierV1) GetCallCostLog(attrs utils.AttrGetCallCost, reply *engine.SMCost) error {
if attrs.CgrId == "" {
return utils.NewErrMandatoryIeMissing("CgrId")
}
if attrs.RunId == "" {
attrs.RunId = utils.META_DEFAULT
}
if cc, err := apier.CdrDb.GetCallCostLog(attrs.CgrId, attrs.RunId); err != nil {
if smc, err := apier.CdrDb.GetCallCostLog(attrs.CgrId, attrs.RunId); err != nil {
return utils.NewErrServerError(err)
} else if cc == nil {
} else if smc == nil {
return utils.ErrNotFound
} else {
*reply = *cc
*reply = *smc
}
return nil
}

View File

@@ -42,6 +42,7 @@ CREATE TABLE sm_costs (
cgrid char(40) NOT NULL,
run_id varchar(64) NOT NULL,
cost_source varchar(64) NOT NULL,
`usage` DECIMAL(30,9) NOT NULL,
cost_details text,
created_at TIMESTAMP,
deleted_at TIMESTAMP,

View File

@@ -45,6 +45,7 @@ CREATE TABLE sm_costs (
cgrid CHAR(40) NOT NULL,
run_id VARCHAR(64) NOT NULL,
cost_source VARCHAR(64) NOT NULL,
usage NUMERIC(30,9) NOT NULL,
cost_details jsonb,
created_at TIMESTAMP,
deleted_at TIMESTAMP,

View File

@@ -38,6 +38,7 @@ type CallCostLog struct {
CgrId string
Source string
RunId string
Usage float64 // real usage (not increment rounded)
CallCost *CallCost
CheckDuplicate bool
}
@@ -119,11 +120,11 @@ func (self *CdrServer) LogCallCost(ccl *CallCostLog) error {
if cc != nil {
return nil, utils.ErrExists
}
return nil, self.cdrDb.LogCallCost(ccl.CgrId, ccl.RunId, ccl.Source, ccl.CallCost)
return nil, self.cdrDb.LogCallCost(&SMCost{CGRID: ccl.CgrId, RunID: ccl.RunId, CostSource: ccl.Source, Usage: ccl.Usage, CostDetails: ccl.CallCost})
}, 0, ccl.CgrId)
return err
}
return self.cdrDb.LogCallCost(ccl.CgrId, ccl.RunId, ccl.Source, ccl.CallCost)
return self.cdrDb.LogCallCost(&SMCost{CGRID: ccl.CgrId, RunID: ccl.RunId, CostSource: ccl.Source, Usage: ccl.Usage, CostDetails: ccl.CallCost})
}
// Called by rate/re-rate API
@@ -332,12 +333,16 @@ func (self *CdrServer) rateCDR(cdr *CDR) error {
if cdr.RequestType == utils.META_NONE {
return nil
}
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && cdr.Usage != 0 { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
_, hasLastUsed := cdr.ExtraFields["LastUsed"]
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && (cdr.Usage != 0 || hasLastUsed) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
delay := utils.Fib()
var usage float64
for i := 0; i < 4; i++ {
qryCC, err = self.cdrDb.GetCallCostLog(cdr.CGRID, cdr.RunID)
qrySMC, err := self.cdrDb.GetCallCostLog(cdr.CGRID, cdr.RunID)
if err == nil {
qryCC = qrySMC.CostDetails
usage = qrySMC.Usage
break
}
time.Sleep(delay())
@@ -346,6 +351,9 @@ func (self *CdrServer) rateCDR(cdr *CDR) error {
utils.Logger.Warning(fmt.Sprintf("<Cdrs> WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", cdr.CGRID, utils.SESSION_MANAGER_SOURCE, cdr.RunID))
qryCC, err = self.getCostFromRater(cdr)
}
if cdr.Usage == 0 {
cdr.Usage = time.Duration(usage)
}
} else {
qryCC, err = self.getCostFromRater(cdr)

View File

@@ -19,31 +19,33 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"log"
"testing"
"time"
)
func ATestAccountLock(t *testing.T) {
func BenchmarkGuard(b *testing.B) {
for i := 0; i < 100; i++ {
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 1")
time.Sleep(1 * time.Millisecond)
log.Print("end first 1")
return 0, nil
}, 0, "1")
go Guardian.Guard(func() (interface{}, error) {
log.Print("first 2")
time.Sleep(1 * time.Millisecond)
log.Print("end first 2")
return 0, nil
}, 0, "2")
go Guardian.Guard(func() (interface{}, error) {
log.Print("second 1")
time.Sleep(1 * time.Millisecond)
log.Print("end second 1")
return 0, nil
}, 0, "1")
}
time.Sleep(10 * time.Second)
}
func BenchmarkGuardian(b *testing.B) {
for i := 0; i < 100; i++ {
go Guardian.Guard(func() (interface{}, error) {
time.Sleep(1 * time.Millisecond)
return 0, nil
}, 0, "1")
}
}

View File

@@ -452,6 +452,7 @@ type TBLSMCosts struct {
Cgrid string
RunID string
CostSource string
Usage float64
CostDetails string
CreatedAt time.Time
DeletedAt time.Time

View File

@@ -139,13 +139,13 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
}
transport := split[0]
address := split[1]
ttlVerify := ps.ttlVerify
switch transport {
case utils.META_HTTP_POST:
go func() {
delay := utils.Fib()
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := ps.pubFunc(address, ps.ttlVerify, evt); err == nil {
if _, err := ps.pubFunc(address, ttlVerify, evt); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
utils.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))

View File

@@ -222,13 +222,13 @@ func testSMCosts(cfg *config.CGRConfig) error {
},
TOR: utils.VOICE,
}
if err := cdrStorage.LogCallCost("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT, utils.UNIT_TEST, cc); err != nil {
if err := cdrStorage.LogCallCost(&SMCost{CGRID: "164b0422fdc6a5117031b427439482c6a4f90e41", RunID: utils.META_DEFAULT, CostSource: utils.UNIT_TEST, CostDetails: cc}); err != nil {
return err
}
if rcvCC, err := cdrStorage.GetCallCostLog("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT); err != nil {
if rcvSMC, err := cdrStorage.GetCallCostLog("164b0422fdc6a5117031b427439482c6a4f90e41", utils.META_DEFAULT); err != nil {
return err
} else if len(cc.Timespans) != len(rcvCC.Timespans) { // cc.Timespans[0].RateInterval.Rating.Rates[0], rcvCC.Timespans[0].RateInterval.Rating.Rates[0])
return fmt.Errorf("Expecting: %+v, received: %+v", cc, rcvCC)
} else if len(cc.Timespans) != len(rcvSMC.CostDetails.Timespans) { // cc.Timespans[0].RateInterval.Rating.Rates[0], rcvCC.Timespans[0].RateInterval.Rating.Rates[0])
return fmt.Errorf("Expecting: %+v, received: %+s", cc, utils.ToIJSON(rcvSMC))
}
return nil
}

View File

@@ -97,8 +97,8 @@ type AccountingStorage interface {
type CdrStorage interface {
Storage
SetCDR(*CDR, bool) error
LogCallCost(cgrid, runid, source string, cc *CallCost) error
GetCallCostLog(cgrid, runid string) (*CallCost, error)
LogCallCost(smc *SMCost) error
GetCallCostLog(cgrid, runid string) (*SMCost, error)
GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error)
}

View File

@@ -698,16 +698,16 @@ func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Acti
}{at, as, time.Now(), source})
}
func (ms *MongoStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) error {
return ms.db.C(utils.TBLSMCosts).Insert(&SMCost{CGRID: cgrid, RunID: runid, CostSource: source, CostDetails: cc})
func (ms *MongoStorage) LogCallCost(smc *SMCost) error {
return ms.db.C(utils.TBLSMCosts).Insert(smc)
}
func (ms *MongoStorage) GetCallCostLog(cgrid, runid string) (cc *CallCost, err error) {
func (ms *MongoStorage) GetCallCostLog(cgrid, runid string) (smc *SMCost, err error) {
var result SMCost
if err = ms.db.C(utils.TBLSMCosts).Find(bson.M{CGRIDLow: cgrid, RunIDLow: runid}).One(&result); err != nil {
return nil, err
}
return result.CostDetails, nil
return &result, nil
}
func (ms *MongoStorage) SetCDR(cdr *CDR, allowUpdate bool) (err error) {

View File

@@ -569,21 +569,22 @@ func (self *SQLStorage) SetTpAccountActions(aas []TpAccountAction) error {
return nil
}
func (self *SQLStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) error {
if cc == nil {
func (self *SQLStorage) LogCallCost(smc *SMCost) error {
if smc.CostDetails == nil {
return nil
}
tss, err := json.Marshal(cc)
tss, err := json.Marshal(smc.CostDetails)
if err != nil {
utils.Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
return err
}
tx := self.db.Begin()
cd := &TBLSMCosts{
Cgrid: cgrid,
RunID: runid,
CostSource: source,
Cgrid: smc.CGRID,
RunID: smc.RunID,
CostSource: smc.CostSource,
CostDetails: string(tss),
Usage: smc.Usage,
CreatedAt: time.Now(),
}
if tx.Save(cd).Error != nil { // Check further since error does not properly reflect duplicates here (sql: no rows in result set)
@@ -594,7 +595,7 @@ func (self *SQLStorage) LogCallCost(cgrid, runid, source string, cc *CallCost) e
return nil
}
func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*CallCost, error) {
func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*SMCost, error) {
var tpCostDetail TBLSMCosts
if err := self.db.Where(&TBLSMCosts{Cgrid: cgrid, RunID: runid}).First(&tpCostDetail).Error; err != nil {
return nil, err
@@ -602,11 +603,17 @@ func (self *SQLStorage) GetCallCostLog(cgrid, runid string) (*CallCost, error) {
if len(tpCostDetail.CostDetails) == 0 {
return nil, nil // No costs returned
}
var cc CallCost
if err := json.Unmarshal([]byte(tpCostDetail.CostDetails), &cc); err != nil {
smc := &SMCost{
CGRID: tpCostDetail.Cgrid,
RunID: tpCostDetail.RunID,
CostSource: tpCostDetail.CostSource,
Usage: tpCostDetail.Usage,
CostDetails: &CallCost{},
}
if err := json.Unmarshal([]byte(tpCostDetail.CostDetails), smc.CostDetails); err != nil {
return nil, err
}
return &cc, nil
return smc, nil
}
func (self *SQLStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {

View File

@@ -157,5 +157,6 @@ type SMCost struct {
CGRID string
RunID string
CostSource string
Usage float64
CostDetails *CallCost
}

View File

@@ -225,7 +225,6 @@ func (self *SMGSession) saveOperations() error {
}
firstCC := self.callCosts[0] // was merged in close method
firstCC.Round()
self.totalUsage = time.Duration(firstCC.RatedUsage) // save final usage
//utils.Logger.Debug("Saved CC: " + utils.ToJSON(firstCC))
roundIncrements := firstCC.GetRoundIncrements()
if len(roundIncrements) != 0 {
@@ -242,6 +241,7 @@ func (self *SMGSession) saveOperations() error {
CgrId: self.eventStart.GetCgrId(self.timezone),
Source: utils.SESSION_MANAGER_SOURCE,
RunId: self.runId,
Usage: float64(self.totalUsage),
CallCost: firstCC,
CheckDuplicate: true,
}, &reply)

View File

@@ -117,6 +117,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
return nil, nil // Did not find the session so no need to close it anymore
}
for idx, s := range ss {
s.totalUsage = usage // save final usage as totalUsage
//utils.Logger.Info(fmt.Sprintf("<SMGeneric> Ending session: %s, runId: %s", sessionId, s.runId))
if idx == 0 && s.stopDebit != nil {
close(s.stopDebit) // Stop automatic debits
@@ -333,18 +334,8 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
}
func (self *SMGeneric) ProcessCdr(gev SMGenericEvent) error {
cdr := gev.AsStoredCdr(self.cgrCfg, self.timezone)
if cdr.Usage == 0 {
var s *SMGSession
for _, s = range self.getSession(gev.GetUUID()) {
break
}
if s != nil {
cdr.Usage = s.TotalUsage()
}
}
var reply string
if err := self.cdrsrv.ProcessCdr(cdr, &reply); err != nil {
if err := self.cdrsrv.ProcessCdr(gev.AsStoredCdr(self.cgrCfg, self.timezone), &reply); err != nil {
return err
}
return nil