CdrsV2.RateCDRs for rating/re-rating CDRs

This commit is contained in:
DanB
2018-08-26 18:50:09 +02:00
parent 7f4d0e42de
commit 218c88b617
9 changed files with 160 additions and 60 deletions

View File

@@ -403,17 +403,8 @@ func (self *ApierV1) ImportTariffPlanFromFolder(attrs utils.AttrImportTPFromFold
return nil
}
type AttrSetRatingProfile struct {
Tenant string // Tenant's Id
Category string // TypeOfRecord
Direction string // Traffic direction, OUT is the only one supported for now
Subject string // Rating subject, usually the same as account
Overwrite bool // Overwrite if exists
RatingPlanActivations []*utils.TPRatingActivation // Activate rating plans at specific time
}
// Sets a specific rating profile working with data directly in the DataDB without involving storDb
func (self *ApierV1) SetRatingProfile(attrs AttrSetRatingProfile, reply *string) (err error) {
func (self *ApierV1) SetRatingProfile(attrs utils.AttrSetRatingProfile, reply *string) (err error) {
if missing := utils.MissingStructFields(&attrs, []string{"Tenant", "TOR", "Direction", "Subject", "RatingPlanActivations"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}

View File

@@ -718,7 +718,7 @@ func TestApierReloadScheduler(t *testing.T) {
func TestApierSetRatingProfile(t *testing.T) {
reply := ""
rpa := &utils.TPRatingActivation{ActivationTime: "2012-01-01T00:00:00Z", RatingPlanId: "RETAIL1", FallbackSubjects: "dan2"}
rpf := &AttrSetRatingProfile{Tenant: "cgrates.org", Category: "call", Direction: "*out", Subject: "dan", RatingPlanActivations: []*utils.TPRatingActivation{rpa}}
rpf := &utils.AttrSetRatingProfile{Tenant: "cgrates.org", Category: "call", Direction: "*out", Subject: "dan", RatingPlanActivations: []*utils.TPRatingActivation{rpa}}
if err := rater.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil {
t.Error("Got error on ApierV1.SetRatingProfile: ", err.Error())
} else if reply != "OK" {

View File

@@ -75,3 +75,8 @@ func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string
func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error {
return self.CdrSrv.V2ProcessCDR(cgrEv, reply)
}
// RateCDRs will rate/re-rate CDRs using ChargerS
func (self *CdrsV2) RateCDRs(args *utils.RPCCDRsFilter, reply *string) error {
return self.CdrSrv.V2RateCDRs(args, reply)
}

View File

@@ -46,6 +46,8 @@ var sTestsCDRsIT = []func(t *testing.T){
testV2CDRsLoadTariffPlanFromFolder,
testV2CDRsProcessCDR,
testV2CDRsGetCdrs,
testV2CDRsRateCDRs,
testV2CDRsGetCdrs2,
testV2CDRsKillEngine,
}
@@ -183,6 +185,74 @@ func testV2CDRsGetCdrs(t *testing.T) {
}
}
// Should re-rate the supplier1 cost with RP_ANY2CNT
func testV2CDRsRateCDRs(t *testing.T) {
rpf := &utils.AttrSetRatingProfile{
Tenant: "cgrates.org",
Category: "call",
Direction: "*out",
Subject: "supplier1",
RatingPlanActivations: []*utils.TPRatingActivation{
&utils.TPRatingActivation{
ActivationTime: "2018-01-01T00:00:00Z",
RatingPlanId: "RP_ANY2CNT"}},
Overwrite: true}
var reply string
if err := cdrsRpc.Call("ApierV1.SetRatingProfile", rpf, &reply); err != nil {
t.Error("Got error on ApierV1.SetRatingProfile: ", err.Error())
} else if reply != "OK" {
t.Error("Calling ApierV1.SetRatingProfile got reply: ", reply)
}
if err := cdrsRpc.Call(utils.CdrsV2RateCDRs,
&utils.RPCCDRsFilter{NotRunIDs: []string{utils.MetaRaw}}, &reply); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(100) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs2(t *testing.T) {
var cdrCnt int64
req := utils.AttrGetCdrs{}
if err := cdrsRpc.Call("ApierV2.CountCdrs", req, &cdrCnt); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if cdrCnt != 3 {
t.Error("Unexpected number of CDRs returned: ", cdrCnt)
}
var cdrs []*engine.ExternalCDR
args := utils.RPCCDRsFilter{RunIDs: []string{utils.MetaRaw}}
if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Cost != -1.0 {
t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost)
}
}
args = utils.RPCCDRsFilter{RunIDs: []string{"CustomerCharges"}}
if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Cost != 0.0198 {
t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost)
}
}
args = utils.RPCCDRsFilter{RunIDs: []string{"SupplierCharges"}}
if err := cdrsRpc.Call("ApierV2.GetCdrs", args, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Cost != 0.0198 {
t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost)
}
}
}
func testV2CDRsKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)

