CDRsV2.ProcessCDR API making use of ChargerS

This commit is contained in:
DanB
2018-08-24 14:06:55 +02:00
parent dcd18fef98
commit 9ee04fa97b
8 changed files with 158 additions and 24 deletions

View File

@@ -110,6 +110,6 @@ func (cSv1 *ChargerSv1) GetChargersForEvent(cgrEv *utils.CGREvent,
// ProcessEvent
func (cSv1 *ChargerSv1) ProcessEvent(args *utils.CGREvent,
reply *[]*engine.AttrSProcessEventReply) error {
reply *[]*engine.ChrgSProcessEventReply) error {
return cSv1.cS.V1ProcessEvent(args, reply)
}

View File

@@ -174,10 +174,11 @@ func testChargerSGetChargersForEvent(t *testing.T) {
}
func testChargerSProcessEvent(t *testing.T) {
processedEv := &[]*engine.AttrSProcessEventReply{
&engine.AttrSProcessEventReply{
MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"},
AlteredFields: []string{"Password"},
processedEv := &[]*engine.ChrgSProcessEventReply{
&engine.ChrgSProcessEventReply{
ChargerSProfile: "Charger1",
AttributeSProfiles: []string{"ATTR_1001_SIMPLEAUTH"},
AlteredFields: []string{"Password"},
CGREvent: &utils.CGREvent{ // matching Charger1
Tenant: "cgrates.org",
ID: "event1",
@@ -190,7 +191,7 @@ func testChargerSProcessEvent(t *testing.T) {
},
},
}
var result *[]*engine.AttrSProcessEventReply
var result *[]*engine.ChrgSProcessEventReply
if err := chargerRPC.Call(utils.ChargerSv1ProcessEvent, chargerEvent[1], &result); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)

View File

@@ -71,3 +71,7 @@ type CdrsV2 struct {
func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error {
return self.CdrSrv.V2StoreSMCost(args, reply)
}
func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error {
return self.CdrSrv.V2ProcessCDR(cgrEv, reply)
}

View File

@@ -100,13 +100,32 @@ type CDR struct {
CostDetails *EventCost // Attach the cost details to CDR when possible
}
// AddDefaults will add missing information based on other fields
func (cdr *CDR) AddDefaults(cfg *config.CGRConfig) {
if cdr.CGRID == "" {
cdr.ComputeCGRID()
}
if cdr.ToR == "" {
cdr.ToR = utils.VOICE
}
if cdr.RequestType == "" {
cdr.RequestType = cfg.DefaultReqType
}
if cdr.Tenant == "" {
cdr.Tenant = cfg.DefaultTenant
}
if cdr.Category == "" {
cdr.Category = cfg.DefaultCategory
}
}
func (cdr *CDR) CostDetailsJson() string {
mrshled, _ := json.Marshal(cdr.CostDetails)
return string(mrshled)
}
func (cdr *CDR) ComputeCGRID() {
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.SetupTime.UTC().String())
cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost)
}
// Used to multiply usage on export

View File

@@ -198,16 +198,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
}
}
if self.thdS != nil {
var tIDs []string
thEv := &ArgsProcessEvent{
CGREvent: *cdr.AsCGREvent()}
if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> error: %s processing CDR event %+v with thdS.", err.Error(), thEv))
}
}
// process CDR with thresholdS
self.thdSProcessEvent(cdr.AsCGREvent())
// Attach raw CDR to stats
if self.cdrstats != nil { // Send raw CDR to stats
var out int
@@ -664,3 +656,105 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte
}
return err
}
// thdSProcessEvent will send the event to ThresholdS if the connection is configured
func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.thdS == nil {
return
}
var tIDs []string
if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
&ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.",
utils.CDRs, err.Error(), cgrEv))
return
}
}
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.stats == nil {
return
}
var reply []string
if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.StatS))
return
}
}
// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem
func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.chargerS == nil {
return
}
var chrgrs []*ChrgSProcessEventReply
if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
return
}
var processedCDRs []*CDR
for _, chrgr := range chrgrs {
cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone())
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.",
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
continue
}
ratedCDRs, err := cdrS.rateCDR(cdr)
if err != nil {
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s rating CDR %+v.",
utils.CDRs, err.Error(), cdr))
continue
}
}
processedCDRs = append(processedCDRs, ratedCDRs...)
}
for _, cdr := range processedCDRs {
if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR
go func() {
if err := cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s storing CDR %+v.",
utils.CDRs, err.Error(), cdr))
}
}()
}
go cdrS.replicateCDRs([]*CDR{cdr}) // Replicate CDR
cgrEv := cdr.AsCGREvent()
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.statSProcessEvent(cgrEv)
}
}
// V2ProcessCDR will process the CDR out of CGREvent
func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err error) {
rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone())
if err != nil {
return utils.NewErrServerError(err)
}
if cdrS.cgrCfg.CDRSStoreCdrs { // Store *raw CDR
if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil {
return utils.NewErrServerError(err) // Cannot store CDR
}
}
cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.statSProcessEvent(cgrEv)
go cdrS.chrgrSProcessEvent(cgrEv)
*reply = utils.OK
return nil
}

