diff --git a/apier/v1/chargers.go b/apier/v1/chargers.go index 002663cc1..d53a88f45 100644 --- a/apier/v1/chargers.go +++ b/apier/v1/chargers.go @@ -110,6 +110,6 @@ func (cSv1 *ChargerSv1) GetChargersForEvent(cgrEv *utils.CGREvent, // ProcessEvent func (cSv1 *ChargerSv1) ProcessEvent(args *utils.CGREvent, - reply *[]*engine.AttrSProcessEventReply) error { + reply *[]*engine.ChrgSProcessEventReply) error { return cSv1.cS.V1ProcessEvent(args, reply) } diff --git a/apier/v1/chargers_it_test.go b/apier/v1/chargers_it_test.go index bacb26e4b..86fe3d31d 100755 --- a/apier/v1/chargers_it_test.go +++ b/apier/v1/chargers_it_test.go @@ -174,10 +174,11 @@ func testChargerSGetChargersForEvent(t *testing.T) { } func testChargerSProcessEvent(t *testing.T) { - processedEv := &[]*engine.AttrSProcessEventReply{ - &engine.AttrSProcessEventReply{ - MatchedProfiles: []string{"ATTR_1001_SIMPLEAUTH"}, - AlteredFields: []string{"Password"}, + processedEv := &[]*engine.ChrgSProcessEventReply{ + &engine.ChrgSProcessEventReply{ + ChargerSProfile: "Charger1", + AttributeSProfiles: []string{"ATTR_1001_SIMPLEAUTH"}, + AlteredFields: []string{"Password"}, CGREvent: &utils.CGREvent{ // matching Charger1 Tenant: "cgrates.org", ID: "event1", @@ -190,7 +191,7 @@ func testChargerSProcessEvent(t *testing.T) { }, }, } - var result *[]*engine.AttrSProcessEventReply + var result *[]*engine.ChrgSProcessEventReply if err := chargerRPC.Call(utils.ChargerSv1ProcessEvent, chargerEvent[1], &result); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) diff --git a/apier/v2/cdrs.go b/apier/v2/cdrs.go index 9923483f5..d72988edb 100644 --- a/apier/v2/cdrs.go +++ b/apier/v2/cdrs.go @@ -71,3 +71,7 @@ type CdrsV2 struct { func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error { return self.CdrSrv.V2StoreSMCost(args, reply) } + +func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error { + return self.CdrSrv.V2ProcessCDR(cgrEv, reply) +} diff --git a/engine/cdr.go b/engine/cdr.go index 4a06713b6..85ec3dd60 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -100,13 +100,32 @@ type CDR struct { CostDetails *EventCost // Attach the cost details to CDR when possible } +// AddDefaults will add missing information based on other fields +func (cdr *CDR) AddDefaults(cfg *config.CGRConfig) { + if cdr.CGRID == "" { + cdr.ComputeCGRID() + } + if cdr.ToR == "" { + cdr.ToR = utils.VOICE + } + if cdr.RequestType == "" { + cdr.RequestType = cfg.DefaultReqType + } + if cdr.Tenant == "" { + cdr.Tenant = cfg.DefaultTenant + } + if cdr.Category == "" { + cdr.Category = cfg.DefaultCategory + } +} + func (cdr *CDR) CostDetailsJson() string { mrshled, _ := json.Marshal(cdr.CostDetails) return string(mrshled) } func (cdr *CDR) ComputeCGRID() { - cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.SetupTime.UTC().String()) + cdr.CGRID = utils.Sha1(cdr.OriginID, cdr.OriginHost) } // Used to multiply usage on export diff --git a/engine/cdrs.go b/engine/cdrs.go index 10bd71db3..9c1b2ad68 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -198,16 +198,8 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) { return err // Error is propagated back and we don't continue processing the CDR if we cannot store it } } - if self.thdS != nil { - var tIDs []string - thEv := &ArgsProcessEvent{ - CGREvent: *cdr.AsCGREvent()} - if err := self.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing CDR event %+v with thdS.", err.Error(), thEv)) - } - } + // process CDR with thresholdS + self.thdSProcessEvent(cdr.AsCGREvent()) // Attach raw CDR to stats if self.cdrstats != nil { // Send raw CDR to stats var out int @@ -664,3 +656,105 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte } return err } + +// thdSProcessEvent will send the event to ThresholdS if the connection is configured +func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) { + if cdrS.thdS == nil { + return + } + var tIDs []string + if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, + &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.", + utils.CDRs, err.Error(), cgrEv)) + return + } +} + +// statSProcessEvent will send the event to StatS if the connection is configured +func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) { + if cdrS.stats == nil { + return + } + var reply []string + if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.StatS)) + return + } +} + +// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem +func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { + if cdrS.chargerS == nil { + return + } + var chrgrs []*ChrgSProcessEventReply + if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err == nil || + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) + return + } + var processedCDRs []*CDR + for _, chrgr := range chrgrs { + cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone()) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) + continue + } + ratedCDRs, err := cdrS.rateCDR(cdr) + if err != nil { + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s rating CDR %+v.", + utils.CDRs, err.Error(), cdr)) + continue + } + } + processedCDRs = append(processedCDRs, ratedCDRs...) + } + for _, cdr := range processedCDRs { + if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR + go func() { + if err := cdrS.cdrDb.SetCDR(cdr, true); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s storing CDR %+v.", + utils.CDRs, err.Error(), cdr)) + } + }() + } + go cdrS.replicateCDRs([]*CDR{cdr}) // Replicate CDR + cgrEv := cdr.AsCGREvent() + go cdrS.thdSProcessEvent(cgrEv) + go cdrS.statSProcessEvent(cgrEv) + } +} + +// V2ProcessCDR will process the CDR out of CGREvent +func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err error) { + rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone()) + if err != nil { + return utils.NewErrServerError(err) + } + if cdrS.cgrCfg.CDRSStoreCdrs { // Store *raw CDR + if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil { + return utils.NewErrServerError(err) // Cannot store CDR + } + } + cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR + + go cdrS.thdSProcessEvent(cgrEv) + go cdrS.statSProcessEvent(cgrEv) + go cdrS.chrgrSProcessEvent(cgrEv) + + *reply = utils.OK + return nil +} diff --git a/engine/chargers.go b/engine/chargers.go index 95556571b..eafaab077 100644 --- a/engine/chargers.go +++ b/engine/chargers.go @@ -96,16 +96,27 @@ func (cS *ChargerService) matchingChargerProfilesForEvent(cgrEv *utils.CGREvent) return } -func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProcessEventReply, err error) { +// ChrgSProcessEventReply is the reply to processEvent +type ChrgSProcessEventReply struct { + ChargerSProfile string + AttributeSProfiles []string + AlteredFields []string + CGREvent *utils.CGREvent +} + +func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*ChrgSProcessEventReply, err error) { var cPs ChargerProfiles if cPs, err = cS.matchingChargerProfilesForEvent(cgrEv); err != nil { return nil, err } - rply = make([]*AttrSProcessEventReply, len(cPs)) - + rply = make([]*ChrgSProcessEventReply, len(cPs)) for i, cP := range cPs { clonedEv := cgrEv.Clone() clonedEv.Event[utils.RunID] = cP.RunID + rply[i] = &ChrgSProcessEventReply{ + ChargerSProfile: cP.ID, + CGREvent: clonedEv, + } if len(cP.AttributeIDs) != 0 { // Attributes should process the event if cS.attrS == nil { return nil, errors.New("no connection to AttributeS") @@ -119,7 +130,11 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProc &evReply); err != nil { return nil, err } - rply[i] = &evReply + rply[i].AttributeSProfiles = evReply.MatchedProfiles + rply[i].AlteredFields = evReply.AlteredFields + if len(evReply.AlteredFields) != 0 { + rply[i].CGREvent = evReply.CGREvent + } } } return @@ -127,7 +142,7 @@ func (cS *ChargerService) processEvent(cgrEv *utils.CGREvent) (rply []*AttrSProc // V1ProcessEvent will process the event received via API and return list of events forked func (cS *ChargerService) V1ProcessEvent(args *utils.CGREvent, - reply *[]*AttrSProcessEventReply) (err error) { + reply *[]*ChrgSProcessEventReply) (err error) { if args.Event == nil { return utils.NewErrMandatoryIeMissing("Event") } diff --git a/engine/mapevent.go b/engine/mapevent.go index f48405078..1d5e3824d 100644 --- a/engine/mapevent.go +++ b/engine/mapevent.go @@ -130,7 +130,7 @@ func (me MapEvent) AsMapStringIgnoreErrors(ignoredFlds utils.StringMap) (mp map[ // AsCDR exports the SafEvent as CDR func (me MapEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error) { - cdr = NewCDRWithDefaults(cfg) + cdr = &CDR{Cost: -1.0, ExtraFields: make(map[string]string)} for k, v := range me { if !utils.IsSliceMember(utils.PrimaryCdrFields, k) { // not primary field, populate extra ones if cdr.ExtraFields[k], err = utils.IfaceAsString(v); err != nil { @@ -219,5 +219,6 @@ func (me MapEvent) AsCDR(cfg *config.CGRConfig, tmz string) (cdr *CDR, err error } } } + cdr.AddDefaults(cfg) return } diff --git a/engine/suppliers.go b/engine/suppliers.go index 4fbd500fe..ed0fc11f1 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -309,7 +309,7 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie } } else if len(costData) == 0 { utils.Logger.Warning( - fmt.Sprintf("<%s> profile: %s ignoring supplier with ID: %s, missing cost information", + fmt.Sprintf("<%s> ignoring supplier with ID: %s, missing cost information", utils.SupplierS, spl.ID)) } else { if extraOpts.maxCost != 0 &&