diff --git a/apier/v1/sessionsv1_it_test.go b/apier/v1/sessionsv1_it_test.go index e7709dd4c..4f3f22bce 100644 --- a/apier/v1/sessionsv1_it_test.go +++ b/apier/v1/sessionsv1_it_test.go @@ -365,7 +365,7 @@ func testSSv1ItInitiateSession(t *testing.T) { t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation) } eAttrs := &engine.AttrSProcessEventReply{ - Opts: map[string]interface{}{utils.Subsys: utils.MetaChargers}, + Opts: map[string]interface{}{utils.Subsys: utils.MetaSessionS}, MatchedProfiles: []string{"ATTR_ACNT_1001"}, AlteredFields: []string{"*req.OfficeGroup"}, CGREvent: &utils.CGREvent{ @@ -777,7 +777,7 @@ func testSSv1ItForceUpdateSession(t *testing.T) { t.Fatal(err) } eAttrs := &engine.AttrSProcessEventReply{ - Opts: map[string]interface{}{utils.Subsys: utils.MetaChargers}, + Opts: map[string]interface{}{utils.Subsys: utils.MetaSessionS}, MatchedProfiles: []string{"ATTR_ACNT_1001"}, AlteredFields: []string{"*req.OfficeGroup"}, CGREvent: &utils.CGREvent{ @@ -909,7 +909,6 @@ func testSSv1ItDynamicDebit(t *testing.T) { GetAttributes: true, Opts: map[string]interface{}{ utils.OptsDebitInterval: 30 * time.Millisecond, - utils.Subsys: utils.MetaChargers, }, CGREvent: &utils.CGREvent{ Tenant: "cgrates.org", diff --git a/engine/cdrs.go b/engine/cdrs.go index a39f8c8b7..6ac72c965 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -323,7 +323,7 @@ func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (rfnd } // chrgrSProcessEvent forks CGREventWithArgDispatcher into multiples based on matching ChargerS profiles -func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs []*utils.CGREventWithArgDispatcher, err error) { +func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs []*utils.CGREventWithOpts, err error) { var chrgrs []*ChrgSProcessEventReply if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ChargerSConns, nil, utils.ChargerSv1ProcessEvent, @@ -333,11 +333,12 @@ func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREventWithOpts) (cgrEvs if len(chrgrs) == 0 { return } - cgrEvs = make([]*utils.CGREventWithArgDispatcher, len(chrgrs)) + cgrEvs = make([]*utils.CGREventWithOpts, len(chrgrs)) for i, cgrPrfl := range chrgrs { - cgrEvs[i] = &utils.CGREventWithArgDispatcher{ + cgrEvs[i] = &utils.CGREventWithOpts{ CGREvent: cgrPrfl.CGREvent, ArgDispatcher: cgrEv.ArgDispatcher, + Opts: cgrPrfl.Opts, } } return @@ -378,14 +379,14 @@ func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREventWithOpts) (err err } // thdSProcessEvent will send the event to ThresholdS -func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { +func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) { var tIDs []string // we clone the CGREvent so we can add EventType without being propagated - thArgs := &ArgsProcessEvent{CGREvent: cgrEv.CGREvent.Clone()} - thArgs.CGREvent.Event[utils.EventType] = utils.CDR - if cgrEv.ArgDispatcher != nil { - thArgs.ArgDispatcher = cgrEv.ArgDispatcher + thArgs := &ArgsProcessEvent{ + CGREvent: cgrEv.CGREvent.Clone(), + ArgDispatcher: cgrEv.ArgDispatcher, } + thArgs.CGREvent.Event[utils.EventType] = utils.CDR if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().ThresholdSConns, nil, utils.ThresholdSv1ProcessEvent, thArgs, &tIDs); err != nil && @@ -396,11 +397,11 @@ func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) } // statSProcessEvent will send the event to StatS -func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithArgDispatcher) (err error) { +func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREventWithOpts) (err error) { var reply []string - statArgs := &StatsArgsProcessEvent{CGREvent: cgrEv.CGREvent} - if cgrEv.ArgDispatcher != nil { - statArgs.ArgDispatcher = cgrEv.ArgDispatcher + statArgs := &StatsArgsProcessEvent{ + CGREvent: cgrEv.CGREvent.Clone(), + ArgDispatcher: cgrEv.ArgDispatcher, } if err = cdrS.connMgr.Call(cdrS.cgrCfg.CdrsCfg().StatSConns, nil, utils.StatSv1ProcessEvent, @@ -436,7 +437,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, return } } - var cgrEvs []*utils.CGREventWithArgDispatcher + var cgrEvs []*utils.CGREventWithOpts if chrgS { if cgrEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil { utils.Logger.Warning( @@ -446,10 +447,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, return } } else { // ChargerS not requested, charge the original event - cgrEvs = []*utils.CGREventWithArgDispatcher{{ - CGREvent: ev.CGREvent, - ArgDispatcher: ev.ArgDispatcher, - }} + cgrEvs = []*utils.CGREventWithOpts{ev} } // Check if the unique ID was not already processed if !refund { @@ -502,7 +500,6 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, utils.Logger.Warning( fmt.Sprintf("<%s> error: <%s> refunding CDR %+v", utils.CDRs, errRfd.Error(), cdr)) - } else if rfnd { procFlgs[i].Add(utils.MetaRefund) } @@ -513,9 +510,10 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, for j, rtCDR := range cdrS.rateCDRWithErr( &CDRWithArgDispatcher{CDR: cdr, ArgDispatcher: ev.ArgDispatcher}) { - cgrEv := &utils.CGREventWithArgDispatcher{ + cgrEv := &utils.CGREventWithOpts{ CGREvent: rtCDR.AsCGREvent(), ArgDispatcher: ev.ArgDispatcher, + Opts: cgrEvs[i].Opts, } if j == 0 { // the first CDR will replace the events we got already as a small optimization cdrs[i] = rtCDR @@ -580,6 +578,7 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithOpts, CGREventWithOpts: &utils.CGREventWithOpts{ CGREvent: cgrEv.CGREvent, ArgDispatcher: cgrEv.ArgDispatcher, + Opts: cgrEv.Opts, }, IDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports, } diff --git a/engine/chargers.go b/engine/chargers.go index 4ed81a4ff..67f6c79e7 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -114,6 +114,7 @@ type ChrgSProcessEventReply struct { func (cS *ChargerService) processEvent(cgrEv *utils.CGREventWithOpts) (rply []*ChrgSProcessEventReply, err error) { var cPs ChargerProfiles + cgrEv.Opts = MapEvent(cgrEv.Opts).Clone() if cgrEv.Opts == nil { cgrEv.Opts = make(map[string]interface{}) } diff --git a/engine/chargers_test.go b/engine/chargers_test.go index c071bd9b5..81691b70a 100755 --- a/engine/chargers_test.go +++ b/engine/chargers_test.go @@ -225,7 +225,7 @@ func TestChargerProcessEvent(t *testing.T) { ArgDispatcher: chargerEvents[0].ArgDispatcher, }) if err != nil { - t.Errorf("Error: %+v", err) + t.Fatalf("Error: %+v", err) } if !reflect.DeepEqual(rpl[0], rcv[0]) { t.Errorf("Expecting: %+v, received: %+v ", utils.ToJSON(rpl[0]), utils.ToJSON(rcv[0])) diff --git a/sessions/sessions.go b/sessions/sessions.go index 53b8ce518..0fb17add6 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1103,7 +1103,7 @@ func (sS *SessionS) newSession(cgrEv *utils.CGREventWithOpts, resID, clntConnID Tenant: cgrEv.Tenant, ResourceID: resID, EventStart: cgrEv.Event, - OptsStart: cgrEv.Opts, + OptsStart: engine.MapEvent(cgrEv.Opts).Clone(), ClientConnID: clntConnID, DebitInterval: dbtItval, ArgDispatcher: cgrEv.ArgDispatcher, @@ -1912,7 +1912,7 @@ func (sS *SessionS) BiRPCv1AuthorizeEvent(clnt rpcclient.ClientConnector, var sRunsUsage map[string]time.Duration if sRunsUsage, err = sS.authEvent(&utils.CGREventWithOpts{ CGREvent: args.CGREvent, - Opts: engine.MapEvent(args.Opts).Clone(), + Opts: args.Opts, ArgDispatcher: args.ArgDispatcher, }, args.ForceDuration); err != nil { return err