View File

@@ -96,16 +96,27 @@ func (cS *ChargerService) matchingChargerProfilesForEvent(cgrEv *utils.CGREvent)
return
}
func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProcessEventReply, err error) {
// ChrgSProcessEventReply is the reply to processEvent
type ChrgSProcessEventReply struct {
ChargerSProfile string
AttributeSProfiles []string
AlteredFields []string
CGREvent *utils.CGREvent
}
func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*ChrgSProcessEventReply, err error) {
var cPs ChargerProfiles
if cPs, err = cS.matchingChargerProfilesForEvent(cgrEv); err != nil {
return nil, err
}
rply = make([]*AttrSProcessEventReply, len(cPs))
rply = make([]*ChrgSProcessEventReply, len(cPs))
for i, cP := range cPs {
clonedEv := cgrEv.Clone()
clonedEv.Event[utils.RunID] = cP.RunID
rply[i] = &ChrgSProcessEventReply{
ChargerSProfile: cP.ID,
CGREvent: clonedEv,
}
if len(cP.AttributeIDs) != 0 { // Attributes should process the event
if cS.attrS == nil {
return nil, errors.New("no connection to AttributeS")
@@ -119,7 +130,11 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProc
&evReply); err != nil {
return nil, err
}
rply[i] = &evReply
rply[i].AttributeSProfiles = evReply.MatchedProfiles
rply[i].AlteredFields = evReply.AlteredFields
if len(evReply.AlteredFields) != 0 {
rply[i].CGREvent = evReply.CGREvent
}
}
}
return
@@ -127,7 +142,7 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProc
// V1ProcessEvent will process the event received via API and return list of events forked
func (cS *ChargerService) V1ProcessEvent(args *utils.CGREvent,
reply *[]*AttrSProcessEventReply) (err error) {
reply *[]*ChrgSProcessEventReply) (err error) {
if args.Event == nil {
return utils.NewErrMandatoryIeMissing("Event")
}

View File

@@ -130,7 +130,7 @@ func (me MapEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[
// AsCDR exports the SafEvent as CDR
func (me MapEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) {
cdr = NewCDRWithDefaults(cfg)
cdr = &CDR{Cost: -1.0, ExtraFields: make(map[string]string)}
for k, v := range me {
if !utils.IsSliceMember(utils.PrimaryCdrFields, k) { // not primary field, populate extra ones
if cdr.ExtraFields[k], err = utils.IfaceAsString(v); err != nil {
@@ -219,5 +219,6 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error
}
}
}
cdr.AddDefaults(cfg)
return
}

View File

@@ -309,7 +309,7 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie
}
} else if len(costData) == 0 {
utils.Logger.Warning(
fmt.Sprintf("<%s> profile: %s ignoring supplier with ID: %s, missing cost information",
fmt.Sprintf("<%s> ignoring supplier with ID: %s, missing cost information",
utils.SupplierS, spl.ID))
} else {
if extraOpts.maxCost != 0 &&