diff --git a/sessions/sessions.go b/sessions/sessions.go index 20cdfbf7d..802a44951 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2696,57 +2696,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.ClientConnector, cgrEvWithArgDisp.Event[utils.Source] = utils.MetaSessionS } - ev := engine.MapEvent(cgrEvWithArgDisp.Event) - cgrID := GetSetCGRID(ev) - s := sS.getRelocateSession(cgrID, - ev.GetStringIgnoreErrors(utils.InitialOriginID), - ev.GetStringIgnoreErrors(utils.OriginID), - ev.GetStringIgnoreErrors(utils.OriginHost)) - if s != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", - utils.SessionS, cgrID)) - } else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { - // found in cache - s = sIface.(*Session) - } else { // no cached session, CDR will be handled by CDRs - return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, - &engine.ArgV1ProcessEvent{ - Flags: []string{utils.MetaRALs}, - CGREvent: *cgrEvWithArgDisp.CGREvent, - ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher}, rply) - } - - // Use previously stored Session to generate CDRs - s.updateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields) - // create one CGREvent for each session run - var cgrEvs []*utils.CGREvent - if cgrEvs, err = s.asCGREvents(); err != nil { - return utils.NewErrServerError(err) - } - var withErrors bool - for _, cgrEv := range cgrEvs { - argsProc := &engine.ArgV1ProcessEvent{ - Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers), - fmt.Sprintf("%s:false", utils.MetaAttributes)}, - CGREvent: *cgrEv, - ArgDispatcher: cgrEvWithArgDisp.ArgDispatcher, - } - if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types - argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs)) - } - if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, - argsProc, rply); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>", - utils.SessionS, err.Error(), cgrID)) - withErrors = true - } - } - if withErrors { - err = utils.ErrPartiallyExecuted - } - return + return sS.processCDR(cgrEvWithArgDisp.CGREvent, cgrEvWithArgDisp.ArgDispatcher, []string{utils.MetaRALs}, rply) } // NewV1ProcessMessageArgs is a constructor for MessageArgs used by ProcessMessage @@ -3509,16 +3459,17 @@ func (sS *SessionS) BiRPCv1ProcessEvent(clnt rpcclient.ClientConnector, } if argsFlagsWithParams.GetBool(utils.MetaCDRs) { - var cdrRply string + if len(sS.cgrCfg.SessionSCfg().CDRsConns) == 0 { + return utils.NewErrNotConnected(utils.CDRs) + } flgs := argsFlagsWithParams[utils.MetaCDRs].SliceFlags() + var cdrRply string for _, cgrEv := range getDerivedEvents(events, argsFlagsWithParams[utils.MetaCDRs].Has(utils.MetaDerivedReply)) { - if err := sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, - &engine.ArgV1ProcessEvent{ - Flags: flgs, - CGREvent: *cgrEv.CGREvent, - ArgDispatcher: cgrEv.ArgDispatcher, - }, &cdrRply); err != nil { - return err + if err := sS.processCDR(cgrEv.CGREvent, cgrEv.ArgDispatcher, flgs, &cdrRply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> processing event %+v with CDRs.", + utils.SessionS, err.Error(), cgrEv.CGREvent)) + withErrors = true } } } @@ -3730,6 +3681,62 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(clnt rpcclient.ClientConnector, return } +func (sS *SessionS) processCDR(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, flags []string, rply *string) (err error) { + + ev := engine.MapEvent(cgrEv.Event) + cgrID := GetSetCGRID(ev) + s := sS.getRelocateSession(cgrID, + ev.GetStringIgnoreErrors(utils.InitialOriginID), + ev.GetStringIgnoreErrors(utils.OriginID), + ev.GetStringIgnoreErrors(utils.OriginHost)) + if s != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", + utils.SessionS, cgrID)) + } else if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { + // found in cache + s = sIface.(*Session) + } else { // no cached session, CDR will be handled by CDRs + return sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, + &engine.ArgV1ProcessEvent{ + Flags: flags, + CGREvent: *cgrEv, + ArgDispatcher: argDisp}, rply) + } + + // Use previously stored Session to generate CDRs + s.updateSRuns(ev, sS.cgrCfg.SessionSCfg().AlterableFields) + // create one CGREvent for each session run + var cgrEvs []*utils.CGREvent + if cgrEvs, err = s.asCGREvents(); err != nil { + return utils.NewErrServerError(err) + } + var withErrors bool + for _, cgrEv := range cgrEvs { + argsProc := &engine.ArgV1ProcessEvent{ + Flags: []string{fmt.Sprintf("%s:false", utils.MetaChargers), + fmt.Sprintf("%s:false", utils.MetaAttributes)}, + CGREvent: *cgrEv, + ArgDispatcher: argDisp, + } + if mp := engine.MapEvent(cgrEv.Event); unratedReqs.HasField(mp.GetStringIgnoreErrors(utils.RequestType)) { // order additional rating for unrated request types + argsProc.Flags = append(argsProc.Flags, fmt.Sprintf("%s:true", utils.MetaRALs)) + } + if err = sS.connMgr.Call(sS.cgrCfg.SessionSCfg().CDRsConns, nil, utils.CDRsV1ProcessEvent, + argsProc, rply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error <%s> posting CDR with CGRID: <%s>", + utils.SessionS, err.Error(), cgrID)) + withErrors = true + } + } + if withErrors { + err = utils.ErrPartiallyExecuted + } + return + +} + // processThreshold will receive the event and send it to ThresholdS to be processed func (sS *SessionS) processThreshold(cgrEv *utils.CGREvent, argDisp *utils.ArgDispatcher, thIDs []string) (tIDs []string, err error) { if len(sS.cgrCfg.SessionSCfg().ThreshSConns) == 0 {