Account concurrency fixes for debit, maxDebit and GetMaxSessionDuration

This commit is contained in:
DanB
2017-03-20 15:37:14 +01:00
parent b9be89d8f6
commit 0563a4c6b2
4 changed files with 237 additions and 210 deletions

View File

@@ -791,7 +791,7 @@ func (acc *Account) GetSharedGroups() (groups []string) {
return
}
func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) {
func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) { // ToDo: make sure we return accountIDs
var balances []*Balance
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, utils.MONETARY, "")...)
balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, cd.TOR, "")...)

View File

@@ -637,21 +637,28 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura
func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil {
return 0, err
} else {
if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
if _, err := guardian.Guardian.Guard(func() (interface{}, error) {
duration, err = cd.getMaxSessionDuration(account)
return 0, err
}, 0, memberIds.Slice()...); err != nil {
return 0, err
}
} else {
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
account, err := cd.getAccount()
if err != nil {
return 0, err
}
return duration, err
}
acntIDs, err := account.GetUniqueSharedGroupMembers(cd)
if err != nil {
return nil, err
}
var lkIDs []string
for acntID := range acntIDs {
if acntID != cd.GetAccountKey() {
lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID)
}
}
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
duration, err = cd.getMaxSessionDuration(account)
return
}, 0, lkIDs...)
return
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
return
}
// Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method)
@@ -693,7 +700,7 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
if len(roundIncrements) != 0 {
rcd := cc.CreateCallDescriptor()
rcd.Increments = roundIncrements
rcd.RefundRounding()
rcd.refundRounding()
}
}
//log.Printf("OUT CC: ", cc)
@@ -702,20 +709,29 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool)
func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
// lock all group members
if account, err := cd.getAccount(); err != nil {
return nil, err
} else {
if memberIds, sgerr := account.GetUniqueSharedGroupMembers(cd); sgerr == nil {
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount)
return 0, err
}, 0, memberIds.Slice()...)
} else {
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
// lock all group members
account, err := cd.getAccount()
if err != nil {
return nil, err
}
acntIDs, sgerr := account.GetUniqueSharedGroupMembers(cd)
if sgerr != nil {
return nil, sgerr
}
return cc, err
}
var lkIDs []string
for acntID := range acntIDs {
if acntID != cd.GetAccountKey() {
lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID)
}
}
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount)
return
}, 0, lkIDs...)
return
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
return
}
// Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method)
@@ -724,148 +740,164 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
// by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor.
func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
cd.account = nil // make sure it's not cached
if account, err := cd.getAccount(); err != nil {
return nil, err
} else {
//log.Printf("ACC: %+v", account)
if memberIDs, err := account.GetUniqueSharedGroupMembers(cd); err == nil {
_, err = guardian.Guardian.Guard(func() (interface{}, error) {
remainingDuration, err := cd.getMaxSessionDuration(account)
if err != nil && cd.GetDuration() > 0 {
return 0, err
}
// check ForceDuartion
if cd.ForceDuration && !account.AllowNegative && remainingDuration < cd.GetDuration() {
return 0, utils.ErrInsufficientCredit
}
//log.Print("AFTER MAX SESSION: ", cd)
if err != nil || remainingDuration == 0 {
cc = cd.CreateCallCost()
if cd.GetDuration() == 0 {
// add RatingInfo
err := cd.LoadRatingPlans()
if err == nil && len(cd.RatingInfos) > 0 {
ts := &TimeSpan{
TimeStart: cd.TimeStart,
TimeEnd: cd.TimeEnd,
}
ts.setRatingInfo(cd.RatingInfos[0])
cc.Timespans = append(cc.Timespans, ts)
}
return cc, nil
}
return 0, err
}
//log.Print("Remaining: ", remainingDuration)
if remainingDuration > 0 { // for postpaying client returns -1
initialDuration := cd.GetDuration()
cd.TimeEnd = cd.TimeStart.Add(remainingDuration)
cd.DurationIndex -= initialDuration - remainingDuration
}
//log.Print("Remaining duration: ", remainingDuration)
cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount)
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
return 0, err
}, 0, memberIDs.Slice()...)
if err != nil {
return cc, err
}
} else {
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
account, err := cd.getAccount()
if err != nil {
return nil, err
}
}
//log.Printf("ACC: %+v", account)
acntIDs, err := account.GetUniqueSharedGroupMembers(cd)
if err != nil {
return nil, err
}
var lkIDs []string
for acntID := range acntIDs {
if acntID != cd.GetAccountKey() {
lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID)
}
}
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
remainingDuration, err := cd.getMaxSessionDuration(account)
if err != nil && cd.GetDuration() > 0 {
return nil, err
}
// check ForceDuartion
if cd.ForceDuration && !account.AllowNegative && remainingDuration < cd.GetDuration() {
return nil, utils.ErrInsufficientCredit
}
//log.Print("AFTER MAX SESSION: ", cd)
if err != nil || remainingDuration == 0 {
cc = cd.CreateCallCost()
if cd.GetDuration() == 0 {
// add RatingInfo
err = cd.LoadRatingPlans()
if err == nil && len(cd.RatingInfos) > 0 {
ts := &TimeSpan{
TimeStart: cd.TimeStart,
TimeEnd: cd.TimeEnd,
}
ts.setRatingInfo(cd.RatingInfos[0])
cc.Timespans = append(cc.Timespans, ts)
}
return
}
return
}
//log.Print("Remaining: ", remainingDuration)
if remainingDuration > 0 { // for postpaying client returns -1
initialDuration := cd.GetDuration()
cd.TimeEnd = cd.TimeStart.Add(remainingDuration)
cd.DurationIndex -= initialDuration - remainingDuration
}
//log.Print("Remaining duration: ", remainingDuration)
cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount)
//log.Print(balanceMap[0].Value, balanceMap[1].Value)
return
}, 0, lkIDs...)
return
}, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey())
return cc, err
}
func (cd *CallDescriptor) RefundIncrements() error {
// get account list for locking
// all must be locked in order to use cache
accMap := make(utils.StringMap)
cd.Increments.Decompress()
// refundIncrements has no locks
func (cd *CallDescriptor) refundIncrements() (err error) {
accountsCache := make(map[string]*Account)
for _, increment := range cd.Increments {
if increment.BalanceInfo.Monetary != nil || increment.BalanceInfo.Unit != nil {
accMap[increment.BalanceInfo.AccountID] = true
account, found := accountsCache[increment.BalanceInfo.AccountID]
if !found {
if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil {
account = acc
accountsCache[increment.BalanceInfo.AccountID] = account
// will save the account only once at the end of the function
defer accountingStorage.SetAccount(account)
}
}
if account == nil {
utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID))
continue
}
//utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment))
var balance *Balance
unitType := cd.TOR
cc := cd.CreateCallCost()
if increment.BalanceInfo.Unit != nil && increment.BalanceInfo.Unit.UUID != "" {
if balance = account.BalanceMap[unitType].GetBalance(increment.BalanceInfo.Unit.UUID); balance == nil {
return
}
balance.AddValue(increment.Duration.Seconds())
account.countUnits(-increment.Duration.Seconds(), unitType, cc, balance)
}
// check money too
if increment.BalanceInfo.Monetary != nil && increment.BalanceInfo.Monetary.UUID != "" {
if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil {
return
}
balance.AddValue(increment.Cost)
account.countUnits(-increment.Cost, utils.MONETARY, cc, balance)
}
}
// start increment refunding loop
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
accountsCache := make(map[string]*Account)
for _, increment := range cd.Increments {
account, found := accountsCache[increment.BalanceInfo.AccountID]
if !found {
if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil {
account = acc
accountsCache[increment.BalanceInfo.AccountID] = account
// will save the account only once at the end of the function
defer accountingStorage.SetAccount(account)
}
}
if account == nil {
utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID))
continue
}
//utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment))
var balance *Balance
unitType := cd.TOR
cc := cd.CreateCallCost()
if increment.BalanceInfo.Unit != nil && increment.BalanceInfo.Unit.UUID != "" {
if balance = account.BalanceMap[unitType].GetBalance(increment.BalanceInfo.Unit.UUID); balance == nil {
return 0, nil
}
balance.AddValue(increment.Duration.Seconds())
account.countUnits(-increment.Duration.Seconds(), unitType, cc, balance)
}
// check money too
if increment.BalanceInfo.Monetary != nil && increment.BalanceInfo.Monetary.UUID != "" {
if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil {
return 0, nil
}
balance.AddValue(increment.Cost)
account.countUnits(-increment.Cost, utils.MONETARY, cc, balance)
}
}
return 0, nil
}, 0, accMap.Slice()...)
return err
return
}
func (cd *CallDescriptor) RefundRounding() error {
func (cd *CallDescriptor) RefundIncrements() (err error) {
// get account list for locking
// all must be locked in order to use cache
cd.Increments.Decompress()
accMap := make(utils.StringMap)
for _, inc := range cd.Increments {
accMap[inc.BalanceInfo.AccountID] = true
for _, increment := range cd.Increments {
if increment.BalanceInfo.Monetary != nil || increment.BalanceInfo.Unit != nil {
accMap[utils.ACCOUNT_PREFIX+increment.BalanceInfo.AccountID] = true
}
}
// start increment refunding loop
_, err := guardian.Guardian.Guard(func() (interface{}, error) {
accountsCache := make(map[string]*Account)
for _, increment := range cd.Increments {
account, found := accountsCache[increment.BalanceInfo.AccountID]
if !found {
if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil {
account = acc
accountsCache[increment.BalanceInfo.AccountID] = account
// will save the account only once at the end of the function
defer accountingStorage.SetAccount(account)
}
}
if account == nil {
utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID))
continue
}
cc := cd.CreateCallCost()
if increment.BalanceInfo.Monetary != nil {
var balance *Balance
if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil {
return 0, nil
}
balance.AddValue(-increment.Cost)
account.countUnits(increment.Cost, utils.MONETARY, cc, balance)
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
err = cd.refundIncrements()
return
}, 0, accMap.Slice()...)
return
}
func (cd *CallDescriptor) refundRounding() (err error) {
// get account list for locking
// all must be locked in order to use cache
accountsCache := make(map[string]*Account)
for _, increment := range cd.Increments {
account, found := accountsCache[increment.BalanceInfo.AccountID]
if !found {
if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil {
account = acc
accountsCache[increment.BalanceInfo.AccountID] = account
// will save the account only once at the end of the function
defer accountingStorage.SetAccount(account)
}
}
return 0, nil
if account == nil {
utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID))
continue
}
cc := cd.CreateCallCost()
if increment.BalanceInfo.Monetary != nil {
var balance *Balance
if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil {
return
}
balance.AddValue(-increment.Cost)
account.countUnits(increment.Cost, utils.MONETARY, cc, balance)
}
}
return
}
func (cd *CallDescriptor) RefundRounding() (err error) {
accMap := make(utils.StringMap)
for _, inc := range cd.Increments {
accMap[utils.ACCOUNT_PREFIX+inc.BalanceInfo.AccountID] = true
}
_, err = guardian.Guardian.Guard(func() (iface interface{}, err error) {
err = cd.refundRounding()
return
}, 0, accMap.Slice()...)
return err
return
}
// Creates a CallCost structure copying related data from CallDescriptor

