diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 9e87315c2..4362fcdab 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -26,6 +26,13 @@ import ( "github.com/cgrates/cgrates/utils" ) +var protectedSFlds = engine.MapEvent{ + utils.CGRID: struct{}{}, + utils.OriginHost: struct{}{}, + utils.OriginID: struct{}{}, + utils.Usage: struct{}{}, +} + // SessionSClient is the interface implemented by Agents which are able to // communicate bidirectionally with SessionS and remote Communication Switch type SessionSClient interface { diff --git a/sessions/session.go b/sessions/session.go index f866c4a75..bfb09fdfb 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -202,10 +202,12 @@ func (s *Session) TotalUsage() (tDur time.Duration) { // AsCGREvents is not thread safe since it is supposed to run by the time Session is closed func (s *Session) asCGREvents() (cgrEvs []*utils.CGREvent, err error) { cgrEvs = make([]*utils.CGREvent, len(s.SRuns)+1) // so we can gather all cdr info while under lock + rawEv := s.EventStart.MapEvent() + rawEv[utils.RunID] = utils.MetaRaw cgrEvs[0] = &utils.CGREvent{ Tenant: s.Tenant, ID: utils.UUIDSha1Prefix(), - Event: s.EventStart.MapEvent(), + Event: rawEv, } for i, sr := range s.SRuns { cgrEvs[i+1] = &utils.CGREvent{ diff --git a/sessions/sessions.go b/sessions/sessions.go index 42e2130ec..b34fa0d9f 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -380,11 +380,6 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs Tenant: s.Tenant, Event: s.EventStart.AsMapInterface(), } - for _, sr := range s.SRuns { - cgrEv.Event[utils.Usage] = sr.TotalUsage - break - } - // post the CDRs if sS.cdrS != nil { if cgrEvs, err := s.asCGREvents(); err != nil { @@ -396,7 +391,9 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs var reply string for _, cgrEv := range cgrEvs { if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR, - &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, &reply); err != nil { + &engine.ArgV2ProcessCDR{CGREvent: *cgrEv, + ChargerS: utils.BoolPointer(false), + AttributeS: utils.BoolPointer(false)}, &reply); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> could not post CDR for event %s, err: %s", @@ -654,10 +651,8 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) { if clnt == nil { return fmt.Errorf("calling %s requires bidirectional JSON connection", utils.SessionSv1DisconnectSession) } - s.RLock() // for reading the TotalUsage s.EventStart.Set(utils.Usage, s.TotalUsage()) // Set the usage to total one debitted sEv := s.EventStart.AsMapInterface() - s.RUnlock() servMethod := utils.SessionSv1DisconnectSession if clnt.proto == 0 { // compatibility with OpenSIPS 2.3 servMethod = "SMGClientV1.DisconnectSession" @@ -1209,14 +1204,8 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage time.Duration, err error) { defer sS.replicateSessions(s.CGRID, false, sS.sReplConns) // update fields from new event - protectedFlds := engine.MapEvent{ - utils.CGRID: struct{}{}, - utils.OriginHost: struct{}{}, - utils.OriginID: struct{}{}, - utils.Usage: struct{}{}, - } for k, v := range updtEv { - if protectedFlds.HasField(k) { + if protectedSFlds.HasField(k) { continue } s.EventStart.Set(k, v) // update previoius field with new one @@ -1298,13 +1287,20 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er "<%s> failed refunding session: <%s>, srIdx: <%d>, error: <%s>", utils.SessionS, s.CGRID, sRunIdx, err.Error())) } + // FixMe: make sure refund is reflected inside EventCost + } + sr.Event[utils.Cost] = sr.EventCost.GetCost() + sr.Event[utils.Usage] = sr.EventCost.GetUsage() + } + if sS.cgrCfg.SessionSCfg().StoreSCosts { + if err := sS.storeSCost(s, sRunIdx); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> failed storing session cost for <%s>, srIdx: <%d>, error: <%s>", + utils.SessionS, s.CGRID, sRunIdx, err.Error())) } } - if err := sS.storeSCost(s, sRunIdx); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> failed storing session cost for <%s>, srIdx: <%d>, error: <%s>", - utils.SessionS, s.CGRID, sRunIdx, err.Error())) - } + engine.Cache.Set(utils.CacheClosedSessions, s.CGRID, s, + nil, true, utils.NonTransactional) } s.Unlock() return @@ -1333,16 +1329,6 @@ func (sS *SessionS) chargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time. return // returns here the maxUsage from update } -func (sS *SessionS) processCDR(tnt string, ev *engine.SafEvent) (err error) { - cgrEv := &utils.CGREvent{ - Tenant: tnt, - ID: utils.UUIDSha1Prefix(), - Event: ev.AsMapInterface(), - } - var reply string - return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, &reply) -} - // APIs start here // Call is part of RpcClientConnection interface @@ -2296,7 +2282,55 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, } // end of RPC caching - return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply) + ev := engine.NewSafEvent(cgrEv.Event) + cgrID := GetSetCGRID(ev) + ss := sS.getRelocateSessions(cgrID, + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) + var s *Session + if len(ss) != 0 { + utils.Logger.Warning( + fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", + utils.SessionS, cgrID)) + s = ss[0] + } else { + if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { + s = sIface.(*Session) + } + } + if s == nil { // no cached session, CDR will be handled by CDRs + return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply) + } + // Use previously stored Session to generate CDRs + + // update stored event with fields out of CDR + for k, v := range ev.Me { + if protectedSFlds.HasField(k) { + continue + } + s.EventStart.Set(k, v) // update previoius field with new one + } + var cgrEvs []*utils.CGREvent + if cgrEvs, err = s.asCGREvents(); err != nil { + return utils.NewErrServerError(err) + } + var withErrors bool + for _, cgrEv := range cgrEvs { + if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR, + &engine.ArgV2ProcessCDR{CGREvent: *cgrEv, + ChargerS: utils.BoolPointer(false), + AttributeS: utils.BoolPointer(false)}, rply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>", + utils.SessionS, err.Error(), cgrID)) + withErrors = true + } + } + if withErrors { + err = utils.ErrPartiallyExecuted + } + return } // NewV1ProcessEventArgs is a constructor for EventArgs used by ProcessEvent