Transport the CostDetails over string so we can properly unmarshall them in CDRs

This commit is contained in:
DanB
2019-03-24 14:26:23 +01:00
parent a8d2506c9d
commit 4a1ceb0a3d
8 changed files with 63 additions and 25 deletions

View File

@@ -289,10 +289,10 @@ func TestRAitAcctStop(t *testing.T) {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Usage != "4s" {
t.Errorf("Unexpected CDR Usage received, cdr: %v ", cdrs[0].Usage)
t.Errorf("Unexpected CDR Usage received, cdr: %v ", cdrs[0])
}
if cdrs[0].CostSource != utils.MetaSessionS {
t.Errorf("Unexpected CDR CostSource received for CDR: %v", cdrs[0].CostSource)
t.Errorf("Unexpected CDR CostSource received for CDR: %v", cdrs[0])
}
if cdrs[0].Cost != 0.01 {
t.Errorf("Unexpected CDR Cost received for CDR: %v", cdrs[0].Cost)

View File

@@ -131,6 +131,7 @@
"value": "~*req.Ascend-User-Acct-Time", "mandatory": true},
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed",
"value": "~*req.Ascend-User-Acct-Time", "mandatory": true},
{"tag": "RemoteAddr" , "field_id": "RemoteAddr", "type": "*remote_host"},
],
"reply_fields":[],
},

View File

