SessionS ProcessCDR with cached closed session

This commit is contained in:
DanB
2019-03-21 20:18:21 +01:00
parent b50e07e2a3
commit 94db8add70
3 changed files with 75 additions and 32 deletions

View File

@@ -26,6 +26,13 @@ import (
"github.com/cgrates/cgrates/utils"
)
var protectedSFlds = engine.MapEvent{
utils.CGRID: struct{}{},
utils.OriginHost: struct{}{},
utils.OriginID: struct{}{},
utils.Usage: struct{}{},
}
// SessionSClient is the interface implemented by Agents which are able to
// communicate bidirectionally with SessionS and remote Communication Switch
type SessionSClient interface {

View File

@@ -202,10 +202,12 @@ func (s *Session) TotalUsage() (tDur time.Duration) {
// AsCGREvents is not thread safe since it is supposed to run by the time Session is closed
func (s *Session) asCGREvents() (cgrEvs []*utils.CGREvent, err error) {
cgrEvs = make([]*utils.CGREvent, len(s.SRuns)+1) // so we can gather all cdr info while under lock
rawEv := s.EventStart.MapEvent()
rawEv[utils.RunID] = utils.MetaRaw
cgrEvs[0] = &utils.CGREvent{
Tenant: s.Tenant,
ID: utils.UUIDSha1Prefix(),
Event: s.EventStart.MapEvent(),
Event: rawEv,
}
for i, sr := range s.SRuns {
cgrEvs[i+1] = &utils.CGREvent{

View File

@@ -380,11 +380,6 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
Tenant: s.Tenant,
Event: s.EventStart.AsMapInterface(),
}
for _, sr := range s.SRuns {
cgrEv.Event[utils.Usage] = sr.TotalUsage
break
}
// post the CDRs
if sS.cdrS != nil {
if cgrEvs, err := s.asCGREvents(); err != nil {
@@ -396,7 +391,9 @@ func (sS *SessionS) forceSTerminate(s *Session, extraDebit time.Duration, lastUs
var reply string
for _, cgrEv := range cgrEvs {
if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR,
&engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, &reply); err != nil {
&engine.ArgV2ProcessCDR{CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false)}, &reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf(
"<%s> could not post CDR for event %s, err: %s",
@@ -654,10 +651,8 @@ func (sS *SessionS) disconnectSession(s *Session, rsn string) (err error) {
if clnt == nil {
return fmt.Errorf("calling %s requires bidirectional JSON connection", utils.SessionSv1DisconnectSession)
}
s.RLock() // for reading the TotalUsage
s.EventStart.Set(utils.Usage, s.TotalUsage()) // Set the usage to total one debitted
sEv := s.EventStart.AsMapInterface()
s.RUnlock()
servMethod := utils.SessionSv1DisconnectSession
if clnt.proto == 0 { // compatibility with OpenSIPS 2.3
servMethod = "SMGClientV1.DisconnectSession"
@@ -1209,14 +1204,8 @@ func (sS *SessionS) initSession(tnt string, evStart *engine.SafEvent, clntConnID
func (sS *SessionS) updateSession(s *Session, updtEv engine.MapEvent) (maxUsage time.Duration, err error) {
defer sS.replicateSessions(s.CGRID, false, sS.sReplConns)
// update fields from new event
protectedFlds := engine.MapEvent{
utils.CGRID: struct{}{},
utils.OriginHost: struct{}{},
utils.OriginID: struct{}{},
utils.Usage: struct{}{},
}
for k, v := range updtEv {
if protectedFlds.HasField(k) {
if protectedSFlds.HasField(k) {
continue
}
s.EventStart.Set(k, v) // update previoius field with new one
@@ -1298,13 +1287,20 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er
"<%s> failed refunding session: <%s>, srIdx: <%d>, error: <%s>",
utils.SessionS, s.CGRID, sRunIdx, err.Error()))
}
// FixMe: make sure refund is reflected inside EventCost
}
sr.Event[utils.Cost] = sr.EventCost.GetCost()
sr.Event[utils.Usage] = sr.EventCost.GetUsage()
}
if sS.cgrCfg.SessionSCfg().StoreSCosts {
if err := sS.storeSCost(s, sRunIdx); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing session cost for <%s>, srIdx: <%d>, error: <%s>",
utils.SessionS, s.CGRID, sRunIdx, err.Error()))
}
}
if err := sS.storeSCost(s, sRunIdx); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> failed storing session cost for <%s>, srIdx: <%d>, error: <%s>",
utils.SessionS, s.CGRID, sRunIdx, err.Error()))
}
engine.Cache.Set(utils.CacheClosedSessions, s.CGRID, s,
nil, true, utils.NonTransactional)
}
s.Unlock()
return
@@ -1333,16 +1329,6 @@ func (sS *SessionS) chargeEvent(tnt string, ev *engine.SafEvent) (maxUsage time.
return // returns here the maxUsage from update
}
func (sS *SessionS) processCDR(tnt string, ev *engine.SafEvent) (err error) {
cgrEv := &utils.CGREvent{
Tenant: tnt,
ID: utils.UUIDSha1Prefix(),
Event: ev.AsMapInterface(),
}
var reply string
return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, &reply)
}
// APIs start here
// Call is part of RpcClientConnection interface
@@ -2296,7 +2282,55 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
}
// end of RPC caching
return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply)
ev := engine.NewSafEvent(cgrEv.Event)
cgrID := GetSetCGRID(ev)
ss := sS.getRelocateSessions(cgrID,
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
var s *Session
if len(ss) != 0 {
utils.Logger.Warning(
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
s = ss[0]
} else {
if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
s = sIface.(*Session)
}
}
if s == nil { // no cached session, CDR will be handled by CDRs
return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply)
}
// Use previously stored Session to generate CDRs
// update stored event with fields out of CDR
for k, v := range ev.Me {
if protectedSFlds.HasField(k) {
continue
}
s.EventStart.Set(k, v) // update previoius field with new one
}
var cgrEvs []*utils.CGREvent
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)
}
var withErrors bool
for _, cgrEv := range cgrEvs {
if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR,
&engine.ArgV2ProcessCDR{CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false)}, rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>",
utils.SessionS, err.Error(), cgrID))
withErrors = true
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
return
}
// NewV1ProcessEventArgs is a constructor for EventArgs used by ProcessEvent