SMGeneric saveOperations with async support

This commit is contained in:
DanB
2016-09-19 15:02:57 +02:00
parent 4a010e4a9f
commit 3c84afc162
3 changed files with 23 additions and 25 deletions

View File

@@ -235,15 +235,16 @@ func (self *SMGSession) disconnectSession(reason string) error {
// Merge the sum of costs and sends it to CDRS for storage
// originID could have been changed from original event, hence passing as argument here
func (self *SMGSession) saveOperations(originID string) error {
// pass cc as the clone of original to avoid concurrency issues
func (self *SMGSession) saveOperations(originID string, cc *engine.CallCost) error {
if len(self.callCosts) == 0 {
return nil // There are no costs to save, ignore the operation
}
firstCC := self.callCosts[0] // was merged in close method
firstCC.Round()
roundIncrements := firstCC.GetRoundIncrements()
//firstCC := self.callCosts[0] // was merged in close method
cc.Round()
roundIncrements := cc.GetRoundIncrements()
if len(roundIncrements) != 0 {
cd := firstCC.CreateCallDescriptor()
cd := cc.CreateCallDescriptor()
cd.CgrID = self.cd.CgrID
cd.RunID = self.cd.RunID
cd.Increments = roundIncrements
@@ -252,11 +253,11 @@ func (self *SMGSession) saveOperations(originID string) error {
return err
}
}
//
if len(firstCC.Timespans) > 50 { // Merge since we will get a callCost too big
firstCC.Timespans.Decompress()
firstCC.Timespans.Merge() // Here we could wait a while depending on the size of the timespans
firstCC.Timespans.Compress()
if len(cc.Timespans) > 50 { // Merge since we will get a callCost too big
cc.Timespans.Decompress()
cc.Timespans.Merge() // Here we could wait a while depending on the size of the timespans
cc.Timespans.Compress()
}
smCost := &engine.SMCost{
@@ -266,7 +267,7 @@ func (self *SMGSession) saveOperations(originID string) error {
OriginHost: self.eventStart.GetOriginatorIP(utils.META_DEFAULT),
OriginID: originID,
Usage: self.TotalUsage().Seconds(),
CostDetails: firstCC,
CostDetails: cc,
}
var reply string
if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil {

View File

@@ -305,11 +305,18 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error {
if err := s.close(aTime.Add(usage)); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
}
//go func() { // Call it in goroutine since it could take a while to compress timespans and save them
if err := s.saveOperations(sessionId); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
if len(s.callCosts) != 0 { // Save cost to sm_cost table
var cc engine.CallCost
if err := utils.Clone(*s.callCosts[0], &cc); err != nil { // Avoid concurrency on CC
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not clone callcost for sessionID: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
continue
}
go func(sessionID string, cc *engine.CallCost) { // Call it in goroutine since it could take a while to compress timespans and save them
if err := s.saveOperations(sessionId, cc); err != nil {
utils.Logger.Err(fmt.Sprintf("<SMGeneric> Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error()))
}
}(sessionId, &cc)
}
//}()
}
return nil, nil
}, time.Duration(2)*time.Second, sessionId)

View File

@@ -38,14 +38,6 @@ type Paginator struct {
SearchTerm string // Global matching pattern in items returned, partially used in some APIs
}
/*func (pag *Paginator) GetLimits() (low, high int) {
if pag.ItemsPerPage == 0 {
return 0, math.MaxInt32
}
return pag.Page * pag.ItemsPerPage, pag.ItemsPerPage
}
*/
type TPDestination struct {
TPid string // Tariff plan id
DestinationId string // Destination id
@@ -182,7 +174,6 @@ func (self *TPRatingPlanBinding) Timing() *TPTiming {
func NewTPRatingProfileFromKeyId(tpid, loadId, keyId string) (*TPRatingProfile, error) {
// *out:cgrates.org:call:*any
s := strings.Split(keyId, ":")
// [*out cgrates.org call *any]
if len(s) != 4 {
return nil, fmt.Errorf("Cannot parse key %s into RatingProfile", keyId)
}
@@ -504,7 +495,6 @@ type TPActionTrigger struct {
func NewTPAccountActionsFromKeyId(tpid, loadId, keyId string) (*TPAccountActions, error) {
// *out:cgrates.org:1001
s := strings.Split(keyId, ":")
// [*out cgrates.org 1001]
if len(s) != 2 {
return nil, fmt.Errorf("Cannot parse key %s into AccountActions", keyId)
}