@@ -233,8 +233,8 @@ func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
return cc, nil
}
// processEvent will process a CGREvent with the configured subsystems
func (cdrS *CDRServer) processEvent(cgrEv *utils.CGREvent,
// attrStoExpThdStat will process a CGREvent with the configured subsystems
func (cdrS *CDRServer) attrStoExpThdStat(cgrEv *utils.CGREvent,
attrS, store, export, thdS, statS bool) (err error) {
if attrS {
if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
@@ -298,7 +298,7 @@ func (cdrS *CDRServer) chrgProcessEvent(cgrEv *utils.CGREvent,
continue
}
for _, rtCDR := range cdrS.rateCDRWithErr(cdr) {
if errProc := cdrS.processEvent(rtCDR.AsCGREvent(),
if errProc := cdrS.attrStoExpThdStat(rtCDR.AsCGREvent(),
attrS, store, export, thdS, statS); errProc != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s",
@@ -546,8 +546,18 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
if arg.ChargerS != nil {
chrgS = *arg.ChargerS
}
var ralS bool // by default we don't extra charge the received CDR
if arg.RALs != nil {
ralS = *arg.RALs
}
cgrEv := &arg.CGREvent
if arg.RALs != nil && *arg.RALs { // need to rate the event
if !ralS {
if err = cdrS.attrStoExpThdStat(cgrEv,
attrS, store, export, thdS, statS); err != nil {
err = utils.NewErrServerError(err)
return
}
} else { // we want rating for this CDR
var partExec bool
cdr, errProc := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
@@ -560,7 +570,7 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
}
for _, rtCDR := range cdrS.rateCDRWithErr(cdr) {
cgrEv := rtCDR.AsCGREvent()
if errProc := cdrS.processEvent(cgrEv,
if errProc := cdrS.attrStoExpThdStat(cgrEv,
attrS, store, export, thdS, statS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v ",
@@ -573,12 +583,6 @@ func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err er
err = utils.ErrPartiallyExecuted
return
}
} else {
if err = cdrS.processEvent(cgrEv,
attrS, store, export, thdS, statS); err != nil {
err = utils.NewErrServerError(err)
return
}
}
if chrgS {
go cdrS.chrgProcessEvent(cgrEv,

View File

@@ -19,6 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/cgrates/cgrates/utils"
@@ -386,3 +389,25 @@ func (cbs Accounting) Clone() (cln Accounting) {
}
return
}
// IfaceAsEventCost converts an interface to EventCost
func IfaceAsEventCost(itm interface{}) (ec *EventCost, err error) {
switch itm.(type) {
case nil:
case *EventCost:
ec = itm.(*EventCost)
case string:
ecStr, canCast := itm.(string)
if !canCast {
return nil, errors.New("cannot cast to string")
}
var rawEC EventCost
if errUnmarshal := json.Unmarshal([]byte(ecStr), &rawEC); errUnmarshal != nil {
return nil, fmt.Errorf("JSON cannot unmarshal to *EventCost, err: %s", errUnmarshal.Error())
}
ec = &rawEC
default:
err = utils.ErrNotConvertibleNoCaps
}
return
}

View File

@@ -264,9 +264,8 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tnt, tmz string) (cdr *CDR, err
return nil, err
}
case utils.CostDetails:
var canCast bool
if cdr.CostDetails, canCast = v.(*EventCost); !canCast {
return nil, fmt.Errorf("cannot cast field: %+v to *EventCost", v)
if cdr.CostDetails, err = IfaceAsEventCost(v); err != nil {
return nil, err
}
case utils.ExtraInfo, utils.OrderID:

View File

@@ -585,7 +585,7 @@ func TestMapEventAsCDR(t *testing.T) {
"Usage": "42s",
"PreRated": "True",
"Cost": "42.3",
"CostDetails": ec1,
"CostDetails": utils.ToJSON(ec1),
}
expected = &CDR{
CGRID: "da39a3ee5e6b4b0d3255bfef95601890afd80709",

View File

@@ -33,6 +33,12 @@ var protectedSFlds = engine.MapEvent{
utils.Usage: struct{}{},
}
var unratedReqs = engine.MapEvent{
utils.META_POSTPAID: struct{}{},
utils.META_PSEUDOPREPAID: struct{}{},
utils.META_RATED: struct{}{},
}
// SessionSClient is the interface implemented by Agents which are able to
// communicate bidirectionally with SessionS and remote Communication Switch
type SessionSClient interface {

View File

@@ -1289,12 +1289,16 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration) (er
}
// FixMe: make sure refund is reflected inside EventCost
}
// set cost fields
sr.Event[utils.Cost] = sr.EventCost.GetCost()
sr.Event[utils.Usage] = sr.TotalUsage
sr.Event[utils.CostDetails] = utils.ToJSON(sr.EventCost) // avoid map[string]interface{} when decoding
sr.Event[utils.CostSource] = utils.MetaSessionS
}
// Set Usage field
if sRunIdx == 0 {
s.EventStart.Set(utils.Usage, sr.TotalUsage)
}
sr.Event[utils.Usage] = sr.TotalUsage
if sS.cgrCfg.SessionSCfg().StoreSCosts {
if err := sS.storeSCost(s, sRunIdx); err != nil {
utils.Logger.Warning(
@@ -2298,7 +2302,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
fmt.Sprintf("<%s> ProcessCDR called for active session with CGRID: <%s>",
utils.SessionS, cgrID))
s = ss[0]
} else {
} else { // try retrieving from closed_sessions within cache
if sIface, has := engine.Cache.Get(utils.CacheClosedSessions, cgrID); has {
s = sIface.(*Session)
}
@@ -2306,6 +2310,7 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
if s == nil { // no cached session, CDR will be handled by CDRs
return sS.cdrS.Call(utils.CDRsV2ProcessCDR, &engine.ArgV2ProcessCDR{CGREvent: *cgrEv}, rply)
}
// Use previously stored Session to generate CDRs
// update stored event with fields out of CDR
@@ -2315,21 +2320,19 @@ func (sS *SessionS) BiRPCv1ProcessCDR(clnt rpcclient.RpcClientConnection,
}
s.EventStart.Set(k, v) // update previoius field with new one
}
// create one CGREvent for each session run plus *raw one
var cgrEvs []*utils.CGREvent
if cgrEvs, err = s.asCGREvents(); err != nil {
return utils.NewErrServerError(err)
}
toRateReqs := engine.MapEvent{
utils.META_POSTPAID: struct{}{},
utils.META_PSEUDOPREPAID: struct{}{},
utils.META_RATED: struct{}{},
}
var withErrors bool
for _, cgrEv := range cgrEvs {
argsProc := &engine.ArgV2ProcessCDR{CGREvent: *cgrEv,
ChargerS: utils.BoolPointer(false),
AttributeS: utils.BoolPointer(false)}
if toRateReqs.HasField(engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
if unratedReqs.HasField( // order additional rating for unrated request types
engine.NewMapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RequestType)) {
argsProc.RALs = utils.BoolPointer(true)
}
if err = sS.cdrS.Call(utils.CDRsV2ProcessCDR,