CDRsV1.ProcessEvent with refund for AccountableRequestTypes

This commit is contained in:
DanB
2019-12-08 19:44:36 +01:00
parent 6747596327
commit 5fc2dc9092
8 changed files with 128 additions and 109 deletions

View File

@@ -135,21 +135,7 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) {
if err := cdrsRpc.Call(utils.ApierV1SetBalance, attrSetBalance, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("received: %s", reply)
}
args := &engine.ArgV1ProcessEvent{
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.OriginID: "testV1CDRsProcessEventWithRefund",
utils.RequestType: utils.META_PSEUDOPREPAID,
utils.Account: "testV1CDRsProcessEventWithRefund",
utils.Destination: "+4986517174963",
utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC),
utils.Usage: time.Duration(3) * time.Minute,
utils.Subject: "ANY2CNT",
},
},
t.Errorf("received: <%s>", reply)
}
expectedVoice := 300000000000.0
if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
@@ -157,26 +143,80 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) {
} else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != expectedVoice {
t.Errorf("Expecting: %v, received: %v", expectedVoice, rply)
}
if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err != nil {
argsEv := &engine.ArgV1ProcessEvent{
Flags: []string{utils.MetaRALs},
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.RunID: "testv1",
utils.OriginID: "testV1CDRsProcessEventWithRefund",
utils.RequestType: utils.META_PSEUDOPREPAID,
utils.Account: "testV1CDRsProcessEventWithRefund",
utils.Destination: "+4986517174963",
utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC),
utils.Usage: time.Duration(3) * time.Minute,
},
},
}
if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(150 * time.Millisecond) // Give time for CDR to be rated
var cdrs []*engine.ExternalCDR
if err := cdrsRpc.Call(utils.ApierV1GetCDRs, utils.AttrGetCdrs{}, &cdrs); err != nil {
t.Error("Unexpected error: ", err.Error())
} else if len(cdrs) != 1 {
t.Error("Unexpected number of CDRs returned: ", len(cdrs))
} else {
if cdrs[0].Cost != -1.0 {
if cdrs[0].Cost != 0 {
t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost)
}
if cdrs[0].ExtraFields["PayPalAccount"] != "paypal@cgrates.org" {
t.Errorf("PayPalAccount should be added by AttributeS, have: %s",
cdrs[0].ExtraFields["PayPalAccount"])
}
}
if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
t.Error(err)
} else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 0 {
t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1))
} else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 {
t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2))
}
// without re-rate we should be denied
if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err == nil {
t.Error("should receive error here")
}
if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
t.Error(err)
} else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 0 {
t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1))
} else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 {
t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2))
}
argsEv = &engine.ArgV1ProcessEvent{
Flags: []string{utils.MetaRALs, utils.MetaRerate},
CGREvent: utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.RunID: "testv1",
utils.OriginID: "testV1CDRsProcessEventWithRefund",
utils.RequestType: utils.META_PSEUDOPREPAID,
utils.Account: "testV1CDRsProcessEventWithRefund",
utils.Destination: "+4986517174963",
utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC),
utils.Usage: time.Duration(1) * time.Minute,
},
},
}
if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil {
t.Error(err)
} else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 120000000000 { // refund is done after debit
t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1))
} else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 {
t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2))
}
return
}

View File

@@ -24,30 +24,11 @@
},
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
},
"attributes": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"rals_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"attributes_conns":[
{"address": "127.0.0.1:2012", "transport":"*json"}
],
"chargers_conns":[
{"address": "127.0.0.1:2012", "transport":"*json"},
]
},

View File

@@ -1181,3 +1181,13 @@ func (as *AccountSummary) Clone() (cln *AccountSummary) {
}
return
}
// GetBalanceWithID returns a Balance given balance type and balance ID
func (acnt *Account) GetBalanceWithID(blcType, blcID string) (blc *Balance) {
for _, blc = range acnt.BalanceMap[blcType] {
if blc.ID == blcID {
return
}
}
return nil
}

View File

