AccountS - debit on multiple accounts with dynamic weights

This commit is contained in:
DanB
2021-02-13 17:28:55 +01:00
parent 099ee3a454
commit efb9e463b8
7 changed files with 235 additions and 133 deletions

View File

@@ -42,7 +42,7 @@ type abstractBalance struct {
}
// debitUsage implements the balanceOperator interface
func (aB *abstractBalance) debitUsage(usage *utils.Decimal,
func (aB *abstractBalance) debitUsage(usage *decimal.Big,
cgrEv *utils.CGREvent) (ec *utils.EventCharges, err error) {
evNm := utils.MapStorage{
@@ -75,7 +75,7 @@ func (aB *abstractBalance) debitUsage(usage *utils.Decimal,
}
var hasUF bool
if uF != nil && uF.Factor.Cmp(decimal.New(1, 0)) != 0 {
usage.Big = utils.MultiplyBig(usage.Big, uF.Factor.Big)
usage = utils.MultiplyBig(usage, uF.Factor.Big)
hasUF = true
}
@@ -87,16 +87,16 @@ func (aB *abstractBalance) debitUsage(usage *utils.Decimal,
}
// balance smaller than usage, correct usage if the balance has limit
if aB.blnCfg.Units.Compare(usage) == -1 && blncLmt != nil {
if aB.blnCfg.Units.Big.Cmp(usage) == -1 && blncLmt != nil {
// decrease the usage to match the maximum increments
// will use special rounding to 0 since otherwise we go negative (ie: 0.05 as increment)
usage.Big = roundedUsageWithIncrements(aB.blnCfg.Units.Big, costIcrm.Increment.Big)
usage = roundedUsageWithIncrements(aB.blnCfg.Units.Big, costIcrm.Increment.Big)
}
if costIcrm.RecurrentFee.Cmp(decimal.New(0, 0)) == 0 &&
(costIcrm.FixedFee == nil ||
costIcrm.FixedFee.Cmp(decimal.New(0, 0)) == 0) {
// cost 0, no need of concrete
ec = &utils.EventCharges{Usage: usage}
ec = &utils.EventCharges{Usage: &utils.Decimal{usage}}
} else {
// attempt to debit usage with cost
if ec, err = maxDebitUsageFromConcretes(aB.cncrtBlncs, usage,
@@ -115,7 +115,7 @@ func (aB *abstractBalance) debitUsage(usage *utils.Decimal,
aB.blnCfg.Units.Big = utils.SumBig(aB.blnCfg.Units.Big, blncLmt.Big)
}
if hasUF {
usage.Big = utils.DivideBig(usage.Big, uF.Factor.Big)
usage = utils.DivideBig(usage, uF.Factor.Big)
}
return
}

View File

@@ -59,7 +59,7 @@ func TestABDebitUsageFromConcretes(t *testing.T) {
}}
// consume only from first balance
if err := debitUsageFromConcretes(aB.cncrtBlncs,
utils.NewDecimal(int64(time.Duration(5*time.Minute)), 0),
decimal.New(int64(time.Duration(5*time.Minute)), 0),
&utils.CostIncrement{
Increment: utils.NewDecimal(int64(time.Duration(time.Minute)), 0),
RecurrentFee: utils.NewDecimal(1, 0)},
@@ -76,7 +76,7 @@ func TestABDebitUsageFromConcretes(t *testing.T) {
aB.cncrtBlncs[1].blnCfg.Units = utils.NewDecimal(125, 2)
if err := debitUsageFromConcretes(aB.cncrtBlncs,
utils.NewDecimal(int64(time.Duration(9*time.Minute)), 0),
decimal.New(int64(time.Duration(9*time.Minute)), 0),
&utils.CostIncrement{
Increment: utils.NewDecimal(int64(time.Duration(time.Minute)), 0),
RecurrentFee: utils.NewDecimal(1, 0)},
@@ -93,7 +93,7 @@ func TestABDebitUsageFromConcretes(t *testing.T) {
aB.cncrtBlncs[1].blnCfg.Units = utils.NewDecimal(125, 2)
if err := debitUsageFromConcretes(aB.cncrtBlncs,
utils.NewDecimal(int64(time.Duration(10*time.Minute)), 0),
decimal.New(int64(time.Duration(10*time.Minute)), 0),
&utils.CostIncrement{
Increment: utils.NewDecimal(int64(time.Duration(time.Minute)), 0),
RecurrentFee: utils.NewDecimal(1, 0)},
@@ -135,7 +135,7 @@ func TestABDebitUsage(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -150,7 +150,7 @@ func TestABDebitUsage(t *testing.T) {
aB.blnCfg.Units = utils.NewDecimal(int64(time.Duration(60*time.Second)), 0)
aB.cncrtBlncs[0].blnCfg.Units = utils.NewDecimal(29, 0) // not enough concrete
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(29*time.Second), 0)) != 0 {
@@ -164,7 +164,7 @@ func TestABDebitUsage(t *testing.T) {
// limited by concrete
aB.cncrtBlncs[0].blnCfg.Units = utils.NewDecimal(0, 0) // not enough concrete
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(0, 0)) != 0 {
@@ -179,7 +179,7 @@ func TestABDebitUsage(t *testing.T) {
aB.blnCfg.Units = utils.NewDecimal(int64(time.Duration(29*time.Second)), 0) // not enough abstract
aB.cncrtBlncs[0].blnCfg.Units = utils.NewDecimal(60, 0)
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(29*time.Second), 0)) != 0 {
@@ -218,7 +218,7 @@ func TestABCost0WithConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -247,7 +247,7 @@ func TestABCost0WithoutConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -283,7 +283,7 @@ func TestABCost0Exceed(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(60*time.Second), 0)) != 0 {
@@ -312,7 +312,7 @@ func TestABCost0ExceedWithoutConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(60*time.Second), 0)) != 0 {
@@ -342,7 +342,7 @@ func TestABCost0WithUnlimited(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(80*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(80*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(80*time.Second), 0)) != 0 {
@@ -381,7 +381,7 @@ func TestABCost0WithUnlimitedWithConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(80*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(80*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(80*time.Second), 0)) != 0 {
@@ -413,7 +413,7 @@ func TestABCost0WithLimit(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -452,7 +452,7 @@ func TestABCost0WithLimitWithConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -484,7 +484,7 @@ func TestABCost0WithLimitExceed(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(50*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(50*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -523,7 +523,7 @@ func TestABCost0WithLimitExceedWithConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(50*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(50*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -560,14 +560,14 @@ func TestDebitUsageFiltersError(t *testing.T) {
utils.Usage: "10s",
},
}
_, err := aB.debitUsage(utils.NewDecimal(int64(40*time.Second), 0),
_, err := aB.debitUsage(decimal.New(int64(40*time.Second), 0),
cgrEv)
if err == nil || err != utils.ErrFilterNotPassingNoCaps {
t.Errorf("Expected %+v, received %+v", utils.ErrFilterNotPassingNoCaps, err)
}
aB.blnCfg.FilterIDs = []string{"invalid_filter_format"}
_, err = aB.debitUsage(utils.NewDecimal(int64(40*time.Second), 0),
_, err = aB.debitUsage(decimal.New(int64(40*time.Second), 0),
cgrEv)
if err == nil || err != utils.ErrNoDatabaseConn {
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
@@ -601,14 +601,14 @@ func TestDebitUsageBalanceLimitErrors(t *testing.T) {
}
expectedErr := "unsupported *balanceLimit format"
_, err := aB.debitUsage(utils.NewDecimal(int64(40*time.Second), 0),
_, err := aB.debitUsage(decimal.New(int64(40*time.Second), 0),
cgrEv)
if err == nil || err.Error() != expectedErr {
t.Errorf("Expected %+v, received %+v", expectedErr, err)
}
aB.blnCfg.Opts[utils.MetaBalanceLimit] = float64(16 * time.Second)
if _, err = aB.debitUsage(utils.NewDecimal(int64(40*time.Second), 0),
if _, err = aB.debitUsage(decimal.New(int64(40*time.Second), 0),
cgrEv); err != nil {
t.Error(err)
}
@@ -648,12 +648,13 @@ func TestDebitUsageUnitFactorsErrors(t *testing.T) {
},
}
if _, err := aB.debitUsage(utils.NewDecimal(int64(20*time.Second), 0), cgrEv); err == nil || err != utils.ErrNoDatabaseConn {
if _, err := aB.debitUsage(decimal.New(int64(20*time.Second), 0), cgrEv); err == nil ||
err != utils.ErrNoDatabaseConn {
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
}
aB.blnCfg.UnitFactors[0].FilterIDs = []string{"*string:*~req.Usage:10s"}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(20*time.Second), 0), cgrEv); err != nil {
if ec, err := aB.debitUsage(decimal.New(int64(20*time.Second), 0), cgrEv); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(0, 0)) != 0 {
t.Error(err)
@@ -685,7 +686,8 @@ func TestDebitUsageCostIncrementError(t *testing.T) {
},
}
if _, err := aB.debitUsage(utils.NewDecimal(int64(20*time.Second), 0), cgrEv); err == nil || err != utils.ErrNoDatabaseConn {
if _, err := aB.debitUsage(decimal.New(int64(20*time.Second), 0), cgrEv); err == nil ||
err != utils.ErrNoDatabaseConn {
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
}
@@ -694,7 +696,8 @@ func TestDebitUsageCostIncrementError(t *testing.T) {
aB.blnCfg.CostIncrements = nil
aB.blnCfg.AttributeIDs = []string{"attr11"}
expected := "NOT_CONNECTED: AttributeS"
if _, err := aB.debitUsage(utils.NewDecimal(int64(20*time.Second), 0), cgrEv); err == nil || err.Error() != expected {
if _, err := aB.debitUsage(decimal.New(int64(20*time.Second), 0), cgrEv); err == nil ||
err.Error() != expected {
t.Errorf("Expected %+v, received %+v", expected, err)
}
}
@@ -725,7 +728,7 @@ func TestABCost(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(10*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(10*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(10*time.Second), 0)) != 0 {
@@ -774,7 +777,7 @@ func TestABCostWithFiltersNotMatch(t *testing.T) {
"CustomField2": "CustomValue2",
},
}
if _, err := aB.debitUsage(utils.NewDecimal(int64(10*time.Second), 0),
if _, err := aB.debitUsage(decimal.New(int64(10*time.Second), 0),
cgrEv); err == nil || err.Error() != "RATES_ERROR:NOT_CONNECTED: RateS" {
t.Error(err)
}
@@ -818,7 +821,7 @@ func TestABCostWithFilters(t *testing.T) {
},
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(10*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(10*time.Second), 0),
cgrEv); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(10*time.Second), 0)) != 0 {
@@ -856,7 +859,7 @@ func TestABCostExceed(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(60*time.Second), 0)) != 0 {
@@ -897,7 +900,7 @@ func TestABCostUnlimitedExceed(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(70*time.Second), 0)) != 0 {
@@ -938,7 +941,7 @@ func TestABCostLimit(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(30*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(30*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -979,7 +982,7 @@ func TestABCostLimitExceed(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(30*time.Second), 0)) != 0 {
@@ -1017,7 +1020,7 @@ func TestABCostNotEnoughConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(55*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(55*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(50*time.Second), 0)) != 0 {
@@ -1062,7 +1065,7 @@ func TestABCostMultipleConcrete(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(55*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(55*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(55*time.Second), 0)) != 0 {
@@ -1112,7 +1115,7 @@ func TestABCostMultipleConcreteUnlimited(t *testing.T) {
fltrS: new(engine.FilterS),
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(70*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(70*time.Second), 0),
new(utils.CGREvent)); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(70*time.Second), 0)) != 0 {
@@ -1169,7 +1172,7 @@ func TestAMCostWithUnitFactor(t *testing.T) {
},
}
if ec, err := aB.debitUsage(utils.NewDecimal(int64(10*time.Second), 0),
if ec, err := aB.debitUsage(decimal.New(int64(10*time.Second), 0),
cgrEv); err != nil {
t.Error(err)
} else if ec.Usage.Cmp(decimal.New(int64(20*time.Second), 0)) != 0 {

View File

@@ -65,11 +65,11 @@ func (aS *AccountS) Call(serviceMethod string, args interface{}, reply interface
return utils.RPCCall(aS, serviceMethod, args, reply)
}
// matchingAccountForEvent returns the matched Account for the given event
// if lked option is passed, the AccountProfile will be also locked
// matchingAccountsForEvent returns the matched Accounts for the given event
// if lked option is passed, each AccountProfile will be also locked
// so it becomes responsibility of upper layers to release the lock
func (aS *AccountS) matchingAccountForEvent(tnt string, cgrEv *utils.CGREvent,
acntIDs []string, lked bool) (acnt *utils.AccountProfile, lkID string, err error) {
func (aS *AccountS) matchingAccountsForEvent(tnt string, cgrEv *utils.CGREvent,
acntIDs []string, lked bool) (acnts utils.AccountProfilesWithWeight, err error) {
evNm := utils.MapStorage{
utils.MetaReq: cgrEv.Event,
utils.MetaOpts: cgrEv.Opts,
@@ -101,9 +101,7 @@ func (aS *AccountS) matchingAccountForEvent(tnt string, cgrEv *utils.CGREvent,
var qAcnt *utils.AccountProfile
if qAcnt, err = aS.dm.GetAccountProfile(tnt, acntID,
true, true, utils.NonTransactional); err != nil {
if lked {
guardian.Guardian.UnguardIDs(refID)
}
guardian.Guardian.UnguardIDs(refID)
if err == utils.ErrNotFound {
err = nil
continue
@@ -113,44 +111,37 @@ func (aS *AccountS) matchingAccountForEvent(tnt string, cgrEv *utils.CGREvent,
if _, isDisabled := qAcnt.Opts[utils.Disabled]; isDisabled ||
(qAcnt.ActivationInterval != nil && cgrEv.Time != nil &&
!qAcnt.ActivationInterval.IsActiveAtTime(*cgrEv.Time)) { // not active
if lked {
guardian.Guardian.UnguardIDs(refID)
}
guardian.Guardian.UnguardIDs(refID)
continue
}
var pass bool
if pass, err = aS.fltrS.Pass(tnt, qAcnt.FilterIDs, evNm); err != nil {
if lked {
guardian.Guardian.UnguardIDs(refID)
}
guardian.Guardian.UnguardIDs(refID)
return
} else if !pass {
if lked {
guardian.Guardian.UnguardIDs(refID)
}
guardian.Guardian.UnguardIDs(refID)
continue
}
if acnt == nil {
acnt = qAcnt
if lked {
if lkID != utils.EmptyString {
guardian.Guardian.UnguardIDs(lkID)
}
lkID = refID
var weight float64
/*
if weight, err = engine.WeightFromDynamics(acnt.DynamicWeights,
aS.fltrS, cgrEv.Tenant, ev); err != nil {
return
}
} else if lked {
guardian.Guardian.UnguardIDs(refID)
}
*/
acnts = append(acnts, &utils.AccountProfileWithWeight{qAcnt, weight, refID})
}
if acnt == nil {
return nil, "", utils.ErrNotFound
if len(acnts) == 0 {
return nil, utils.ErrNotFound
}
acnts.Sort()
return
}
// accountProcessEvent implements event processing by an Account
func (aS *AccountS) accountDebitUsage(acnt *utils.AccountProfile,
// accountDebitUsage will debit the usage out of an Account
func (aS *AccountS) accountDebitUsage(acnt *utils.AccountProfile, usage *decimal.Big,
cgrEv *utils.CGREvent) (ec *utils.EventCharges, err error) {
// Find balances matching event
//ev := cgrEv.AsDataProvider()
blcsWithWeight := make(utils.BalancesWithWeight, 0, len(acnt.Balances))
for _, blnCfg := range acnt.Balances {
@@ -163,12 +154,38 @@ func (aS *AccountS) accountDebitUsage(acnt *utils.AccountProfile,
*/
blcsWithWeight = append(blcsWithWeight, &utils.BalanceWithWeight{blnCfg, weight})
}
blcsWithWeight.Sort()
var blncOpers []balanceOperator
if blncOpers, err = newBalanceOperators(blcsWithWeight.Balances(), aS.fltrS, aS.connMgr,
aS.cfg.AccountSCfg().AttributeSConns, aS.cfg.AccountSCfg().RateSConns); err != nil {
return
}
usage := utils.NewDecimal(int64(72*time.Hour), 0)
for i, blncOper := range blncOpers {
if i == 0 {
ec = utils.NewEventCharges()
}
if usage.Cmp(decimal.New(0, 0)) == 0 {
return // no more debit
}
var ecDbt *utils.EventCharges
if ecDbt, err = blncOper.debitUsage(new(decimal.Big).Copy(usage), cgrEv); err != nil {
if err == utils.ErrFilterNotPassingNoCaps {
err = nil
continue
}
return
}
usage = utils.SubstractBig(usage, ecDbt.Usage.Big)
ec.Merge(ecDbt)
}
return
}
// accountsDebitUsage will debit an usage out of multiple accounts
func (aS *AccountS) accountsDebitUsage(acnts []*utils.AccountProfileWithWeight,
cgrEv *utils.CGREvent, store bool) (ec *utils.EventCharges, err error) {
usage := decimal.New(int64(72*time.Hour), 0)
var usgEv time.Duration
if usgEv, err = cgrEv.FieldAsDuration(utils.Usage); err != nil {
if err != utils.ErrNotFound {
@@ -181,27 +198,33 @@ func (aS *AccountS) accountDebitUsage(acnt *utils.AccountProfile,
}
err = nil
} else { // found, overwrite usage
usage.Big = decimal.New(int64(usgEv), 0)
usage = decimal.New(int64(usgEv), 0)
}
} else {
usage.Big = decimal.New(int64(usgEv), 0)
usage = decimal.New(int64(usgEv), 0)
}
for i, blncOper := range blncOpers {
acntBkps := make([]utils.AccountBalancesBackup, len(acnts))
for i, acnt := range acnts {
if i == 0 {
ec = utils.NewEventCharges()
}
if usage.Big.Cmp(decimal.New(0, 0)) == 0 {
return // no more debit
if usage.Cmp(decimal.New(0, 0)) == 0 {
return // no more debits
}
acntBkps[i] = acnt.AccountProfile.AccountBalancesBackup()
var ecDbt *utils.EventCharges
if ecDbt, err = blncOper.debitUsage(usage.Clone(), cgrEv); err != nil {
if err == utils.ErrFilterNotPassingNoCaps {
err = nil
continue
if ecDbt, err = aS.accountDebitUsage(acnt.AccountProfile,
new(decimal.Big).Copy(usage), cgrEv); err != nil {
if store {
restoreAccounts(aS.dm, acnts, acntBkps)
}
return
}
usage.Big = utils.SubstractBig(usage.Big, ecDbt.Usage.Big)
if err = aS.dm.SetAccountProfile(acnt.AccountProfile, false); err != nil {
restoreAccounts(aS.dm, acnts, acntBkps)
return
}
usage = utils.SubstractBig(usage, ecDbt.Usage.Big)
ec.Merge(ecDbt)
}
return
@@ -212,35 +235,37 @@ func (aS *AccountS) accountDebitCost(acnt *utils.AccountProfile,
return
}
// V1AccountProfileForEvent returns the matching AccountProfile for Event
func (aS *AccountS) V1AccountProfileForEvent(args *utils.ArgsAccountForEvent, ap *utils.AccountProfile) (err error) {
var acnt *utils.AccountProfile
if acnt, _, err = aS.matchingAccountForEvent(args.CGREvent.Tenant,
// V1AccountProfilesForEvent returns the matching AccountProfiles for Event
func (aS *AccountS) V1AccountProfilesForEvent(args *utils.ArgsAccountsForEvent, aps *[]*utils.AccountProfile) (err error) {
var acnts utils.AccountProfilesWithWeight
if acnts, err = aS.matchingAccountsForEvent(args.CGREvent.Tenant,
args.CGREvent, args.AccountIDs, false); err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return
}
*ap = *acnt // ToDo: make sure we clone in RPC
*aps = acnts.AccountProfiles()
return
}
// V1MaxUsage returns the maximum usage for the event, based on matching Account
func (aS *AccountS) V1MaxUsage(args *utils.ArgsAccountForEvent, eEc *utils.ExtEventCharges) (err error) {
var acnt *utils.AccountProfile
var lkID string
if acnt, lkID, err = aS.matchingAccountForEvent(args.CGREvent.Tenant,
// V1MaxUsage returns the maximum usage for the event, based on matching Accounts
func (aS *AccountS) V1MaxUsage(args *utils.ArgsAccountsForEvent, eEc *utils.ExtEventCharges) (err error) {
var acnts utils.AccountProfilesWithWeight
if acnts, err = aS.matchingAccountsForEvent(args.CGREvent.Tenant,
args.CGREvent, args.AccountIDs, true); err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return
}
defer guardian.Guardian.UnguardIDs(lkID)
defer func() {
for _, lkID := range acnts.LockIDs() {
guardian.Guardian.UnguardIDs(lkID)
}
}()
var procEC *utils.EventCharges
if procEC, err = aS.accountDebitUsage(acnt, args.CGREvent); err != nil {
if procEC, err = aS.accountsDebitUsage(acnts, args.CGREvent, false); err != nil {
return
}
var rcvEec *utils.ExtEventCharges
@@ -252,20 +277,23 @@ func (aS *AccountS) V1MaxUsage(args *utils.ArgsAccountForEvent, eEc *utils.ExtEv
}
// V1DebitUsage performs debit for the provided event
func (aS *AccountS) V1DebitUsage(args *utils.ArgsAccountForEvent, eEc *utils.ExtEventCharges) (err error) {
var acnt *utils.AccountProfile
var lkID string
if acnt, lkID, err = aS.matchingAccountForEvent(args.CGREvent.Tenant,
func (aS *AccountS) V1DebitUsage(args *utils.ArgsAccountsForEvent, eEc *utils.ExtEventCharges) (err error) {
var acnts utils.AccountProfilesWithWeight
if acnts, err = aS.matchingAccountsForEvent(args.CGREvent.Tenant,
args.CGREvent, args.AccountIDs, true); err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return
}
defer guardian.Guardian.UnguardIDs(lkID)
defer func() {
for _, lkID := range acnts.LockIDs() {
guardian.Guardian.UnguardIDs(lkID)
}
}()
var procEC *utils.EventCharges
if procEC, err = aS.accountDebitUsage(acnt, args.CGREvent); err != nil {
if procEC, err = aS.accountsDebitUsage(acnts, args.CGREvent, true); err != nil {
return
}
@@ -274,10 +302,6 @@ func (aS *AccountS) V1DebitUsage(args *utils.ArgsAccountForEvent, eEc *utils.Ext
return
}
if err = aS.dm.SetAccountProfile(acnt, false); err != nil {
return // no need of revert since we did not save
}
*eEc = *rcvEec
return
}

View File

@@ -110,7 +110,7 @@ func (cB *concreteBalance) debitUnits(dUnts *utils.Decimal, tnt string,
}
// debit implements the balanceOperator interface
func (cB *concreteBalance) debitUsage(usage *utils.Decimal,
func (cB *concreteBalance) debitUsage(usage *decimal.Big,
cgrEv *utils.CGREvent) (ec *utils.EventCharges, err error) {
evNm := utils.MapStorage{
utils.MetaOpts: cgrEv.Opts,

View File

@@ -75,7 +75,7 @@ func newBalanceOperator(blncCfg *utils.Balance, cncrtBlncs []*concreteBalance,
// balanceOperator is the implementation of a balance type
type balanceOperator interface {
debitUsage(usage *utils.Decimal, cgrEv *utils.CGREvent) (ec *utils.EventCharges, err error)
debitUsage(usage *decimal.Big, cgrEv *utils.CGREvent) (ec *utils.EventCharges, err error)
}
// roundUsageWithIncrements rounds the usage based on increments
@@ -189,7 +189,7 @@ func balanceLimit(optsCfg map[string]interface{}) (bL *utils.Decimal, err error)
// debitUsageFromConcrete attempts to debit the usage out of concrete balances
// returns utils.ErrInsufficientCredit if complete usage cannot be debitted
func debitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Decimal,
func debitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *decimal.Big,
costIcrm *utils.CostIncrement, cgrEv *utils.CGREvent,
connMgr *engine.ConnManager, rateSConns, rpIDs []string) (err error) {
if costIcrm.RecurrentFee.Cmp(decimal.New(-1, 0)) == 0 &&
@@ -209,7 +209,7 @@ func debitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Decimal
// RecurrentFee is configured, used it with increments
if costIcrm.RecurrentFee.Big.Cmp(decimal.New(-1, 0)) != 0 {
rcrntCost := utils.MultiplyBig(
utils.DivideBig(usage.Big, costIcrm.Increment.Big),
utils.DivideBig(usage, costIcrm.Increment.Big),
costIcrm.RecurrentFee.Big)
if tCost == nil {
tCost = rcrntCost
@@ -239,7 +239,7 @@ func debitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Decimal
}
// maxDebitUsageFromConcretes will debit the maximum possible usage out of concretes
func maxDebitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Decimal,
func maxDebitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *decimal.Big,
connMgr *engine.ConnManager, cgrEv *utils.CGREvent,
attrSConns, attributeIDs, rateSConns, rpIDs []string,
costIcrm *utils.CostIncrement) (ec *utils.EventCharges, err error) {
@@ -269,7 +269,7 @@ func maxDebitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Deci
if i == maxItr {
return nil, utils.ErrMaxIncrementsExceeded
}
qriedUsage := usage.Big // so we can detect loops
qriedUsage := usage // so we can detect loops
if err = debitUsageFromConcretes(cncrtBlncs, usage, costIcrm, cgrEv,
connMgr, rateSConns, rpIDs); err != nil {
if err != utils.ErrInsufficientCredit {
@@ -277,35 +277,35 @@ func maxDebitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Deci
}
err = nil
// ErrInsufficientCredit
usageDenied = new(decimal.Big).Copy(usage.Big)
usageDenied = new(decimal.Big).Copy(usage)
if usagePaid == nil { // going backwards
usage.Big = utils.DivideBig( // divide by 2
usage.Big, decimal.New(2, 0))
usage.Big = roundedUsageWithIncrements(usage.Big, costIcrm.Increment.Big) // make sure usage is multiple of increments
if usage.Big.Cmp(usageDenied) >= 0 ||
usage.Big.Cmp(decimal.New(0, 0)) == 0 ||
usage.Big.Cmp(qriedUsage) == 0 { // loop
usage = utils.DivideBig( // divide by 2
usage, decimal.New(2, 0))
usage = roundedUsageWithIncrements(usage, costIcrm.Increment.Big) // make sure usage is multiple of increments
if usage.Cmp(usageDenied) >= 0 ||
usage.Cmp(decimal.New(0, 0)) == 0 ||
usage.Cmp(qriedUsage) == 0 { // loop
break
}
continue
}
} else {
usagePaid = new(decimal.Big).Copy(usage.Big)
usagePaid = new(decimal.Big).Copy(usage)
paidConcrtUnts = cloneUnitsFromConcretes(cncrtBlncs)
if i == 0 { // no estimation done, covering full
break
}
}
// going upwards
usage.Big = utils.SumBig(usagePaid,
usage = utils.SumBig(usagePaid,
utils.DivideBig(usagePaid, decimal.New(2, 0)).RoundToInt())
if usage.Big.Cmp(usageDenied) >= 0 {
usage.Big = utils.SumBig(usagePaid, costIcrm.Increment.Big)
if usage.Cmp(usageDenied) >= 0 {
usage = utils.SumBig(usagePaid, costIcrm.Increment.Big)
}
usage.Big = roundedUsageWithIncrements(usage.Big, costIcrm.Increment.Big)
if usage.Big.Cmp(usagePaid) <= 0 ||
usage.Big.Cmp(usageDenied) >= 0 ||
usage.Big.Cmp(qriedUsage) == 0 { // loop
usage = roundedUsageWithIncrements(usage, costIcrm.Increment.Big)
if usage.Cmp(usagePaid) <= 0 ||
usage.Cmp(usageDenied) >= 0 ||
usage.Cmp(qriedUsage) == 0 { // loop
break
}
}
@@ -317,3 +317,19 @@ func maxDebitUsageFromConcretes(cncrtBlncs []*concreteBalance, usage *utils.Deci
restoreUnitsFromClones(cncrtBlncs, paidConcrtUnts)
return &utils.EventCharges{Usage: &utils.Decimal{usagePaid}}, nil
}
// restoreAccounts will restore the accounts in DataDB out of their backups if present
func restoreAccounts(dm *engine.DataManager,
acnts []*utils.AccountProfileWithWeight, bkps []utils.AccountBalancesBackup) {
for i, bkp := range bkps {
if bkp == nil ||
!acnts[i].AccountProfile.BalancesAltered(bkp) {
continue
}
acnts[i].AccountProfile.RestoreFromBackup(bkp)
if err := dm.SetAccountProfile(acnts[i].AccountProfile, false); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> error <%s> restoring account <%s>",
utils.AccountS, err, acnts[i].AccountProfile.TenantID()))
}
}
}

View File

@@ -159,19 +159,19 @@ func (aSv1 *AccountSv1) Ping(ign *utils.CGREvent, reply *string) error {
}
// AccountProfileForEvent returns the matching AccountProfile for Event
func (aSv1 *AccountSv1) AccountProfileForEvent(args *utils.ArgsAccountForEvent,
ap *utils.AccountProfile) (err error) {
return aSv1.aS.V1AccountProfileForEvent(args, ap)
func (aSv1 *AccountSv1) AccountProfilesForEvent(args *utils.ArgsAccountsForEvent,
aps *[]*utils.AccountProfile) (err error) {
return aSv1.aS.V1AccountProfilesForEvent(args, aps)
}
// MaxUsage returns the maximum usage for the event, based on matching Account
func (aSv1 *AccountSv1) MaxUsage(args *utils.ArgsAccountForEvent,
func (aSv1 *AccountSv1) MaxUsage(args *utils.ArgsAccountsForEvent,
eEc *utils.ExtEventCharges) (err error) {
return aSv1.aS.V1MaxUsage(args, eEc)
}
// DebitUsage performs debit for the provided event
func (aSv1 *AccountSv1) DebitUsage(args *utils.ArgsAccountForEvent,
func (aSv1 *AccountSv1) DebitUsage(args *utils.ArgsAccountsForEvent,
eEc *utils.ExtEventCharges) (err error) {
return aSv1.aS.V1DebitUsage(args, eEc)
}

View File

@@ -20,6 +20,8 @@ package utils
import (
"time"
"github.com/ericlagergren/decimal"
)
// AccountProfile represents one Account on a Tenant
@@ -34,6 +36,41 @@ type AccountProfile struct {
ThresholdIDs []string
}
// BalancesAltered detects altering of the Balances by comparing the Balance values with the ones from backup
func (ap *AccountProfile) BalancesAltered(abb AccountBalancesBackup) (altred bool) {
if len(ap.Balances) != len(abb) {
return true
}
for blncID, blnc := range ap.Balances {
if bkpVal, has := abb[blncID]; !has {
return true
} else if blnc.Units.Big.Cmp(bkpVal) != 0 {
return true
}
}
return
}
func (ap *AccountProfile) RestoreFromBackup(abb AccountBalancesBackup) {
for blncID, val := range abb {
ap.Balances[blncID].Units.Big = val
}
}
// AccountBalanceBackups is used to create balance snapshots as backups
type AccountBalancesBackup map[string]*decimal.Big
// AccountBalanceBackup returns a backup of all balance values
func (ap *AccountProfile) AccountBalancesBackup() (abb AccountBalancesBackup) {
if ap.Balances != nil {
abb = make(AccountBalancesBackup)
for blncID, blnc := range ap.Balances {
abb[blncID] = new(decimal.Big).Copy(blnc.Units.Big)
}
}
return
}
// Balance represents one Balance inside an Account
type Balance struct {
ID string // Balance identificator, unique within an Account
@@ -203,6 +240,7 @@ func (bL *Balance) Clone() (blnc *Balance) {
type AccountProfileWithWeight struct {
*AccountProfile
Weight float64
LockID string
}
// AccountProfilesWithWeight is a sortable list of AccountProfileWithWeight
@@ -227,6 +265,27 @@ func (apWws AccountProfilesWithWeight) AccountProfiles() (aps []*AccountProfile)
return
}
// LockIDs returns the list of LockIDs
func (apWws AccountProfilesWithWeight) LockIDs() (lkIDs []string) {
if apWws != nil {
lkIDs = make([]string, len(apWws))
for i, apWw := range apWws {
lkIDs[i] = apWw.LockID
}
}
return
}
func (apWws AccountProfilesWithWeight) TenantIDs() (tntIDs []string) {
if apWws != nil {
tntIDs = make([]string, len(apWws))
for i, apWw := range apWws {
tntIDs[i] = apWw.AccountProfile.TenantID()
}
}
return
}
// BalanceWithWeight attaches static Weight to Balance
type BalanceWithWeight struct {
*Balance
@@ -268,7 +327,7 @@ type AccountProfileWithOpts struct {
}
// ArgsAccountForEvent arguments used for process event
type ArgsAccountForEvent struct {
type ArgsAccountsForEvent struct {
*CGREvent
AccountIDs []string
}