View File

@@ -18,7 +18,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import "github.com/cgrates/cgrates/apier/v1"
import (
"github.com/cgrates/cgrates/utils"
)
func init() {
c := &CmdSetRatingProfile{
@@ -33,7 +35,7 @@ func init() {
type CmdSetRatingProfile struct {
name string
rpcMethod string
rpcParams *v1.AttrSetRatingProfile
rpcParams *utils.AttrSetRatingProfile
rpcResult string
*CommandExecuter
}
@@ -48,7 +50,7 @@ func (self *CmdSetRatingProfile) RpcMethod() string {
func (self *CmdSetRatingProfile) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &v1.AttrSetRatingProfile{}
self.rpcParams = new(utils.AttrSetRatingProfile)
}
return self.rpcParams
}

View File

@@ -1,4 +1,4 @@
#Tenant,ID,Context,FilterIDs,ActivationInterval,FieldName,Initial,Substitute,Append,Blocker,Weight
cgrates.org,ATTR_ACNT_1001,*sessions,FLTR_ACCOUNT_1001,,OfficeGroup,*any,Marketing,true,false,10
cgrates.org,ATTR_SUPPLIER1,*chargers,*string:Category:customers,,Subject,*any,supplier1,false,false,10
cgrates.org,ATTR_SUPPLIER1,*chargers,*string:Category:customers,,Subject,*any,supplier1,true,false,10
cgrates.org,ATTR_SUPPLIER1,,,,Category,*any,call,false,false,10
1 #Tenant ID Context FilterIDs ActivationInterval FieldName Initial Substitute Append Blocker Weight
2 cgrates.org ATTR_ACNT_1001 *sessions FLTR_ACCOUNT_1001 OfficeGroup *any Marketing true false 10
3 cgrates.org ATTR_SUPPLIER1 *chargers *string:Category:customers Subject *any supplier1 false true false 10
4 cgrates.org ATTR_SUPPLIER1 Category *any call false false 10

View File

@@ -660,9 +660,6 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte
// thdSProcessEvent will send the event to ThresholdS if the connection is configured
func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.thdS == nil {
return
}
var tIDs []string
if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
&ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil &&
@@ -676,9 +673,6 @@ func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
// statSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.stats == nil {
return
}
var reply []string
if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, cgrEv, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -689,11 +683,41 @@ func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) {
}
}
// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem
func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
if cdrS.chargerS == nil {
// rarethsta will RAte/STOtore/REplicate/THresholds/STAts the CDR received
// used by both chargerS as well as re-/rating
func (cdrS *CdrServer) rastorethstaCDR(cdr *CDR) {
ratedCDRs, err := cdrS.rateCDR(cdr)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s rating CDR %+v.",
utils.CDRs, err.Error(), cdr))
return
}
for _, rtCDR := range ratedCDRs {
if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR
go func(rtCDR *CDR) {
if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s storing CDR %+v.",
utils.CDRs, err.Error(), rtCDR))
}
}(rtCDR)
}
if len(cdrS.cgrCfg.CDRSOnlineCDRExports) != 0 {
go cdrS.replicateCDRs([]*CDR{rtCDR})
}
cgrEv := rtCDR.AsCGREvent()
if cdrS.thdS != nil {
go cdrS.thdSProcessEvent(cgrEv)
}
if cdrS.stats != nil {
go cdrS.statSProcessEvent(cgrEv)
}
}
}
// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem
func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
var chrgrs []*ChrgSProcessEventReply
if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
@@ -702,7 +726,6 @@ func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
return
}
var processedCDRs []*CDR
for _, chrgr := range chrgrs {
cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, cdrS.Timezone())
if err != nil {
@@ -711,35 +734,8 @@ func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
continue
}
if cdr.RunID == utils.MetaRaw { // do not calculate *raw, just save it back to DB, case of aliasing *raw data
processedCDRs = append(processedCDRs, cdr)
continue
}
ratedCDRs, err := cdrS.rateCDR(cdr)
if err != nil {
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s rating CDR %+v.",
utils.CDRs, err.Error(), cdr))
continue
}
}
processedCDRs = append(processedCDRs, ratedCDRs...)
}
for _, cdr := range processedCDRs {
if cdrS.cgrCfg.CDRSStoreCdrs { // Store CDR
go func(cdr *CDR) {
if err := cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s storing CDR %+v.",
utils.CDRs, err.Error(), cdr))
}
}(cdr)
}
go cdrS.replicateCDRs([]*CDR{cdr}) // Replicate CDR
cgrEv := cdr.AsCGREvent()
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.statSProcessEvent(cgrEv)
cdrS.rastorethstaCDR(cdr)
}
}
@@ -757,13 +753,39 @@ func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err e
return utils.NewErrServerError(err) // Cannot store CDR
}
}
cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR
go cdrS.thdSProcessEvent(cgrEv)
go cdrS.statSProcessEvent(cgrEv)
go cdrS.chrgrSProcessEvent(cgrEv)
if len(cdrS.cgrCfg.CDRSOnlineCDRExports) != 0 {
cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR
}
if cdrS.thdS != nil {
go cdrS.thdSProcessEvent(cgrEv)
}
if cdrS.stats != nil {
go cdrS.statSProcessEvent(cgrEv)
}
if cdrS.chargerS != nil {
go cdrS.chrgrSProcessEvent(cgrEv)
}
*reply = utils.OK
return nil
}
// Called by rate/re-rate API, RPC method
func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) error {
if cdrS.chargerS == nil {
return utils.NewErrNotConnected(utils.ChargerS)
}
cdrFltr, err := attrs.AsCDRsFilter(cdrS.cgrCfg.DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
}
cdrs, _, err := cdrS.cdrDb.GetCDRs(cdrFltr, false)
if err != nil {
return err
}
for _, cdr := range cdrs {
go cdrS.rastorethstaCDR(cdr)
}
*reply = utils.OK
return nil
}

View File

@@ -254,6 +254,15 @@ func (rpf *TPRatingProfile) SetRatingProfilesId(id string) error {
return nil
}
type AttrSetRatingProfile struct {
Tenant string // Tenant's Id
Category string // TypeOfRecord
Direction string // Traffic direction, OUT is the only one supported for now
Subject string // Rating subject, usually the same as account
Overwrite bool // Overwrite if exists
RatingPlanActivations []*TPRatingActivation // Activate rating plans at specific time
}
type TPRatingActivation struct {
ActivationTime string // Time when this profile will become active, defined as unix epoch time
RatingPlanId string // Id of RatingPlan profile

View File

@@ -767,6 +767,7 @@ const (
// CdrsV2 APIs
const (
CdrsV2ProcessCDR = "CdrsV2.ProcessCDR"
CdrsV2RateCDRs = "CdrsV2.RateCDRs"
)
// Scheduler