From 3c84afc162b710d745117a26c6402da608112679 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 19 Sep 2016 15:02:57 +0200 Subject: [PATCH] SMGeneric saveOperations with async support --- sessionmanager/smg_session.go | 23 ++++++++++++----------- sessionmanager/smgeneric.go | 15 +++++++++++---- utils/apitpdata.go | 10 ---------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 44a573936..3b1b5c184 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -235,15 +235,16 @@ func (self *SMGSession) disconnectSession(reason string) error { // Merge the sum of costs and sends it to CDRS for storage // originID could have been changed from original event, hence passing as argument here -func (self *SMGSession) saveOperations(originID string) error { +// pass cc as the clone of original to avoid concurrency issues +func (self *SMGSession) saveOperations(originID string, cc *engine.CallCost) error { if len(self.callCosts) == 0 { return nil // There are no costs to save, ignore the operation } - firstCC := self.callCosts[0] // was merged in close method - firstCC.Round() - roundIncrements := firstCC.GetRoundIncrements() + //firstCC := self.callCosts[0] // was merged in close method + cc.Round() + roundIncrements := cc.GetRoundIncrements() if len(roundIncrements) != 0 { - cd := firstCC.CreateCallDescriptor() + cd := cc.CreateCallDescriptor() cd.CgrID = self.cd.CgrID cd.RunID = self.cd.RunID cd.Increments = roundIncrements @@ -252,11 +253,11 @@ func (self *SMGSession) saveOperations(originID string) error { return err } } - // - if len(firstCC.Timespans) > 50 { // Merge since we will get a callCost too big - firstCC.Timespans.Decompress() - firstCC.Timespans.Merge() // Here we could wait a while depending on the size of the timespans - firstCC.Timespans.Compress() + + if len(cc.Timespans) > 50 { // Merge since we will get a callCost too big + cc.Timespans.Decompress() + cc.Timespans.Merge() // Here we could wait a while depending on the size of the timespans + cc.Timespans.Compress() } smCost := &engine.SMCost{ @@ -266,7 +267,7 @@ func (self *SMGSession) saveOperations(originID string) error { OriginHost: self.eventStart.GetOriginatorIP(utils.META_DEFAULT), OriginID: originID, Usage: self.TotalUsage().Seconds(), - CostDetails: firstCC, + CostDetails: cc, } var reply string if err := self.cdrsrv.Call("CdrsV1.StoreSMCost", engine.AttrCDRSStoreSMCost{Cost: smCost, CheckDuplicate: true}, &reply); err != nil { diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index 0b4b77987..1372cc9dc 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -305,11 +305,18 @@ func (self *SMGeneric) sessionEnd(sessionId string, usage time.Duration) error { if err := s.close(aTime.Add(usage)); err != nil { utils.Logger.Err(fmt.Sprintf(" Could not close session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) } - //go func() { // Call it in goroutine since it could take a while to compress timespans and save them - if err := s.saveOperations(sessionId); err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) + if len(s.callCosts) != 0 { // Save cost to sm_cost table + var cc engine.CallCost + if err := utils.Clone(*s.callCosts[0], &cc); err != nil { // Avoid concurrency on CC + utils.Logger.Err(fmt.Sprintf(" Could not clone callcost for sessionID: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) + continue + } + go func(sessionID string, cc *engine.CallCost) { // Call it in goroutine since it could take a while to compress timespans and save them + if err := s.saveOperations(sessionId, cc); err != nil { + utils.Logger.Err(fmt.Sprintf(" Could not save session: %s, runId: %s, error: %s", sessionId, s.runId, err.Error())) + } + }(sessionId, &cc) } - //}() } return nil, nil }, time.Duration(2)*time.Second, sessionId) diff --git a/utils/apitpdata.go b/utils/apitpdata.go index cdd91889e..a065dfa3b 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -38,14 +38,6 @@ type Paginator struct { SearchTerm string // Global matching pattern in items returned, partially used in some APIs } -/*func (pag *Paginator) GetLimits() (low, high int) { - if pag.ItemsPerPage == 0 { - return 0, math.MaxInt32 - } - return pag.Page * pag.ItemsPerPage, pag.ItemsPerPage -} -*/ - type TPDestination struct { TPid string // Tariff plan id DestinationId string // Destination id @@ -182,7 +174,6 @@ func (self *TPRatingPlanBinding) Timing() *TPTiming { func NewTPRatingProfileFromKeyId(tpid, loadId, keyId string) (*TPRatingProfile, error) { // *out:cgrates.org:call:*any s := strings.Split(keyId, ":") - // [*out cgrates.org call *any] if len(s) != 4 { return nil, fmt.Errorf("Cannot parse key %s into RatingProfile", keyId) } @@ -504,7 +495,6 @@ type TPActionTrigger struct { func NewTPAccountActionsFromKeyId(tpid, loadId, keyId string) (*TPAccountActions, error) { // *out:cgrates.org:1001 s := strings.Split(keyId, ":") - // [*out cgrates.org 1001] if len(s) != 2 { return nil, fmt.Errorf("Cannot parse key %s into AccountActions", keyId) }