Uniformize the handlins of CDR between SessionSv1.ProcessCDR and SessionSv1.ProcessEvent

This commit is contained in:
TeoV
2020-07-22 17:32:24 +03:00
committed by Dan Christian Bogos
parent 68b9f5e720
commit a2e531c58d

View File

@@ -2696,57 +2696,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector,
cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS
}
ev := engine.MapEvent(cgrEvWithArgDisp.Event)
cgrID := GetSetCGRID(ev)
s := sS.getRelocateSession(cgrID,
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
if s != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
} else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
// found in cache
s = sIface.(*Session)
} else { // no cached session, CDR will be handled by CDRs
return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{
Flags: []string{utils.MetaRALs},
CGREvent: *cgrEvWithArgDisp.CGREvent,
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher}, rply)
}
// Use previously stored Session to generate CDRs
s.updateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields)
// create one CGREvent for each session run
var cgrEvs []*utils.CGREvent
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)
}
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers),
fmt.Sprintf("%s:false", utils.MetaAttributes)},
CGREvent: *cgrEv,
ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher,
}
if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types
argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs))
}
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
argsProc, rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>",
utils.SessionS, err.Error(), cgrID))
withErrors = true
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
return
return sS.processCDR(cgrEvWithArgDisp.CGREvent, cgrEvWithArgDisp.ArgDispatcher, []string{utils.MetaRALs}, rply)
}
// NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage
@@ -3509,16 +3459,17 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector,
}
if argsFlagsWithParams.GetBool(utils.MetaCDRs) {
var cdrRply string
if len(sS.cgrCfg.SessionSCfg().CDRsConns) == 0 {
return utils.NewErrNotConnected(utils.CDRs)
}
flgs := argsFlagsWithParams[utils.MetaCDRs].SliceFlags()
var cdrRply string
for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) {
if err := sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{
Flags: flgs,
CGREvent: *cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
}, &cdrRply); err != nil {
return err
if err := sS.processCDR(cgrEv.CGREvent, cgrEv.ArgDispatcher, flgs, &cdrRply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with CDRs.",
utils.SessionS, err.Error(), cgrEv.CGREvent))
withErrors = true
}
}
}
@@ -3730,6 +3681,62 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt rpcclient.ClientConnector,
return
}
func (sS *SessionS) processCDR(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, flags []string, rply *string) (err error) {
ev := engine.MapEvent(cgrEv.Event)
cgrID := GetSetCGRID(ev)
s := sS.getRelocateSession(cgrID,
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost))
if s != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
} else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
// found in cache
s = sIface.(*Session)
} else { // no cached session, CDR will be handled by CDRs
return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
&engine.ArgV1ProcessEvent{
Flags: flags,
CGREvent: *cgrEv,
ArgDispatcher: argDisp}, rply)
}
// Use previously stored Session to generate CDRs
s.updateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields)
// create one CGREvent for each session run
var cgrEvs []*utils.CGREvent
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)
}
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV1ProcessEvent{
Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers),
fmt.Sprintf("%s:false", utils.MetaAttributes)},
CGREvent: *cgrEv,
ArgDispatcher: argDisp,
}
if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types
argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs))
}
if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent,
argsProc, rply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>",
utils.SessionS, err.Error(), cgrID))
withErrors = true
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
return
}
// processThreshold will receive the event and send it to ThresholdS to be processed
func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, thIDs []string) (tIDs []string, err error) {
if len(sS.cgrCfg.SessionSCfg().ThreshSConns) == 0 {