From 73d7ec2a77f38b30092da16dca26da67c010f37f Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 27 Jan 2019 18:40:56 +0100 Subject: [PATCH] SessionS package building --- sessions/session.go | 40 ++++ sessions/sessions.go | 468 +++++++++++++++++++++---------------------- utils/consts.go | 2 +- 3 files changed, 275 insertions(+), 235 deletions(-) diff --git a/sessions/session.go b/sessions/session.go index af56da1bf..92f475247 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -139,6 +140,45 @@ func (s *Session) AsActiveSessions(tmz, nodeID string) (aSs []*ActiveSession) { return } +// TotalUsage returns the first session run total usage +func (s *Session) TotalUsage() (tDur time.Duration) { + if len(s.SRuns) == 0 { + return + } + s.RLock() + for _, sr := range s.SRuns { + tDur = sr.TotalUsage + break // only first + } + s.RUnlock() + return +} + +// AsCGREvents is a thread safe method to return the Session as CGREvents +// there will be one CGREvent for each SRun +func (s *Session) AsCGREvents(cgrCfg *config.CGRConfig) (cgrEvs []*utils.CGREvent, err error) { + if len(s.SRuns) == 0 { + return + } + s.RLock() + cgrEvs = make([]*utils.CGREvent, len(s.SRuns)) // so we can gather all cdr info while under lock + for i, sr := range s.SRuns { + var cdr *engine.CDR + if cdr, err = sr.Event.AsCDR(cgrCfg, s.Tenant, + cgrCfg.GeneralCfg().DefaultTimezone); err != nil { + break // will return with error + } + cdr.Usage = sr.TotalUsage + cgrEvs[i] = &utils.CGREvent{ + Tenant: s.Tenant, + ID: utils.UUIDSha1Prefix(), + Event: cdr.AsMapStringIface(), + } + } + s.RUnlock() + return +} + // SRun is one billing run for the Session type SRun struct { Event engine.MapEvent // Event received from ChargerS diff --git a/sessions/sessions.go b/sessions/sessions.go index a392ec988..2b2a14540 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -112,10 +112,10 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS, biJClnts: make(map[rpcclient.RpcClientConnection]string), biJIDs: make(map[string]*biJClient), aSessions: make(map[string]*Session), - aSessionsIdx: make(map[string]map[string]map[string]utils.StringMap), + aSessionsIdx: make(map[string]map[string]utils.StringMap), aSessionsRIdx: make(map[string][]*riFieldNameVal), pSessions: make(map[string]*Session), - pSessionsIdx: make(map[string]map[string]map[string]utils.StringMap), + pSessionsIdx: make(map[string]map[string]utils.StringMap), pSessionsRIdx: make(map[string][]*riFieldNameVal)} } @@ -148,16 +148,16 @@ type SessionS struct { aSsMux sync.RWMutex // protects aSessions aSessions map[string]*Session // group sessions per sessionId, multiple runs based on derived charging - aSIMux sync.RWMutex // protects aSessionsIdx - aSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][runID]utils.StringMap[cgrID] - aSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove + aSIMux sync.RWMutex // protects aSessionsIdx + aSessionsIdx map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][runID]utils.StringMap[cgrID] + aSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove pSsMux sync.RWMutex // protects pSessions pSessions map[string]*Session // group passive sessions based on cgrID - pSIMux sync.RWMutex // protects pSessionsIdx - pSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][runID]utils.StringMap[cgrID] - pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove + pSIMux sync.RWMutex // protects pSessionsIdx + pSessionsIdx map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue]utils.StringMap[cgrID] + pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove } @@ -287,7 +287,6 @@ func (sS *SessionS) setSTerminator(s *Session) { return // nothing to set up } // random delay computation - var sTLMaxDelay int64 maxDelay, err := s.EventStart.GetDuration(utils.SessionTTLMaxDelay) switch err { case nil: // all good @@ -375,37 +374,28 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs if err = sS.endSession(s, nil); err != nil { utils.Logger.Warning( fmt.Sprintf( - "<%s> failed force terminating session with ID %s, err: %s", + "<%s> failed force terminating session with ID <%s>, err: <%s>", utils.SessionS, s.CGRid(), err.Error())) } // Generate CDRs for each session run - s.Lock() - for _, sr := range s.SRuns { - cdr, err := sr.Event.AsCDR(sS.cgrCfg, s.Tenant, - sS.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf( - "<%s> could not create CDR out of event %s, err: %s", - utils.SessionS, utils.ToJSON(sr.Event), err.Error())) - } - cdr.Usage = sr.TotalUsage + var cgrEvs []*utils.CGREvent + if cgrEvs, err = s.AsCGREvents(sS.cgrCfg); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> could not create CDRs for session: <%s>, err: <%s>", + utils.SessionS, s.CGRid(), err.Error())) + } + // post the CDRs + for _, cgrEv := range cgrEvs { var reply string - cgrEv := &utils.CGREvent{ - Tenant: s.Tenant, - ID: utils.UUIDSha1Prefix(), - Event: cdr.AsMapStringIface(), - } if err = sS.cdrS.Call(utils.CdrsV2ProcessCDR, cgrEv, &reply); err != nil { utils.Logger.Warning( fmt.Sprintf( - "<%s> could not create CDR out of event %s, err: %s", - utils.SessionS, utils.ToJSON(sr.Event), err.Error())) + "<%s> could not post CDR for event %s, err: %s", + utils.SessionS, utils.ToJSON(cgrEv), err.Error())) } } - resID := s.ResourceID // we are still under lock - clntConnID := s.ClientConnID - s.Unlock() + // release the resources for the session if sS.resS != nil && s.ResourceID != "" { var reply string argsRU := utils.ArgRSv1ResourceUsage{ @@ -413,7 +403,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs Tenant: s.Tenant, Event: s.EventStart.AsMapInterface(), }, - UsageID: resID, + UsageID: s.ResourceID, Units: 1, } if err := sS.resS.Call(utils.ResourceSv1ReleaseResources, @@ -437,7 +427,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs } } } - sS.replicateSessions(s.CGRID, false) + sS.replicateSessions(s.CGRID, false, sS.sReplConns) return } @@ -451,7 +441,7 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, return } sr := s.SRuns[sRunIdx] - rDur := sr.debitReserve(dur, lastUsage) // debit out of reserve, rDur is still to be debited + rDur := sr.debitReserve(dur, lastUsed) // debit out of reserve, rDur is still to be debited if rDur == time.Duration(0) { s.Unlock() return dur, nil // complete debit out of reserve @@ -487,7 +477,8 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, sr.CD.MaxCostSoFar += cc.Cost sr.CD.LoopIndex += 1 sr.TotalUsage += sr.LastUsage - ec := engine.NewEventCostFromCallCost(cc, s.CGRID, sr.RunID) + ec := engine.NewEventCostFromCallCost(cc, s.CGRID, + sr.Event.GetStringIgnoreErrors(utils.RunID)) if sr.EventCost == nil { sr.EventCost = ec } else { @@ -510,28 +501,28 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, return } - var sleepDur time.Duration for { - if maxDebit, err := sS.debitSession(dbtIvl, nil); err != nil { + var maxDebit time.Duration + if maxDebit, err = sS.debitSession(s, sRunIdx, dbtIvl, nil); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> could not complete debit operation on session: <%s>, error: <%s>", - utils.SessionS, self.CGRID, err.Error())) + utils.SessionS, s.CGRid(), err.Error())) dscReason := utils.ErrServerError.Error() if err.Error() == utils.ErrUnauthorizedDestination.Error() { dscReason = err.Error() } - if err := sS.disconnectSession(dscReason); err != nil { + if err = sS.disconnectSession(s, dscReason); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> could not disconnect session: %s, error: %s", - utils.SessionS, self.CGRID, err.Error())) + utils.SessionS, s.CGRid(), err.Error())) } return } else if maxDebit < dbtIvl { time.Sleep(maxDebit) - if err := self.disconnectSession(utils.ErrInsufficientCredit.Error()); err != nil { + if err := sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> could not disconnect session: %s, error: %s", - utils.SessionS, self.CGRID, err.Error())) + utils.SessionS, s.CGRid(), err.Error())) } return } @@ -588,7 +579,7 @@ func (sS *SessionS) refundSession(s *Session, sRunIdx int, rUsage time.Duration) Increments: incrmts, } var acnt engine.Account - if err = self.rals.Call(utils.ResponderRefundIncrements, cd, &acnt); err != nil { + if err = sS.ralS.Call(utils.ResponderRefundIncrements, cd, &acnt); err != nil { return } if acnt.ID != "" { // Account info updated, update also cached AccountSummary @@ -623,12 +614,12 @@ func (sS *SessionS) storeSCost(s *Session, sRunIdx int) (err error) { if err == utils.ErrExists { utils.Logger.Warning( fmt.Sprintf("<%s> refunding session: <%s> error: <%s>", - utils.SessionS, cgrID, err.Error())) + utils.SessionS, s.CGRID, err.Error())) if err = sS.refundSession(s, sRunIdx, sr.CD.GetDuration()); err != nil { // refund entire duration utils.Logger.Warning( fmt.Sprintf( "<%s> failed refunding session: <%s>, srIdx: <%d>, error: <%s>", - utils.SessionS, cgrID, i, err.Error())) + utils.SessionS, s.CGRID, sRunIdx, err.Error())) } } else { return err @@ -643,8 +634,9 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { if clnt == nil { return fmt.Errorf("calling %s requires bidirectional JSON connection", utils.SessionSv1DisconnectSession) } - s.RLock() // for reading the TotalUsage - s.EventStart.Set(utils.Usage, self.TotalUsage) // Set the usage to total one debitted + s.RLock() // for reading the TotalUsage + s.EventStart.Set(utils.Usage, s.TotalUsage()) // Set the usage to total one debitted + sEv := s.EventStart.AsMapInterface() s.RUnlock() servMethod := utils.SessionSv1DisconnectSession if clnt.proto == 0 { // compatibility with OpenSIPS 2.3 @@ -652,7 +644,7 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { } var rply string if err := clnt.conn.Call(servMethod, - utils.AttrDisconnectSession{EventStart: self.EventStart.AsMapInterface(), + utils.AttrDisconnectSession{EventStart: sEv, Reason: rsn}, &rply); err != nil { if err != utils.ErrNotImplemented { return err @@ -677,13 +669,14 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC wg.Add(1) } go func(conn rpcclient.RpcClientConnection, sync bool, ss []*Session) { - for _, s := range ssMp { + for _, s := range ss { + sCln := s.Clone() var rply string if err := conn.Call(utils.SessionSv1SetPassiveSession, s.Clone(), &rply); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s", - utils.SessionS, s.CGRID, err.Error())) + utils.SessionS, sCln.CGRID, err.Error())) } } if sync { @@ -708,7 +701,7 @@ func (sS *SessionS) registerSession(s *Session, passive bool) { sMp[s.CGRID] = s sS.indexSession(s, passive) if !passive { - sS.setSessionTerminator(s) + sS.setSTerminator(s) } sMux.Unlock() } @@ -723,18 +716,17 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool { sMp = sS.pSessions } sMux.Lock() - if _, found := sMp[cgrID]; !found { + s, found := sMp[cgrID] + if !found { sMux.Unlock() return false } delete(sMp, cgrID) sS.unindexSession(cgrID, passive) if !passive { - sS.sTsMux.RLock() - if st, found := sS.sessionTerminators[cgrID]; found { - st.endChan <- true - } - sS.sTsMux.RUnlock() + s.RLock() + close(s.sTerminator.endChan) + s.RUnlock() } sMux.Unlock() return true @@ -743,19 +735,16 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool { // indexSession will index an active or passive Session based on configuration func (sS *SessionS) indexSession(s *Session, pSessions bool) { idxMux := &sS.aSIMux // pointer to original mux since will have no effect if we copy it - ssIndx := sS.aSessionsIndex - ssRIdx := sS.aSessionsRIndex - if passiveSessions { + ssIndx := sS.aSessionsIdx + ssRIdx := sS.aSessionsRIdx + if pSessions { idxMux = &sS.pSIMux - ssIndx = sS.pSessionsIndex - ssRIdx = sS.pSessionsRIndex + ssIndx = sS.pSessionsIdx + ssRIdx = sS.pSessionsRIdx } idxMux.Lock() defer idxMux.Unlock() - s.RLock() - defer s.RUnlock() - idxCfg := sS.cgrCfg.SessionSCfg().SessionIndexes - for fieldName := range idxCfg { + for fieldName := range sS.cgrCfg.SessionSCfg().SessionIndexes { fieldVal, err := s.EventStart.GetString(fieldName) if err != nil { if err == utils.ErrNotFound { @@ -769,19 +758,16 @@ func (sS *SessionS) indexSession(s *Session, pSessions bool) { fieldVal = utils.MetaEmpty } if _, hasFieldName := ssIndx[fieldName]; !hasFieldName { // Init it here - ssIndx[fieldName] = make(map[string]map[string]utils.StringMap) + ssIndx[fieldName] = make(map[string]utils.StringMap) } if _, hasFieldVal := ssIndx[fieldName][fieldVal]; !hasFieldVal { - ssIndx[fieldName][fieldVal] = make(map[string]utils.StringMap) + ssIndx[fieldName][fieldVal] = make(utils.StringMap) } - if _, hasFieldVal := ssIndx[fieldName][fieldVal][s.RunID]; !hasFieldVal { - ssIndx[fieldName][fieldVal][s.RunID] = make(utils.StringMap) - } - ssIndx[fieldName][fieldVal][s.RunID][s.CGRID] = true + ssIndx[fieldName][fieldVal][s.CGRID] = true if _, hasIt := ssRIdx[s.CGRID]; !hasIt { ssRIdx[s.CGRID] = make([]*riFieldNameVal, 0) } - ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{runID: s.RunID, fieldName: fieldName, fieldValue: fieldVal}) + ssRIdx[s.CGRID] = append(ssRIdx[s.CGRID], &riFieldNameVal{fieldName: fieldName, fieldValue: fieldVal}) } return } @@ -803,10 +789,7 @@ func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool { return false } for _, riFNV := range ssRIdx[cgrID] { - delete(ssIndx[riFNV.fieldName][riFNV.fieldValue][riFNV.runID], cgrID) - if len(ssIndx[riFNV.fieldName][riFNV.fieldValue][riFNV.runID]) == 0 { - delete(ssIndx[riFNV.fieldName][riFNV.fieldValue], riFNV.runID) - } + delete(ssIndx[riFNV.fieldName][riFNV.fieldValue], cgrID) if len(ssIndx[riFNV.fieldName][riFNV.fieldValue]) == 0 { delete(ssIndx[riFNV.fieldName], riFNV.fieldValue) } @@ -832,7 +815,7 @@ func (sS *SessionS) getSessionIDsForPrefix(prefix string, pSessions bool) (cgrID if strings.HasPrefix(originID, prefix) { if _, hasDefaultRun := ssIndx[utils.OriginID][originID][utils.META_DEFAULT]; hasDefaultRun { cgrIDs = append(cgrIDs, - ssIndx[utils.OriginID][originID][utils.META_DEFAULT].Slice()...) + ssIndx[utils.OriginID][originID].Slice()...) } } } @@ -854,34 +837,16 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string]string, defer idxMux.RUnlock() matchedIndexes := make(map[string]string) matchingSessions := make(utils.StringMap) - runID := fltrs[utils.RunID] checkNr := 0 - var findFunc func(cgrID, fltrName, fltrVal string) bool - if runID == "" { - findFunc = func(cgrID, fltrName, fltrVal string) bool { - for runID := range ssIndx[fltrName][fltrVal] { - for cgrmID := range ssIndx[fltrName][fltrVal][runID] { - if cgrID == cgrmID { - return true - } - } + findFunc := func(cgrID, fltrName, fltrVal string) bool { + for cgrmID := range ssIndx[fltrName][fltrVal] { + if cgrID == cgrmID { + return true } - return false - } - } else { // We know the RunID - findFunc = func(cgrID, fltrName, fltrVal string) bool { - for cgrmID := range ssIndx[fltrName][fltrVal][runID] { - if cgrID == cgrmID { - return true - } - } - return false } + return false } for fltrName, fltrVal := range fltrs { - if fltrName == utils.RunID { - continue - } if _, hasFldName := ssIndx[fltrName]; !hasFldName { continue } @@ -892,13 +857,7 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string]string, } matchedIndexes[fltrName] = fltrVal if checkNr == 1 { // First run will init the MatchingSessions - if runID == "" { - for runID := range ssIndx[fltrName][fltrVal] { - matchingSessions.Join(ssIndx[fltrName][fltrVal][runID]) - } - } else { // We know the RunID - matchingSessions = ssIndx[fltrName][fltrVal][runID] - } + matchingSessions = ssIndx[fltrName][fltrVal] continue } // Higher run, takes out non matching indexes @@ -919,7 +878,7 @@ func (sS *SessionS) asActiveSessions(fltrs map[string]string, count, psv bool) (aSs []*ActiveSession, counter int, err error) { aSs = make([]*ActiveSession, 0) // Make sure we return at least empty list and not nil // Check first based on indexes so we can downsize the list of matching sessions - matchingSessionIDs, checkedFilters := sS.getSessionIDsMatchingIndexes(fltrs, pSessions) + matchingSessionIDs, checkedFilters := sS.getSessionIDsMatchingIndexes(fltrs, psv) if len(matchingSessionIDs) == 0 && len(checkedFilters) != 0 { return } @@ -931,7 +890,7 @@ func (sS *SessionS) asActiveSessions(fltrs map[string]string, } var remainingSessions []*Session // Survived index matching ss := sS.getSessions(fltrs[utils.CGRID], psv) - for cgrID, s := range ss { + for _, s := range ss { remainingSessions = append(remainingSessions, s) } if len(fltrs) != 0 { // Still have some filters to match @@ -974,15 +933,15 @@ func (sS *SessionS) forkSession(s *Session) (err error) { cgrEv := &utils.CGREvent{ Tenant: s.Tenant, ID: utils.UUIDSha1Prefix(), - Event: evStart.AsMapInterface(), + Event: s.EventStart.AsMapInterface(), } var chrgrs []*engine.ChrgSProcessEventReply - if err := sS.chargerS.Call(utils.ChargerSv1ProcessEvent, + if err = sS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil { if err.Error() == utils.ErrNotFound.Error() { - return nil, utils.ErrNoActiveSession + return utils.ErrNoActiveSession } - return nil, err + return } s.SRuns = make([]*SRun, len(chrgrs)) for i, chrgr := range chrgrs { @@ -990,7 +949,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { startTime := me.GetTimeIgnoreErrors(utils.AnswerTime, sS.cgrCfg.GeneralCfg().DefaultTimezone) if startTime.IsZero() { // AnswerTime not parsable, try SetupTime - startTime = evStart.GetTimeIgnoreErrors(utils.SetupTime, + startTime = s.EventStart.GetTimeIgnoreErrors(utils.SetupTime, sS.cgrCfg.GeneralCfg().DefaultTimezone) } s.SRuns[i] = &SRun{ @@ -1006,7 +965,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { Account: me.GetStringIgnoreErrors(utils.Account), Destination: me.GetStringIgnoreErrors(utils.Destination), TimeStart: startTime, - TimeEnd: startTime.Add(evStart.GetDurationIgnoreErrors(utils.Usage)), + TimeEnd: startTime.Add(s.EventStart.GetDurationIgnoreErrors(utils.Usage)), ExtraFields: me.AsMapStringIgnoreErrors(utils.NewStringMap(utils.PrimaryCdrFields...)), }, } @@ -1041,8 +1000,8 @@ func (sS *SessionS) getSessions(cgrID string, pSessions bool) (ss []*Session) { // transitSState will transit the sessions from one state (active/passive) to another (passive/active) func (sS *SessionS) transitSState(cgrID string, psv bool) (ss []*Session) { ss = sS.getSessions(cgrID, !psv) - for _, s := range pSessions { - sS.unregisterSession(s, !psv) + for _, s := range ss { + sS.unregisterSession(cgrID, !psv) sS.registerSession(s, psv) // ToDo: activate prepaid debits } @@ -1052,7 +1011,7 @@ func (sS *SessionS) transitSState(cgrID string, psv bool) (ss []*Session) { // getActivateSessions returns the sessions from active list or moves from passive func (sS *SessionS) getActivateSessions(cgrID string) (ss []*Session) { ss = sS.getSessions(cgrID, false) - if len(aSessions) == 0 { + if len(ss) == 0 { ss = sS.transitSState(cgrID, false) } return @@ -1064,17 +1023,18 @@ func (sS *SessionS) relocateSessions(initOriginID, originID, originHost string) return } initCGRID := utils.Sha1(initOriginID, originHost) - ss = sS.getActivateSessions(initCGRID, false) - for i, s := range ss { + newCGRID := utils.Sha1(originID, originHost) + ss = sS.getActivateSessions(initCGRID) + for _, s := range ss { sS.unregisterSession(s.CGRID, false) s.Lock() - s.CGRID = cgrID + s.CGRID = newCGRID // Overwrite initial CGRID with new one - s.EventStart.Set(utils.CGRID, utils.Sha1(originID, originHost)) // Overwrite CGRID for final CDR - s.EventStart.Set(utils.OriginID, originID) // Overwrite OriginID for session indexing + s.EventStart.Set(utils.CGRID, newCGRID) // Overwrite CGRID for final CDR + s.EventStart.Set(utils.OriginID, originID) // Overwrite OriginID for session indexing s.Unlock() - sS.registerSession(s.CGRID, false) - sS.replicateSessions(initCGRID, false, sS.smgReplConns) + sS.registerSession(s, false) + sS.replicateSessions(initCGRID, false, sS.sReplConns) } return } @@ -1094,7 +1054,7 @@ func (sS *SessionS) getRelocateSessions(cgrID string, initOriginID, func (sS *SessionS) syncSessions() { queriedCGRIDs := engine.NewSafEvent(nil) // need this to be var err error - for _, clnt := range sS.biJClnts() { + for _, clnt := range sS.biJClients() { errChan := make(chan error) go func() { var queriedSessionIDs []*SessionID @@ -1103,7 +1063,7 @@ func (sS *SessionS) syncSessions() { errChan <- err } for _, sessionID := range queriedSessionIDs { - queriedCGRIDs.Set(sessionID.CGRID()) = struct{}{} + queriedCGRIDs.Set(sessionID.CGRID(), struct{}{}) } errChan <- nil }() @@ -1113,7 +1073,7 @@ func (sS *SessionS) syncSessions() { utils.Logger.Warning( fmt.Sprintf("<%s> error quering session ids : %+v", utils.SessionS, err)) } - case <-time.After(smg.cgrCfg.GeneralCfg().ReplyTimeout): + case <-time.After(sS.cgrCfg.GeneralCfg().ReplyTimeout): utils.Logger.Warning( fmt.Sprintf("<%s> timeout quering session ids ", utils.SessionS)) } @@ -1126,10 +1086,10 @@ func (sS *SessionS) syncSessions() { toBeRemoved = append(toBeRemoved, cgrid) } } - smg.aSsMux.RUnlock() + sS.aSsMux.RUnlock() for _, cgrID := range toBeRemoved { ss := sS.getSessions(cgrID, false) - if len(aSessions) == 0 { + if len(ss) == 0 { continue } if err := sS.forceSTerminate(ss[0], 0, nil); err != nil { @@ -1140,7 +1100,15 @@ func (sS *SessionS) syncSessions() { // authSession calculates maximum usage allowed for given session func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage time.Duration, err error) { - s = &Session{ + cgrID := GetSetCGRID(evStart) + if _, err = evStart.GetDuration(utils.Usage); err != nil { + if err != utils.ErrNotFound { + return + } + evStart.Set(utils.Usage, sS.cgrCfg.SessionSCfg().MaxCallDuration) // will be used in CD + err = nil + } + s := &Session{ CGRID: cgrID, Tenant: tnt, EventStart: evStart, @@ -1148,14 +1116,7 @@ func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage if err = sS.forkSession(s); err != nil { return } - var reqMaxUsage time.Duration - if reqMaxUsage, err = s.EventStart.GetStringIgnoreErrors(utils.Usage); err != nil { - if err != utils.ErrNotFound { - return - } - reqMaxUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration - err = nil - } + var maxUsageSet bool // so we know if we have set the 0 on purpose prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} for _, sr := range s.SRuns { @@ -1198,29 +1159,29 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID go sS.debitLoopSession(s, i, dbtItval) } } - sS.registerASession(s) // make the session available to the rest of the system + sS.registerSession(s, false) // make the session available to the rest of the system return } // updateSession will reset terminator, perform debits and replicate sessions -func (sS *SessionS) updateSession(s *Session) (maxUsage time.Duration, err error) { - defer sS.replicateSessions(s.CGRID) +func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage time.Duration, err error) { + defer sS.replicateSessions(s.CGRID, false, sS.sReplConns) // update fields from new event - protectedFlds := []string{utils.CGRID, utils.OriginHost, utils.OriginID, utils.Usage} // declare here so we don't sort more than once - for k, v := range ev.AsMapInterface() { - if utils.IsSliceMember(protectedFlds, k) { // reserved field, don't overwrite + protectedFlds := engine.MapEvent{ + utils.CGRID: struct{}{}, + utils.OriginHost: struct{}{}, + utils.OriginID: struct{}{}, + utils.Usage: struct{}{}, + } + for k, v := range updtEv { + if protectedFlds.HasField(k) { continue } s.EventStart.Set(k, v) // update previoius field with new one } - sS.setSTerminator(s) - if evLastUsed, err = ev.GetDuration(utils.LastUsed); err == nil { - lastUsed = &evLastUsed - } else if err != utils.ErrNotFound { - return - } + sS.setSTerminator(s) // reset the terminator var reqMaxUsage time.Duration - if reqMaxUsage, err = ev.GetDuration(utils.Usage); err != nil { + if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil { if err != utils.ErrNotFound { return } @@ -1230,12 +1191,12 @@ func (sS *SessionS) updateSession(s *Session) (maxUsage time.Duration, err error s.RLock() var maxUsageSet bool // so we know if we have set the 0 on purpose prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} - for _, sr := range s.SRuns { + for i, sr := range s.SRuns { var rplyMaxUsage time.Duration if !utils.IsSliceMember(prepaidReqs, sr.Event.GetStringIgnoreErrors(utils.RequestType)) { rplyMaxUsage = time.Duration(-1) - } else if rplyMaxUsage, err = s.debit(reqMaxUsage, + } else if rplyMaxUsage, err = sS.debitSession(s, i, reqMaxUsage, sr.Event.GetDurationPtrIgnoreErrors(utils.LastUsed)); err != nil { return } @@ -1253,28 +1214,28 @@ func (sS *SessionS) updateSession(s *Session) (maxUsage time.Duration, err error // endSession will end a session from outside func (sS *SessionS) endSession(s *Session, tUsage *time.Duration) (err error) { s.Lock() // no need to release it untill end since the session should be anyway closed - sS.unregisterSession(cgrID, false) - for i, sr := range s.SRuns { - sUsage := s.TotalUsage + sS.unregisterSession(s.CGRID, false) + for sRunIdx, sr := range s.SRuns { + sUsage := sr.TotalUsage if tUsage != nil { sUsage = *tUsage - s.TotalUsage = *tUsage + sr.TotalUsage = *tUsage } - if s.stopDebit != nil { - close(s.stopDebit) // Stop automatic debits - s.stopDebit = nil + if s.debitStop != nil { + close(s.debitStop) // Stop automatic debits + s.debitStop = nil } if sr.EventCost != nil { if notCharged := sUsage - sr.EventCost.GetUsage(); notCharged > 0 { // we did not charge enough, make a manual debit here if sr.CD.LoopIndex > 0 { sr.CD.TimeStart = sr.CD.TimeEnd } - sr.CD.TimeEnd = self.CD.TimeStart.Add(notCharged) + sr.CD.TimeEnd = sr.CD.TimeStart.Add(notCharged) sr.CD.DurationIndex += notCharged cc := new(engine.CallCost) - if err = sS.ralS.Call(utils.ResponderDebit, self.CD, cc); err == nil { + if err = sS.ralS.Call(utils.ResponderDebit, sr.CD, cc); err == nil { sr.EventCost.Merge( - engine.NewEventCostFromCallCost(cc, sr.CGRID, + engine.NewEventCostFromCallCost(cc, s.CGRID, sr.Event.GetStringIgnoreErrors(utils.RunID))) } } else if notCharged < 0 { // charged too much, try refund @@ -1282,14 +1243,14 @@ func (sS *SessionS) endSession(s *Session, tUsage *time.Duration) (err error) { utils.Logger.Warning( fmt.Sprintf( "<%s> failed refunding session: <%s>, srIdx: <%d>, error: <%s>", - utils.SessionS, cgrID, i, err.Error())) + utils.SessionS, s.CGRID, sRunIdx, err.Error())) } } } - if err := sS.storeSCost(s, i); err != nil { + if err := sS.storeSCost(s, sRunIdx); err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> failed storing session cost for <%s>, error: <%s>", - utils.SessionS, cgrID, s.RunID, err.Error())) + fmt.Sprintf("<%s> failed storing session cost for <%s>, srIdx: <%d>, error: <%s>", + utils.SessionS, s.CGRID, sRunIdx, err.Error())) } } s.Unlock() @@ -1300,18 +1261,18 @@ func (sS *SessionS) endSession(s *Session, tUsage *time.Duration) (err error) { func (sS *SessionS) chargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time.Duration, err error) { cgrID := GetSetCGRID(ev) var s *Session - if s, err = sS.initSession(tnt, cgrID, ev, "", "", 0); err != nil { + if s, err = sS.initSession(tnt, ev, "", "", 0); err != nil { return } - if maxUsage, err = sS.updateSession(s); err != nil { - if errEnd := sS.endSession(cgrID, 0); errEnd != nil { + if maxUsage, err = sS.updateSession(s, ev.AsMapInterface()); err != nil { + if errEnd := sS.endSession(s, utils.DurationPointer(time.Duration(0))); errEnd != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error when force-ending charged event: <%s>, err: <%s>", utils.SessionS, cgrID, err.Error())) } return } - if errEnd := sS.endSession(cgrID, maxUsage); errEnd != nil { + if errEnd := sS.endSession(s, utils.DurationPointer(maxUsage)); errEnd != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error when ending charged event: <%s>, err: <%s>", utils.SessionS, cgrID, err.Error())) @@ -1320,7 +1281,6 @@ func (sS *SessionS) chargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time. } func (sS *SessionS) processCDR(tnt string, ev *engine.SafEvent) (err error) { - cgrID := GetSetCGRID(ev) cgrEv := &utils.CGREvent{ Tenant: tnt, ID: utils.UUIDSha1Prefix(), @@ -1345,7 +1305,7 @@ func (sS *SessionS) CallBiRPC(clnt rpcclient.RpcClientConnection, return rpcclient.ErrUnsupporteServiceMethod } // get method BiRPCV1.Method - method := reflect.ValueOf(smg).MethodByName( + method := reflect.ValueOf(sS).MethodByName( "BiRPC" + parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version V1 in the method name and add prefix if !method.IsValid() { return rpcclient.ErrUnsupporteServiceMethod @@ -1443,10 +1403,10 @@ func (sS *SessionS) BiRPCv1GetPassiveSessionsCount(clnt rpcclient.RpcClientConne } // BiRPCv1SetPassiveSessions used for replicating Sessions -func (sS *SessionS) BiRPCv1SetPassiveSessions(clnt rpcclient.RpcClientConnection, +func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, s *Session, reply *string) (err error) { if s.CGRID == "" { - return utils.NewErrMandatoryIeMissing([]string{utils.CGRID}) + return utils.NewErrMandatoryIeMissing(utils.CGRID) } if s.EventStart == nil { // remove instead of if removed := sS.unregisterSession(s.CGRID, true); !removed { @@ -1470,21 +1430,24 @@ type ArgsReplicateSessions struct { func (sS *SessionS) BiRPCv1ReplicateSessions(clnt rpcclient.RpcClientConnection, args ArgsReplicateSessions, reply *string) (err error) { cacheKey := "BiRPCv1ReplicateSessions" + args.CGRID - if item, err := sS.responseCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*string)), item.Err + if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { + if item.Err == nil { + *reply = *item.Value.(*string) + } + return item.Err } - defer sS.responseCache.Cache(cacheKey, + defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: reply, Err: err}) sSConns := sS.sReplConns if len(args.Connections) != 0 { if sSConns, err = NewSReplConns(args.Connections, - smg.cgrCfg.GeneralCfg().Reconnects, - smg.cgrCfg.GeneralCfg().ConnectTimeout, - smg.cgrCfg.GeneralCfg().ReplyTimeout); err != nil { + sS.cgrCfg.GeneralCfg().Reconnects, + sS.cgrCfg.GeneralCfg().ConnectTimeout, + sS.cgrCfg.GeneralCfg().ReplyTimeout); err != nil { return utils.NewErrServerError(err) } } - if err = sS.replicateSessions(cgrID, args.Passive, sSConns); err != nil { + if err = sS.replicateSessions(args.CGRID, args.Passive, sSConns); err != nil { return utils.NewErrServerError(err) } *reply = utils.OK @@ -1576,7 +1539,10 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, } cacheKey := "BiRPCv1AuthorizeEventWithDigest" + args.CGREvent.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*V1AuthorizeReplyWithDigest)), item.Err + if item.Err == nil { + *authReply = *item.Value.(*V1AuthorizeReply) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: authReply, Err: err}) @@ -1586,14 +1552,14 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing("subsystems") } if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = smg.cgrCfg.GeneralCfg().DefaultTenant + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } if args.GetAttributes { if sS.attrS == nil { return utils.NewErrNotConnected(utils.AttributeS) } if args.CGREvent.Context == nil { // populate if not already in - args.CGREvent.Context = utils.StringPointer(utils.MetaSessions) + args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS) } attrArgs := &engine.AttrArgsProcessEvent{ CGREvent: args.CGREvent, @@ -1679,7 +1645,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.StatService) } var statReply []string - if err := smg.statS.Call(utils.StatSv1ProcessEvent, + if err := sS.statS.Call(utils.StatSv1ProcessEvent, &engine.StatsArgsProcessEvent{CGREvent: args.CGREvent}, &statReply); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -1805,7 +1771,10 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, } cacheKey := "BiRPCv1InitiateSession" + args.CGREvent.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*V1InitSessionReply)), item.Err + if item.Err == nil { + *rply = *item.Value.(*V1InitSessionReply) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: rply, Err: err}) @@ -1814,7 +1783,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing("subsystems") } if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = smg.cgrCfg.GeneralCfg().DefaultTenant + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } originID, _ := args.CGREvent.FieldAsString(utils.OriginID) if args.GetAttributes { @@ -1822,7 +1791,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } if args.CGREvent.Context == nil { // populate if not already in - args.CGREvent.Context = utils.StringPointer(utils.MetaSessions) + args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS) } attrArgs := &engine.AttrArgsProcessEvent{ CGREvent: args.CGREvent, @@ -1869,7 +1838,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, if err != nil { return utils.NewErrRALs(err) } - if maxUsage, err := sS.updateSession(s); err != nil { + if maxUsage, err := sS.updateSession(s, nil); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -1893,7 +1862,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, rply.ThresholdIDs = &tIDs } if args.ProcessStats { - if smg.statS == nil { + if sS.statS == nil { return utils.NewErrNotConnected(utils.StatService) } var statReply []string @@ -2001,7 +1970,10 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, } cacheKey := "BiRPCv1UpdateSession" + args.CGREvent.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*V1UpdateSessionReply)), item.Err + if item.Err == nil { + *rply = *item.Value.(*V1UpdateSessionReply) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: rply, Err: err}) @@ -2010,14 +1982,14 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing("subsystems") } if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = smg.cgrCfg.GeneralCfg().DefaultTenant + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } if args.GetAttributes { if sS.attrS == nil { return utils.NewErrNotConnected(utils.AttributeS) } if args.CGREvent.Context == nil { // populate if not already in - args.CGREvent.Context = utils.StringPointer(utils.MetaSessions) + args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS) } attrArgs := &engine.AttrArgsProcessEvent{ CGREvent: args.CGREvent, @@ -2041,7 +2013,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, } ev := engine.NewSafEvent(args.CGREvent.Event) cgrID := GetSetCGRID(ev) - ss = sS.getRelocateSessions(cgrID, + ss := sS.getRelocateSessions(cgrID, me.GetStringIgnoreErrors(utils.InitialOriginID), me.GetStringIgnoreErrors(utils.OriginID), me.GetStringIgnoreErrors(utils.OriginHost)) @@ -2055,7 +2027,7 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, } else { s = ss[0] } - if maxUsage, err := sS.updateSession(s); err != nil { + if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -2090,7 +2062,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, } cacheKey := "BiRPCv1TerminateSession" + args.CGREvent.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*V1TerminateSessionArgs)), item.Err + if item.Err == nil { + *rply = *item.Value.(*string) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: rply, Err: err}) @@ -2099,22 +2074,23 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing("subsystems") } if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = smg.cgrCfg.GeneralCfg().DefaultTenant + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } + ev := engine.NewSafEvent(args.CGREvent.Event) me := engine.NewMapEvent(args.CGREvent.Event) // used for easy access to fields within the event + cgrID := GetSetCGRID(ev) originID := me.GetStringIgnoreErrors(utils.OriginID) if args.TerminateSession { if originID == "" { return utils.NewErrMandatoryIeMissing(utils.OriginID) } - ev := engine.NewSafEvent(args.CGREvent.Event) - dbtItvl := smg.cgrCfg.SessionSCfg().DebitInterval + dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { return utils.NewErrRALs(err) } } - ss = sS.getRelocateSessions(cgrID, + ss := sS.getRelocateSessions(cgrID, me.GetStringIgnoreErrors(utils.InitialOriginID), me.GetStringIgnoreErrors(utils.OriginID), me.GetStringIgnoreErrors(utils.OriginHost)) @@ -2185,18 +2161,20 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, // BiRPCv1ProcessCDR sends the CDR to CDRs func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, - cgrEv *utils.CGREvent, rply *string) error { - if args.CGREvent.ID == "" { - args.CGREvent.ID = utils.GenUUID() + cgrEv *utils.CGREvent, rply *string) (err error) { + if cgrEv.ID == "" { + cgrEv.ID = utils.GenUUID() } - cacheKey := "BiRPCv1ProcessCDR" + args.CGREvent.ID + cacheKey := "BiRPCv1ProcessCDR" + cgrEv.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*string)), item.Err + if item.Err == nil { + *rply = *item.Value.(*string) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: rply, Err: err}) - GetSetCGRID(ev) return sS.cdrS.Call(utils.CdrsV2ProcessCDR, cgrEv, rply) } @@ -2263,13 +2241,16 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, } cacheKey := "BiRPCv1ProcessEvent" + args.CGREvent.ID if item, err := sS.respCache.Get(cacheKey); err == nil && item != nil { - return (item.Value.(*V1ProcessEventReply)), item.Err + if item.Err == nil { + *rply = *item.Value.(*V1ProcessEventReply) + } + return item.Err } defer sS.respCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: rply, Err: err}) if args.CGREvent.Tenant == "" { - args.CGREvent.Tenant = smg.cgrCfg.GeneralCfg().DefaultTenant + args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } me := engine.NewMapEvent(args.CGREvent.Event) originID := me.GetStringIgnoreErrors(utils.OriginID) @@ -2279,7 +2260,7 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, return utils.NewErrNotConnected(utils.AttributeS) } if args.CGREvent.Context == nil { // populate if not already in - args.CGREvent.Context = utils.StringPointer(utils.MetaSessions) + args.CGREvent.Context = utils.StringPointer(utils.MetaSessionS) } attrArgs := &engine.AttrArgsProcessEvent{ CGREvent: args.CGREvent, @@ -2375,27 +2356,46 @@ func (sS *SessionS) BiRPCV1ForceDisconnect(clnt rpcclient.RpcClientConnection, return utils.ErrNotFound } for _, as := range aSs { - sS.forceSTerminate(aSs.CGRID, 0, nil) + ss := sS.getSessions(as.CGRID, false) + if len(ss) == 0 { + continue + } + if errTerm := sS.forceSTerminate(ss[0], 0, nil); errTerm != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> failed force-terminating session with id: <%s>, err: <%s>", + utils.SessionS, ss[0].CGRid(), errTerm.Error())) + err = utils.ErrPartiallyExecuted + } + } + if err != nil { + *reply = utils.OK } - *reply = utils.OK return nil } // BiRPCV1GetMaxUsage returns the maximum usage as seconds, compatible with OpenSIPS 2.3 // DEPRECATED, it will be removed in future versions func (sS *SessionS) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection, - ev engine.MapEvent, maxUsage *float64) error { - maxUsageDur, err := smg.GetMaxUsage( - utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), - smg.cgrCfg.GeneralCfg().DefaultTenant), - engine.NewSafEvent(ev)) - if err != nil { - return utils.NewErrServerError(err) + ev engine.MapEvent, maxUsage *float64) (err error) { + var rply *V1AuthorizeReply + if err = sS.BiRPCv1AuthorizeEvent( + clnt, + &V1AuthorizeArgs{ + GetMaxUsage: true, + CGREvent: utils.CGREvent{ + Tenant: utils.FirstNonEmpty( + ev.GetStringIgnoreErrors(utils.Tenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), + ID: utils.UUIDSha1Prefix(), + Event: ev}}, + rply); err != nil { + return } - if maxUsageDur == time.Duration(-1) { + if *rply.MaxUsage == time.Duration(-1) { *maxUsage = -1.0 } else { - *maxUsage = maxUsageDur.Seconds() + *maxUsage = rply.MaxUsage.Seconds() } return nil } @@ -2410,10 +2410,10 @@ func (sS *SessionS) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, clnt, &V1InitSessionArgs{ InitSession: true, - CGREvent: &utils.CGREvent{ + CGREvent: utils.CGREvent{ Tenant: utils.FirstNonEmpty( ev.GetStringIgnoreErrors(utils.Tenant), - smg.cgrCfg.GeneralCfg().DefaultTenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), ID: utils.UUIDSha1Prefix(), Event: ev}}, rply); err != nil { @@ -2422,7 +2422,7 @@ func (sS *SessionS) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, if *rply.MaxUsage == time.Duration(-1) { *maxUsage = -1.0 } else { - *maxUsage = *rply.MaxUsage.Seconds() + *maxUsage = rply.MaxUsage.Seconds() } return } @@ -2435,12 +2435,12 @@ func (sS *SessionS) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, var rply *V1UpdateSessionReply if err = sS.BiRPCv1UpdateSession( clnt, - &V1InitSessionArgs{ + &V1UpdateSessionArgs{ UpdateSession: true, - CGREvent: &utils.CGREvent{ + CGREvent: utils.CGREvent{ Tenant: utils.FirstNonEmpty( ev.GetStringIgnoreErrors(utils.Tenant), - smg.cgrCfg.GeneralCfg().DefaultTenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), ID: utils.UUIDSha1Prefix(), Event: ev}}, rply); err != nil { @@ -2449,7 +2449,7 @@ func (sS *SessionS) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, if *rply.MaxUsage == time.Duration(-1) { *maxUsage = -1.0 } else { - *maxUsage = *rply.MaxUsage.Seconds() + *maxUsage = rply.MaxUsage.Seconds() } return } @@ -2463,10 +2463,10 @@ func (sS *SessionS) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, clnt, &V1TerminateSessionArgs{ TerminateSession: true, - CGREvent: &utils.CGREvent{ + CGREvent: utils.CGREvent{ Tenant: utils.FirstNonEmpty( ev.GetStringIgnoreErrors(utils.Tenant), - smg.cgrCfg.GeneralCfg().DefaultTenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), ID: utils.UUIDSha1Prefix(), Event: ev}}, rply) @@ -2477,12 +2477,12 @@ func (sS *SessionS) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, // Kept for compatibility with OpenSIPS 2.3 func (sS *SessionS) BiRPCV1ProcessCDR(clnt rpcclient.RpcClientConnection, ev engine.MapEvent, rply *string) (err error) { - return sS.ProcessCDR( + return sS.BiRPCv1ProcessCDR( clnt, &utils.CGREvent{ Tenant: utils.FirstNonEmpty( ev.GetStringIgnoreErrors(utils.Tenant), - smg.cgrCfg.GeneralCfg().DefaultTenant), + sS.cgrCfg.GeneralCfg().DefaultTenant), ID: utils.UUIDSha1Prefix(), Event: ev}, rply) diff --git a/utils/consts.go b/utils/consts.go index e27636867..5d1b3724d 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -771,7 +771,7 @@ const ( SessionSv1GetActiveSessions = "SessionSv1.GetActiveSessions" SessionSv1ForceDisconnect = "SessionSv1.ForceDisconnect" SessionSv1GetPassiveSessions = "SessionSv1.GetPassiveSessions" - SessionSv1SetPassiveSessions = "SessionSV1.SetPassiveSessions" + SessionSv1SetPassiveSession = "SessionSV1.SetPassiveSession" SMGenericV1InitiateSession = "SMGenericV1.InitiateSession" SMGenericV2InitiateSession = "SMGenericV2.InitiateSession" SMGenericV2UpdateSession = "SMGenericV2.UpdateSession"