diff --git a/apier/v1/cdrs_it_test.go b/apier/v1/cdrs_it_test.go index 293371563..a07f81809 100644 --- a/apier/v1/cdrs_it_test.go +++ b/apier/v1/cdrs_it_test.go @@ -135,21 +135,7 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) { if err := cdrsRpc.Call(utils.ApierV1SetBalance, attrSetBalance, &reply); err != nil { t.Error(err) } else if reply != utils.OK { - t.Errorf("received: %s", reply) - } - args := &engine.ArgV1ProcessEvent{ - CGREvent: utils.CGREvent{ - Tenant: "cgrates.org", - Event: map[string]interface{}{ - utils.OriginID: "testV1CDRsProcessEventWithRefund", - utils.RequestType: utils.META_PSEUDOPREPAID, - utils.Account: "testV1CDRsProcessEventWithRefund", - utils.Destination: "+4986517174963", - utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC), - utils.Usage: time.Duration(3) * time.Minute, - utils.Subject: "ANY2CNT", - }, - }, + t.Errorf("received: <%s>", reply) } expectedVoice := 300000000000.0 if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { @@ -157,26 +143,80 @@ func testV1CDRsProcessEventWithRefund(t *testing.T) { } else if rply := acnt.BalanceMap[utils.VOICE].GetTotalValue(); rply != expectedVoice { t.Errorf("Expecting: %v, received: %v", expectedVoice, rply) } - if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, args, &reply); err != nil { + argsEv := &engine.ArgV1ProcessEvent{ + Flags: []string{utils.MetaRALs}, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.RunID: "testv1", + utils.OriginID: "testV1CDRsProcessEventWithRefund", + utils.RequestType: utils.META_PSEUDOPREPAID, + utils.Account: "testV1CDRsProcessEventWithRefund", + utils.Destination: "+4986517174963", + utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC), + utils.Usage: time.Duration(3) * time.Minute, + }, + }, + } + if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(150 * time.Millisecond) // Give time for CDR to be rated - var cdrs []*engine.ExternalCDR if err := cdrsRpc.Call(utils.ApierV1GetCDRs, utils.AttrGetCdrs{}, &cdrs); err != nil { t.Error("Unexpected error: ", err.Error()) } else if len(cdrs) != 1 { t.Error("Unexpected number of CDRs returned: ", len(cdrs)) } else { - if cdrs[0].Cost != -1.0 { + if cdrs[0].Cost != 0 { t.Errorf("Unexpected cost for CDR: %f", cdrs[0].Cost) } - if cdrs[0].ExtraFields["PayPalAccount"] != "paypal@cgrates.org" { - t.Errorf("PayPalAccount should be added by AttributeS, have: %s", - cdrs[0].ExtraFields["PayPalAccount"]) - } + } + if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + t.Error(err) + } else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 0 { + t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1)) + } else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 { + t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2)) + } + // without re-rate we should be denied + if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err == nil { + t.Error("should receive error here") + } + if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + t.Error(err) + } else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 0 { + t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1)) + } else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 { + t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2)) + } + argsEv = &engine.ArgV1ProcessEvent{ + Flags: []string{utils.MetaRALs, utils.MetaRerate}, + CGREvent: utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.RunID: "testv1", + utils.OriginID: "testV1CDRsProcessEventWithRefund", + utils.RequestType: utils.META_PSEUDOPREPAID, + utils.Account: "testV1CDRsProcessEventWithRefund", + utils.Destination: "+4986517174963", + utils.AnswerTime: time.Date(2019, 11, 27, 12, 21, 26, 0, time.UTC), + utils.Usage: time.Duration(1) * time.Minute, + }, + }, + } + if err := cdrsRpc.Call(utils.CDRsV1ProcessEvent, argsEv, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply received: ", reply) + } + if err := cdrsRpc.Call(utils.ApierV2GetAccount, acntAttrs, &acnt); err != nil { + t.Error(err) + } else if blc1 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE1"); blc1.Value != 120000000000 { // refund is done after debit + t.Errorf("Balance1 is: %s", utils.ToIJSON(blc1)) + } else if blc2 := acnt.GetBalanceWithID(utils.VOICE, "BALANCE2"); blc2.Value != 120000000000 { + t.Errorf("Balance2 is: %s", utils.ToIJSON(blc2)) } return } diff --git a/data/conf/samples/cdrsv1internal/cgrates.json b/data/conf/samples/cdrsv1internal/cgrates.json index fa1049a7c..608cf3c09 100644 --- a/data/conf/samples/cdrsv1internal/cgrates.json +++ b/data/conf/samples/cdrsv1internal/cgrates.json @@ -24,30 +24,11 @@ }, -"chargers": { - "enabled": true, - "attributes_conns": [ - {"address": "*internal"}, - ], -}, - - -"attributes": { - "enabled": true, -}, - - "cdrs": { "enabled": true, "rals_conns": [ {"address": "127.0.0.1:2012", "transport":"*json"}, ], - "attributes_conns":[ - {"address": "127.0.0.1:2012", "transport":"*json"} - ], - "chargers_conns":[ - {"address": "127.0.0.1:2012", "transport":"*json"}, - ] }, diff --git a/engine/account.go b/engine/account.go index 5f7b0eba4..a1e46d8b2 100644 --- a/engine/account.go +++ b/engine/account.go @@ -1181,3 +1181,13 @@ func (as *AccountSummary) Clone() (cln *AccountSummary) { } return } + +// GetBalanceWithID returns a Balance given balance type and balance ID +func (acnt *Account) GetBalanceWithID(blcType, blcID string) (blc *Balance) { + for _, blc = range acnt.BalanceMap[blcType] { + if blc.ID == blcID { + return + } + } + return nil +} diff --git a/engine/cdrs.go b/engine/cdrs.go index d1ac1d4ba..f7d41b517 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -507,27 +507,44 @@ func (cdrS *CDRServer) processEvent(ev *utils.CGREventWithArgDispatcher, } } if store { + refundCDRCosts := func() { // will be used to refund all CDRs on errors + for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed + if errRfd := cdrS.refundEventCost(cdr.CostDetails, + cdr.RequestType, cdr.ToR); errRfd != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> refunding CDR %+v", + utils.CDRs, errRfd.Error(), cdr)) + } + } + } for _, cdr := range cdrs { if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { - if err.Error() == "duplicate" && reRate { // fix error name here - if err = cdrS.refundEventCost(cdr.CostDetails, - cdr.RequestType, cdr.ToR); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: <%s> refunding CDR %+v", - utils.CDRs, err.Error(), cdr)) - err = utils.ErrPartiallyExecuted - return - } - // after refund we can force update - if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: <%s> updating CDR %+v", - utils.CDRs, err.Error(), cdr)) - err = utils.ErrPartiallyExecuted - return - } + if err != utils.ErrExists || !reRate { + refundCDRCosts() + return + } + // CDR was found in StorDB + // reRate is allowed, refund the previous CDR + var prevCDRs []*CDR // only one should be returned + if prevCDRs, _, err = cdrS.cdrDb.GetCDRs( + &utils.CDRsFilter{CGRIDs: []string{cdr.CGRID}, + RunIDs: []string{cdr.RunID}}, false); err != nil { + refundCDRCosts() + return + } + if err = cdrS.refundEventCost(prevCDRs[0].CostDetails, + cdr.RequestType, cdr.ToR); err != nil { + refundCDRCosts() + return + } + // after refund we can force update + if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> updating CDR %+v", + utils.CDRs, err.Error(), cdr)) + err = utils.ErrPartiallyExecuted + return } - return } } } @@ -682,7 +699,7 @@ type ArgV1ProcessEvent struct { *utils.ArgDispatcher } -// V2ProcessCDR will process the CDR out of CGREvent +// V1ProcessCDR will process the CDR out of CGREvent func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (err error) { if arg.CGREvent.ID == "" { arg.CGREvent.ID = utils.GenUUID() @@ -728,18 +745,26 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er if flgs.HasKey(utils.MetaThresholds) { thdS = flgs.GetBool(utils.MetaThresholds) } - statS := cdrS.statS != nil + stS := cdrS.statS != nil if flgs.HasKey(utils.MetaStats) { - statS = flgs.GetBool(utils.MetaStats) + stS = flgs.GetBool(utils.MetaStats) } chrgS := cdrS.chargerS != nil // activate charging for the Event if flgs.HasKey(utils.MetaChargers) { chrgS = flgs.GetBool(utils.MetaChargers) } var ralS bool // activate single rating for the CDR + fmt.Printf("flags: %s HasKeyRALs: %v\n", utils.ToIJSON(flgs), flgs.GetBool(utils.MetaRALs)) if flgs.HasKey(utils.MetaRALs) { ralS = flgs.GetBool(utils.MetaRALs) } + var reRate bool + if flgs.HasKey(utils.MetaRerate) { + reRate = flgs.GetBool(utils.MetaRerate) + if reRate { + ralS = true + } + } // end of processing options cgrEv := &utils.CGREventWithArgDispatcher{ @@ -748,47 +773,9 @@ func (cdrS *CDRServer) V1ProcessEvent(arg *ArgV1ProcessEvent, reply *string) (er if arg.ArgDispatcher != nil { cgrEv.ArgDispatcher = arg.ArgDispatcher } - - if !ralS { - if err = cdrS.attrStoExpThdStat(cgrEv, - attrS, store, false, export, thdS, statS); err != nil { - err = utils.NewErrServerError(err) - return - } - } else { // we want rating for this CDR - var partExec bool - cdr, errProc := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s converting event %+v to CDR", - utils.CDRs, errProc.Error(), cgrEv)) - err = utils.ErrPartiallyExecuted - return - } - for _, rtCDR := range cdrS.rateCDRWithErr(&CDRWithArgDispatcher{CDR: cdr, - ArgDispatcher: arg.ArgDispatcher}) { - cgrEv := &utils.CGREventWithArgDispatcher{ - CGREvent: rtCDR.AsCGREvent(), - ArgDispatcher: arg.ArgDispatcher, - } - if errProc := cdrS.attrStoExpThdStat(cgrEv, - attrS, store, false, export, thdS, statS); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v ", - utils.CDRs, errProc.Error(), cgrEv)) - partExec = true - continue - } - } - if partExec { - err = utils.ErrPartiallyExecuted - return - } - } - if chrgS { - go cdrS.chrgProcessEvent(cgrEv, - attrS, store, false, export, thdS, statS) + if err = cdrS.processEvent(cgrEv, + chrgS, attrS, ralS, store, reRate, export, thdS, stS); err != nil { + return } *reply = utils.OK return nil diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index 9f1a93056..6fd590450 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -842,9 +842,9 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { if cdr.OrderID == 0 { cdr.OrderID = iDB.cnter.Next() } + cdrKey := utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID) if !allowUpdate { - x, ok := iDB.db.Get(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID)) - if ok && x != nil { + if _, has := iDB.db.Get(utils.CDRsTBL, cdrKey); has { return utils.ErrExists } } @@ -879,7 +879,7 @@ func (iDB *InternalDB) SetCDR(cdr *CDR, allowUpdate bool) (err error) { } } - iDB.db.Set(utils.CDRsTBL, utils.ConcatenatedKey(cdr.CGRID, cdr.RunID, cdr.OriginID), cdr, idxs.AsSlice(), + iDB.db.Set(utils.CDRsTBL, cdrKey, cdr, idxs.AsSlice(), cacheCommit(utils.NonTransactional), utils.NonTransactional) return diff --git a/utils/consts.go b/utils/consts.go index 067db33c4..0b8d8b876 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -341,6 +341,7 @@ const ( MetaScheduler = "*scheduler" MetaSessionsCosts = "*sessions_costs" MetaRALs = "*rals" + MetaRerate = "*rerate" MetaStats = "*stats" MetaResponder = "*responder" MetaCore = "*core" diff --git a/utils/map.go b/utils/map.go index 226ac626e..826e0d697 100644 --- a/utils/map.go +++ b/utils/map.go @@ -282,7 +282,7 @@ func (fWp FlagsWithParams) GetBool(key string) (b bool) { return // not present means false } if v == nil || len(v) == 0 { - return false // empty slice + return true // empty slice } return v[0] == "true" // check only the first element } diff --git a/utils/map_test.go b/utils/map_test.go index 32e07ea3f..b23f8c4ff 100644 --- a/utils/map_test.go +++ b/utils/map_test.go @@ -225,7 +225,7 @@ func TestFlagsWithParamsGetBool(t *testing.T) { t.Errorf("Expecting: false, received: %+v", ToJSON(rcv)) } key = "empty" - if rcv := flagsWithParams.GetBool(key); rcv != false { + if rcv := flagsWithParams.GetBool(key); rcv != true { t.Errorf("Expecting: false, received: %+v", ToJSON(rcv)) } key = "test"