Updated CDRServer.chrgrSProcessEvent to return also the opts

This commit is contained in:
Trial97
2020-07-27 13:34:08 +03:00
committed by Dan Christian Bogos
parent 240ff4b671
commit 43f36fdea6
5 changed files with 24 additions and 25 deletions

View File

@@ -365,7 +365,7 @@ func testSSv1ItInitiateSession(t *testing.T) {
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation)
}
eAttrs := &engine.AttrSProcessEventReply{
Opts: map[string]interface{}{utils.Subsys: utils.MetaChargers},
Opts: map[string]interface{}{utils.Subsys: utils.MetaSessionS},
MatchedProfiles: []string{"ATTR_ACNT_1001"},
AlteredFields: []string{"*req.OfficeGroup"},
CGREvent: &utils.CGREvent{
@@ -777,7 +777,7 @@ func testSSv1ItForceUpdateSession(t *testing.T) {
t.Fatal(err)
}
eAttrs := &engine.AttrSProcessEventReply{
Opts: map[string]interface{}{utils.Subsys: utils.MetaChargers},
Opts: map[string]interface{}{utils.Subsys: utils.MetaSessionS},
MatchedProfiles: []string{"ATTR_ACNT_1001"},
AlteredFields: []string{"*req.OfficeGroup"},
CGREvent: &utils.CGREvent{
@@ -909,7 +909,6 @@ func testSSv1ItDynamicDebit(t *testing.T) {
GetAttributes: true,
Opts: map[string]interface{}{
utils.OptsDebitInterval: 30 * time.Millisecond,
utils.Subsys: utils.MetaChargers,
},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",

View File

@@ -323,7 +323,7 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (rfnd
}
// chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) {
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs []*utils.CGREventWithOpts, err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ChargerSConns, nil,
utils.ChargerSv1ProcessEvent,
@@ -333,11 +333,12 @@ func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs
if len(chrgrs) == 0 {
return
}
cgrEvs = make([]*utils.CGREventWithArgDispatcher, len(chrgrs))
cgrEvs = make([]*utils.CGREventWithOpts, len(chrgrs))
for i, cgrPrfl := range chrgrs {
cgrEvs[i] = &utils.CGREventWithArgDispatcher{
cgrEvs[i] = &utils.CGREventWithOpts{
CGREvent: cgrPrfl.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
Opts: cgrPrfl.Opts,
}
}
return
@@ -378,14 +379,14 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err
}
// thdSProcessEvent will send the event to ThresholdS
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
var tIDs []string
// we clone the CGREvent so we can add EventType without being propagated
thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()}
thArgs.CGREvent.Event[utils.EventType] = utils.CDR
if cgrEv.ArgDispatcher != nil {
thArgs.ArgDispatcher = cgrEv.ArgDispatcher
thArgs := &ArgsProcessEvent{
CGREvent: cgrEv.CGREvent.Clone(),
ArgDispatcher: cgrEv.ArgDispatcher,
}
thArgs.CGREvent.Event[utils.EventType] = utils.CDR
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent,
thArgs, &tIDs); err != nil &&
@@ -396,11 +397,11 @@ func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher)
}
// statSProcessEvent will send the event to StatS
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) {
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) {
var reply []string
statArgs := &StatsArgsProcessEvent{CGREvent: cgrEv.CGREvent}
if cgrEv.ArgDispatcher != nil {
statArgs.ArgDispatcher = cgrEv.ArgDispatcher
statArgs := &StatsArgsProcessEvent{
CGREvent: cgrEv.CGREvent.Clone(),
ArgDispatcher: cgrEv.ArgDispatcher,
}
if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().StatSConns, nil,
utils.StatSv1ProcessEvent,
@@ -436,7 +437,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
return
}
}
var cgrEvs []*utils.CGREventWithArgDispatcher
var cgrEvs []*utils.CGREventWithOpts
if chrgS {
if cgrEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
@@ -446,10 +447,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
return
}
} else { // ChargerS not requested, charge the original event
cgrEvs = []*utils.CGREventWithArgDispatcher{{
CGREvent: ev.CGREvent,
ArgDispatcher: ev.ArgDispatcher,
}}
cgrEvs = []*utils.CGREventWithOpts{ev}
}
// Check if the unique ID was not already processed
if !refund {
@@ -502,7 +500,6 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), cdr))
} else if rfnd {
procFlgs[i].Add(utils.MetaRefund)
}
@@ -513,9 +510,10 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: ev.ArgDispatcher}) {
cgrEv := &utils.CGREventWithArgDispatcher{
cgrEv := &utils.CGREventWithOpts{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: ev.ArgDispatcher,
Opts: cgrEvs[i].Opts,
}
if j == 0 { // the first CDR will replace the events we got already as a small optimization
cdrs[i] = rtCDR
@@ -580,6 +578,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts,
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: cgrEv.CGREvent,
ArgDispatcher: cgrEv.ArgDispatcher,
Opts: cgrEv.Opts,
},
IDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports,
}

View File

@@ -114,6 +114,7 @@ type ChrgSProcessEventReply struct {
func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithOpts) (rply []*ChrgSProcessEventReply, err error) {
var cPs ChargerProfiles
cgrEv.Opts = MapEvent(cgrEv.Opts).Clone()
if cgrEv.Opts == nil {
cgrEv.Opts = make(map[string]interface{})
}

View File

@@ -225,7 +225,7 @@ func TestChargerProcessEvent(t *testing.T) {
ArgDispatcher: chargerEvents[0].ArgDispatcher,
})
if err != nil {
t.Errorf("Error: %+v", err)
t.Fatalf("Error: %+v", err)
}
if !reflect.DeepEqual(rpl[0], rcv[0]) {
t.Errorf("Expecting: %+v, received: %+v ", utils.ToJSON(rpl[0]), utils.ToJSON(rcv[0]))

View File

@@ -1103,7 +1103,7 @@ func (sS *SessionS) newSession(cgrEv *utils.CGREventWithOpts, resID, clntConnID
Tenant: cgrEv.Tenant,
ResourceID: resID,
EventStart: cgrEv.Event,
OptsStart: cgrEv.Opts,
OptsStart: engine.MapEvent(cgrEv.Opts).Clone(),
ClientConnID: clntConnID,
DebitInterval: dbtItval,
ArgDispatcher: cgrEv.ArgDispatcher,
@@ -1912,7 +1912,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector,
var sRunsUsage map[string]time.Duration
if sRunsUsage, err = sS.authEvent(&utils.CGREventWithOpts{
CGREvent: args.CGREvent,
Opts: engine.MapEvent(args.Opts).Clone(),
Opts: args.Opts,
ArgDispatcher: args.ArgDispatcher,
}, args.ForceDuration); err != nil {
return err