From 4a1ceb0a3d4351939cafa81ec2cb4eb1cd87b4b3 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 24 Mar 2019 14:26:23 +0100 Subject: [PATCH] Transport the CostDetails over string so we can properly unmarshall them in CDRs --- agents/radagent_it_test.go | 4 ++-- data/conf/samples/radagent/cgrates.json | 1 + engine/cdrs.go | 26 ++++++++++++++----------- engine/libeventcost.go | 25 ++++++++++++++++++++++++ engine/mapevent.go | 5 ++--- engine/mapevent_test.go | 2 +- sessions/libsessions.go | 6 ++++++ sessions/sessions.go | 19 ++++++++++-------- 8 files changed, 63 insertions(+), 25 deletions(-) diff --git a/agents/radagent_it_test.go b/agents/radagent_it_test.go index 9579fda08..3d8fb31ca 100644 --- a/agents/radagent_it_test.go +++ b/agents/radagent_it_test.go @@ -289,10 +289,10 @@ func TestRAitAcctStop(t *testing.T) { t.Error("Unexpected number of CDRs returned: ", len(cdrs)) } else { if cdrs[0].Usage != "4s" { - t.Errorf("Unexpected CDR Usage received, cdr: %v ", cdrs[0].Usage) + t.Errorf("Unexpected CDR Usage received, cdr: %v ", cdrs[0]) } if cdrs[0].CostSource != utils.MetaSessionS { - t.Errorf("Unexpected CDR CostSource received for CDR: %v", cdrs[0].CostSource) + t.Errorf("Unexpected CDR CostSource received for CDR: %v", cdrs[0]) } if cdrs[0].Cost != 0.01 { t.Errorf("Unexpected CDR Cost received for CDR: %v", cdrs[0].Cost) diff --git a/data/conf/samples/radagent/cgrates.json b/data/conf/samples/radagent/cgrates.json index 6beb16b7c..1b3851570 100644 --- a/data/conf/samples/radagent/cgrates.json +++ b/data/conf/samples/radagent/cgrates.json @@ -131,6 +131,7 @@ "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, {"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "~*req.Ascend-User-Acct-Time", "mandatory": true}, + {"tag": "RemoteAddr" , "field_id": "RemoteAddr", "type": "*remote_host"}, ], "reply_fields":[], }, diff --git a/engine/cdrs.go b/engine/cdrs.go index 0794761e0..fa12c38d4 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -233,8 +233,8 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) { return cc, nil } -// processEvent will process a CGREvent with the configured subsystems -func (cdrS *CDRServer) processEvent(cgrEv *utils.CGREvent, +// attrStoExpThdStat will process a CGREvent with the configured subsystems +func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREvent, attrS, store, export, thdS, statS bool) (err error) { if attrS { if err = cdrS.attrSProcessEvent(cgrEv); err != nil { @@ -298,7 +298,7 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent, continue } for _, rtCDR := range cdrS.rateCDRWithErr(cdr) { - if errProc := cdrS.processEvent(rtCDR.AsCGREvent(), + if errProc := cdrS.attrStoExpThdStat(rtCDR.AsCGREvent(), attrS, store, export, thdS, statS); errProc != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s", @@ -546,8 +546,18 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er if arg.ChargerS != nil { chrgS = *arg.ChargerS } + var ralS bool // by default we don't extra charge the received CDR + if arg.RALs != nil { + ralS = *arg.RALs + } cgrEv := &arg.CGREvent - if arg.RALs != nil && *arg.RALs { // need to rate the event + if !ralS { + if err = cdrS.attrStoExpThdStat(cgrEv, + attrS, store, export, thdS, statS); err != nil { + err = utils.NewErrServerError(err) + return + } + } else { // we want rating for this CDR var partExec bool cdr, errProc := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) @@ -560,7 +570,7 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er } for _, rtCDR := range cdrS.rateCDRWithErr(cdr) { cgrEv := rtCDR.AsCGREvent() - if errProc := cdrS.processEvent(cgrEv, + if errProc := cdrS.attrStoExpThdStat(cgrEv, attrS, store, export, thdS, statS); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing event %+v ", @@ -573,12 +583,6 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er err = utils.ErrPartiallyExecuted return } - } else { - if err = cdrS.processEvent(cgrEv, - attrS, store, export, thdS, statS); err != nil { - err = utils.NewErrServerError(err) - return - } } if chrgS { go cdrS.chrgProcessEvent(cgrEv, diff --git a/engine/libeventcost.go b/engine/libeventcost.go index 3f2f2cfbb..69b3ddc2c 100644 --- a/engine/libeventcost.go +++ b/engine/libeventcost.go @@ -19,6 +19,9 @@ along with this program. If not, see package engine import ( + "encoding/json" + "errors" + "fmt" "time" "github.com/cgrates/cgrates/utils" @@ -386,3 +389,25 @@ func (cbs Accounting) Clone() (cln Accounting) { } return } + +// IfaceAsEventCost converts an interface to EventCost +func IfaceAsEventCost(itm interface{}) (ec *EventCost, err error) { + switch itm.(type) { + case nil: + case *EventCost: + ec = itm.(*EventCost) + case string: + ecStr, canCast := itm.(string) + if !canCast { + return nil, errors.New("cannot cast to string") + } + var rawEC EventCost + if errUnmarshal := json.Unmarshal([]byte(ecStr), &rawEC); errUnmarshal != nil { + return nil, fmt.Errorf("JSON cannot unmarshal to *EventCost, err: %s", errUnmarshal.Error()) + } + ec = &rawEC + default: + err = utils.ErrNotConvertibleNoCaps + } + return +} diff --git a/engine/mapevent.go b/engine/mapevent.go index 881ea7d45..904a2c8bd 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -264,9 +264,8 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err return nil, err } case utils.CostDetails: - var canCast bool - if cdr.CostDetails, canCast = v.(*EventCost); !canCast { - return nil, fmt.Errorf("cannot cast field: %+v to *EventCost", v) + if cdr.CostDetails, err = IfaceAsEventCost(v); err != nil { + return nil, err } case utils.ExtraInfo, utils.OrderID: diff --git a/engine/mapevent_test.go b/engine/mapevent_test.go index 15bdc7bd4..56fe1a5c7 100644 --- a/engine/mapevent_test.go +++ b/engine/mapevent_test.go @@ -585,7 +585,7 @@ func TestMapEventAsCDR(t *testing.T) { "Usage": "42s", "PreRated": "True", "Cost": "42.3", - "CostDetails": ec1, + "CostDetails": utils.ToJSON(ec1), } expected = &CDR{ CGRID: "da39a3ee5e6b4b0d3255bfef95601890afd80709", diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 4362fcdab..39778d2fe 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -33,6 +33,12 @@ var protectedSFlds = engine.MapEvent{ utils.Usage: struct{}{}, } +var unratedReqs = engine.MapEvent{ + utils.META_POSTPAID: struct{}{}, + utils.META_PSEUDOPREPAID: struct{}{}, + utils.META_RATED: struct{}{}, +} + // SessionSClient is the interface implemented by Agents which are able to // communicate bidirectionally with SessionS and remote Communication Switch type SessionSClient interface { diff --git a/sessions/sessions.go b/sessions/sessions.go index 5ef3ab681..98d5d4709 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1289,12 +1289,16 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er } // FixMe: make sure refund is reflected inside EventCost } + // set cost fields sr.Event[utils.Cost] = sr.EventCost.GetCost() - sr.Event[utils.Usage] = sr.TotalUsage + sr.Event[utils.CostDetails] = utils.ToJSON(sr.EventCost) // avoid map[string]interface{} when decoding + sr.Event[utils.CostSource] = utils.MetaSessionS } + // Set Usage field if sRunIdx == 0 { s.EventStart.Set(utils.Usage, sr.TotalUsage) } + sr.Event[utils.Usage] = sr.TotalUsage if sS.cgrCfg.SessionSCfg().StoreSCosts { if err := sS.storeSCost(s, sRunIdx); err != nil { utils.Logger.Warning( @@ -2298,7 +2302,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>", utils.SessionS, cgrID)) s = ss[0] - } else { + } else { // try retrieving from closed_sessions within cache if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has { s = sIface.(*Session) } @@ -2306,6 +2310,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, if s == nil { // no cached session, CDR will be handled by CDRs return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply) } + // Use previously stored Session to generate CDRs // update stored event with fields out of CDR @@ -2315,21 +2320,19 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection, } s.EventStart.Set(k, v) // update previoius field with new one } + // create one CGREvent for each session run plus *raw one var cgrEvs []*utils.CGREvent if cgrEvs, err = s.asCGREvents(); err != nil { return utils.NewErrServerError(err) } - toRateReqs := engine.MapEvent{ - utils.META_POSTPAID: struct{}{}, - utils.META_PSEUDOPREPAID: struct{}{}, - utils.META_RATED: struct{}{}, - } + var withErrors bool for _, cgrEv := range cgrEvs { argsProc := &engine.ArgV2ProcessCDR{CGREvent: *cgrEv, ChargerS: utils.BoolPointer(false), AttributeS: utils.BoolPointer(false)} - if toRateReqs.HasField(engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { + if unratedReqs.HasField( // order additional rating for unrated request types + engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) { argsProc.RALs = utils.BoolPointer(true) } if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR,