diff --git a/engine/account.go b/engine/account.go index 1a6b20a8b..fe0ab90b5 100644 --- a/engine/account.go +++ b/engine/account.go @@ -791,7 +791,7 @@ func (acc *Account) GetSharedGroups() (groups []string) { return } -func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) { +func (account *Account) GetUniqueSharedGroupMembers(cd *CallDescriptor) (utils.StringMap, error) { // ToDo: make sure we return accountIDs var balances []*Balance balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, utils.MONETARY, "")...) balances = append(balances, account.getBalancesForPrefix(cd.Destination, cd.Category, cd.Direction, cd.TOR, "")...) diff --git a/engine/calldesc.go b/engine/calldesc.go index c1c314cf0..ac0cd54f7 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -637,21 +637,28 @@ func (origCD *CallDescriptor) getMaxSessionDuration(origAcc *Account) (time.Dura func (cd *CallDescriptor) GetMaxSessionDuration() (duration time.Duration, err error) { cd.account = nil // make sure it's not cached - if account, err := cd.getAccount(); err != nil { - return 0, err - } else { - if memberIds, err := account.GetUniqueSharedGroupMembers(cd); err == nil { - if _, err := guardian.Guardian.Guard(func() (interface{}, error) { - duration, err = cd.getMaxSessionDuration(account) - return 0, err - }, 0, memberIds.Slice()...); err != nil { - return 0, err - } - } else { + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + account, err := cd.getAccount() + if err != nil { return 0, err } - return duration, err - } + acntIDs, err := account.GetUniqueSharedGroupMembers(cd) + if err != nil { + return nil, err + } + var lkIDs []string + for acntID := range acntIDs { + if acntID != cd.GetAccountKey() { + lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID) + } + } + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + duration, err = cd.getMaxSessionDuration(account) + return + }, 0, lkIDs...) + return + }, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey()) + return } // Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method) @@ -693,7 +700,7 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool) if len(roundIncrements) != 0 { rcd := cc.CreateCallDescriptor() rcd.Increments = roundIncrements - rcd.RefundRounding() + rcd.refundRounding() } } //log.Printf("OUT CC: ", cc) @@ -702,20 +709,29 @@ func (cd *CallDescriptor) debit(account *Account, dryRun bool, goNegative bool) func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { cd.account = nil // make sure it's not cached - // lock all group members - if account, err := cd.getAccount(); err != nil { - return nil, err - } else { - if memberIds, sgerr := account.GetUniqueSharedGroupMembers(cd); sgerr == nil { - _, err = guardian.Guardian.Guard(func() (interface{}, error) { - cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount) - return 0, err - }, 0, memberIds.Slice()...) - } else { + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + // lock all group members + account, err := cd.getAccount() + if err != nil { + return nil, err + } + acntIDs, sgerr := account.GetUniqueSharedGroupMembers(cd) + if sgerr != nil { return nil, sgerr } - return cc, err - } + var lkIDs []string + for acntID := range acntIDs { + if acntID != cd.GetAccountKey() { + lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID) + } + } + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount) + return + }, 0, lkIDs...) + return + }, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey()) + return } // Interface method used to add/substract an amount of cents or bonus seconds (as returned by GetCost method) @@ -724,148 +740,164 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { // by the GetMaxSessionDuration method. The amount filed has to be filled in call descriptor. func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { cd.account = nil // make sure it's not cached - if account, err := cd.getAccount(); err != nil { - return nil, err - } else { - //log.Printf("ACC: %+v", account) - if memberIDs, err := account.GetUniqueSharedGroupMembers(cd); err == nil { - _, err = guardian.Guardian.Guard(func() (interface{}, error) { - remainingDuration, err := cd.getMaxSessionDuration(account) - if err != nil && cd.GetDuration() > 0 { - return 0, err - } - // check ForceDuartion - if cd.ForceDuration && !account.AllowNegative && remainingDuration < cd.GetDuration() { - return 0, utils.ErrInsufficientCredit - } - //log.Print("AFTER MAX SESSION: ", cd) - if err != nil || remainingDuration == 0 { - cc = cd.CreateCallCost() - if cd.GetDuration() == 0 { - // add RatingInfo - err := cd.LoadRatingPlans() - if err == nil && len(cd.RatingInfos) > 0 { - ts := &TimeSpan{ - TimeStart: cd.TimeStart, - TimeEnd: cd.TimeEnd, - } - ts.setRatingInfo(cd.RatingInfos[0]) - cc.Timespans = append(cc.Timespans, ts) - } - return cc, nil - } - return 0, err - } - //log.Print("Remaining: ", remainingDuration) - if remainingDuration > 0 { // for postpaying client returns -1 - initialDuration := cd.GetDuration() - cd.TimeEnd = cd.TimeStart.Add(remainingDuration) - cd.DurationIndex -= initialDuration - remainingDuration - } - //log.Print("Remaining duration: ", remainingDuration) - cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount) - //log.Print(balanceMap[0].Value, balanceMap[1].Value) - return 0, err - }, 0, memberIDs.Slice()...) - if err != nil { - return cc, err - } - } else { + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + account, err := cd.getAccount() + if err != nil { return nil, err } - } + //log.Printf("ACC: %+v", account) + acntIDs, err := account.GetUniqueSharedGroupMembers(cd) + if err != nil { + return nil, err + } + var lkIDs []string + for acntID := range acntIDs { + if acntID != cd.GetAccountKey() { + lkIDs = append(lkIDs, utils.ACCOUNT_PREFIX+acntID) + } + } + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + remainingDuration, err := cd.getMaxSessionDuration(account) + if err != nil && cd.GetDuration() > 0 { + return nil, err + } + // check ForceDuartion + if cd.ForceDuration && !account.AllowNegative && remainingDuration < cd.GetDuration() { + return nil, utils.ErrInsufficientCredit + } + //log.Print("AFTER MAX SESSION: ", cd) + if err != nil || remainingDuration == 0 { + cc = cd.CreateCallCost() + if cd.GetDuration() == 0 { + // add RatingInfo + err = cd.LoadRatingPlans() + if err == nil && len(cd.RatingInfos) > 0 { + ts := &TimeSpan{ + TimeStart: cd.TimeStart, + TimeEnd: cd.TimeEnd, + } + ts.setRatingInfo(cd.RatingInfos[0]) + cc.Timespans = append(cc.Timespans, ts) + } + return + } + return + } + //log.Print("Remaining: ", remainingDuration) + if remainingDuration > 0 { // for postpaying client returns -1 + initialDuration := cd.GetDuration() + cd.TimeEnd = cd.TimeStart.Add(remainingDuration) + cd.DurationIndex -= initialDuration - remainingDuration + } + //log.Print("Remaining duration: ", remainingDuration) + cc, err = cd.debit(account, cd.DryRun, !cd.DenyNegativeAccount) + //log.Print(balanceMap[0].Value, balanceMap[1].Value) + return + }, 0, lkIDs...) + return + }, 0, utils.ACCOUNT_PREFIX+cd.GetAccountKey()) return cc, err } -func (cd *CallDescriptor) RefundIncrements() error { - // get account list for locking - // all must be locked in order to use cache - accMap := make(utils.StringMap) - cd.Increments.Decompress() +// refundIncrements has no locks +func (cd *CallDescriptor) refundIncrements() (err error) { + accountsCache := make(map[string]*Account) for _, increment := range cd.Increments { - if increment.BalanceInfo.Monetary != nil || increment.BalanceInfo.Unit != nil { - accMap[increment.BalanceInfo.AccountID] = true + account, found := accountsCache[increment.BalanceInfo.AccountID] + if !found { + if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil { + account = acc + accountsCache[increment.BalanceInfo.AccountID] = account + // will save the account only once at the end of the function + defer accountingStorage.SetAccount(account) + } + } + if account == nil { + utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID)) + continue + } + //utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) + var balance *Balance + unitType := cd.TOR + cc := cd.CreateCallCost() + if increment.BalanceInfo.Unit != nil && increment.BalanceInfo.Unit.UUID != "" { + if balance = account.BalanceMap[unitType].GetBalance(increment.BalanceInfo.Unit.UUID); balance == nil { + return + } + balance.AddValue(increment.Duration.Seconds()) + account.countUnits(-increment.Duration.Seconds(), unitType, cc, balance) + } + // check money too + if increment.BalanceInfo.Monetary != nil && increment.BalanceInfo.Monetary.UUID != "" { + if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil { + return + } + balance.AddValue(increment.Cost) + account.countUnits(-increment.Cost, utils.MONETARY, cc, balance) } } - // start increment refunding loop - _, err := guardian.Guardian.Guard(func() (interface{}, error) { - accountsCache := make(map[string]*Account) - for _, increment := range cd.Increments { - account, found := accountsCache[increment.BalanceInfo.AccountID] - if !found { - if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil { - account = acc - accountsCache[increment.BalanceInfo.AccountID] = account - // will save the account only once at the end of the function - defer accountingStorage.SetAccount(account) - } - } - if account == nil { - utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID)) - continue - } - //utils.Logger.Info(fmt.Sprintf("Refunding increment %+v", increment)) - var balance *Balance - unitType := cd.TOR - cc := cd.CreateCallCost() - if increment.BalanceInfo.Unit != nil && increment.BalanceInfo.Unit.UUID != "" { - if balance = account.BalanceMap[unitType].GetBalance(increment.BalanceInfo.Unit.UUID); balance == nil { - return 0, nil - } - balance.AddValue(increment.Duration.Seconds()) - account.countUnits(-increment.Duration.Seconds(), unitType, cc, balance) - } - // check money too - if increment.BalanceInfo.Monetary != nil && increment.BalanceInfo.Monetary.UUID != "" { - if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil { - return 0, nil - } - balance.AddValue(increment.Cost) - account.countUnits(-increment.Cost, utils.MONETARY, cc, balance) - } - } - return 0, nil - }, 0, accMap.Slice()...) - return err + return + } -func (cd *CallDescriptor) RefundRounding() error { +func (cd *CallDescriptor) RefundIncrements() (err error) { // get account list for locking // all must be locked in order to use cache + cd.Increments.Decompress() accMap := make(utils.StringMap) - for _, inc := range cd.Increments { - accMap[inc.BalanceInfo.AccountID] = true + for _, increment := range cd.Increments { + if increment.BalanceInfo.Monetary != nil || increment.BalanceInfo.Unit != nil { + accMap[utils.ACCOUNT_PREFIX+increment.BalanceInfo.AccountID] = true + } } - // start increment refunding loop - _, err := guardian.Guardian.Guard(func() (interface{}, error) { - accountsCache := make(map[string]*Account) - for _, increment := range cd.Increments { - account, found := accountsCache[increment.BalanceInfo.AccountID] - if !found { - if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil { - account = acc - accountsCache[increment.BalanceInfo.AccountID] = account - // will save the account only once at the end of the function - defer accountingStorage.SetAccount(account) - } - } - if account == nil { - utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID)) - continue - } - cc := cd.CreateCallCost() - if increment.BalanceInfo.Monetary != nil { - var balance *Balance - if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil { - return 0, nil - } - balance.AddValue(-increment.Cost) - account.countUnits(increment.Cost, utils.MONETARY, cc, balance) + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + err = cd.refundIncrements() + return + }, 0, accMap.Slice()...) + return +} + +func (cd *CallDescriptor) refundRounding() (err error) { + // get account list for locking + // all must be locked in order to use cache + accountsCache := make(map[string]*Account) + for _, increment := range cd.Increments { + account, found := accountsCache[increment.BalanceInfo.AccountID] + if !found { + if acc, err := accountingStorage.GetAccount(increment.BalanceInfo.AccountID); err == nil && acc != nil { + account = acc + accountsCache[increment.BalanceInfo.AccountID] = account + // will save the account only once at the end of the function + defer accountingStorage.SetAccount(account) } } - return 0, nil + if account == nil { + utils.Logger.Warning(fmt.Sprintf("Could not get the account to be refunded: %s", increment.BalanceInfo.AccountID)) + continue + } + cc := cd.CreateCallCost() + if increment.BalanceInfo.Monetary != nil { + var balance *Balance + if balance = account.BalanceMap[utils.MONETARY].GetBalance(increment.BalanceInfo.Monetary.UUID); balance == nil { + return + } + balance.AddValue(-increment.Cost) + account.countUnits(increment.Cost, utils.MONETARY, cc, balance) + } + } + return +} + +func (cd *CallDescriptor) RefundRounding() (err error) { + accMap := make(utils.StringMap) + for _, inc := range cd.Increments { + accMap[utils.ACCOUNT_PREFIX+inc.BalanceInfo.AccountID] = true + } + _, err = guardian.Guardian.Guard(func() (iface interface{}, err error) { + err = cd.refundRounding() + return }, 0, accMap.Slice()...) - return err + return } // Creates a CallCost structure copying related data from CallDescriptor diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index c6859de58..9865f91f3 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -116,73 +116,27 @@ func populateDB() { } func debitTest(t *testing.T, wg *sync.WaitGroup) { - defer wg.Done() t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC) cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} - _, err := cd.Debit() - if err != nil { + if _, err := cd.Debit(); err != nil { t.Errorf("Error debiting balance: %s", err) } + wg.Done() } -/* -func TestParallelDebit(t *testing.T) { - var wg sync.WaitGroup - moneyConcurent := &Account{ - ID: "cgrates.org:moneyp", - BalanceMap: map[string]Balances{ - utils.MONETARY: Balances{ - &Balance{Value: 10000, Weight: 10}, - }}, - } - if accountingStorage != nil && ratingStorage != nil { - accountingStorage.SetAccount(moneyConcurent) - } else { - t.Log("Could not connect to db!") - return - } - debitsToDo := 50 - for i := 0; i < debitsToDo; i++ { - wg.Add(1) - go debitTest(t, &wg) - } - wg.Wait() - t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) - t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC) - cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} - - acc, err := cd.getAccount() - - if err != nil { - t.Errorf("Error debiting balance: %+v", err) - } - if acc.BalanceMap[utils.MONETARY][0].GetValue() != float64(10000-(debitsToDo*60)) { - t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), float64(10000-(debitsToDo*60))) - } - /* - out, err := json.Marshal(acc) - if err == nil { - t.Log("Account: %s", string(out)) - } - -} -*/ - func TestSerialDebit(t *testing.T) { var wg sync.WaitGroup + initialBalance := 10000.0 moneyConcurent := &Account{ ID: "cgrates.org:moneyp", BalanceMap: map[string]Balances{ utils.MONETARY: Balances{ - &Balance{Value: 10000, Weight: 10}, + &Balance{Value: initialBalance, Weight: 10}, }}, } - if accountingStorage != nil && ratingStorage != nil { - accountingStorage.SetAccount(moneyConcurent) - } else { - t.Log("Could not connect to db!") - return + if err := accountingStorage.SetAccount(moneyConcurent); err != nil { + t.Error(err) } debitsToDo := 50 for i := 0; i < debitsToDo; i++ { @@ -195,12 +149,53 @@ func TestSerialDebit(t *testing.T) { cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} acc, err := cd.getAccount() - if err != nil { t.Errorf("Error debiting balance: %+v", err) } - if acc.BalanceMap[utils.MONETARY][0].GetValue() != float64(10000-(debitsToDo*60)) { - t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), float64(10000-(debitsToDo*60))) + expBalance := initialBalance - float64(debitsToDo*60) + if acc.BalanceMap[utils.MONETARY][0].GetValue() != expBalance { + t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), expBalance) + } + /* + out, err := json.Marshal(acc) + if err == nil { + t.Log("Account: %s", string(out)) + } + */ + +} + +func TestParallelDebit(t *testing.T) { + var wg sync.WaitGroup + initialBalance := 10000.0 + moneyConcurent := &Account{ + ID: "cgrates.org:moneyp", + BalanceMap: map[string]Balances{ + utils.MONETARY: Balances{ + &Balance{Value: initialBalance, Weight: 10}, + }}, + } + if err := accountingStorage.SetAccount(moneyConcurent); err != nil { + t.Error(err) + } + debitsToDo := 50 + for i := 0; i < debitsToDo; i++ { + wg.Add(1) + go debitTest(t, &wg) + } + wg.Wait() + time.Sleep(time.Duration(10 * time.Millisecond)) + t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) + t2 := time.Date(2017, time.February, 2, 17, 30, 59, 0, time.UTC) + cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Account: "moneyp", Subject: "nt", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0} + + acc, err := cd.getAccount() + if err != nil { + t.Errorf("Error debiting balance: %+v", err) + } + expBalance := initialBalance - float64(debitsToDo*60) + if acc.BalanceMap[utils.MONETARY][0].GetValue() != expBalance { + t.Errorf("Balance does not match: %f, expected %f", acc.BalanceMap[utils.MONETARY][0].GetValue(), expBalance) } /* out, err := json.Marshal(acc) @@ -392,8 +387,9 @@ func TestDebitPerformRounding(t *testing.T) { t1 := time.Date(2017, time.February, 2, 17, 30, 0, 0, time.UTC) t2 := time.Date(2017, time.February, 2, 17, 33, 0, 0, time.UTC) cd := &CallDescriptor{Direction: "*out", Category: "call", Tenant: "cgrates.org", Subject: "round", Destination: "49", TimeStart: t1, TimeEnd: t2, LoopIndex: 0, PerformRounding: true} - result, _ := cd.Debit() - if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :( + if result, err := cd.Debit(); err != nil { + t.Error(err) + } else if result.Cost != 0.3001 || result.GetConnectFee() != 0 { // should be 0.3 :( t.Error("bad cost", utils.ToIJSON(result)) } } diff --git a/guardian/guardian.go b/guardian/guardian.go index 0ec9ead0b..053db5c02 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -110,7 +110,6 @@ func (guard *GuardianLock) Guard(handler func() (interface{}, error), timeout ti case reply = <-rplyChan: } } - guard.unlockItems(itmLocks) return }