diff --git a/engine/safevent.go b/engine/safevent.go index a9292d7db..678f26cae 100644 --- a/engine/safevent.go +++ b/engine/safevent.go @@ -110,6 +110,15 @@ func (se *SafEvent) GetDuration(fldName string) (d time.Duration, err error) { return } +// GetDurationPointer returns pointer towards duration, useful to detect presence of duration +func (se *SafEvent) GetDurationOrDefault(fldName string, dflt time.Duration) (d time.Duration, err error) { + _, has := se.Get(fldName) + if !has { + return dflt, nil + } + return se.GetDuration(fldName) +} + // GetDuration returns a field as Duration, ignoring errors func (se *SafEvent) GetDurationIgnoreErrors(fldName string) (d time.Duration) { d, _ = se.GetDuration(fldName) diff --git a/sessions/data_it_test.go b/sessions/data_it_test.go index ba7e1e8db..48cc27afc 100644 --- a/sessions/data_it_test.go +++ b/sessions/data_it_test.go @@ -638,7 +638,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { if maxUsage != 1024 { t.Error("Bad max usage: ", maxUsage) } - eAcntVal = 100352.000000 // 1054720 + eAcntVal = 100352.000000 if err := smgRPC.Call("ApierV2.GetAccount", acntAttrs, &acnt); err != nil { t.Error(err) } else if dataVal := acnt.BalanceMap[utils.DATA].GetTotalValue(); dataVal != eAcntVal { @@ -675,7 +675,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) { if maxUsage != 0 { t.Error("Bad max usage: ", maxUsage) } - eAcntVal = 100352.000000 // 1054720 + eAcntVal = 100352.000000 if err := smgRPC.Call("ApierV2.GetAccount", acntAttrs, &acnt); err != nil { t.Error(err) } else if dataVal := acnt.BalanceMap[utils.DATA].GetTotalValue(); dataVal != eAcntVal { diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 9d65597b9..4ba22c691 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -26,7 +26,8 @@ import ( "github.com/cgrates/cgrates/utils" ) -// getSessionTTL retrieves SessionTTL setting out of S +// getSessionTTL retrieves SessionTTL setting out of ev +// if SessionTTLMaxDelay is present in ev, the return is randomized func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration, cfgSessionTTLMaxDelay *time.Duration) (ttl time.Duration, err error) { if ttl, err = ev.GetDuration(utils.SessionTTL); err != nil { diff --git a/sessions/session.go b/sessions/session.go index 51ebad696..62774fabe 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -32,12 +32,12 @@ import ( // One session handled by SM type SMGSession struct { - mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed - stopDebit chan struct{} // Channel to communicate with debit loops when closing the session - clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. - rals rpcclient.RpcClientConnection // Connector to rals service - cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service - clientProto float64 + sync.RWMutex // protects the SMGSession in places where is concurrently accessed + stopDebit chan struct{} // Channel to communicate with debit loops when closing the session + clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. + rals rpcclient.RpcClientConnection // Connector to rals service + cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service + clientProto float64 Tenant string // store original Tenant so we can use it in API calls CGRID string // Unique identifier for this session @@ -49,11 +49,10 @@ type SMGSession struct { CD *engine.CallDescriptor // initial CD used for debits, updated on each debit EventCost *engine.EventCost - ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked + ExtraDuration time.Duration // keeps the current duration debited on top of what has been asked LastUsage time.Duration // last requested Duration LastDebit time.Duration // last real debited duration TotalUsage time.Duration // sum of lastUsage - } // Clone returns the cloned version of SMGSession @@ -111,8 +110,8 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) { // Attempts to debit a duration, returns maximum duration which can be debitted or error func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.Duration, error) { - self.mux.Lock() - defer self.mux.Unlock() + self.Lock() + defer self.Unlock() requestedDuration := dur if lastUsed != nil { self.ExtraDuration = self.LastDebit - *lastUsed @@ -128,9 +127,8 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time. } else { self.LastUsage = requestedDuration self.TotalUsage += self.LastUsage - ccDuration := self.ExtraDuration // fake ccDuration self.ExtraDuration -= dur - return ccDuration, nil + return requestedDuration, nil } initialExtraDuration := self.ExtraDuration self.ExtraDuration = 0 @@ -199,8 +197,8 @@ func (self *SMGSession) disconnectSession(reason string) error { // Session has ended, check debits and refund the extra charged duration func (self *SMGSession) close(usage time.Duration) (err error) { - self.mux.Lock() - defer self.mux.Unlock() + self.Lock() + defer self.Unlock() if self.EventCost == nil { return } @@ -273,8 +271,8 @@ func (self *SMGSession) storeSMCost() error { if self.EventCost == nil { return nil // There are no costs to save, ignore the operation } - self.mux.Lock() - self.mux.Unlock() + self.Lock() + self.Unlock() smCost := &engine.V2SMCost{ CGRID: self.CGRID, CostSource: utils.MetaSessionS, @@ -298,8 +296,8 @@ func (self *SMGSession) storeSMCost() error { } func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession { - self.mux.RLock() - defer self.mux.RUnlock() + self.RLock() + defer self.RUnlock() aSession := &ActiveSession{ CGRID: self.CGRID, TOR: self.EventStart.GetStringIgnoreErrors(utils.ToR), diff --git a/sessions/sessions.go b/sessions/sessions.go index c914c24f5..a646d86a4 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -347,8 +347,8 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) { } idxMux.Lock() defer idxMux.Unlock() - s.mux.RLock() - defer s.mux.RUnlock() + s.RLock() + defer s.RUnlock() for fieldName := range smg.ssIdxCfg { fieldVal, err := s.EventStart.GetString(fieldName) if err != nil { @@ -591,14 +591,10 @@ func (smg *SMGeneric) v2ForkSessions(tnt string, evStart *engine.SafEvent, } // sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request -func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent, +func (smg *SMGeneric) sessionStart(tnt, cgrID string, evStart *engine.SafEvent, clntConn rpcclient.RpcClientConnection, resourceID string, - debitInterval time.Duration) (err error) { - cgrID := evStart.GetStringIgnoreErrors(utils.CGRID) - _, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level - if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 { - return nil, nil // ToDo: handle here also debits - } + dbtItval time.Duration) (err error) { + guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level var ss []*SMGSession if smg.chargerS == nil { // old way of session forking ss, err = smg.v1ForkSessions(tnt, evStart, clntConn, cgrID, resourceID, false) @@ -612,9 +608,9 @@ func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent, for _, s := range ss { smg.recordASession(s) if s.RunID != utils.META_NONE && - debitInterval != 0 { + dbtItval != 0 { s.stopDebit = stopDebitChan - go s.debitLoop(debitInterval) + go s.debitLoop(dbtItval) } } return nil, nil @@ -622,9 +618,90 @@ func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent, return } +// sessionUpdate will reset terminator, perform debits and replicate sessions +func (smg *SMGeneric) sessionUpdate(tnt, cgrID string, ev *engine.SafEvent, + clnt rpcclient.RpcClientConnection, resourceID string, + dbtItval time.Duration) (maxUsage time.Duration, err error) { + guardian.Guardian.Guard(func() (iface interface{}, errGuard error) { // Lock it on CGRID level + // make sure the session exists, otherwise create + aSessions := smg.getSessions(cgrID, false) + if len(aSessions) == 0 { + if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { + if ev.HasField(utils.InitialOriginID) { + initialCGRID := utils.Sha1( + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) + err = smg.sessionRelocate(initialCGRID, + cgrID, ev.GetStringIgnoreErrors(utils.OriginID)) + if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update + err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItval) + } + if err != nil { + return + } + smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns) // report changes + aSessions = smg.getSessions(cgrID, false) // try again to populate after starting above + if len(aSessions) == 0 { + utils.Logger.Err( + fmt.Sprintf("<%s> no active sessions for event: <%s>", + utils.SessionS, cgrID)) + err = rpcclient.ErrSessionNotFound + return + } + } + } + } + defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns) + + var sesTTL, evLastUsed time.Duration + if sesTTL, err = getSessionTTL(ev, smg.cgrCfg.SessionSCfg().SessionTTL, + smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay); err != nil { + return + } + var ttlLastUsed, ttlUsage, lastUsed *time.Duration + if ttlLastUsed, err = ev.GetDurationPtrOrDefault(utils.SessionTTLLastUsed, + smg.cgrCfg.SessionSCfg().SessionTTLLastUsed); err != nil { + return + } + if ttlUsage, err = ev.GetDurationPtrOrDefault(utils.SessionTTLUsage, + smg.cgrCfg.SessionSCfg().SessionTTLUsage); err != nil { + return + } + smg.resetTerminatorTimer(cgrID, sesTTL, ttlLastUsed, ttlUsage) + if evLastUsed, err = ev.GetDuration(utils.LastUsed); err == nil { + lastUsed = &evLastUsed + } else if err != utils.ErrNotFound { + return + } + if maxUsage, err = ev.GetDuration(utils.Usage); err != nil { + if err != utils.ErrNotFound { + return + } + maxUsage = smg.cgrCfg.SessionSCfg().MaxCallDuration + err = nil + } + for _, s := range aSessions[cgrID] { + var maxDur time.Duration + var maxUsageSet bool + if s.RunID == utils.META_NONE { + maxDur = time.Duration(-1) + } else if maxDur, err = s.debit(maxUsage, lastUsed); err != nil { + return + } + if maxDur == time.Duration(-1) && !maxUsageSet { + maxUsage = maxDur + } else if maxDur < maxUsage { + maxUsage = maxDur + } + } + return + }, smg.cgrCfg.GeneralCfg().LockingTimeout, cgrID) + return +} + // sessionEnd will end a session from outside -func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { - _, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level +func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) (err error) { + guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level ss := smg.getSessions(cgrID, false) if len(ss) == 0 { if ss = smg.passiveToActive(cgrID); len(ss) == 0 { @@ -657,12 +734,12 @@ func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error { } return nil, nil }, smg.cgrCfg.GeneralCfg().LockingTimeout, cgrID) - return err + return } // sessionRelocate is used when an update will relocate an initial session (eg multiple data streams) -func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error { - _, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level +func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) (err error) { + guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here return nil, utils.ErrMandatoryIeMissing } @@ -680,11 +757,11 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro } } for i, s := range ss[initialID] { - s.mux.Lock() + s.Lock() s.CGRID = cgrID // Overwrite initial CGRID with new one s.EventStart.Set(utils.CGRID, cgrID) // Overwrite CGRID for final CDR s.EventStart.Set(utils.OriginID, newOriginID) // Overwrite OriginID for session indexing - s.mux.Unlock() + s.Unlock() smg.recordASession(s) if i == 0 { smg.unrecordASession(initialID) @@ -692,7 +769,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro } return nil, nil }, smg.cgrCfg.GeneralCfg().LockingTimeout, initialID) - return err + return } // replicateSessions will replicate session based on configuration @@ -710,7 +787,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool ssMux.RLock() ss := ssMp[cgrID] if len(ss) != 0 { - ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock + ss[0].RLock() // lock session so we can clone it after releasing the map lock } ssMux.RUnlock() ssCln := make([]*SMGSession, len(ss)) @@ -718,7 +795,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool ssCln[i] = s.Clone() } if len(ss) != 0 { - ss[0].mux.RUnlock() + ss[0].RUnlock() } var wg sync.WaitGroup for _, rplConn := range smgReplConns { @@ -880,23 +957,23 @@ func (smg *SMGeneric) asActiveSessions(fltrs map[string]string, count, passiveSe // Methods to apply on sessions, mostly exported through RPC/Bi-RPC -// MaxUsage calculates maximum usage allowed for given gevent -func (smg *SMGeneric) GetMaxUsage(tnt string, gev *engine.SafEvent) (maxUsage time.Duration, err error) { - cgrID := GetSetCGRID(gev) +// MaxUsage calculates maximum usage allowed for given event +func (smg *SMGeneric) GetMaxUsage(tnt string, ev *engine.SafEvent) (maxUsage time.Duration, err error) { + cgrID := GetSetCGRID(ev) cacheKey := "MaxUsage" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return (item.Value.(time.Duration)), item.Err } defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err}) - if has := gev.HasField(utils.Usage); !has { // make sure we have a minimum duration configured - gev.Set(utils.Usage, smg.cgrCfg.SessionSCfg().MaxCallDuration) + if has := ev.HasField(utils.Usage); !has { // make sure we have a minimum duration configured + ev.Set(utils.Usage, smg.cgrCfg.SessionSCfg().MaxCallDuration) } // fork sessions var ss []*SMGSession if smg.chargerS == nil { // old way of session forking - ss, err = smg.v1ForkSessions(tnt, gev, nil, cgrID, "", true) + ss, err = smg.v1ForkSessions(tnt, ev, nil, cgrID, "", true) } else { - ss, err = smg.v2ForkSessions(tnt, gev, nil, cgrID, "", true) + ss, err = smg.v2ForkSessions(tnt, ev, nil, cgrID, "", true) } if err != nil { return @@ -924,9 +1001,10 @@ func (smg *SMGeneric) GetMaxUsage(tnt string, gev *engine.SafEvent) (maxUsage ti } // Called on session start -func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent, - clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) { - cgrID := GetSetCGRID(gev) +func (smg *SMGeneric) InitiateSession(tnt string, ev *engine.SafEvent, + clnt rpcclient.RpcClientConnection, resourceID string, + dbtItval time.Duration) (maxUsage time.Duration, err error) { + cgrID := GetSetCGRID(ev) cacheKey := "InitiateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err @@ -934,15 +1012,15 @@ func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent, defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err}) // schedule response caching smg.deletePassiveSessions(cgrID) - if err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { + if err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItval); err != nil { smg.sessionEnd(cgrID, 0) return } - if smg.cgrCfg.SessionSCfg().DebitInterval != 0 { // Session handled by debit loop + if dbtItval != 0 { // Session handled by debit loop maxUsage = time.Duration(-1) return } - maxUsage, err = smg.UpdateSession(tnt, gev, clnt, resourceID) + maxUsage, err = smg.sessionUpdate(tnt, cgrID, ev, clnt, resourceID, dbtItval) if err != nil || maxUsage == 0 { smg.sessionEnd(cgrID, 0) } @@ -950,105 +1028,41 @@ func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent, } // Execute debits for usage/maxUsage -func (smg *SMGeneric) UpdateSession(tnt string, gev *engine.SafEvent, - clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) { - cgrID := GetSetCGRID(gev) +func (smg *SMGeneric) UpdateSession(tnt string, ev *engine.SafEvent, + clnt rpcclient.RpcClientConnection, resourceID string, + dbtItval time.Duration) (maxUsage time.Duration, err error) { + cgrID := GetSetCGRID(ev) cacheKey := "UpdateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err } defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err}) - if smg.cgrCfg.SessionSCfg().DebitInterval != 0 { // Not possible to update a session with debit loop active - err = errors.New("ACTIVE_DEBIT_LOOP") - return - } - if gev.HasField(utils.InitialOriginID) { - initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID), - gev.GetStringIgnoreErrors(utils.OriginHost)) - err = smg.sessionRelocate(initialCGRID, - cgrID, gev.GetStringIgnoreErrors(utils.OriginID)) - if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update - err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval) - } - if err != nil { - return - } - smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns) - } - sesTTL, err := getSessionTTL(gev, smg.cgrCfg.SessionSCfg().SessionTTL, - smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay) + maxUsage, err = smg.sessionUpdate(tnt, cgrID, ev, clnt, resourceID, dbtItval) if err != nil { - return maxUsage, err - } - ttlLastUsed, err := gev.GetDurationPtrOrDefault(utils.SessionTTLLastUsed, - smg.cgrCfg.SessionSCfg().SessionTTLLastUsed) - if err != nil { - return maxUsage, err - } - ttlUsage, err := gev.GetDurationPtrOrDefault(utils.SessionTTLUsage, - smg.cgrCfg.SessionSCfg().SessionTTLUsage) - if err != nil { - return maxUsage, err - } - smg.resetTerminatorTimer(cgrID, sesTTL, ttlLastUsed, ttlUsage) - var lastUsed *time.Duration - var evLastUsed time.Duration - if evLastUsed, err = gev.GetDuration(utils.LastUsed); err == nil { - lastUsed = &evLastUsed - } else if err != utils.ErrNotFound { - return - } - if maxUsage, err = gev.GetDuration(utils.Usage); err != nil { - if err != utils.ErrNotFound { - return - } - err = nil - maxUsage = smg.cgrCfg.SessionSCfg().MaxCallDuration - return - } - aSessions := smg.getSessions(cgrID, false) - if len(aSessions) == 0 { - if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { - utils.Logger.Err( - fmt.Sprintf("<%s> SessionUpdate with no active sessions for event: <%s>", - utils.SessionS, cgrID)) - err = rpcclient.ErrSessionNotFound - return - } - } - defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns) - for _, s := range aSessions[cgrID] { - if s.RunID == utils.META_NONE { - maxUsage = time.Duration(-1) - continue - } - var maxDur time.Duration - if maxDur, err = s.debit(maxUsage, lastUsed); err != nil { - return - } else if maxDur < maxUsage { - maxUsage = maxDur - } + smg.sessionEnd(cgrID, 0) } return } // Called on session end, should stop debit loop -func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent, - clnt rpcclient.RpcClientConnection, resourceID string) (err error) { - cgrID := GetSetCGRID(gev) +func (smg *SMGeneric) TerminateSession(tnt string, ev *engine.SafEvent, + clnt rpcclient.RpcClientConnection, resourceID string, + dbtItvl time.Duration) (err error) { + cgrID := GetSetCGRID(ev) cacheKey := "TerminateSession" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Err } defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Err: err}) - if gev.HasField(utils.InitialOriginID) { - initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID), - gev.GetStringIgnoreErrors(utils.OriginHost)) + if ev.HasField(utils.InitialOriginID) { + initialCGRID := utils.Sha1( + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) err = smg.sessionRelocate(initialCGRID, cgrID, - gev.GetStringIgnoreErrors(utils.OriginID)) + ev.GetStringIgnoreErrors(utils.OriginID)) if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update - err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval) + err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItvl) } if err != nil && err != utils.ErrMandatoryIeMissing { return @@ -1056,8 +1070,8 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent, smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns) } sessionIDs := []string{cgrID} - if gev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered - sessionIDPrefix := gev.GetStringIgnoreErrors(utils.OriginIDPrefix) + if ev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered + sessionIDPrefix := ev.GetStringIgnoreErrors(utils.OriginIDPrefix) if sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, false); len(sessionIDs) == 0 { sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, true) for _, sessionID := range sessionIDs { // activate sessions for prefix @@ -1065,14 +1079,14 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent, } } } - usage, errUsage := gev.GetDuration(utils.Usage) + usage, errUsage := ev.GetDuration(utils.Usage) var lastUsed time.Duration if errUsage != nil { if errUsage != utils.ErrNotFound { err = errUsage return } - lastUsed, err = gev.GetDuration(utils.LastUsed) + lastUsed, err = ev.GetDuration(utils.LastUsed) if err != nil { if err == utils.ErrNotFound { err = utils.ErrMandatoryIeMissing @@ -1085,7 +1099,7 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent, aSessions := smg.getSessions(sessionID, false) if len(aSessions) == 0 { if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 { - utils.Logger.Err(fmt.Sprintf("<%s> SessionTerminate with no active sessions for cgrID: <%s>", utils.SessionS, cgrID)) + utils.Logger.Err(fmt.Sprintf("<%s> terminate with no active sessions for cgrID: <%s>", utils.SessionS, cgrID)) continue } } @@ -1107,8 +1121,8 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent, } // Processes one time events (eg: SMS) -func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage time.Duration, err error) { - cgrID := GetSetCGRID(gev) +func (smg *SMGeneric) ChargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time.Duration, err error) { + cgrID := GetSetCGRID(ev) cacheKey := "ChargeEvent" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Value.(time.Duration), item.Err @@ -1117,9 +1131,9 @@ func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage ti // fork sessions var ss []*SMGSession if smg.chargerS == nil { // old way of session forking - ss, err = smg.v1ForkSessions(tnt, gev, nil, cgrID, "", false) + ss, err = smg.v1ForkSessions(tnt, ev, nil, cgrID, "", false) } else { - ss, err = smg.v2ForkSessions(tnt, gev, nil, cgrID, "", false) + ss, err = smg.v2ForkSessions(tnt, ev, nil, cgrID, "", false) } if err != nil { return @@ -1167,8 +1181,8 @@ func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage ti return } -func (smg *SMGeneric) ProcessCDR(tnt string, gev *engine.SafEvent) (err error) { - cgrID := GetSetCGRID(gev) +func (smg *SMGeneric) ProcessCDR(tnt string, ev *engine.SafEvent) (err error) { + cgrID := GetSetCGRID(ev) cacheKey := "ProcessCDR" + cgrID if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil { return item.Err @@ -1177,7 +1191,7 @@ func (smg *SMGeneric) ProcessCDR(tnt string, gev *engine.SafEvent) (err error) { cgrEv := &utils.CGREvent{ Tenant: tnt, ID: utils.UUIDSha1Prefix(), - Event: gev.AsMapInterface(), + Event: ev.AsMapInterface(), } var reply string if err = smg.cdrsrv.Call(utils.CdrsV2ProcessCDR, cgrEv, &reply); err != nil { @@ -1287,7 +1301,8 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, tnt := utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), smg.cgrCfg.GeneralCfg().DefaultTenant) if minMaxUsage, err = smg.InitiateSession(tnt, - engine.NewSafEvent(ev), clnt, ""); err != nil { + engine.NewSafEvent(ev), clnt, "", + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1315,7 +1330,8 @@ func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection, if minMaxUsage, err = smg.InitiateSession( utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), smg.cgrCfg.GeneralCfg().DefaultTenant), - engine.NewSafEvent(ev), clnt, ""); err != nil { + engine.NewSafEvent(ev), clnt, "", + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1333,7 +1349,8 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, if minMaxUsage, err = smg.UpdateSession( utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), smg.cgrCfg.GeneralCfg().DefaultTenant), - engine.NewSafEvent(ev), clnt, ""); err != nil { + engine.NewSafEvent(ev), clnt, "", + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1354,7 +1371,8 @@ func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection, if minMaxUsage, err = smg.UpdateSession( utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), smg.cgrCfg.GeneralCfg().DefaultTenant), - engine.NewSafEvent(ev), clnt, ""); err != nil { + engine.NewSafEvent(ev), clnt, "", + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1370,7 +1388,8 @@ func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection if err = smg.TerminateSession( utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant), smg.cgrCfg.GeneralCfg().DefaultTenant), - engine.NewSafEvent(ev), clnt, ""); err != nil { + engine.NewSafEvent(ev), clnt, "", + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { if err != rpcclient.ErrSessionNotFound { err = utils.NewErrServerError(err) } @@ -1911,7 +1930,8 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, } if maxUsage, err := smg.InitiateSession( args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil { + engine.NewSafEvent(args.CGREvent.Event), clnt, originID, + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -2070,7 +2090,8 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrMandatoryIeMissing(utils.OriginID) } if maxUsage, err := smg.UpdateSession(args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil { + engine.NewSafEvent(args.CGREvent.Event), clnt, originID, + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -2118,7 +2139,8 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection return utils.NewErrMandatoryIeMissing(utils.OriginID) } if err = smg.TerminateSession(args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil { + engine.NewSafEvent(args.CGREvent.Event), clnt, originID, + smg.cgrCfg.SessionSCfg().DebitInterval); err != nil { return utils.NewErrRALs(err) } } diff --git a/sessions/smgreplc_it_test.go b/sessions/smgreplc_it_test.go index 243278eab..9de54315c 100644 --- a/sessions/smgreplc_it_test.go +++ b/sessions/smgreplc_it_test.go @@ -111,14 +111,9 @@ func TestSMGRplcInitiate(t *testing.T) { utils.Usage: "1m30s", } var maxUsage time.Duration - if err := smgRplcMstrRPC.Call(utils.SMGenericV2UpdateSession, - smgEv, &maxUsage); err == nil && - err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound - t.Error(err) - } var reply string if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession", - smgEv, &reply); err == nil && + smgEv, &reply); err == nil || err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound t.Error(err) } @@ -212,17 +207,23 @@ func TestSMGRplcTerminate(t *testing.T) { } time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated var aSessions []*ActiveSession - if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", + map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } - if err := smgRplcSlvRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call("SMGenericV1.GetActiveSessions", + map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || + err.Error() != utils.ErrNotFound.Error() { t.Error(err, aSessions) } var pSessions map[string][]*SMGSession - if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", + nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } - if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { + if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", + nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) } }