View File

@@ -116,73 +116,27 @@ func populateDB() {
}
func debitTest(t *testing.T, wg *sync.WaitGroup) {
defer wg.Done()
t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC)
t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC)
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0}
_, err := cd.Debit()
if err != nil {
if _, err := cd.Debit(); err != nil {
t.Errorf("Error debiting balance: %s", err)
}
wg.Done()
}
/*
func TestParallelDebit(t *testing.T) {
var wg sync.WaitGroup
moneyConcurent := &Account{
ID: "cgrates.org:moneyp",
BalanceMap: map[string]Balances{
utils.MONETARY: Balances{
&Balance{Value: 10000, Weight: 10},
}},
}
if accountingStorage != nil && ratingStorage != nil {
accountingStorage.SetAccount(moneyConcurent)
} else {
t.Log("Could not connect to db!")
return
}
debitsToDo := 50
for i := 0; i < debitsToDo; i++ {
wg.Add(1)
go debitTest(t, &wg)
}
wg.Wait()
t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC)
t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC)
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0}
acc, err := cd.getAccount()
if err != nil {
t.Errorf("Error debiting balance: %+v", err)
}
if acc.BalanceMap[utils.MONETARY][0].GetValue() != float64(10000-(debitsToDo*60)) {
t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), float64(10000-(debitsToDo*60)))
}
/*
out, err := json.Marshal(acc)
if err == nil {
t.Log("Account: %s", string(out))
}
}
*/
func TestSerialDebit(t *testing.T) {
var wg sync.WaitGroup
initialBalance := 10000.0
moneyConcurent := &Account{
ID: "cgrates.org:moneyp",
BalanceMap: map[string]Balances{
utils.MONETARY: Balances{
&Balance{Value: 10000, Weight: 10},
&Balance{Value: initialBalance, Weight: 10},
}},
}
if accountingStorage != nil && ratingStorage != nil {
accountingStorage.SetAccount(moneyConcurent)
} else {
t.Log("Could not connect to db!")
return
if err := accountingStorage.SetAccount(moneyConcurent); err != nil {
t.Error(err)
}
debitsToDo := 50
for i := 0; i < debitsToDo; i++ {
@@ -195,12 +149,53 @@ func TestSerialDebit(t *testing.T) {
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0}
acc, err := cd.getAccount()
if err != nil {
t.Errorf("Error debiting balance: %+v", err)
}
if acc.BalanceMap[utils.MONETARY][0].GetValue() != float64(10000-(debitsToDo*60)) {
t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), float64(10000-(debitsToDo*60)))
expBalance := initialBalance - float64(debitsToDo*60)
if acc.BalanceMap[utils.MONETARY][0].GetValue() != expBalance {
t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), expBalance)
}
/*
out, err := json.Marshal(acc)
if err == nil {
t.Log("Account: %s", string(out))
}
*/
}
func TestParallelDebit(t *testing.T) {
var wg sync.WaitGroup
initialBalance := 10000.0
moneyConcurent := &Account{
ID: "cgrates.org:moneyp",
BalanceMap: map[string]Balances{
utils.MONETARY: Balances{
&Balance{Value: initialBalance, Weight: 10},
}},
}
if err := accountingStorage.SetAccount(moneyConcurent); err != nil {
t.Error(err)
}
debitsToDo := 50
for i := 0; i < debitsToDo; i++ {
wg.Add(1)
go debitTest(t, &wg)
}
wg.Wait()
time.Sleep(time.Duration(10 * time.Millisecond))
t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC)
t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC)
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0}
acc, err := cd.getAccount()
if err != nil {
t.Errorf("Error debiting balance: %+v", err)
}
expBalance := initialBalance - float64(debitsToDo*60)
if acc.BalanceMap[utils.MONETARY][0].GetValue() != expBalance {
t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), expBalance)
}
/*
out, err := json.Marshal(acc)
@@ -392,8 +387,9 @@ func TestDebitPerformRounding(t *testing.T) {
t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC)
t2 := time.Date(2017, time.February, 2, 17, 33, 0, 0, time.UTC)
cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "round", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0, PerformRounding: true}
result, _ := cd.Debit()
if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :(
if result, err := cd.Debit(); err != nil {
t.Error(err)
} else if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :(
t.Error("bad cost", utils.ToIJSON(result))
}
}

View File

@@ -110,7 +110,6 @@ func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout ti
case reply = <-rplyChan:
}
}
guard.unlockItems(itmLocks)
return
}