mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 18:46:24 +05:00
CDRS forking rated CDRs depending on smg_costs, smg_costs indexing update to include origin_id also
This commit is contained in:
@@ -655,6 +655,26 @@ func TestDmtAgentSendCCRSimpaEvent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDmtAgentCdrs(t *testing.T) {
|
||||
if !*testIntegration {
|
||||
return
|
||||
}
|
||||
var cdrs []*engine.ExternalCDR
|
||||
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, ToRs: []string{utils.VOICE}}
|
||||
if err := apierRpc.Call("ApierV2.GetCdrs", req, &cdrs); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(cdrs) != 1 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
|
||||
} else {
|
||||
if cdrs[0].Usage != "610" {
|
||||
t.Errorf("Unexpected CDR Usage received, cdr: %+v ", cdrs[0])
|
||||
}
|
||||
if cdrs[0].Cost != 0.7565 {
|
||||
t.Errorf("Unexpected CDR Cost received, cdr: %+v ", cdrs[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDmtAgentSendDataGrpInit(t *testing.T) {
|
||||
if !*testIntegration {
|
||||
return
|
||||
@@ -803,7 +823,7 @@ func TestDmtAgentSendDataGrpUpdate(t *testing.T) {
|
||||
if err := dmtClient.SendMessage(ccr); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(500) * time.Millisecond)
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
|
||||
msg := dmtClient.ReceivedMessage()
|
||||
if msg == nil {
|
||||
t.Fatal("No message returned")
|
||||
@@ -883,7 +903,7 @@ func TestDmtAgentSendDataGrpTerminate(t *testing.T) {
|
||||
if err := dmtClient.SendMessage(ccr); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
|
||||
time.Sleep(time.Duration(3000) * time.Millisecond)
|
||||
msg := dmtClient.ReceivedMessage()
|
||||
if msg == nil {
|
||||
t.Fatal("No message returned")
|
||||
@@ -897,23 +917,16 @@ func TestDmtAgentSendDataGrpTerminate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDmtAgentCdrs(t *testing.T) {
|
||||
func TestDmtAgentSendDataGrpCDRs(t *testing.T) {
|
||||
if !*testIntegration {
|
||||
return
|
||||
}
|
||||
var cdrs []*engine.ExternalCDR
|
||||
req := utils.RPCCDRsFilter{RunIDs: []string{utils.META_DEFAULT}, ToRs: []string{utils.VOICE}}
|
||||
req := utils.RPCCDRsFilter{CGRIDs: []string{utils.Sha1("testdatagrp")}}
|
||||
if err := apierRpc.Call("ApierV2.GetCdrs", req, &cdrs); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(cdrs) != 1 {
|
||||
} else if len(cdrs) != 3 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
|
||||
} else {
|
||||
if cdrs[0].Usage != "610" {
|
||||
t.Errorf("Unexpected CDR Usage received, cdr: %+v ", cdrs[0])
|
||||
}
|
||||
if cdrs[0].Cost != 0.7565 {
|
||||
t.Errorf("Unexpected CDR Cost received, cdr: %+v ", cdrs[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
},
|
||||
{
|
||||
"id": "data_update_grp1", // formal identifier of this processor
|
||||
"dry_run": true, // do not send the events to SMG, just log them
|
||||
"dry_run": false, // do not send the events to SMG, just log them
|
||||
"request_filter": "Service-Context-Id(^gprs);CC-Request-Type(2);Multiple-Services-Credit-Control>Rating-Group(1)", // filter requests processed by this processor
|
||||
"continue_on_success": true, // continue to the next template if executed
|
||||
"ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
@@ -44,7 +44,8 @@
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*constant", "value": "data"},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "LastUsed", "field_id": "Usage", "field_filter":"Multiple-Services-Credit-Control>Rating-Group(1)", "type": "*handler", "handler_id": "*sum",
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*constant", "value": "3"},
|
||||
{"tag": "LastUsed", "field_id": "LastUsed", "field_filter":"Multiple-Services-Credit-Control>Rating-Group(1)", "type": "*handler", "handler_id": "*sum",
|
||||
"value": "Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets;^|;Multiple-Services-Credit-Control>Used-Service-Unit>CC-Output-Octets"},
|
||||
],
|
||||
"cca_fields": [
|
||||
@@ -54,7 +55,7 @@
|
||||
},
|
||||
{
|
||||
"id": "data_update_grp2", // formal identifier of this processor
|
||||
"dry_run": true, // do not send the events to SMG, just log them
|
||||
"dry_run": false, // do not send the events to SMG, just log them
|
||||
"request_filter": "Service-Context-Id(^gprs);CC-Request-Type(2);Multiple-Services-Credit-Control>Rating-Group(2)", // filter requests processed by this processor
|
||||
"continue_on_success": true, // continue to the next template if executed
|
||||
"ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
@@ -70,7 +71,8 @@
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*constant", "value": "data"},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "LastUsed", "field_id": "Usage", "field_filter":"Multiple-Services-Credit-Control>Rating-Group(2)", "type": "*handler", "handler_id": "*sum",
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*constant", "value": "3"},
|
||||
{"tag": "LastUsed", "field_id": "LastUsed", "field_filter":"Multiple-Services-Credit-Control>Rating-Group(2)", "type": "*handler", "handler_id": "*sum",
|
||||
"value": "Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets;^|;Multiple-Services-Credit-Control>Used-Service-Unit>CC-Output-Octets"},
|
||||
],
|
||||
"cca_fields": [
|
||||
@@ -80,11 +82,12 @@
|
||||
},
|
||||
{
|
||||
"id": "data_terminate", // formal identifier of this processor
|
||||
"dry_run": true, // do not send the events to SMG, just log them
|
||||
"dry_run": false, // do not send the events to SMG, just log them
|
||||
"request_filter": "Service-Context-Id(^gprs);CC-Request-Type(3)", // filter requests processed by this processor
|
||||
"continue_on_success": false, // continue to the next template if executed
|
||||
"ccr_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*data", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "Session-Id", "mandatory": true},
|
||||
{"tag": "OriginIDPrefix", "field_id": "OriginIDPrefix", "type": "*composed", "value": "Session-Id", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*prepaid", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
|
||||
@@ -94,12 +97,11 @@
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*constant", "value": "data"},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "Event-Timestamp", "mandatory": true},
|
||||
{"tag": "LastUsed", "field_id": "Usage", "type": "*handler", "handler_id": "*sum",
|
||||
{"tag": "LastUsed", "field_id": "LastUsed", "type": "*handler", "handler_id": "*sum",
|
||||
"value": "Multiple-Services-Credit-Control>Used-Service-Unit>CC-Input-Octets;^|;Multiple-Services-Credit-Control>Used-Service-Unit>CC-Output-Octets"},
|
||||
],
|
||||
"cca_fields": [
|
||||
{"tag": "ResultCode", "field_id": "Result-Code", "type": "*constant", "value": "^2001"},
|
||||
{"tag": "ResultCode", "field_filter": "CGRMaxUsage(0)", "field_id": "Result-Code", "type": "*constant", "value": "4010"},
|
||||
{"tag": "ResultCode", "field_id": "Result-Code", "type": "*constant", "value": "^2001"}
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
@@ -33,7 +33,7 @@ CREATE TABLE cdrs (
|
||||
updated_at TIMESTAMP,
|
||||
deleted_at TIMESTAMP,
|
||||
PRIMARY KEY (id),
|
||||
UNIQUE KEY cdrrun (cgrid, run_id)
|
||||
UNIQUE KEY cdrrun (cgrid, run_id, origin_id)
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS sm_costs;
|
||||
|
||||
@@ -32,7 +32,7 @@ CREATE TABLE cdrs (
|
||||
created_at TIMESTAMP,
|
||||
updated_at TIMESTAMP,
|
||||
deleted_at TIMESTAMP,
|
||||
UNIQUE (cgrid, run_id)
|
||||
UNIQUE (cgrid, run_id, origin_id)
|
||||
);
|
||||
;
|
||||
DROP INDEX IF EXISTS deleted_at_cp_idx;
|
||||
|
||||
163
engine/cdrs.go
163
engine/cdrs.go
@@ -109,12 +109,9 @@ func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error {
|
||||
|
||||
// RPC method, used to log callcosts to db
|
||||
func (self *CdrServer) StoreSMCost(smCost *SMCost, checkDuplicate bool) error {
|
||||
smCost.CostDetails.UpdateCost() // make sure the total cost reflect the increments
|
||||
smCost.CostDetails.UpdateRatedUsage() // make sure rated usage is updated
|
||||
lockKey := smCost.CGRID + smCost.RunID // Will lock on this ID
|
||||
if smCost.CGRID == "" && smCost.OriginID != "" {
|
||||
lockKey = smCost.OriginHost + smCost.OriginID
|
||||
}
|
||||
smCost.CostDetails.UpdateCost() // make sure the total cost reflect the increments
|
||||
smCost.CostDetails.UpdateRatedUsage() // make sure rated usage is updated
|
||||
lockKey := utils.CDRS_SOURCE + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID
|
||||
if checkDuplicate {
|
||||
_, err := self.guard.Guard(func() (interface{}, error) {
|
||||
smCosts, err := self.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
|
||||
@@ -125,7 +122,7 @@ func (self *CdrServer) StoreSMCost(smCost *SMCost, checkDuplicate bool) error {
|
||||
return nil, utils.ErrExists
|
||||
}
|
||||
return nil, self.cdrDb.SetSMCost(smCost)
|
||||
}, 0, lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
|
||||
}, time.Duration(2*time.Second), lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
|
||||
return err
|
||||
}
|
||||
return self.cdrDb.SetSMCost(smCost)
|
||||
@@ -209,60 +206,6 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR, sendToStats bool) error {
|
||||
if cdr.RunID == utils.MetaRaw { // Overwrite *raw with *default for rating
|
||||
cdr.RunID = utils.META_DEFAULT
|
||||
}
|
||||
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := LoadAlias(&AttrMatchingAlias{
|
||||
Destination: cdr.Destination,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Account: cdr.Account,
|
||||
Subject: cdr.Subject,
|
||||
Context: utils.ALIAS_CONTEXT_RATING,
|
||||
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
// Rate CDR
|
||||
if self.rater != nil && !cdr.Rated {
|
||||
if err := self.rateCDR(cdr); err != nil {
|
||||
cdr.Cost = -1.0 // If there was an error, mark the CDR
|
||||
cdr.ExtraInfo = err.Error()
|
||||
}
|
||||
}
|
||||
if cdr.RunID == utils.META_SURETAX { // Request should be processed by SureTax
|
||||
if err := SureTaxProcessCdr(cdr); err != nil {
|
||||
cdr.Cost = -1.0
|
||||
cdr.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
|
||||
}
|
||||
}
|
||||
if self.cgrCfg.CDRSStoreCdrs { // Store CDRs
|
||||
// Store RatedCDR
|
||||
if cdr.CostDetails != nil {
|
||||
cdr.CostDetails.UpdateCost()
|
||||
cdr.CostDetails.UpdateRatedUsage()
|
||||
}
|
||||
if err := self.cdrDb.SetCDR(cdr, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", cdr, err.Error()))
|
||||
}
|
||||
}
|
||||
// Attach CDR to stats
|
||||
if self.stats != nil && sendToStats { // Send CDR to stats
|
||||
if err := self.stats.AppendCDR(cdr, nil); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not append CDR to stats: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
if len(self.cgrCfg.CDRSCdrReplication) != 0 {
|
||||
self.replicateCdr(cdr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
|
||||
cdrRuns := []*CDR{cdr}
|
||||
if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs
|
||||
@@ -331,44 +274,118 @@ func (self *CdrServer) deriveCdrs(cdr *CDR) ([]*CDR, error) {
|
||||
return cdrRuns, nil
|
||||
}
|
||||
|
||||
func (self *CdrServer) rateCDR(cdr *CDR) error {
|
||||
func (self *CdrServer) rateStoreStatsReplicate(cdr *CDR, sendToStats bool) error {
|
||||
if cdr.RunID == utils.MetaRaw { // Overwrite *raw with *default for rating
|
||||
cdr.RunID = utils.META_DEFAULT
|
||||
}
|
||||
if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := LoadAlias(&AttrMatchingAlias{
|
||||
Destination: cdr.Destination,
|
||||
Direction: cdr.Direction,
|
||||
Tenant: cdr.Tenant,
|
||||
Category: cdr.Category,
|
||||
Account: cdr.Account,
|
||||
Subject: cdr.Subject,
|
||||
Context: utils.ALIAS_CONTEXT_RATING,
|
||||
}, cdr, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
// Rate CDR, can receive multiple due to SMCosts for OriginIDPrefix
|
||||
var ratedCDRs []*CDR
|
||||
var err error
|
||||
if cdr.Rated {
|
||||
ratedCDRs = []*CDR{cdr}
|
||||
} else if self.rater != nil {
|
||||
if ratedCDRs, err = self.rateCDR(cdr); err != nil {
|
||||
cdr.Cost = -1.0 // If there was an error, mark the CDR
|
||||
cdr.ExtraInfo = err.Error()
|
||||
ratedCDRs = []*CDR{cdr}
|
||||
}
|
||||
}
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
if ratedCDR.RunID == utils.META_SURETAX { // Request should be processed by SureTax
|
||||
if err := SureTaxProcessCdr(ratedCDR); err != nil {
|
||||
ratedCDR.Cost = -1.0
|
||||
ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
|
||||
}
|
||||
}
|
||||
}
|
||||
if self.cgrCfg.CDRSStoreCdrs { // Store CDRs
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
// Store RatedCDR
|
||||
if ratedCDR.CostDetails != nil {
|
||||
ratedCDR.CostDetails.UpdateCost()
|
||||
ratedCDR.CostDetails.UpdateRatedUsage()
|
||||
}
|
||||
if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
// Attach CDR to stats
|
||||
if self.stats != nil && sendToStats { // Send CDR to stats
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
if err := self.stats.AppendCDR(ratedCDR, nil); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not append CDR to stats: %s", err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(self.cgrCfg.CDRSCdrReplication) != 0 {
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
self.replicateCdr(ratedCDR)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// rateCDR will populate cost field
|
||||
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
|
||||
func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
|
||||
var qryCC *CallCost
|
||||
var err error
|
||||
if cdr.RequestType == utils.META_NONE {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
var cdrsRated []*CDR
|
||||
_, hasLastUsed := cdr.ExtraFields[utils.LastUsed]
|
||||
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && (cdr.Usage != 0 || hasLastUsed) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
|
||||
// Should be previously calculated and stored in DB
|
||||
delay := utils.Fib()
|
||||
var usage float64
|
||||
var smCosts []*SMCost
|
||||
for i := 0; i < 4; i++ {
|
||||
smCosts, err := self.cdrDb.GetSMCosts(cdr.CGRID, cdr.RunID, cdr.OriginHost, cdr.ExtraFields[utils.OriginIDPrefix])
|
||||
smCosts, err = self.cdrDb.GetSMCosts(cdr.CGRID, cdr.RunID, cdr.OriginHost, cdr.ExtraFields[utils.OriginIDPrefix])
|
||||
if err == nil && len(smCosts) != 0 {
|
||||
qryCC = smCosts[0].CostDetails
|
||||
usage = smCosts[0].Usage
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
}
|
||||
if len(smCosts) != 0 { // Cost retrieved from SMCost table
|
||||
for _, smCost := range smCosts {
|
||||
cdrClone := cdr.Clone()
|
||||
cdrClone.OriginID = smCost.OriginID
|
||||
cdrClone.Usage = time.Duration(smCost.Usage * utils.NANO_MULTIPLIER) // Usage is float as seconds, convert back to duration
|
||||
cdrClone.Cost = smCost.CostDetails.Cost
|
||||
cdrClone.CostDetails = smCost.CostDetails
|
||||
cdrsRated = append(cdrsRated, cdrClone)
|
||||
}
|
||||
return cdrsRated, nil
|
||||
}
|
||||
if err != nil && (err == gorm.RecordNotFound || err == mgov2.ErrNotFound) { //calculate CDR as for pseudoprepaid
|
||||
utils.Logger.Warning(fmt.Sprintf("<Cdrs> WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, will recalculate", cdr.CGRID, utils.SESSION_MANAGER_SOURCE, cdr.RunID))
|
||||
qryCC, err = self.getCostFromRater(cdr)
|
||||
}
|
||||
if cdr.Usage == 0 {
|
||||
cdr.Usage = time.Duration(usage)
|
||||
}
|
||||
|
||||
} else {
|
||||
qryCC, err = self.getCostFromRater(cdr)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
} else if qryCC != nil {
|
||||
cdr.Cost = qryCC.Cost
|
||||
cdr.CostDetails = qryCC
|
||||
}
|
||||
return nil
|
||||
return []*CDR{cdr}, nil
|
||||
}
|
||||
|
||||
// Retrive the cost from engine
|
||||
|
||||
@@ -218,7 +218,7 @@ func NewMongoStorage(host, port, db, user, pass string, cdrsIndexes []string) (*
|
||||
}
|
||||
}
|
||||
index = mgo.Index{
|
||||
Key: []string{CGRIDLow, RunIDLow},
|
||||
Key: []string{CGRIDLow, RunIDLow, OriginIDLow},
|
||||
Unique: true,
|
||||
DropDups: false,
|
||||
Background: false,
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -615,6 +614,8 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
|
||||
smc := &SMCost{
|
||||
CGRID: result.Cgrid,
|
||||
RunID: result.RunID,
|
||||
OriginHost: result.OriginHost,
|
||||
OriginID: result.OriginID,
|
||||
CostSource: result.CostSource,
|
||||
Usage: result.Usage,
|
||||
CostDetails: &CallCost{},
|
||||
@@ -966,8 +967,8 @@ func (self *SQLStorage) GetCDRs(qryFltr *utils.CDRsFilter, remove bool) ([]*CDR,
|
||||
return nil, 0, fmt.Errorf("JSON unmarshal callcost error for cgrid: %s, runid: %v, error: %s", result.Cgrid, result.RunID, err.Error())
|
||||
}
|
||||
}
|
||||
usageDur, _ := time.ParseDuration(strconv.FormatFloat(result.Usage, 'f', -1, 64) + "s")
|
||||
pddDur, _ := time.ParseDuration(strconv.FormatFloat(result.Pdd, 'f', -1, 64) + "s")
|
||||
usageDur := time.Duration(result.Usage * utils.NANO_MULTIPLIER)
|
||||
pddDur := time.Duration(result.Pdd * utils.NANO_MULTIPLIER)
|
||||
storCdr := &CDR{
|
||||
CGRID: result.Cgrid,
|
||||
RunID: result.RunID,
|
||||
|
||||
@@ -80,7 +80,6 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) {
|
||||
func (self *SMGSession) debit(dur time.Duration, lastUsed time.Duration) (time.Duration, error) {
|
||||
requestedDuration := dur
|
||||
self.totalUsage += lastUsed // Should reflect the total usage so far
|
||||
|
||||
//utils.Logger.Debug(fmt.Sprintf("ExtraDuration: %d", self.extraDuration))
|
||||
if lastUsed > 0 {
|
||||
self.extraDuration = self.lastUsage - lastUsed
|
||||
@@ -221,7 +220,8 @@ func (self *SMGSession) disconnectSession(reason string) error {
|
||||
}
|
||||
|
||||
// Merge the sum of costs and sends it to CDRS for storage
|
||||
func (self *SMGSession) saveOperations() error {
|
||||
// originID could have been changed from original event, hence passing as argument here
|
||||
func (self *SMGSession) saveOperations(originID string) error {
|
||||
if len(self.callCosts) == 0 {
|
||||
return nil // There are no costs to save, ignore the operation
|
||||
}
|
||||
@@ -243,7 +243,8 @@ func (self *SMGSession) saveOperations() error {
|
||||
CostSource: utils.SESSION_MANAGER_SOURCE,
|
||||
RunID: self.runId,
|
||||
OriginHost: self.eventStart.GetOriginatorIP(utils.META_DEFAULT),
|
||||
OriginID: self.eventStart.GetUUID(),
|
||||
OriginID: originID,
|
||||
Usage: self.TotalUsage().Seconds(),
|
||||
CostDetails: firstCC,
|
||||
}
|
||||
if err := self.cdrsrv.StoreSMCost(engine.AttrCDRSStoreSMCost{SMCost: smCost, CheckDuplicate: true}, &reply); err != nil {
|
||||
|
||||
@@ -143,7 +143,7 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
|
||||
if err := s.close(aTime.Add(usage)); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
|
||||
}
|
||||
if err := s.saveOperations(); err != nil {
|
||||
if err := s.saveOperations(sessionId); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
|
||||
}
|
||||
}
|
||||
@@ -228,8 +228,7 @@ func (self *SMGeneric) SessionUpdate(gev SMGenericEvent, clnt *rpc2.Client) (tim
|
||||
}
|
||||
return nilDuration, err
|
||||
}
|
||||
evUuid := gev.GetUUID()
|
||||
for _, s := range self.getSession(evUuid) {
|
||||
for _, s := range self.getSession(gev.GetUUID()) {
|
||||
if maxDur, err := s.debit(evMaxUsage, evLastUsed); err != nil {
|
||||
return nilDuration, err
|
||||
} else if maxDur < evMaxUsage {
|
||||
@@ -254,15 +253,18 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error {
|
||||
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
|
||||
err = self.sessionStart(gev, getClientConnId(clnt))
|
||||
}
|
||||
if err != nil {
|
||||
if err != nil && err != utils.ErrMandatoryIeMissing {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sessionIDs := []string{gev.GetUUID()}
|
||||
if sessionIDPrefix, err := gev.GetFieldAsString(utils.OriginIDPrefix); err == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
|
||||
sessionIDs = self.getSessionIDsForPrefix(sessionIDPrefix)
|
||||
}
|
||||
usage, err := gev.GetUsage(utils.META_DEFAULT)
|
||||
if err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
return err
|
||||
|
||||
}
|
||||
lastUsed, err := gev.GetLastUsed(utils.META_DEFAULT)
|
||||
if err != nil {
|
||||
@@ -272,18 +274,19 @@ func (self *SMGeneric) SessionEnd(gev SMGenericEvent, clnt *rpc2.Client) error {
|
||||
return err
|
||||
}
|
||||
var s *SMGSession
|
||||
for _, s = range self.getSession(gev.GetUUID()) {
|
||||
break
|
||||
for _, sID := range sessionIDs {
|
||||
for _, s = range self.getSession(sID) {
|
||||
break
|
||||
}
|
||||
if s != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
usage = s.TotalUsage() + lastUsed
|
||||
}
|
||||
sessionIDs := []string{gev.GetUUID()}
|
||||
if sessionIDPrefix, err := gev.GetFieldAsString(utils.OriginIDPrefix); err == nil { // OriginIDPrefix is present, OriginID will not be anymore considered
|
||||
sessionIDs = self.getSessionIDsForPrefix(sessionIDPrefix)
|
||||
}
|
||||
var interimError error
|
||||
for _, sessionID := range sessionIDs {
|
||||
if err := self.sessionEnd(sessionID, usage); err != nil {
|
||||
@@ -377,7 +380,6 @@ func (self *SMGeneric) ChargeEvent(gev SMGenericEvent, clnt *rpc2.Client) (maxDu
|
||||
utils.Logger.Err(fmt.Sprintf("<SM> ERROR failed to refund rounding: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
var reply string
|
||||
smCost := &engine.SMCost{
|
||||
CGRID: gev.GetCgrId(self.timezone),
|
||||
|
||||
Reference in New Issue
Block a user