@@ -507,27 +507,44 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher,
}
}
if store {
refundCDRCosts := func() { // will be used to refund all CDRs on errors
for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed
if errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), cdr))
}
}
}
for _, cdr := range cdrs {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err.Error() == "duplicate" && reRate { // fix error name here
if err = cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, err.Error(), cdr))
err = utils.ErrPartiallyExecuted
return
}
// after refund we can force update
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), cdr))
err = utils.ErrPartiallyExecuted
return
}
if err != utils.ErrExists || !reRate {
refundCDRCosts()
return
}
// CDR was found in StorDB
// reRate is allowed, refund the previous CDR
var prevCDRs []*CDR // only one should be returned
if prevCDRs, _, err = cdrS.cdrDb.GetCDRs(
&utils.CDRsFilter{CGRIDs: []string{cdr.CGRID},
RunIDs: []string{cdr.RunID}}, false); err != nil {
refundCDRCosts()
return
}
if err = cdrS.refundEventCost(prevCDRs[0].CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
refundCDRCosts()
return
}
// after refund we can force update
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), cdr))
err = utils.ErrPartiallyExecuted
return
}
return
}
}
}
@@ -682,7 +699,7 @@ type ArgV1ProcessEvent struct {
*utils.ArgDispatcher
}
// V2ProcessCDR will process the CDR out of CGREvent
// V1ProcessCDR will process the CDR out of CGREvent
func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (err error) {
if arg.CGREvent.ID == "" {
arg.CGREvent.ID = utils.GenUUID()
@@ -728,18 +745,26 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
if flgs.HasKey(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
statS := cdrS.statS != nil
stS := cdrS.statS != nil
if flgs.HasKey(utils.MetaStats) {
statS = flgs.GetBool(utils.MetaStats)
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := cdrS.chargerS != nil // activate charging for the Event
if flgs.HasKey(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // activate single rating for the CDR
fmt.Printf("flags: %s HasKeyRALs: %v\n", utils.ToIJSON(flgs), flgs.GetBool(utils.MetaRALs))
if flgs.HasKey(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if flgs.HasKey(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
if reRate {
ralS = true
}
}
// end of processing options
cgrEv := &utils.CGREventWithArgDispatcher{
@@ -748,47 +773,9 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er
if arg.ArgDispatcher != nil {
cgrEv.ArgDispatcher = arg.ArgDispatcher
}
if !ralS {
if err = cdrS.attrStoExpThdStat(cgrEv,
attrS, store, false, export, thdS, statS); err != nil {
err = utils.NewErrServerError(err)
return
}
} else { // we want rating for this CDR
var partExec bool
cdr, errProc := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s converting event %+v to CDR",
utils.CDRs, errProc.Error(), cgrEv))
err = utils.ErrPartiallyExecuted
return
}
for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr,
ArgDispatcher: arg.ArgDispatcher}) {
cgrEv := &utils.CGREventWithArgDispatcher{
CGREvent: rtCDR.AsCGREvent(),
ArgDispatcher: arg.ArgDispatcher,
}
if errProc := cdrS.attrStoExpThdStat(cgrEv,
attrS, store, false, export, thdS, statS); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v ",
utils.CDRs, errProc.Error(), cgrEv))
partExec = true
continue
}
}
if partExec {
err = utils.ErrPartiallyExecuted
return
}
}
if chrgS {
go cdrS.chrgProcessEvent(cgrEv,
attrS, store, false, export, thdS, statS)
if err = cdrS.processEvent(cgrEv,
chrgS, attrS, ralS, store, reRate, export, thdS, stS); err != nil {
return
}
*reply = utils.OK
return nil

View File

@@ -842,9 +842,9 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
if cdr.OrderID == 0 {
cdr.OrderID = iDB.cnter.Next()
}
cdrKey := utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID)
if !allowUpdate {
x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID))
if ok && x != nil {
if _, has := iDB.db.Get(utils.CDRsTBL, cdrKey); has {
return utils.ErrExists
}
}
@@ -879,7 +879,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) {
}
}
iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(),
iDB.db.Set(utils.CDRsTBL, cdrKey, cdr, idxs.AsSlice(),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return

View File

@@ -341,6 +341,7 @@ const (
MetaScheduler = "*scheduler"
MetaSessionsCosts = "*sessions_costs"
MetaRALs = "*rals"
MetaRerate = "*rerate"
MetaStats = "*stats"
MetaResponder = "*responder"
MetaCore = "*core"

View File

@@ -282,7 +282,7 @@ func (fWp FlagsWithParams) GetBool(key string) (b bool) {
return // not present means false
}
if v == nil || len(v) == 0 {
return false // empty slice
return true // empty slice
}
return v[0] == "true" // check only the first element
}

View File

@@ -225,7 +225,7 @@ func TestFlagsWithParamsGetBool(t *testing.T) {
t.Errorf("Expecting: false, received: %+v", ToJSON(rcv))
}
key = "empty"
if rcv := flagsWithParams.GetBool(key); rcv != false {
if rcv := flagsWithParams.GetBool(key); rcv != true {
t.Errorf("Expecting: false, received: %+v", ToJSON(rcv))
}
key = "test"