diff --git a/engine/mapevent.go b/engine/mapevent.go index 09990a306..28137e130 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -128,6 +128,15 @@ func (me MapEvent) GetDurationPtrIgnoreErrors(fldName string) (d *time.Duration) return } +// GetDurationPtrOrDefault returns pointer or default if fldName is missing +func (me MapEvent) GetDurationPtrOrDefault(fldName string, dflt *time.Duration) (d *time.Duration, err error) { + if d, err = me.GetDurationPtr(fldName); err == utils.ErrNotFound { + d = dflt + err = nil + } + return +} + // GetTime returns a field as Time func (me MapEvent) GetTime(fldName string, timezone string) (t time.Time, err error) { fldIface, has := me[fldName] diff --git a/engine/mapevent_test.go b/engine/mapevent_test.go index 3ff6f8fac..1e5e22ca2 100644 --- a/engine/mapevent_test.go +++ b/engine/mapevent_test.go @@ -699,3 +699,15 @@ func TestMapEventGetDurationPtrIgnoreErrors(t *testing.T) { t.Errorf("Expected: %+v, received: %+v", expected, rply) } } + +func TestMapEventGetDurationPtrOrDefault(t *testing.T) { + dflt := &time.Duration(1) + if rcv, err := mapEv.GetDurationPtrOrDefault("test7", &dflt); !reflect.DeepEqual(rcv, dflt) { + t.Error("received: ", ptr) + } + newVal := time.Duration(2) + mapEv["test7"] = newVal + if ptr, err := mapEv.GetDurationPtrOrDefault("test7", &dflt); !reflect.DeepEqual(rcv, newVal) { + t.Error("received: ", ptr) + } +} diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 39778d2fe..0a7ddb830 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -39,6 +39,11 @@ var unratedReqs = engine.MapEvent{ utils.META_RATED: struct{}{}, } +var authReqs = engine.MapEvent{ + utils.META_PREPAID: struct{}{}, + utils.META_PSEUDOPREPAID: struct{}{}, +} + // SessionSClient is the interface implemented by Agents which are able to // communicate bidirectionally with SessionS and remote Communication Switch type SessionSClient interface { @@ -81,12 +86,12 @@ func getSessionTTL(ev *engine.SafEvent, cfgSessionTTL time.Duration, return } -func GetSetCGRID(ev *engine.SafEvent) (cgrID string) { +func GetSetCGRID(ev engine.MapEvent) (cgrID string) { cgrID = ev.GetStringIgnoreErrors(utils.CGRID) if cgrID == "" { cgrID = utils.Sha1(ev.GetStringIgnoreErrors(utils.OriginID), ev.GetStringIgnoreErrors(utils.OriginHost)) - ev.Set(utils.CGRID, cgrID) + ev[utils.CGRID] = cgrID } return } diff --git a/sessions/session.go b/sessions/session.go index c36c11105..df5d02d9f 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -68,10 +68,10 @@ type Session struct { CGRID string Tenant string ResourceID string - ClientConnID string // connection ID towards the client so we can recover from passive - EventStart *engine.SafEvent // Event which started the session - DebitInterval time.Duration // execute debits for *prepaid runs - SRuns []*SRun // forked based on ChargerS + ClientConnID string // connection ID towards the client so we can recover from passive + EventStart engine.MapEvent // Event which started the session + DebitInterval time.Duration // execute debits for *prepaid runs + SRuns []*SRun // forked based on ChargerS debitStop chan struct{} sTerminator *sTerminator // automatic timeout for the session @@ -201,7 +201,7 @@ func (s *Session) TotalUsage() (tDur time.Duration) { // AsCGREvents is not thread safe since it is supposed to run by the time Session is closed func (s *Session) asCGREvents() (cgrEvs []*utils.CGREvent, err error) { cgrEvs = make([]*utils.CGREvent, len(s.SRuns)+1) // so we can gather all cdr info while under lock - rawEv := s.EventStart.MapEvent() + rawEv := s.EventStart.Clone() rawEv[utils.RunID] = utils.MetaRaw cgrEvs[0] = &utils.CGREvent{ Tenant: s.Tenant, diff --git a/sessions/sessions.go b/sessions/sessions.go index 39ce54a2e..93af963f2 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -52,8 +52,9 @@ func NewSReplConns(conns []*config.RemoteHost, reconnects int, connTimeout, replyTimeout time.Duration) (sReplConns []*SReplConn, err error) { sReplConns = make([]*SReplConn, len(conns)) for i, replConnCfg := range conns { - if replCon, err := rpcclient.NewRpcClient("tcp", replConnCfg.Address, replConnCfg.TLS, "", "", "", 0, reconnects, - connTimeout, replyTimeout, replConnCfg.Transport[1:], nil, true); err != nil { + if replCon, err := rpcclient.NewRpcClient(utils.TCP, replConnCfg.Address, + replConnCfg.TLS, "", "", "", 0, reconnects, connTimeout, + replyTimeout, replConnCfg.Transport[1:], nil, true); err != nil { return nil, err } else { sReplConns[i] = &SReplConn{Connection: replCon, Synchronous: replConnCfg.Synchronous} @@ -99,6 +100,7 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS, } return &SessionS{ cgrCfg: cgrCfg, + dm: dm, chargerS: chargerS, ralS: ralS, resS: resS, @@ -116,7 +118,6 @@ func NewSessionS(cgrCfg *config.CGRConfig, ralS, resS, thdS, pSessions: make(map[string]*Session), pSessionsIdx: make(map[string]map[string]map[string]utils.StringMap), pSessionsRIdx: make(map[string][]*riFieldNameVal), - dm: dm, } } @@ -129,6 +130,7 @@ type biJClient struct { // SessionS represents the session service type SessionS struct { cgrCfg *config.CGRConfig // Separate from smCfg since there can be multiple + dm *engine.DataManager chargerS rpcclient.RpcClientConnection ralS rpcclient.RpcClientConnection // RALs connections @@ -139,27 +141,26 @@ type SessionS struct { attrS rpcclient.RpcClientConnection // AttributeS connections cdrS rpcclient.RpcClientConnection // CDR server connections - sReplConns []*SReplConn // list of connections where we will replicate our session data - biJMux sync.RWMutex // mux protecting BI-JSON connections biJClnts map[rpcclient.RpcClientConnection]string // index BiJSONConnection so we can sync them later biJIDs map[string]*biJClient // identifiers of bidirectional JSON conns, used to call RPC based on connIDs aSsMux sync.RWMutex // protects aSessions - aSessions map[string]*Session // group sessions per sessionId, multiple runs based on derived charging + aSessions map[string]*Session // group sessions per sessionId aSIMux sync.RWMutex // protects aSessionsIdx - aSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID] + aSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID]sID aSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for active sessions, used on remove pSsMux sync.RWMutex // protects pSessions pSessions map[string]*Session // group passive sessions based on cgrID pSIMux sync.RWMutex // protects pSessionsIdx - pSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID] + pSessionsIdx map[string]map[string]map[string]utils.StringMap // map[fieldName]map[fieldValue][cgrID]utils.StringMap[runID]sID pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove - dm *engine.DataManager + sReplConns []*SReplConn // list of connections where we will replicate our session data + } // ListenAndServe starts the service and binds it to the listen loop @@ -167,7 +168,7 @@ func (sS *SessionS) ListenAndServe(exitChan chan bool) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SessionS)) if sS.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 { go func() { - for { // Schedule sync channels to run repetately + for { // Schedule sync channels to run repeately select { case e := <-exitChan: exitChan <- e @@ -197,7 +198,8 @@ func (sS *SessionS) OnBiJSONConnect(c *rpc2.Client) { sS.biJMux.Lock() nodeID := utils.UUIDSha1Prefix() // connection identifier, should be later updated as login procedure sS.biJClnts[c] = nodeID - sS.biJIDs[nodeID] = &biJClient{conn: c, + sS.biJIDs[nodeID] = &biJClient{ + conn: c, proto: sS.cgrCfg.SessionSCfg().ClientProtocol} sS.biJMux.Unlock() } @@ -212,12 +214,13 @@ func (sS *SessionS) OnBiJSONDisconnect(c *rpc2.Client) { sS.biJMux.Unlock() } -// RegisterIntBiJConn is called on each internal BiJ connection towards SessionS +// RegisterIntBiJConn is called on internal BiJ connection towards SessionS func (sS *SessionS) RegisterIntBiJConn(c rpcclient.RpcClientConnection) { sS.biJMux.Lock() nodeID := sS.cgrCfg.GeneralCfg().NodeID sS.biJClnts[c] = nodeID - sS.biJIDs[nodeID] = &biJClient{conn: c, + sS.biJIDs[nodeID] = &biJClient{ + conn: c, proto: sS.cgrCfg.SessionSCfg().ClientProtocol} sS.biJMux.Unlock() } @@ -225,7 +228,7 @@ func (sS *SessionS) RegisterIntBiJConn(c rpcclient.RpcClientConnection) { // biJClnt returns a bidirectional JSON client based on connection ID func (sS *SessionS) biJClnt(connID string) (clnt *biJClient) { if connID == "" { - return nil + return } sS.biJMux.RLock() clnt = sS.biJIDs[connID] @@ -272,97 +275,98 @@ type sTerminator struct { } // setSTerminator installs a new terminator for a session -// assumes the Session is locked in this stage +// setSTerminator is not thread safe, only the goroutine forked from within func (sS *SessionS) setSTerminator(s *Session) { + // TTL ttl, err := s.EventStart.GetDuration(utils.SessionTTL) - switch err { - case nil: // all good - case utils.ErrNotFound: + if err != nil { + if err != utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s>, cannot extract <%s> from event: <%s>, err: <%s>", + utils.SessionS, utils.SessionTTL, s.EventStart.String(), err.Error())) + return + } ttl = sS.cgrCfg.SessionSCfg().SessionTTL - default: // not nil - utils.Logger.Warning( - fmt.Sprintf("<%s>, cannot extract %s from event: %s, err: %s", - utils.SessionS, utils.SessionTTL, s.EventStart.String(), err.Error())) } - s.Lock() - defer s.Unlock() - if ttl == 0 && s.sTerminator == nil { + if ttl == 0 { return // nothing to set up } // random delay computation maxDelay, err := s.EventStart.GetDuration(utils.SessionTTLMaxDelay) - switch err { - case nil: // all good - case utils.ErrNotFound: - if sS.cgrCfg.SessionSCfg().SessionTTLMaxDelay != nil { - maxDelay = *sS.cgrCfg.SessionSCfg().SessionTTLMaxDelay + if err != nil { + if err != utils.ErrNotFound { + utils.Logger.Warning( + fmt.Sprintf("<%s>, cannot extract <%s> from event: <%s>, err: <%s>", + utils.SessionS, utils.SessionTTLMaxDelay, s.EventStart.String(), err.Error())) + return } - default: // not nil - utils.Logger.Warning( - fmt.Sprintf("<%s>, cannot extract %s from event %s, err: %s", - utils.SessionS, utils.SessionTTLMaxDelay, s.EventStart.String(), err.Error())) - return + maxDelay = *sS.cgrCfg.SessionSCfg().SessionTTLMaxDelay } - sTTLMaxDelay := maxDelay.Nanoseconds() / time.Millisecond.Nanoseconds() // Milliseconds precision for randomness - if sTTLMaxDelay != 0 { + if maxDelay != 0 { rand.Seed(time.Now().Unix()) - ttl += time.Duration(rand.Int63n(sTTLMaxDelay) * time.Millisecond.Nanoseconds()) + ttl += time.Duration( + rand.Int63n(maxDelay.Nanoseconds()/time.Millisecond.Nanoseconds()) * time.Millisecond.Nanoseconds()) } - ttlLastUsed, err := s.EventStart.GetDurationPtrOrDefault(utils.SessionTTLLastUsed, sS.cgrCfg.SessionSCfg().SessionTTLLastUsed) + // LastUsed + ttlLastUsed, err := s.EventStart.GetDurationPtrOrDefault( + utils.SessionTTLLastUsed, sS.cgrCfg.SessionSCfg().SessionTTLLastUsed) if err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>", - utils.SessionS, utils.SessionTTLLastUsed, s.EventStart.String())) + fmt.Sprintf("<%s>, cannot extract <%s> from event: <%s>, err: <%s>", + utils.SessionS, utils.SessionTTLLastUsed, s.EventStart.String(), err.Error())) return } - ttlUsage, err := s.EventStart.GetDurationPtrOrDefault(utils.SessionTTLUsage, sS.cgrCfg.SessionSCfg().SessionTTLUsage) + // TTLUsage + ttlUsage, err := s.EventStart.GetDurationPtrOrDefault( + utils.SessionTTLUsage, sS.cgrCfg.SessionSCfg().SessionTTLUsage) if err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s>, cannot extract %s from event, disabling session timeout for event: <%s>", - utils.SessionS, utils.SessionTTLUsage, s.EventStart.String())) + fmt.Sprintf("<%s>, cannot extract <%s> from event: <%s>, err: <%s>", + utils.SessionS, utils.SessionTTLUsage, s.EventStart.String(), err.Error())) return } + // previously defined, reset if s.sTerminator != nil { - if ttl != 0 { // only change if different than 0 - s.sTerminator.ttl = ttl - if ttlLastUsed != nil { - s.sTerminator.ttlLastUsed = ttlLastUsed - } - if ttlUsage != nil { - s.sTerminator.ttlUsage = ttlUsage - } - s.sTerminator.timer.Reset(s.sTerminator.ttl) + s.sTerminator.ttl = ttl + if ttlLastUsed != nil { + s.sTerminator.ttlLastUsed = ttlLastUsed } + if ttlUsage != nil { + s.sTerminator.ttlUsage = ttlUsage + } + s.sTerminator.timer.Reset(s.sTerminator.ttl) return } - timer := time.NewTimer(ttl) endChan := make(chan struct{}) + // new set s.sTerminator = &sTerminator{ - timer: timer, + timer: time.NewTimer(ttl), endChan: endChan, ttl: ttl, ttlLastUsed: ttlLastUsed, ttlUsage: ttlUsage, } - go func() { // schedule automatic termination + + // schedule automatic termination + go func() { select { - case <-timer.C: - eUsage := s.sTerminator.ttl + case <-s.sTerminator.timer.C: + s.Lock() + endUsage := s.sTerminator.ttl if s.sTerminator.ttlUsage != nil { - eUsage = *s.sTerminator.ttlUsage + endUsage = *s.sTerminator.ttlUsage } - sS.forceSTerminate(s, eUsage, + sS.forceSTerminate(s, endUsage, s.sTerminator.ttlLastUsed) + s.Unlock() case <-endChan: - timer.Stop() + s.sTerminator.timer.Stop() } - s.Lock() - s.sTerminator = nil - s.Unlock() }() } // forceSTerminate is called when a session times-out or it is forced from CGRateS side +// not thread safe func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUsed *time.Duration) (err error) { if extraDebit != 0 { for i := range s.SRuns { @@ -398,7 +402,7 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs ArgDispatcher: s.ArgDispatcher, } if unratedReqs.HasField( // order additional rating for unrated request types - engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { + engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { argsProc.RALs = utils.BoolPointer(true) } if err = sS.cdrS.Call(utils.CDRsV1ProcessEvent, argsProc, &reply); err != nil { @@ -417,13 +421,14 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs CGREvent: &utils.CGREvent{ Tenant: s.Tenant, ID: utils.GenUUID(), - Event: s.EventStart.AsMapInterface(), + Event: s.EventStart, }, UsageID: s.ResourceID, Units: 1, ArgDispatcher: s.ArgDispatcher, } - if err := sS.resS.Call(utils.ResourceSv1ReleaseResources, + if err := sS.resS.Call( + utils.ResourceSv1ReleaseResources, argsRU, &reply); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s could not release resource with resourceID: %s", @@ -434,9 +439,10 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs if clntConn := sS.biJClnt(s.ClientConnID); clntConn != nil { go func() { var rply string - if err := clntConn.conn.Call(utils.SessionSv1DisconnectSession, + if err := clntConn.conn.Call( + utils.SessionSv1DisconnectSession, utils.AttrDisconnectSession{ - EventStart: s.EventStart.AsMapInterface(), + EventStart: s.EventStart, Reason: ErrForcedDisconnect.Error()}, &rply); err != nil { if err != utils.ErrNotImplemented { @@ -453,19 +459,13 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs // debitSession performs debit for a session run func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, lastUsed *time.Duration) (maxDur time.Duration, err error) { - - s.Lock() if sRunIdx >= len(s.SRuns) { err = errors.New("sRunIdx out of range") - s.Unlock() return } - sr := s.SRuns[sRunIdx] - rDur := sr.debitReserve(dur, lastUsed) // debit out of reserve, rDur is still to be debited if rDur == time.Duration(0) { - s.Unlock() return dur, nil // complete debit out of reserve } dbtRsrv := dur - rDur // the amount debited from reserve @@ -476,17 +476,15 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, sr.CD.DurationIndex += rDur cd := sr.CD.Clone() argDsp := s.ArgDispatcher - s.Unlock() cc := new(engine.CallCost) - if err := sS.ralS.Call(utils.ResponderMaxDebit, - &engine.CallDescriptorWithArgDispatcher{CallDescriptor: cd, - ArgDispatcher: argDsp}, cc); err != nil { - s.Lock() + if err := sS.ralS.Call( + utils.ResponderMaxDebit, + &engine.CallDescriptorWithArgDispatcher{ + CallDescriptor: cd, + ArgDispatcher: argDsp}, cc); err != nil { sr.ExtraDuration += dbtRsrv - s.Unlock() return 0, err } - s.Lock() sr.CD.TimeEnd = cc.GetEndTime() // set debited timeEnd ccDuration := cc.GetDuration() if ccDuration > rDur { @@ -513,23 +511,20 @@ func (sS *SessionS) debitSession(s *Session, sRunIdx int, dur time.Duration, sr.EventCost.Merge(ec) } maxDur = sr.LastUsage - s.Unlock() return } // debitLoopSession will periodically debit sessions, ie: automatic prepaid +// threadSafe since it will run into it's own goroutine func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, dbtIvl time.Duration) (maxDur time.Duration, err error) { - - s.RLock() lenSRuns := len(s.SRuns) - s.RUnlock() if sRunIdx >= lenSRuns { err = errors.New("sRunIdx out of range") return } - for { + s.Lock() var maxDebit time.Duration if maxDebit, err = sS.debitSession(s, sRunIdx, dbtIvl, nil); err != nil { utils.Logger.Warning( @@ -542,6 +537,7 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, // try to disconect the session n times before we force terminate it on our side for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { if err = sS.disconnectSession(s, dscReason); err == nil { + s.Unlock() return } utils.Logger.Warning( @@ -551,30 +547,36 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, if err = sS.forceSTerminate(s, 0, nil); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", utils.SessionS, s.CGRid(), err)) } + s.Unlock() return - } else if maxDebit < dbtIvl { - go func(s *Session) { // schedule sending disconnect command - select { - case <-s.debitStop: // call was disconnected already - return - case <-time.After(maxDebit): - // try to disconect the session n times before we force terminate it on our side - for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { - if err = sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err == nil { - return - } - utils.Logger.Warning( - fmt.Sprintf("<%s> could not command disconnect session: %s, error: %s", - utils.SessionS, s.CGRid(), err.Error())) - } - if err = sS.forceSTerminate(s, 0, nil); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", utils.SessionS, s.CGRid(), err)) + } + debitStop := s.debitStop // avoid concurrency with endSession + s.Unlock() + if maxDebit < dbtIvl { // disconnect faster + select { + case <-debitStop: // call was disconnected already + return + case <-time.After(maxDebit): + s.Lock() + defer s.Unlock() + // try to disconect the session n times before we force terminate it on our side + for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { + if err = sS.disconnectSession(s, utils.ErrInsufficientCredit.Error()); err == nil { + return } } - }(s) + utils.Logger.Warning( + fmt.Sprintf("<%s> could not disconnect session: <%s>, error: <%s>", + utils.SessionS, s.CGRid(), err.Error())) + if err = sS.forceSTerminate(s, 0, nil); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", + utils.SessionS, s.CGRid(), err)) + } + } + return } select { - case <-s.debitStop: + case <-debitStop: return case <-time.After(dbtIvl): continue @@ -686,21 +688,23 @@ func (sS *SessionS) storeSCost(s *Session, sRunIdx int) (err error) { } // disconnectSession will send disconnect from SessionS to clients +// not thread safe, it considers that the session is already stopped by this time func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { clnt := sS.biJClnt(s.ClientConnID) if clnt == nil { - return fmt.Errorf("calling %s requires bidirectional JSON connection", utils.SessionSv1DisconnectSession) + return fmt.Errorf("calling %s requires bidirectional JSON connection, connID: <%s>", + utils.SessionSv1DisconnectSession, s.ClientConnID) } - s.EventStart.Set(utils.Usage, s.TotalUsage()) // Set the usage to total one debitted - sEv := s.EventStart.AsMapInterface() + s.EventStart[utils.Usage] = s.TotalUsage() // Set the usage to total one debitted servMethod := utils.SessionSv1DisconnectSession if clnt.proto == 0 { // compatibility with OpenSIPS 2.3 servMethod = "SMGClientV1.DisconnectSession" } var rply string if err = clnt.conn.Call(servMethod, - utils.AttrDisconnectSession{EventStart: sEv, - Reason: rsn}, &rply); err != nil { + utils.AttrDisconnectSession{ + EventStart: s.EventStart, + Reason: rsn}, &rply); err != nil { if err != utils.ErrNotImplemented { return err } @@ -717,7 +721,9 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC ss := sS.getSessions(cgrID, psv) if len(ss) == 0 { // session scheduled to be removed from remote (initiate also the EventStart to avoid the panic) - ss = []*Session{&Session{CGRID: cgrID, EventStart: engine.NewSafEvent(nil)}} + ss = []*Session{ + &Session{CGRID: cgrID, + EventStart: make(engine.MapEvent)}} } var wg sync.WaitGroup for _, rplConn := range rplConns { @@ -728,7 +734,8 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC for _, s := range ss { sCln := s.Clone() var rply string - if err := conn.Call(utils.SessionSv1SetPassiveSession, + if err := conn.Call( + utils.SessionSv1SetPassiveSession, sCln, &rply); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> cannot replicate session with id <%s>, err: %s", @@ -746,6 +753,7 @@ func (sS *SessionS) replicateSessions(cgrID string, psv bool, rplConns []*SReplC // registerSession will register an active or passive Session // called on init or relocate +// not thread safe for the Session func (sS *SessionS) registerSession(s *Session, passive bool) { sMux := &sS.aSsMux sMp := sS.aSessions @@ -755,11 +763,12 @@ func (sS *SessionS) registerSession(s *Session, passive bool) { } sMux.Lock() sMp[s.CGRID] = s + sMux.Unlock() sS.indexSession(s, passive) if !passive { sS.setSTerminator(s) } - sMux.Unlock() + } // uregisterSession will unregister an active or passive session based on it's CGRID @@ -778,16 +787,15 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool { return false } delete(sMp, cgrID) + sMux.Unlock() sS.unindexSession(cgrID, passive) if !passive { if s.sTerminator != nil && s.sTerminator.endChan != nil { close(s.sTerminator.endChan) s.sTerminator.endChan = nil - time.Sleep(1) // ensure context switching so that the goroutine can remove old terminator } } - sMux.Unlock() return true } @@ -810,7 +818,9 @@ func (sS *SessionS) indexSession(s *Session, pSessions bool) { if err == utils.ErrNotFound { fieldVal = utils.NOT_AVAILABLE } else { - utils.Logger.Err(fmt.Sprintf("<%s> retrieving field: %s from event: %+v, err: <%s>", utils.SessionS, fieldName, s.EventStart, err)) + utils.Logger.Err( + fmt.Sprintf("<%s> retrieving field: %s from event: %+v, err: <%s>", + utils.SessionS, fieldName, s.EventStart, err)) continue } } @@ -867,7 +877,8 @@ func (sS *SessionS) unindexSession(cgrID string, pSessions bool) bool { return true } -func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) (indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule) { +func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) ( + indexedFltr map[string][]string, unindexedFltr []*engine.FilterRule) { indexedFltr = make(map[string][]string) for _, fltrID := range fltrs { f, err := sS.dm.GetFilter(tenant, fltrID, @@ -884,7 +895,8 @@ func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) (indexedFlt } for _, fltr := range f.Rules { if fltr.Type != utils.MetaString || - !sS.cgrCfg.SessionSCfg().SessionIndexes.HasKey(strings.TrimPrefix(fltr.FieldName, utils.DynamicDataPrefix)) { + !sS.cgrCfg.SessionSCfg().SessionIndexes.HasKey( + strings.TrimPrefix(fltr.FieldName, utils.DynamicDataPrefix)) { unindexedFltr = append(unindexedFltr, fltr) continue } @@ -894,7 +906,7 @@ func (sS *SessionS) getIndexedFilters(tenant string, fltrs []string) (indexedFlt return } -// matchedIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them +// getSessionIDsMatchingIndexes returns map[matchedFieldName]possibleMatchedFieldVal so we optimize further to avoid checking them func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string, pSessions bool) ([]string, map[string]utils.StringMap) { idxMux := &sS.aSIMux @@ -907,7 +919,6 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string, defer idxMux.RUnlock() matchingSessions := make(map[string]utils.StringMap) checkNr := 0 - getMatchingIndexes := func(fltrName string, values []string) (matchingSessionsbyValue map[string]utils.StringMap) { matchingSessionsbyValue = make(map[string]utils.StringMap) fltrName = strings.TrimPrefix(fltrName, utils.DynamicDataPrefix) @@ -961,6 +972,8 @@ func (sS *SessionS) getSessionIDsMatchingIndexes(fltrs map[string][]string, return cgrIDs, matchingSessions } +// filterSessions will return a list of sessions in external format based on filters passed +// is thread safe for the Sessions func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*ExternalSession) { if len(sf.Filters) == 0 { ss := sS.getSessions(utils.EmptyString, psv) @@ -1021,14 +1034,13 @@ func (sS *SessionS) filterSessions(sf *utils.SessionFilter, psv bool) (aSs []*Ex return } +// filterSessionsCount re func (sS *SessionS) filterSessionsCount(sf *utils.SessionFilter, psv bool) (count int) { count = 0 if len(sf.Filters) == 0 { ss := sS.getSessions(utils.EmptyString, psv) for _, s := range ss { - s.RLock() count += len(s.SRuns) - s.RUnlock() } return } @@ -1086,7 +1098,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { CGREvent: &utils.CGREvent{ Tenant: s.Tenant, ID: utils.UUIDSha1Prefix(), - Event: s.EventStart.AsMapInterface(), + Event: s.EventStart, }, ArgDispatcher: s.ArgDispatcher, } @@ -1100,7 +1112,7 @@ func (sS *SessionS) forkSession(s *Session) (err error) { } s.SRuns = make([]*SRun, len(chrgrs)) for i, chrgr := range chrgrs { - me := engine.NewMapEvent(chrgr.CGREvent.Event) + me := engine.MapEvent(chrgr.CGREvent.Event).Clone() startTime := me.GetTimeIgnoreErrors(utils.AnswerTime, sS.cgrCfg.GeneralCfg().DefaultTimezone) if startTime.IsZero() { // AnswerTime not parsable, try SetupTime @@ -1189,61 +1201,62 @@ func (sS *SessionS) getSessionsFromCGRIDs(pSessions bool, cgrIDs ...string) (ss } // transitSState will transit the sessions from one state (active/passive) to another (passive/active) -func (sS *SessionS) transitSState(cgrID string, psv bool) (ss []*Session) { - ss = sS.getSessions(cgrID, !psv) - for _, s := range ss { - s.RLock() // protect the sTerminator - sS.unregisterSession(cgrID, !psv) - s.RUnlock() - sS.registerSession(s, psv) - // ToDo: activate prepaid debits +func (sS *SessionS) transitSState(cgrID string, psv bool) (s *Session) { + ss := sS.getSessions(cgrID, !psv) + if len(ss) == 0 { + return } + s = ss[0] + sS.unregisterSession(cgrID, !psv) + sS.registerSession(s, psv) + // ToDo: activate prepaid debits return } -// getActivateSessions returns the sessions from active list or moves from passive -func (sS *SessionS) getActivateSessions(cgrID string) (ss []*Session) { - ss = sS.getSessions(cgrID, false) - if len(ss) == 0 { - ss = sS.transitSState(cgrID, false) +// getActivateSession returns the session from active list or moves from passive +func (sS *SessionS) getActivateSession(cgrID string) (s *Session) { + ss := sS.getSessions(cgrID, false) + if len(ss) != 0 { + return ss[0] } - return + return sS.transitSState(cgrID, false) } // relocateSession will change the CGRID of a session (ie: prefix based session group) -func (sS *SessionS) relocateSessions(initOriginID, originID, originHost string) (ss []*Session) { +func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) (s *Session) { if initOriginID == "" { return } initCGRID := utils.Sha1(initOriginID, originHost) newCGRID := utils.Sha1(originID, originHost) - ss = sS.getActivateSessions(initCGRID) - for _, s := range ss { - s.Lock() - sS.unregisterSession(s.CGRID, false) - s.CGRID = newCGRID - // Overwrite initial CGRID with new one - s.EventStart.Set(utils.CGRID, newCGRID) // Overwrite CGRID for final CDR - s.EventStart.Set(utils.OriginID, originID) // Overwrite OriginID for session indexing - for _, sRun := range s.SRuns { - sRun.Event[utils.CGRID] = newCGRID // needed for CDR generation - sRun.Event[utils.OriginID] = originID - } - s.Unlock() - sS.registerSession(s, false) - sS.replicateSessions(initCGRID, false, sS.sReplConns) + s = sS.getActivateSession(initCGRID) + if s == nil { + return } + sS.unregisterSession(s.CGRID, false) + s.Lock() + s.CGRID = newCGRID + // Overwrite initial CGRID with new one + s.EventStart[utils.CGRID] = newCGRID // Overwrite CGRID for final CDR + s.EventStart[utils.OriginID] = originID // Overwrite OriginID for session indexing + for _, sRun := range s.SRuns { + sRun.Event[utils.CGRID] = newCGRID // needed for CDR generation + sRun.Event[utils.OriginID] = originID + } + s.Unlock() + sS.registerSession(s, false) + sS.replicateSessions(initCGRID, false, sS.sReplConns) return } -// getRelocateSessions will relocate a session if it cannot find cgrID and initialOriginID is present -func (sS *SessionS) getRelocateSessions(cgrID string, initOriginID, - originID, originHost string) (ss []*Session) { - if ss = sS.getActivateSessions(cgrID); len(ss) != 0 || +// getRelocateSession will relocate a session if it cannot find cgrID and initialOriginID is present +func (sS *SessionS) getRelocateSession(cgrID string, initOriginID, + originID, originHost string) (s *Session) { + if s = sS.getActivateSession(cgrID); s == nil || initOriginID == "" { return } - return sS.relocateSessions(initOriginID, originID, originHost) + return sS.relocateSession(initOriginID, originID, originHost) } // syncSessions synchronizes the active sessions with the one in the clients @@ -1268,7 +1281,7 @@ func (sS *SessionS) syncSessions() { case err = <-errChan: if err != nil { utils.Logger.Warning( - fmt.Sprintf("<%s> error quering session ids : %+v", utils.SessionS, err)) + fmt.Sprintf("<%s> error <%s> quering session ids", utils.SessionS, err.Error())) } case <-time.After(sS.cgrCfg.GeneralCfg().ReplyTimeout): utils.Logger.Warning( @@ -1290,7 +1303,9 @@ func (sS *SessionS) syncSessions() { continue } if err := sS.forceSTerminate(ss[0], 0, nil); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", utils.SessionS, cgrID, err)) + utils.Logger.Warning( + fmt.Sprintf("<%s> failed force-terminating session: <%s>, err: <%s>", + utils.SessionS, cgrID, err.Error())) } } } @@ -1311,18 +1326,17 @@ func (sS *SessionS) initSessionDebitLoops(s *Session) { } } -// authSession calculates maximum usage allowed for given session -func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage time.Duration, err error) { +// authEvent calculates maximum usage allowed for the given event +func (sS *SessionS) authEvent(tnt string, evStart engine.MapEvent) (maxUsage time.Duration, err error) { cgrID := GetSetCGRID(evStart) var eventUsage time.Duration - if eventUsage, err = evStart.GetDuration(utils.Usage); err != nil { if err != utils.ErrNotFound { return } - evStart.Set(utils.Usage, sS.cgrCfg.SessionSCfg().MaxCallDuration) // will be used in CD - eventUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration err = nil + evStart[utils.Usage] = sS.cgrCfg.SessionSCfg().MaxCallDuration // will be used in CD + eventUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration } s := &Session{ CGRID: cgrID, @@ -1330,32 +1344,29 @@ func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage EventStart: evStart, } //check if we have APIKey in event and in case it has add it in ArgDispatcher - apiKeyIface, errApiKey := evStart.FieldAsString([]string{utils.MetaApiKey}) - if errApiKey == nil { + apiKey, errAPIKey := evStart.GetString(utils.MetaApiKey) + if errAPIKey == nil { s.ArgDispatcher = &utils.ArgDispatcher{ - APIKey: utils.StringPointer(apiKeyIface), + APIKey: utils.StringPointer(apiKey), } } //check if we have RouteID in event and in case it has add it in ArgDispatcher - routeIDIface, errRouteID := evStart.FieldAsString([]string{utils.MetaRouteID}) - if errRouteID == nil { - if errApiKey.Error() == utils.ErrNotFound.Error() { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + if routeID, err := evStart.GetString(utils.MetaRouteID); err == nil { + if errAPIKey == utils.ErrNotFound { //in case we don't have APIKey, but we have RouteID we need to initialize the struct s.ArgDispatcher = &utils.ArgDispatcher{ - RouteID: utils.StringPointer(routeIDIface), + RouteID: utils.StringPointer(routeID), } } else { - s.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface) + s.ArgDispatcher.RouteID = utils.StringPointer(routeID) } } if err = sS.forkSession(s); err != nil { return } - var maxUsageSet bool // so we know if we have set the 0 on purpose - prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} for _, sr := range s.SRuns { var rplyMaxUsage time.Duration - if !utils.IsSliceMember(prepaidReqs, + if !authReqs.HasField( sr.Event.GetStringIgnoreErrors(utils.RequestType)) { rplyMaxUsage = eventUsage } else if err = sS.ralS.Call(utils.ResponderGetMaxSessionTime, @@ -1374,7 +1385,8 @@ func (sS *SessionS) authSession(tnt string, evStart *engine.SafEvent) (maxUsage } // initSession handles a new session -func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID string, +// not thread-safe for Session since it is constructed here +func (sS *SessionS) initSession(tnt string, evStart engine.MapEvent, clntConnID string, resID string, dbtItval time.Duration, argDisp *utils.ArgDispatcher) (s *Session, err error) { cgrID := GetSetCGRID(evStart) s = &Session{ @@ -1397,25 +1409,27 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID // updateSession will reset terminator, perform debits and replicate sessions func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage time.Duration, err error) { defer sS.replicateSessions(s.CGRID, false, sS.sReplConns) + s.Lock() + defer s.Unlock() // update fields from new event for k, v := range updtEv { if protectedSFlds.HasField(k) { continue } - s.EventStart.Set(k, v) // update previoius field with new one + s.EventStart[k] = v // update previoius field with new one } sS.setSTerminator(s) // reset the terminator //init has no updtEv if updtEv == nil { - updtEv = engine.NewMapEvent(s.EventStart.AsMapInterface()) + updtEv = engine.NewMapEvent(s.EventStart.Clone()) } var reqMaxUsage time.Duration if reqMaxUsage, err = updtEv.GetDuration(utils.Usage); err != nil { if err != utils.ErrNotFound { return } - reqMaxUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration err = nil + reqMaxUsage = sS.cgrCfg.SessionSCfg().MaxCallDuration } var maxUsageSet bool // so we know if we have set the 0 on purpose prepaidReqs := []string{utils.META_PREPAID, utils.META_PSEUDOPREPAID} @@ -1435,7 +1449,6 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage maxUsageSet = true } } - return } @@ -1443,7 +1456,6 @@ func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTime *time.Time) (err error) { //check if we have replicate connection and close the session there defer sS.replicateSessions(s.CGRID, true, sS.sReplConns) - s.Lock() // no need to release it untill end since the session should be anyway closed sS.unregisterSession(s.CGRID, false) for sRunIdx, sr := range s.SRuns { @@ -1492,7 +1504,7 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTi } // Set Usage field if sRunIdx == 0 { - s.EventStart.Set(utils.Usage, sr.TotalUsage) + s.EventStart[utils.Usage] = sr.TotalUsage } sr.Event[utils.Usage] = sr.TotalUsage if aTime != nil { @@ -1514,13 +1526,14 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, aTi } // chargeEvent will charge a single event (ie: SMS) -func (sS *SessionS) chargeEvent(tnt string, ev *engine.SafEvent, argDisp *utils.ArgDispatcher) (maxUsage time.Duration, err error) { +func (sS *SessionS) chargeEvent(tnt string, ev engine.MapEvent, + argDisp *utils.ArgDispatcher) (maxUsage time.Duration, err error) { cgrID := GetSetCGRID(ev) var s *Session if s, err = sS.initSession(tnt, ev, "", "", 0, argDisp); err != nil { return } - if maxUsage, err = sS.updateSession(s, ev.AsMapInterface()); err != nil { + if maxUsage, err = sS.updateSession(s, ev.Clone()); err != nil { if errEnd := sS.endSession(s, utils.DurationPointer(time.Duration(0)), nil, nil); errEnd != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error when force-ending charged event: <%s>, err: <%s>", @@ -1660,16 +1673,15 @@ func (sS *SessionS) BiRPCv1SetPassiveSession(clnt rpcclient.RpcClientConnection, return } } else { + s.Lock() //if we have an active session with the same CGRID //we unregister it first then regiser the new one - s.Lock() if len(sS.getSessions(s.CGRID, false)) != 0 { sS.unregisterSession(s.CGRID, false) } - sS.initSessionDebitLoops(s) - s.Unlock() sS.registerSession(s, true) + s.Unlock() } *reply = utils.OK return @@ -1886,8 +1898,8 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection, } } if args.GetMaxUsage { - maxUsage, err := sS.authSession(args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event)) + maxUsage, err := sS.authEvent(args.CGREvent.Tenant, + args.CGREvent.Event) if err != nil { return utils.NewErrRALs(err) } @@ -2173,7 +2185,7 @@ func (sS *SessionS) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection, } if args.InitSession { var err error - ev := engine.NewSafEvent(args.CGREvent.Event) + ev := engine.MapEvent(args.CGREvent.Event) dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { @@ -2374,23 +2386,21 @@ func (sS *SessionS) BiRPCv1UpdateSession(clnt rpcclient.RpcClientConnection, return utils.NewErrRALs(err) } } - ev := engine.NewSafEvent(args.CGREvent.Event) + ev := engine.MapEvent(args.CGREvent.Event) cgrID := GetSetCGRID(ev) - ss := sS.getRelocateSessions(cgrID, + s := sS.getRelocateSession(cgrID, me.GetStringIgnoreErrors(utils.InitialOriginID), me.GetStringIgnoreErrors(utils.OriginID), me.GetStringIgnoreErrors(utils.OriginHost)) - var s *Session - if len(ss) == 0 { + if s == nil { if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), - me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { + me.GetStringIgnoreErrors(utils.OriginID), + dbtItvl, args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } - } else { - s = ss[0] } - if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil { + if maxUsage, err := sS.updateSession(s, ev.Clone()); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -2487,10 +2497,9 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, if args.CGREvent.Tenant == "" { args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } - ev := engine.NewSafEvent(args.CGREvent.Event) - me := engine.NewMapEvent(args.CGREvent.Event) // used for easy access to fields within the event + ev := engine.MapEvent(args.CGREvent.Event) cgrID := GetSetCGRID(ev) - originID := me.GetStringIgnoreErrors(utils.OriginID) + originID := ev.GetStringIgnoreErrors(utils.OriginID) if args.TerminateSession { if originID == "" { return utils.NewErrMandatoryIeMissing(utils.OriginID) @@ -2504,12 +2513,10 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, var s *Session fib := utils.Fib() for i := 0; i < sS.cgrCfg.SessionSCfg().TerminateAttempts; i++ { - ss := sS.getRelocateSessions(cgrID, - me.GetStringIgnoreErrors(utils.InitialOriginID), - me.GetStringIgnoreErrors(utils.OriginID), - me.GetStringIgnoreErrors(utils.OriginHost)) - if len(ss) != 0 { - s = ss[0] + if s = sS.getRelocateSession(cgrID, + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)); s != nil { break } if i+1 < sS.cgrCfg.SessionSCfg().TerminateAttempts { // not last iteration @@ -2518,14 +2525,16 @@ func (sS *SessionS) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection, } if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), - me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { + ev.GetStringIgnoreErrors(utils.OriginID), dbtItvl, + args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } } if err = sS.endSession(s, - me.GetDurationPtrIgnoreErrors(utils.Usage), - me.GetDurationPtrIgnoreErrors(utils.LastUsed), - utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil { + ev.GetDurationPtrIgnoreErrors(utils.Usage), + ev.GetDurationPtrIgnoreErrors(utils.LastUsed), + utils.TimePointer(ev.GetTimeIgnoreErrors(utils.AnswerTime, + utils.EmptyString))); err != nil { return utils.NewErrRALs(err) } } @@ -2608,24 +2617,20 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS } - ev := engine.NewSafEvent(cgrEvWithArgDisp.Event) + ev := engine.MapEvent(cgrEvWithArgDisp.Event) cgrID := GetSetCGRID(ev) - ss := sS.getRelocateSessions(cgrID, + s := sS.getRelocateSession(cgrID, ev.GetStringIgnoreErrors(utils.InitialOriginID), ev.GetStringIgnoreErrors(utils.OriginID), ev.GetStringIgnoreErrors(utils.OriginHost)) - var s *Session - if len(ss) != 0 { + if s != nil { utils.Logger.Warning( fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", utils.SessionS, cgrID)) - s = ss[0] - } else { // try retrieving from closed_sessions within cache - if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { - s = sIface.(*Session) - } - } - if s == nil { // no cached session, CDR will be handled by CDRs + } else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { + // found in cache + s = sIface.(*Session) + } else { // no cached session, CDR will be handled by CDRs return sS.cdrS.Call(utils.CDRsV1ProcessEvent, &engine.ArgV1ProcessEvent{ CGREvent: *cgrEvWithArgDisp.CGREvent, @@ -2633,13 +2638,12 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, } // Use previously stored Session to generate CDRs - // update stored event with fields out of CDR - for k, v := range ev.Me { + for k, v := range ev { if protectedSFlds.HasField(k) { continue } - s.EventStart.Set(k, v) // update previoius field with new one + s.EventStart[k] = v // update previoius field with new one } // create one CGREvent for each session run plus *raw one var cgrEvs []*utils.CGREvent @@ -2880,7 +2884,7 @@ func (sS *SessionS) BiRPCv1ProcessMessage(clnt rpcclient.RpcClientConnection, } if args.Debit { if maxUsage, err := sS.chargeEvent(args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event), args.ArgDispatcher); err != nil { + engine.MapEvent(args.CGREvent.Event), args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -2998,9 +3002,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, if args.CGREvent.Tenant == "" { args.CGREvent.Tenant = sS.cgrCfg.GeneralCfg().DefaultTenant } - me := engine.NewMapEvent(args.CGREvent.Event) - ev := engine.NewSafEvent(args.CGREvent.Event) - originID := me.GetStringIgnoreErrors(utils.OriginID) + ev := engine.MapEvent(args.CGREvent.Event) + originID := ev.GetStringIgnoreErrors(utils.OriginID) dbtItvl := sS.cgrCfg.SessionSCfg().DebitInterval //convert from Flags []string to utils.FlagsWithParams @@ -3078,8 +3081,8 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, switch { //check for auth session case ralsFlagsWithParams.HasKey(utils.MetaAuth): - maxUsage, err := sS.authSession(args.CGREvent.Tenant, - engine.NewSafEvent(args.CGREvent.Event)) + maxUsage, err := sS.authEvent(args.CGREvent.Tenant, + engine.MapEvent(args.CGREvent.Event)) if err != nil { return utils.NewErrRALs(err) } @@ -3107,28 +3110,25 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, } //check for update session case ralsFlagsWithParams.HasKey(utils.MetaUpdate): - if me.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval - if dbtItvl, err = me.GetDuration(utils.CGRDebitInterval); err != nil { + if ev.HasField(utils.CGRDebitInterval) { // dynamic DebitInterval via CGRDebitInterval + if dbtItvl, err = ev.GetDuration(utils.CGRDebitInterval); err != nil { return utils.NewErrRALs(err) } } - ev := engine.NewSafEvent(args.CGREvent.Event) + ev := engine.MapEvent(args.CGREvent.Event) cgrID := GetSetCGRID(ev) - ss := sS.getRelocateSessions(cgrID, - me.GetStringIgnoreErrors(utils.InitialOriginID), - me.GetStringIgnoreErrors(utils.OriginID), - me.GetStringIgnoreErrors(utils.OriginHost)) - var s *Session - if len(ss) == 0 { + s := sS.getRelocateSession(cgrID, + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) + if s == nil { if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), - me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { + ev.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } - } else { - s = ss[0] } - if maxUsage, err := sS.updateSession(s, ev.AsMapInterface()); err != nil { + if maxUsage, err := sS.updateSession(s, ev); err != nil { return utils.NewErrRALs(err) } else { rply.MaxUsage = &maxUsage @@ -3141,24 +3141,23 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.RpcClientConnection, } } cgrID := GetSetCGRID(ev) - ss := sS.getRelocateSessions(cgrID, - me.GetStringIgnoreErrors(utils.InitialOriginID), - me.GetStringIgnoreErrors(utils.OriginID), - me.GetStringIgnoreErrors(utils.OriginHost)) - var s *Session - if len(ss) == 0 { + s := sS.getRelocateSession(cgrID, + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) + if s == nil { if s, err = sS.initSession(args.CGREvent.Tenant, ev, sS.biJClntID(clnt), - me.GetStringIgnoreErrors(utils.OriginID), dbtItvl, args.ArgDispatcher); err != nil { + ev.GetStringIgnoreErrors(utils.OriginID), dbtItvl, + args.ArgDispatcher); err != nil { return utils.NewErrRALs(err) } - } else { - s = ss[0] } if err = sS.endSession(s, - me.GetDurationPtrIgnoreErrors(utils.Usage), - me.GetDurationPtrIgnoreErrors(utils.LastUsed), - utils.TimePointer(me.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil { + ev.GetDurationPtrIgnoreErrors(utils.Usage), + ev.GetDurationPtrIgnoreErrors(utils.LastUsed), + utils.TimePointer( + ev.GetTimeIgnoreErrors(utils.AnswerTime, utils.EmptyString))); err != nil { return utils.NewErrRALs(err) } }