From 218c88b617646d81efaf8d3d26fa0e55bba19ec8 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 26 Aug 2018 18:50:09 +0200 Subject: [PATCH] CdrsV2.RateCDRs for rating/re-rating CDRs --- apier/v1/apier.go | 11 +-- apier/v1/apier_it_test.go | 2 +- apier/v2/cdrs.go | 5 ++ apier/v2/cdrs_it_test.go | 70 ++++++++++++++++ console/ratingprofile_set.go | 8 +- data/tariffplans/testit/Attributes.csv | 2 +- engine/cdrs.go | 112 +++++++++++++++---------- utils/apitpdata.go | 9 ++ utils/consts.go | 1 + 9 files changed, 160 insertions(+), 60 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index b3e9e65f7..ea96341d3 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -403,17 +403,8 @@ func (self *ApierV1) ImportTariffPlanFromFolder(attrs utils.AttrImportTPFromFold return nil } -type AttrSetRatingProfile struct { - Tenant string // Tenant's Id - Category string // TypeOfRecord - Direction string // Traffic direction, OUT is the only one supported for now - Subject string // Rating subject, usually the same as account - Overwrite bool // Overwrite if exists - RatingPlanActivations []*utils.TPRatingActivation // Activate rating plans at specific time -} - // Sets a specific rating profile working with data directly in the DataDB without involving storDb -func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) (err error) { +func (self *ApierV1) SetRatingProfile(attrs utils.AttrSetRatingProfile, reply *string) (err error) { if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "TOR", "Direction", "Subject", "RatingPlanActivations"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } diff --git a/apier/v1/apier_it_test.go b/apier/v1/apier_it_test.go index b249bd00e..d14564ebd 100644 --- a/apier/v1/apier_it_test.go +++ b/apier/v1/apier_it_test.go @@ -718,7 +718,7 @@ func TestApierReloadScheduler(t *testing.T) { func TestApierSetRatingProfile(t *testing.T) { reply := "" rpa := &utils.TPRatingActivation{ActivationTime: "2012-01-01T00:00:00Z", RatingPlanId: "RETAIL1", FallbackSubjects: "dan2"} - rpf := &AttrSetRatingProfile{Tenant: "cgrates.org", Category: "call", Direction: "*out", Subject: "dan", RatingPlanActivations: []*utils.TPRatingActivation{rpa}} + rpf := &utils.AttrSetRatingProfile{Tenant: "cgrates.org", Category: "call", Direction: "*out", Subject: "dan", RatingPlanActivations: []*utils.TPRatingActivation{rpa}} if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil { t.Error("Got error on ApierV1.SetRatingProfile: ", err.Error()) } else if reply != "OK" { diff --git a/apier/v2/cdrs.go b/apier/v2/cdrs.go index d72988edb..56a0755f5 100644 --- a/apier/v2/cdrs.go +++ b/apier/v2/cdrs.go @@ -75,3 +75,8 @@ func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error { return self.CdrSrv.V2ProcessCDR(cgrEv, reply) } + +// RateCDRs will rate/re-rate CDRs using ChargerS +func (self *CdrsV2) RateCDRs(args *utils.RPCCDRsFilter, reply *string) error { + return self.CdrSrv.V2RateCDRs(args, reply) +} diff --git a/apier/v2/cdrs_it_test.go b/apier/v2/cdrs_it_test.go index 37ab1a4f8..8a32c251f 100644 --- a/apier/v2/cdrs_it_test.go +++ b/apier/v2/cdrs_it_test.go @@ -46,6 +46,8 @@ var sTestsCDRsIT = []func(t *testing.T){ testV2CDRsLoadTariffPlanFromFolder, testV2CDRsProcessCDR, testV2CDRsGetCdrs, + testV2CDRsRateCDRs, + testV2CDRsGetCdrs2, testV2CDRsKillEngine, } @@ -183,6 +185,74 @@ func testV2CDRsGetCdrs(t *testing.T) { } } +// Should re-rate the supplier1 cost with RP_ANY2CNT +func testV2CDRsRateCDRs(t *testing.T) { + rpf := &utils.AttrSetRatingProfile{ + Tenant: "cgrates.org", + Category: "call", + Direction: "*out", + Subject: "supplier1", + RatingPlanActivations: []*utils.TPRatingActivation{ + &utils.TPRatingActivation{ + ActivationTime: "2018-01-01T00:00:00Z", + RatingPlanId: "RP_ANY2CNT"}}, + Overwrite: true} + var reply string + if err := cdrsRpc.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil { + t.Error("Got error on ApierV1.SetRatingProfile: ", err.Error()) + } else if reply != "OK" { + t.Error("Calling ApierV1.SetRatingProfile got reply: ", reply) + } + if err := cdrsRpc.Call(utils.CdrsV2RateCDRs, + &utils.RPCCDRsFilter{NotRunIDs: []string{utils.MetaRaw}}, &reply); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + time.Sleep(time.Duration(100) * time.Millisecond) // Give time for CDR to be rated +} + +func testV2CDRsGetCdrs2(t *testing.T) { + var cdrCnt int64 + req := utils.AttrGetCdrs{} + if err := cdrsRpc.Call("ApierV2.CountCdrs", req, &cdrCnt); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if cdrCnt != 3 { + t.Error("Unexpected number of CDRs returned: ", cdrCnt) + } + var cdrs []*engine.ExternalCDR + args := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaRaw}} + if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != -1.0 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } + args = utils.RPCCDRsFilter{RunIDs: []string{"CustomerCharges"}} + if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != 0.0198 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } + args = utils.RPCCDRsFilter{RunIDs: []string{"SupplierCharges"}} + if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil { + t.Error("Unexpected error: ", err.Error()) + } else if len(cdrs) != 1 { + t.Error("Unexpected number of CDRs returned: ", len(cdrs)) + } else { + if cdrs[0].Cost != 0.0198 { + t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) + } + } +} + func testV2CDRsKillEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/console/ratingprofile_set.go b/console/ratingprofile_set.go index 18c347ae3..f92defddc 100644 --- a/console/ratingprofile_set.go +++ b/console/ratingprofile_set.go @@ -18,7 +18,9 @@ along with this program. If not, see package console -import "github.com/cgrates/cgrates/apier/v1" +import ( + "github.com/cgrates/cgrates/utils" +) func init() { c := &CmdSetRatingProfile{ @@ -33,7 +35,7 @@ func init() { type CmdSetRatingProfile struct { name string rpcMethod string - rpcParams *v1.AttrSetRatingProfile + rpcParams *utils.AttrSetRatingProfile rpcResult string *CommandExecuter } @@ -48,7 +50,7 @@ func (self *CmdSetRatingProfile) RpcMethod() string { func (self *CmdSetRatingProfile) RpcParams(reset bool) interface{} { if reset || self.rpcParams == nil { - self.rpcParams = &v1.AttrSetRatingProfile{} + self.rpcParams = new(utils.AttrSetRatingProfile) } return self.rpcParams } diff --git a/data/tariffplans/testit/Attributes.csv b/data/tariffplans/testit/Attributes.csv index 21db47b7a..c19507c6e 100644 --- a/data/tariffplans/testit/Attributes.csv +++ b/data/tariffplans/testit/Attributes.csv @@ -1,4 +1,4 @@ #Tenant,ID,Context,FilterIDs,ActivationInterval,FieldName,Initial,Substitute,Append,Blocker,Weight cgrates.org,ATTR_ACNT_1001,*sessions,FLTR_ACCOUNT_1001,,OfficeGroup,*any,Marketing,true,false,10 -cgrates.org,ATTR_SUPPLIER1,*chargers,*string:Category:customers,,Subject,*any,supplier1,false,false,10 +cgrates.org,ATTR_SUPPLIER1,*chargers,*string:Category:customers,,Subject,*any,supplier1,true,false,10 cgrates.org,ATTR_SUPPLIER1,,,,Category,*any,call,false,false,10 diff --git a/engine/cdrs.go b/engine/cdrs.go index 0d12d51be..e2e09064e 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -660,9 +660,6 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte // thdSProcessEvent will send the event to ThresholdS if the connection is configured func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) { - if cdrS.thdS == nil { - return - } var tIDs []string if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil && @@ -676,9 +673,6 @@ func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) { // statSProcessEvent will send the event to StatS if the connection is configured func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) { - if cdrS.stats == nil { - return - } var reply []string if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -689,11 +683,41 @@ func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) { } } -// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem -func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { - if cdrS.chargerS == nil { +// rarethsta will RAte/STOtore/REplicate/THresholds/STAts the CDR received +// used by both chargerS as well as re-/rating +func (cdrS *CdrServer) rastorethstaCDR(cdr *CDR) { + ratedCDRs, err := cdrS.rateCDR(cdr) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s rating CDR %+v.", + utils.CDRs, err.Error(), cdr)) return } + for _, rtCDR := range ratedCDRs { + if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR + go func(rtCDR *CDR) { + if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s storing CDR %+v.", + utils.CDRs, err.Error(), rtCDR)) + } + }(rtCDR) + } + if len(cdrS.cgrCfg.CDRSOnlineCDRExports) != 0 { + go cdrS.replicateCDRs([]*CDR{rtCDR}) + } + cgrEv := rtCDR.AsCGREvent() + if cdrS.thdS != nil { + go cdrS.thdSProcessEvent(cgrEv) + } + if cdrS.stats != nil { + go cdrS.statSProcessEvent(cgrEv) + } + } +} + +// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem +func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { var chrgrs []*ChrgSProcessEventReply if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil && err.Error() != utils.ErrNotFound.Error() { @@ -702,7 +726,6 @@ func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) return } - var processedCDRs []*CDR for _, chrgr := range chrgrs { cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone()) if err != nil { @@ -711,35 +734,8 @@ func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) continue } - if cdr.RunID == utils.MetaRaw { // do not calculate *raw, just save it back to DB, case of aliasing *raw data - processedCDRs = append(processedCDRs, cdr) - continue - } - ratedCDRs, err := cdrS.rateCDR(cdr) - if err != nil { - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s rating CDR %+v.", - utils.CDRs, err.Error(), cdr)) - continue - } - } - processedCDRs = append(processedCDRs, ratedCDRs...) - } - for _, cdr := range processedCDRs { - if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR - go func(cdr *CDR) { - if err := cdrS.cdrDb.SetCDR(cdr, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s storing CDR %+v.", - utils.CDRs, err.Error(), cdr)) - } - }(cdr) - } - go cdrS.replicateCDRs([]*CDR{cdr}) // Replicate CDR - cgrEv := cdr.AsCGREvent() - go cdrS.thdSProcessEvent(cgrEv) - go cdrS.statSProcessEvent(cgrEv) + cdrS.rastorethstaCDR(cdr) + } } @@ -757,13 +753,39 @@ func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err e return utils.NewErrServerError(err) // Cannot store CDR } } - cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR - - go cdrS.thdSProcessEvent(cgrEv) - go cdrS.statSProcessEvent(cgrEv) - - go cdrS.chrgrSProcessEvent(cgrEv) + if len(cdrS.cgrCfg.CDRSOnlineCDRExports) != 0 { + cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR + } + if cdrS.thdS != nil { + go cdrS.thdSProcessEvent(cgrEv) + } + if cdrS.stats != nil { + go cdrS.statSProcessEvent(cgrEv) + } + if cdrS.chargerS != nil { + go cdrS.chrgrSProcessEvent(cgrEv) + } *reply = utils.OK return nil } + +// Called by rate/re-rate API, RPC method +func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) error { + if cdrS.chargerS == nil { + return utils.NewErrNotConnected(utils.ChargerS) + } + cdrFltr, err := attrs.AsCDRsFilter(cdrS.cgrCfg.DefaultTimezone) + if err != nil { + return utils.NewErrServerError(err) + } + cdrs, _, err := cdrS.cdrDb.GetCDRs(cdrFltr, false) + if err != nil { + return err + } + for _, cdr := range cdrs { + go cdrS.rastorethstaCDR(cdr) + } + *reply = utils.OK + return nil +} diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 16f36d64e..2c496e299 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -254,6 +254,15 @@ func (rpf *TPRatingProfile) SetRatingProfilesId(id string) error { return nil } +type AttrSetRatingProfile struct { + Tenant string // Tenant's Id + Category string // TypeOfRecord + Direction string // Traffic direction, OUT is the only one supported for now + Subject string // Rating subject, usually the same as account + Overwrite bool // Overwrite if exists + RatingPlanActivations []*TPRatingActivation // Activate rating plans at specific time +} + type TPRatingActivation struct { ActivationTime string // Time when this profile will become active, defined as unix epoch time RatingPlanId string // Id of RatingPlan profile diff --git a/utils/consts.go b/utils/consts.go index 378898c3e..226a91f33 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -767,6 +767,7 @@ const ( // CdrsV2 APIs const ( CdrsV2ProcessCDR = "CdrsV2.ProcessCDR" + CdrsV2RateCDRs = "CdrsV2.RateCDRs" ) // Scheduler