SessionS with forced init inside update in case of session was not previously initiated, fixes #1320

This commit is contained in:
DanB
2018-12-10 17:10:55 +01:00
parent b1a6bb119d
commit bd39ac940e
6 changed files with 196 additions and 165 deletions

View File

@@ -110,6 +110,15 @@ func (se *SafEvent) GetDuration(fldName string) (d time.Duration, err error) {
return
}
// GetDurationPointer returns pointer towards duration, useful to detect presence of duration
func (se *SafEvent) GetDurationOrDefault(fldName string, dflt time.Duration) (d time.Duration, err error) {
_, has := se.Get(fldName)
if !has {
return dflt, nil
}
return se.GetDuration(fldName)
}
// GetDuration returns a field as Duration, ignoring errors
func (se *SafEvent) GetDurationIgnoreErrors(fldName string) (d time.Duration) {
d, _ = se.GetDuration(fldName)

View File

@@ -638,7 +638,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) {
if maxUsage != 1024 {
t.Error("Bad max usage: ", maxUsage)
}
eAcntVal = 100352.000000 // 1054720
eAcntVal = 100352.000000
if err := smgRPC.Call("ApierV2.GetAccount", acntAttrs, &acnt); err != nil {
t.Error(err)
} else if dataVal := acnt.BalanceMap[utils.DATA].GetTotalValue(); dataVal != eAcntVal {
@@ -675,7 +675,7 @@ func TestSMGDataMultipleDataNoUsage(t *testing.T) {
if maxUsage != 0 {
t.Error("Bad max usage: ", maxUsage)
}
eAcntVal = 100352.000000 // 1054720
eAcntVal = 100352.000000
if err := smgRPC.Call("ApierV2.GetAccount", acntAttrs, &acnt); err != nil {
t.Error(err)
} else if dataVal := acnt.BalanceMap[utils.DATA].GetTotalValue(); dataVal != eAcntVal {

View File

@@ -26,7 +26,8 @@ import (
"github.com/cgrates/cgrates/utils"
)
// getSessionTTL retrieves SessionTTL setting out of S
// getSessionTTL retrieves SessionTTL setting out of ev
// if SessionTTLMaxDelay is present in ev, the return is randomized
func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration,
cfgSessionTTLMaxDelay *time.Duration) (ttl time.Duration, err error) {
if ttl, err = ev.GetDuration(utils.SessionTTL); err != nil {

View File

@@ -32,12 +32,12 @@ import (
// One session handled by SM
type SMGSession struct {
mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect.
rals rpcclient.RpcClientConnection // Connector to rals service
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
clientProto float64
sync.RWMutex // protects the SMGSession in places where is concurrently accessed
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect.
rals rpcclient.RpcClientConnection // Connector to rals service
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
clientProto float64
Tenant string // store original Tenant so we can use it in API calls
CGRID string // Unique identifier for this session
@@ -49,11 +49,10 @@ type SMGSession struct {
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
EventCost *engine.EventCost
ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked
ExtraDuration time.Duration // keeps the current duration debited on top of what has been asked
LastUsage time.Duration // last requested Duration
LastDebit time.Duration // last real debited duration
TotalUsage time.Duration // sum of lastUsage
}
// Clone returns the cloned version of SMGSession
@@ -111,8 +110,8 @@ func (self *SMGSession) debitLoop(debitInterval time.Duration) {
// Attempts to debit a duration, returns maximum duration which can be debitted or error
func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.Duration, error) {
self.mux.Lock()
defer self.mux.Unlock()
self.Lock()
defer self.Unlock()
requestedDuration := dur
if lastUsed != nil {
self.ExtraDuration = self.LastDebit - *lastUsed
@@ -128,9 +127,8 @@ func (self *SMGSession) debit(dur time.Duration, lastUsed *time.Duration) (time.
} else {
self.LastUsage = requestedDuration
self.TotalUsage += self.LastUsage
ccDuration := self.ExtraDuration // fake ccDuration
self.ExtraDuration -= dur
return ccDuration, nil
return requestedDuration, nil
}
initialExtraDuration := self.ExtraDuration
self.ExtraDuration = 0
@@ -199,8 +197,8 @@ func (self *SMGSession) disconnectSession(reason string) error {
// Session has ended, check debits and refund the extra charged duration
func (self *SMGSession) close(usage time.Duration) (err error) {
self.mux.Lock()
defer self.mux.Unlock()
self.Lock()
defer self.Unlock()
if self.EventCost == nil {
return
}
@@ -273,8 +271,8 @@ func (self *SMGSession) storeSMCost() error {
if self.EventCost == nil {
return nil // There are no costs to save, ignore the operation
}
self.mux.Lock()
self.mux.Unlock()
self.Lock()
self.Unlock()
smCost := &engine.V2SMCost{
CGRID: self.CGRID,
CostSource: utils.MetaSessionS,
@@ -298,8 +296,8 @@ func (self *SMGSession) storeSMCost() error {
}
func (self *SMGSession) AsActiveSession(timezone string) *ActiveSession {
self.mux.RLock()
defer self.mux.RUnlock()
self.RLock()
defer self.RUnlock()
aSession := &ActiveSession{
CGRID: self.CGRID,
TOR: self.EventStart.GetStringIgnoreErrors(utils.ToR),

View File

@@ -347,8 +347,8 @@ func (smg *SMGeneric) indexSession(s *SMGSession, passiveSessions bool) {
}
idxMux.Lock()
defer idxMux.Unlock()
s.mux.RLock()
defer s.mux.RUnlock()
s.RLock()
defer s.RUnlock()
for fieldName := range smg.ssIdxCfg {
fieldVal, err := s.EventStart.GetString(fieldName)
if err != nil {
@@ -591,14 +591,10 @@ func (smg *SMGeneric) v2ForkSessions(tnt string, evStart *engine.SafEvent,
}
// sessionStart will handle a new session, pass the connectionId so we can communicate on disconnect request
func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent,
func (smg *SMGeneric) sessionStart(tnt, cgrID string, evStart *engine.SafEvent,
clntConn rpcclient.RpcClientConnection, resourceID string,
debitInterval time.Duration) (err error) {
cgrID := evStart.GetStringIgnoreErrors(utils.CGRID)
_, err = guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
if pSS := smg.passiveToActive(cgrID); len(pSS) != 0 {
return nil, nil // ToDo: handle here also debits
}
dbtItval time.Duration) (err error) {
guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on CGRID level
var ss []*SMGSession
if smg.chargerS == nil { // old way of session forking
ss, err = smg.v1ForkSessions(tnt, evStart, clntConn, cgrID, resourceID, false)
@@ -612,9 +608,9 @@ func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent,
for _, s := range ss {
smg.recordASession(s)
if s.RunID != utils.META_NONE &&
debitInterval != 0 {
dbtItval != 0 {
s.stopDebit = stopDebitChan
go s.debitLoop(debitInterval)
go s.debitLoop(dbtItval)
}
}
return nil, nil
@@ -622,9 +618,90 @@ func (smg *SMGeneric) sessionStart(tnt string, evStart *engine.SafEvent,
return
}
// sessionUpdate will reset terminator, perform debits and replicate sessions
func (smg *SMGeneric) sessionUpdate(tnt, cgrID string, ev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string,
dbtItval time.Duration) (maxUsage time.Duration, err error) {
guardian.Guardian.Guard(func() (iface interface{}, errGuard error) { // Lock it on CGRID level
// make sure the session exists, otherwise create
aSessions := smg.getSessions(cgrID, false)
if len(aSessions) == 0 {
if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 {
if ev.HasField(utils.InitialOriginID) {
initialCGRID := utils.Sha1(
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
err = smg.sessionRelocate(initialCGRID,
cgrID, ev.GetStringIgnoreErrors(utils.OriginID))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItval)
}
if err != nil {
return
}
smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns) // report changes
aSessions = smg.getSessions(cgrID, false) // try again to populate after starting above
if len(aSessions) == 0 {
utils.Logger.Err(
fmt.Sprintf("<%s> no active sessions for event: <%s>",
utils.SessionS, cgrID))
err = rpcclient.ErrSessionNotFound
return
}
}
}
}
defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns)
var sesTTL, evLastUsed time.Duration
if sesTTL, err = getSessionTTL(ev, smg.cgrCfg.SessionSCfg().SessionTTL,
smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay); err != nil {
return
}
var ttlLastUsed, ttlUsage, lastUsed *time.Duration
if ttlLastUsed, err = ev.GetDurationPtrOrDefault(utils.SessionTTLLastUsed,
smg.cgrCfg.SessionSCfg().SessionTTLLastUsed); err != nil {
return
}
if ttlUsage, err = ev.GetDurationPtrOrDefault(utils.SessionTTLUsage,
smg.cgrCfg.SessionSCfg().SessionTTLUsage); err != nil {
return
}
smg.resetTerminatorTimer(cgrID, sesTTL, ttlLastUsed, ttlUsage)
if evLastUsed, err = ev.GetDuration(utils.LastUsed); err == nil {
lastUsed = &evLastUsed
} else if err != utils.ErrNotFound {
return
}
if maxUsage, err = ev.GetDuration(utils.Usage); err != nil {
if err != utils.ErrNotFound {
return
}
maxUsage = smg.cgrCfg.SessionSCfg().MaxCallDuration
err = nil
}
for _, s := range aSessions[cgrID] {
var maxDur time.Duration
var maxUsageSet bool
if s.RunID == utils.META_NONE {
maxDur = time.Duration(-1)
} else if maxDur, err = s.debit(maxUsage, lastUsed); err != nil {
return
}
if maxDur == time.Duration(-1) && !maxUsageSet {
maxUsage = maxDur
} else if maxDur < maxUsage {
maxUsage = maxDur
}
}
return
}, smg.cgrCfg.GeneralCfg().LockingTimeout, cgrID)
return
}
// sessionEnd will end a session from outside
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
_, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) (err error) {
guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on UUID level
ss := smg.getSessions(cgrID, false)
if len(ss) == 0 {
if ss = smg.passiveToActive(cgrID); len(ss) == 0 {
@@ -657,12 +734,12 @@ func (smg *SMGeneric) sessionEnd(cgrID string, usage time.Duration) error {
}
return nil, nil
}, smg.cgrCfg.GeneralCfg().LockingTimeout, cgrID)
return err
return
}
// sessionRelocate is used when an update will relocate an initial session (eg multiple data streams)
func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) error {
_, err := guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level
func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) (err error) {
guardian.Guardian.Guard(func() (interface{}, error) { // Lock it on initialID level
if utils.IsSliceMember([]string{initialID, cgrID, newOriginID}, "") { // Not allowed empty params here
return nil, utils.ErrMandatoryIeMissing
}
@@ -680,11 +757,11 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
}
}
for i, s := range ss[initialID] {
s.mux.Lock()
s.Lock()
s.CGRID = cgrID // Overwrite initial CGRID with new one
s.EventStart.Set(utils.CGRID, cgrID) // Overwrite CGRID for final CDR
s.EventStart.Set(utils.OriginID, newOriginID) // Overwrite OriginID for session indexing
s.mux.Unlock()
s.Unlock()
smg.recordASession(s)
if i == 0 {
smg.unrecordASession(initialID)
@@ -692,7 +769,7 @@ func (smg *SMGeneric) sessionRelocate(initialID, cgrID, newOriginID string) erro
}
return nil, nil
}, smg.cgrCfg.GeneralCfg().LockingTimeout, initialID)
return err
return
}
// replicateSessions will replicate session based on configuration
@@ -710,7 +787,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
ssMux.RLock()
ss := ssMp[cgrID]
if len(ss) != 0 {
ss[0].mux.RLock() // lock session so we can clone it after releasing the map lock
ss[0].RLock() // lock session so we can clone it after releasing the map lock
}
ssMux.RUnlock()
ssCln := make([]*SMGSession, len(ss))
@@ -718,7 +795,7 @@ func (smg *SMGeneric) replicateSessionsWithID(cgrID string, passiveSessions bool
ssCln[i] = s.Clone()
}
if len(ss) != 0 {
ss[0].mux.RUnlock()
ss[0].RUnlock()
}
var wg sync.WaitGroup
for _, rplConn := range smgReplConns {
@@ -880,23 +957,23 @@ func (smg *SMGeneric) asActiveSessions(fltrs map[string]string, count, passiveSe
// Methods to apply on sessions, mostly exported through RPC/Bi-RPC
// MaxUsage calculates maximum usage allowed for given gevent
func (smg *SMGeneric) GetMaxUsage(tnt string, gev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
// MaxUsage calculates maximum usage allowed for given event
func (smg *SMGeneric) GetMaxUsage(tnt string, ev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "MaxUsage" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return (item.Value.(time.Duration)), item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Value: maxUsage, Err: err})
if has := gev.HasField(utils.Usage); !has { // make sure we have a minimum duration configured
gev.Set(utils.Usage, smg.cgrCfg.SessionSCfg().MaxCallDuration)
if has := ev.HasField(utils.Usage); !has { // make sure we have a minimum duration configured
ev.Set(utils.Usage, smg.cgrCfg.SessionSCfg().MaxCallDuration)
}
// fork sessions
var ss []*SMGSession
if smg.chargerS == nil { // old way of session forking
ss, err = smg.v1ForkSessions(tnt, gev, nil, cgrID, "", true)
ss, err = smg.v1ForkSessions(tnt, ev, nil, cgrID, "", true)
} else {
ss, err = smg.v2ForkSessions(tnt, gev, nil, cgrID, "", true)
ss, err = smg.v2ForkSessions(tnt, ev, nil, cgrID, "", true)
}
if err != nil {
return
@@ -924,9 +1001,10 @@ func (smg *SMGeneric) GetMaxUsage(tnt string, gev *engine.SafEvent) (maxUsage ti
}
// Called on session start
func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
func (smg *SMGeneric) InitiateSession(tnt string, ev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string,
dbtItval time.Duration) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "InitiateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
@@ -934,15 +1012,15 @@ func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent,
defer smg.responseCache.Cache(cacheKey,
&utils.ResponseCacheItem{Value: maxUsage, Err: err}) // schedule response caching
smg.deletePassiveSessions(cgrID)
if err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItval); err != nil {
smg.sessionEnd(cgrID, 0)
return
}
if smg.cgrCfg.SessionSCfg().DebitInterval != 0 { // Session handled by debit loop
if dbtItval != 0 { // Session handled by debit loop
maxUsage = time.Duration(-1)
return
}
maxUsage, err = smg.UpdateSession(tnt, gev, clnt, resourceID)
maxUsage, err = smg.sessionUpdate(tnt, cgrID, ev, clnt, resourceID, dbtItval)
if err != nil || maxUsage == 0 {
smg.sessionEnd(cgrID, 0)
}
@@ -950,105 +1028,41 @@ func (smg *SMGeneric) InitiateSession(tnt string, gev *engine.SafEvent,
}
// Execute debits for usage/maxUsage
func (smg *SMGeneric) UpdateSession(tnt string, gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
func (smg *SMGeneric) UpdateSession(tnt string, ev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string,
dbtItval time.Duration) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "UpdateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
}
defer smg.responseCache.Cache(cacheKey,
&utils.ResponseCacheItem{Value: maxUsage, Err: err})
if smg.cgrCfg.SessionSCfg().DebitInterval != 0 { // Not possible to update a session with debit loop active
err = errors.New("ACTIVE_DEBIT_LOOP")
return
}
if gev.HasField(utils.InitialOriginID) {
initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID),
gev.GetStringIgnoreErrors(utils.OriginHost))
err = smg.sessionRelocate(initialCGRID,
cgrID, gev.GetStringIgnoreErrors(utils.OriginID))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval)
}
if err != nil {
return
}
smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns)
}
sesTTL, err := getSessionTTL(gev, smg.cgrCfg.SessionSCfg().SessionTTL,
smg.cgrCfg.SessionSCfg().SessionTTLMaxDelay)
maxUsage, err = smg.sessionUpdate(tnt, cgrID, ev, clnt, resourceID, dbtItval)
if err != nil {
return maxUsage, err
}
ttlLastUsed, err := gev.GetDurationPtrOrDefault(utils.SessionTTLLastUsed,
smg.cgrCfg.SessionSCfg().SessionTTLLastUsed)
if err != nil {
return maxUsage, err
}
ttlUsage, err := gev.GetDurationPtrOrDefault(utils.SessionTTLUsage,
smg.cgrCfg.SessionSCfg().SessionTTLUsage)
if err != nil {
return maxUsage, err
}
smg.resetTerminatorTimer(cgrID, sesTTL, ttlLastUsed, ttlUsage)
var lastUsed *time.Duration
var evLastUsed time.Duration
if evLastUsed, err = gev.GetDuration(utils.LastUsed); err == nil {
lastUsed = &evLastUsed
} else if err != utils.ErrNotFound {
return
}
if maxUsage, err = gev.GetDuration(utils.Usage); err != nil {
if err != utils.ErrNotFound {
return
}
err = nil
maxUsage = smg.cgrCfg.SessionSCfg().MaxCallDuration
return
}
aSessions := smg.getSessions(cgrID, false)
if len(aSessions) == 0 {
if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 {
utils.Logger.Err(
fmt.Sprintf("<%s> SessionUpdate with no active sessions for event: <%s>",
utils.SessionS, cgrID))
err = rpcclient.ErrSessionNotFound
return
}
}
defer smg.replicateSessionsWithID(cgrID, false, smg.smgReplConns)
for _, s := range aSessions[cgrID] {
if s.RunID == utils.META_NONE {
maxUsage = time.Duration(-1)
continue
}
var maxDur time.Duration
if maxDur, err = s.debit(maxUsage, lastUsed); err != nil {
return
} else if maxDur < maxUsage {
maxUsage = maxDur
}
smg.sessionEnd(cgrID, 0)
}
return
}
// Called on session end, should stop debit loop
func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string) (err error) {
cgrID := GetSetCGRID(gev)
func (smg *SMGeneric) TerminateSession(tnt string, ev *engine.SafEvent,
clnt rpcclient.RpcClientConnection, resourceID string,
dbtItvl time.Duration) (err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "TerminateSession" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
}
defer smg.responseCache.Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
if gev.HasField(utils.InitialOriginID) {
initialCGRID := utils.Sha1(gev.GetStringIgnoreErrors(utils.InitialOriginID),
gev.GetStringIgnoreErrors(utils.OriginHost))
if ev.HasField(utils.InitialOriginID) {
initialCGRID := utils.Sha1(
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
err = smg.sessionRelocate(initialCGRID, cgrID,
gev.GetStringIgnoreErrors(utils.OriginID))
ev.GetStringIgnoreErrors(utils.OriginID))
if err == utils.ErrNotFound { // Session was already relocated, create a new session with this update
err = smg.sessionStart(tnt, gev, clnt, resourceID, smg.cgrCfg.SessionSCfg().DebitInterval)
err = smg.sessionStart(tnt, cgrID, ev, clnt, resourceID, dbtItvl)
}
if err != nil && err != utils.ErrMandatoryIeMissing {
return
@@ -1056,8 +1070,8 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent,
smg.replicateSessionsWithID(initialCGRID, false, smg.smgReplConns)
}
sessionIDs := []string{cgrID}
if gev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered
sessionIDPrefix := gev.GetStringIgnoreErrors(utils.OriginIDPrefix)
if ev.HasField(utils.OriginIDPrefix) { // OriginIDPrefix is present, OriginID will not be anymore considered
sessionIDPrefix := ev.GetStringIgnoreErrors(utils.OriginIDPrefix)
if sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, false); len(sessionIDs) == 0 {
sessionIDs = smg.getSessionIDsForPrefix(sessionIDPrefix, true)
for _, sessionID := range sessionIDs { // activate sessions for prefix
@@ -1065,14 +1079,14 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent,
}
}
}
usage, errUsage := gev.GetDuration(utils.Usage)
usage, errUsage := ev.GetDuration(utils.Usage)
var lastUsed time.Duration
if errUsage != nil {
if errUsage != utils.ErrNotFound {
err = errUsage
return
}
lastUsed, err = gev.GetDuration(utils.LastUsed)
lastUsed, err = ev.GetDuration(utils.LastUsed)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrMandatoryIeMissing
@@ -1085,7 +1099,7 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent,
aSessions := smg.getSessions(sessionID, false)
if len(aSessions) == 0 {
if aSessions = smg.passiveToActive(cgrID); len(aSessions) == 0 {
utils.Logger.Err(fmt.Sprintf("<%s> SessionTerminate with no active sessions for cgrID: <%s>", utils.SessionS, cgrID))
utils.Logger.Err(fmt.Sprintf("<%s> terminate with no active sessions for cgrID: <%s>", utils.SessionS, cgrID))
continue
}
}
@@ -1107,8 +1121,8 @@ func (smg *SMGeneric) TerminateSession(tnt string, gev *engine.SafEvent,
}
// Processes one time events (eg: SMS)
func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(gev)
func (smg *SMGeneric) ChargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time.Duration, err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "ChargeEvent" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Value.(time.Duration), item.Err
@@ -1117,9 +1131,9 @@ func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage ti
// fork sessions
var ss []*SMGSession
if smg.chargerS == nil { // old way of session forking
ss, err = smg.v1ForkSessions(tnt, gev, nil, cgrID, "", false)
ss, err = smg.v1ForkSessions(tnt, ev, nil, cgrID, "", false)
} else {
ss, err = smg.v2ForkSessions(tnt, gev, nil, cgrID, "", false)
ss, err = smg.v2ForkSessions(tnt, ev, nil, cgrID, "", false)
}
if err != nil {
return
@@ -1167,8 +1181,8 @@ func (smg *SMGeneric) ChargeEvent(tnt string, gev *engine.SafEvent) (maxUsage ti
return
}
func (smg *SMGeneric) ProcessCDR(tnt string, gev *engine.SafEvent) (err error) {
cgrID := GetSetCGRID(gev)
func (smg *SMGeneric) ProcessCDR(tnt string, ev *engine.SafEvent) (err error) {
cgrID := GetSetCGRID(ev)
cacheKey := "ProcessCDR" + cgrID
if item, err := smg.responseCache.Get(cacheKey); err == nil && item != nil {
return item.Err
@@ -1177,7 +1191,7 @@ func (smg *SMGeneric) ProcessCDR(tnt string, gev *engine.SafEvent) (err error) {
cgrEv := &utils.CGREvent{
Tenant: tnt,
ID: utils.UUIDSha1Prefix(),
Event: gev.AsMapInterface(),
Event: ev.AsMapInterface(),
}
var reply string
if err = smg.cdrsrv.Call(utils.CdrsV2ProcessCDR, cgrEv, &reply); err != nil {
@@ -1287,7 +1301,8 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
tnt := utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant),
smg.cgrCfg.GeneralCfg().DefaultTenant)
if minMaxUsage, err = smg.InitiateSession(tnt,
engine.NewSafEvent(ev), clnt, ""); err != nil {
engine.NewSafEvent(ev), clnt, "",
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1315,7 +1330,8 @@ func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection,
if minMaxUsage, err = smg.InitiateSession(
utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant),
smg.cgrCfg.GeneralCfg().DefaultTenant),
engine.NewSafEvent(ev), clnt, ""); err != nil {
engine.NewSafEvent(ev), clnt, "",
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1333,7 +1349,8 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection,
if minMaxUsage, err = smg.UpdateSession(
utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant),
smg.cgrCfg.GeneralCfg().DefaultTenant),
engine.NewSafEvent(ev), clnt, ""); err != nil {
engine.NewSafEvent(ev), clnt, "",
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1354,7 +1371,8 @@ func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection,
if minMaxUsage, err = smg.UpdateSession(
utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant),
smg.cgrCfg.GeneralCfg().DefaultTenant),
engine.NewSafEvent(ev), clnt, ""); err != nil {
engine.NewSafEvent(ev), clnt, "",
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1370,7 +1388,8 @@ func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection
if err = smg.TerminateSession(
utils.FirstNonEmpty(ev.GetStringIgnoreErrors(utils.Tenant),
smg.cgrCfg.GeneralCfg().DefaultTenant),
engine.NewSafEvent(ev), clnt, ""); err != nil {
engine.NewSafEvent(ev), clnt, "",
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
@@ -1911,7 +1930,8 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
}
if maxUsage, err := smg.InitiateSession(
args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
engine.NewSafEvent(args.CGREvent.Event), clnt, originID,
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -2070,7 +2090,8 @@ func (smg *SMGeneric) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection,
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if maxUsage, err := smg.UpdateSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
engine.NewSafEvent(args.CGREvent.Event), clnt, originID,
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
return utils.NewErrRALs(err)
} else {
rply.MaxUsage = &maxUsage
@@ -2118,7 +2139,8 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
if err = smg.TerminateSession(args.CGREvent.Tenant,
engine.NewSafEvent(args.CGREvent.Event), clnt, originID); err != nil {
engine.NewSafEvent(args.CGREvent.Event), clnt, originID,
smg.cgrCfg.SessionSCfg().DebitInterval); err != nil {
return utils.NewErrRALs(err)
}
}

View File

@@ -111,14 +111,9 @@ func TestSMGRplcInitiate(t *testing.T) {
utils.Usage: "1m30s",
}
var maxUsage time.Duration
if err := smgRplcMstrRPC.Call(utils.SMGenericV2UpdateSession,
smgEv, &maxUsage); err == nil &&
err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound
t.Error(err)
}
var reply string
if err := smgRplcMstrRPC.Call("SMGenericV1.TerminateSession",
smgEv, &reply); err == nil &&
smgEv, &reply); err == nil ||
err.Error() != rpcclient.ErrSessionNotFound.Error() { // Update should return rpcclient.ErrSessionNotFound
t.Error(err)
}
@@ -212,17 +207,23 @@ func TestSMGRplcTerminate(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Wait for the sessions to be populated
var aSessions []*ActiveSession
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := smgRplcMstrRPC.Call("SMGenericV1.GetActiveSessions",
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
if err := smgRplcSlvRPC.Call("SMGenericV1.GetActiveSessions", map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := smgRplcSlvRPC.Call("SMGenericV1.GetActiveSessions",
map[string]string{utils.OriginID: "123451"}, &aSessions); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err, aSessions)
}
var pSessions map[string][]*SMGSession
if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := smgRplcMstrRPC.Call("SMGenericV1.GetPassiveSessions",
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions", nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
if err := smgRplcSlvRPC.Call("SMGenericV1.GetPassiveSessions",
nil, &pSessions); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}