mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
started accounting storage
This commit is contained in:
@@ -207,7 +207,7 @@ func (at *ActionTiming) SetActions(as Actions) {
|
||||
|
||||
func (at *ActionTiming) getActions() (as []*Action, err error) {
|
||||
if at.actions == nil {
|
||||
at.actions, err = storageGetter.GetActions(at.ActionsId, false)
|
||||
at.actions, err = accountingStorage.GetActions(at.ActionsId, false)
|
||||
}
|
||||
at.actions.Sort()
|
||||
return at.actions, err
|
||||
@@ -229,7 +229,7 @@ func (at *ActionTiming) Execute() (err error) {
|
||||
}
|
||||
for _, ubId := range at.UserBalanceIds {
|
||||
AccLock.Guard(ubId, func() (float64, error) {
|
||||
ub, err := storageGetter.GetUserBalance(ubId)
|
||||
ub, err := accountingStorage.GetUserBalance(ubId)
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId))
|
||||
return 0, err
|
||||
@@ -237,7 +237,7 @@ func (at *ActionTiming) Execute() (err error) {
|
||||
|
||||
Logger.Info(fmt.Sprintf("Executing %v on %v", a.ActionType, ub.Id))
|
||||
err = actionFunction(ub, a)
|
||||
storageGetter.SetUserBalance(ub)
|
||||
accountingStorage.SetUserBalance(ub)
|
||||
return 0, nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ type ActionTrigger struct {
|
||||
func (at *ActionTrigger) Execute(ub *UserBalance) (err error) {
|
||||
// does NOT need to Lock() because it is triggered from a method that took the Lock
|
||||
var aac Actions
|
||||
aac, err = storageGetter.GetActions(at.ActionsId, false)
|
||||
aac, err = accountingStorage.GetActions(at.ActionsId, false)
|
||||
aac.Sort()
|
||||
if err != nil {
|
||||
Logger.Err(fmt.Sprintf("Failed to get actions: %v", err))
|
||||
@@ -62,7 +62,7 @@ func (at *ActionTrigger) Execute(ub *UserBalance) (err error) {
|
||||
}
|
||||
storageLogger.LogActionTrigger(ub.Id, RATER_SOURCE, at, aac)
|
||||
at.Executed = true
|
||||
storageGetter.SetUserBalance(ub)
|
||||
accountingStorage.SetUserBalance(ub)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -851,14 +851,14 @@ func TestActionTriggerLogging(t *testing.T) {
|
||||
Weight: 10.0,
|
||||
ActionsId: "TEST_ACTIONS",
|
||||
}
|
||||
as, err := storageGetter.GetActions(at.ActionsId, false)
|
||||
as, err := accountingStorage.GetActions(at.ActionsId, false)
|
||||
if err != nil {
|
||||
t.Error("Error getting actions for the action timing: ", as, err)
|
||||
}
|
||||
storageLogger.LogActionTrigger("rif", RATER_SOURCE, at, as)
|
||||
//expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0"
|
||||
var key string
|
||||
atMap, _ := storageGetter.GetAllActionTimings()
|
||||
atMap, _ := accountingStorage.GetAllActionTimings()
|
||||
for k, v := range atMap {
|
||||
_ = k
|
||||
_ = v
|
||||
@@ -895,14 +895,14 @@ func TestActionTimingLogging(t *testing.T) {
|
||||
Weight: 10.0,
|
||||
ActionsId: "TEST_ACTIONS",
|
||||
}
|
||||
as, err := storageGetter.GetActions(at.ActionsId, false)
|
||||
as, err := accountingStorage.GetActions(at.ActionsId, false)
|
||||
if err != nil {
|
||||
t.Error("Error getting actions for the action trigger: ", err)
|
||||
}
|
||||
storageLogger.LogActionTiming(SCHED_SOURCE, at, as)
|
||||
//expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0"
|
||||
var key string
|
||||
atMap, _ := storageGetter.GetAllActionTimings()
|
||||
atMap, _ := accountingStorage.GetAllActionTimings()
|
||||
for k, v := range atMap {
|
||||
_ = k
|
||||
_ = v
|
||||
|
||||
@@ -39,9 +39,11 @@ func init() {
|
||||
DEBUG := true
|
||||
if DEBUG {
|
||||
storageGetter, _ = NewMapStorage()
|
||||
accountingStorage, _ = NewMapStorage()
|
||||
} else {
|
||||
//storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "")
|
||||
storageGetter, _ = NewRedisStorage("127.0.0.1:6379", 11, "", utils.MSGPACK)
|
||||
accountingStorage, _ = NewRedisStorage("127.0.0.1:6379", 12, "", utils.MSGPACK)
|
||||
}
|
||||
storageLogger = storageGetter.(LogStorage)
|
||||
}
|
||||
@@ -52,13 +54,14 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
Logger utils.LoggerInterface
|
||||
storageGetter DataStorage
|
||||
storageLogger LogStorage
|
||||
debitPeriod = 10 * time.Second
|
||||
roundingMethod = "*middle"
|
||||
roundingDecimals = 4
|
||||
historyScribe history.Scribe
|
||||
Logger utils.LoggerInterface
|
||||
storageGetter DataStorage
|
||||
accountingStorage AccountingStorage
|
||||
storageLogger LogStorage
|
||||
debitPeriod = 10 * time.Second
|
||||
roundingMethod = "*middle"
|
||||
roundingDecimals = 4
|
||||
historyScribe history.Scribe
|
||||
//historyScribe, _ = history.NewMockScribe()
|
||||
)
|
||||
|
||||
@@ -67,6 +70,10 @@ func SetDataStorage(sg DataStorage) {
|
||||
storageGetter = sg
|
||||
}
|
||||
|
||||
func SetAccountingStorage(ag AccountingStorage) {
|
||||
accountingStorage = ag
|
||||
}
|
||||
|
||||
// Sets the global rounding method and decimal precision for GetCost method
|
||||
func SetRoundingMethodAndDecimals(rm string, rd int) {
|
||||
roundingMethod = rm
|
||||
@@ -136,7 +143,7 @@ func (cd *CallDescriptor) GetUserBalanceKey() string {
|
||||
// Gets and caches the user balance information.
|
||||
func (cd *CallDescriptor) getUserBalance() (ub *UserBalance, err error) {
|
||||
if cd.userBalance == nil {
|
||||
cd.userBalance, err = storageGetter.GetUserBalance(cd.GetUserBalanceKey())
|
||||
cd.userBalance, err = accountingStorage.GetUserBalance(cd.GetUserBalanceKey())
|
||||
}
|
||||
return cd.userBalance, err
|
||||
}
|
||||
@@ -481,7 +488,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) {
|
||||
// Logger.Debug(fmt.Sprintf("<Rater> No user balance defined: %v", cd.GetUserBalanceKey()))
|
||||
} else {
|
||||
//Logger.Debug(fmt.Sprintf("<Rater> Attempting to debit from %v, value: %v", cd.GetUserBalanceKey(), cc.Cost+cc.ConnectFee))
|
||||
defer storageGetter.SetUserBalance(userBalance)
|
||||
defer accountingStorage.SetUserBalance(userBalance)
|
||||
//ub, _ := json.Marshal(userBalance)
|
||||
//Logger.Debug(fmt.Sprintf("UserBalance: %s", ub))
|
||||
//cCost, _ := json.Marshal(cc)
|
||||
@@ -510,7 +517,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) {
|
||||
|
||||
func (cd *CallDescriptor) RefundIncrements() (left float64, err error) {
|
||||
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
|
||||
defer storageGetter.SetUserBalance(userBalance)
|
||||
defer accountingStorage.SetUserBalance(userBalance)
|
||||
userBalance.refundIncrements(cd.Increments, cd.Direction, true)
|
||||
}
|
||||
return 0.0, err
|
||||
@@ -522,7 +529,7 @@ The amount filed has to be filled in call descriptor.
|
||||
*/
|
||||
func (cd *CallDescriptor) DebitCents() (left float64, err error) {
|
||||
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
|
||||
defer storageGetter.SetUserBalance(userBalance)
|
||||
defer accountingStorage.SetUserBalance(userBalance)
|
||||
return userBalance.debitGenericBalance(CREDIT, cd.Direction, cd.Amount, true), nil
|
||||
}
|
||||
return 0.0, err
|
||||
@@ -534,7 +541,7 @@ The amount filed has to be filled in call descriptor.
|
||||
*/
|
||||
func (cd *CallDescriptor) DebitSMS() (left float64, err error) {
|
||||
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
|
||||
defer storageGetter.SetUserBalance(userBalance)
|
||||
defer accountingStorage.SetUserBalance(userBalance)
|
||||
return userBalance.debitGenericBalance(SMS, cd.Direction, cd.Amount, true), nil
|
||||
}
|
||||
return 0, err
|
||||
@@ -546,7 +553,7 @@ The amount filed has to be filled in call descriptor.
|
||||
*/
|
||||
func (cd *CallDescriptor) DebitSeconds() (err error) {
|
||||
if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil {
|
||||
defer storageGetter.SetUserBalance(userBalance)
|
||||
defer accountingStorage.SetUserBalance(userBalance)
|
||||
return userBalance.debitCreditBalance(cd.CreateCallCost(), true)
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -55,10 +55,10 @@ func populateDB() {
|
||||
&Balance{Value: 100, DestinationId: "RET", Weight: 20},
|
||||
}},
|
||||
}
|
||||
if storageGetter != nil {
|
||||
storageGetter.(Storage).Flush()
|
||||
storageGetter.SetUserBalance(broker)
|
||||
storageGetter.SetUserBalance(minu)
|
||||
if accountingStorage != nil {
|
||||
accountingStorage.(Storage).Flush()
|
||||
accountingStorage.SetUserBalance(broker)
|
||||
accountingStorage.SetUserBalance(minu)
|
||||
} else {
|
||||
log.Fatal("Could not connect to db!")
|
||||
}
|
||||
|
||||
@@ -31,28 +31,30 @@ import (
|
||||
)
|
||||
|
||||
type CSVReader struct {
|
||||
sep rune
|
||||
storage DataStorage
|
||||
readerFunc func(string, rune, int) (*csv.Reader, *os.File, error)
|
||||
actions map[string][]*Action
|
||||
actionsTimings map[string][]*ActionTiming
|
||||
actionsTriggers map[string][]*ActionTrigger
|
||||
accountActions []*UserBalance
|
||||
destinations []*Destination
|
||||
timings map[string]*utils.TPTiming
|
||||
rates map[string]*utils.TPRate
|
||||
destinationRates map[string]*utils.TPDestinationRate
|
||||
ratingPlans map[string]*RatingPlan
|
||||
ratingProfiles map[string]*RatingProfile
|
||||
sep rune
|
||||
dataStorage DataStorage
|
||||
accountingStorage AccountingStorage
|
||||
readerFunc func(string, rune, int) (*csv.Reader, *os.File, error)
|
||||
actions map[string][]*Action
|
||||
actionsTimings map[string][]*ActionTiming
|
||||
actionsTriggers map[string][]*ActionTrigger
|
||||
accountActions []*UserBalance
|
||||
destinations []*Destination
|
||||
timings map[string]*utils.TPTiming
|
||||
rates map[string]*utils.TPRate
|
||||
destinationRates map[string]*utils.TPDestinationRate
|
||||
ratingPlans map[string]*RatingPlan
|
||||
ratingProfiles map[string]*RatingProfile
|
||||
// file names
|
||||
destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn,
|
||||
actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string
|
||||
}
|
||||
|
||||
func NewFileCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader {
|
||||
func NewFileCSVReader(dataStorage DataStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader {
|
||||
c := new(CSVReader)
|
||||
c.sep = sep
|
||||
c.storage = storage
|
||||
c.dataStorage = dataStorage
|
||||
c.accountingStorage = accountingStorage
|
||||
c.actions = make(map[string][]*Action)
|
||||
c.actionsTimings = make(map[string][]*ActionTiming)
|
||||
c.actionsTriggers = make(map[string][]*ActionTrigger)
|
||||
@@ -68,8 +70,8 @@ func NewFileCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn,
|
||||
return c
|
||||
}
|
||||
|
||||
func NewStringCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader {
|
||||
c := NewFileCSVReader(storage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn)
|
||||
func NewStringCSVReader(dataStorage DataStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader {
|
||||
c := NewFileCSVReader(dataStorage, accountingStorage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn)
|
||||
c.readerFunc = openStringCSVReader
|
||||
return c
|
||||
}
|
||||
@@ -113,18 +115,19 @@ func (csvr *CSVReader) ShowStatistics() {
|
||||
}
|
||||
|
||||
func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
storage := csvr.storage
|
||||
if storage == nil {
|
||||
dataStorage := csvr.dataStorage
|
||||
accountingStorage := csvr.accountingStorage
|
||||
if dataStorage == nil {
|
||||
return errors.New("No database connection!")
|
||||
}
|
||||
if flush {
|
||||
storage.(Storage).Flush()
|
||||
dataStorage.(Storage).Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
}
|
||||
for _, d := range csvr.destinations {
|
||||
err = storage.SetDestination(d)
|
||||
err = dataStorage.SetDestination(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -136,7 +139,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Rating plans")
|
||||
}
|
||||
for _, rp := range csvr.ratingPlans {
|
||||
err = storage.SetRatingPlan(rp)
|
||||
err = dataStorage.SetRatingPlan(rp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -148,7 +151,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Rating profiles")
|
||||
}
|
||||
for _, rp := range csvr.ratingProfiles {
|
||||
err = storage.SetRatingProfile(rp)
|
||||
err = dataStorage.SetRatingProfile(rp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -160,7 +163,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Action timings")
|
||||
}
|
||||
for k, ats := range csvr.actionsTimings {
|
||||
err = storage.SetActionTimings(k, ats)
|
||||
err = accountingStorage.SetActionTimings(k, ats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -172,7 +175,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Actions")
|
||||
}
|
||||
for k, as := range csvr.actions {
|
||||
err = storage.SetActions(k, as)
|
||||
err = accountingStorage.SetActions(k, as)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -184,7 +187,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Account actions")
|
||||
}
|
||||
for _, ub := range csvr.accountActions {
|
||||
err = storage.SetUserBalance(ub)
|
||||
err = accountingStorage.SetUserBalance(ub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -297,7 +300,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) {
|
||||
}
|
||||
}
|
||||
if !destinationExists {
|
||||
if dbExists, err := csvr.storage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil {
|
||||
if dbExists, err := csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil {
|
||||
return err
|
||||
} else if !dbExists {
|
||||
return fmt.Errorf("Could not get destination for tag %v", record[1])
|
||||
@@ -380,7 +383,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) {
|
||||
}
|
||||
_, exists := csvr.ratingPlans[record[5]]
|
||||
if !exists {
|
||||
if dbExists, err := csvr.storage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil {
|
||||
if dbExists, err := csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil {
|
||||
return err
|
||||
} else if !dbExists {
|
||||
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5]))
|
||||
|
||||
@@ -129,7 +129,7 @@ vdf,minitsboy,*out,MORE_MINUTES,STANDARD_TRIGGER
|
||||
var csvr *CSVReader
|
||||
|
||||
func init() {
|
||||
csvr = NewStringCSVReader(storageGetter, ',', destinations, timings, rates, destinationRates, destinationRateTimings, ratingProfiles, actions, actionTimings, actionTriggers, accountActions)
|
||||
csvr = NewStringCSVReader(storageGetter, accountingStorage, ',', destinations, timings, rates, destinationRates, destinationRateTimings, ratingProfiles, actions, actionTimings, actionTriggers, accountActions)
|
||||
csvr.LoadDestinations()
|
||||
csvr.LoadTimings()
|
||||
csvr.LoadRates()
|
||||
|
||||
@@ -30,6 +30,7 @@ type DbReader struct {
|
||||
tpid string
|
||||
storDb LoadStorage
|
||||
dataDb DataStorage
|
||||
accountDb AccountingStorage
|
||||
actions map[string][]*Action
|
||||
actionsTimings map[string][]*ActionTiming
|
||||
actionsTriggers map[string][]*ActionTrigger
|
||||
@@ -42,10 +43,11 @@ type DbReader struct {
|
||||
ratingProfiles map[string]*RatingProfile
|
||||
}
|
||||
|
||||
func NewDbReader(storDB LoadStorage, storage DataStorage, tpid string) *DbReader {
|
||||
func NewDbReader(storDB LoadStorage, storage DataStorage, accountDb AccountingStorage, tpid string) *DbReader {
|
||||
c := new(DbReader)
|
||||
c.storDb = storDB
|
||||
c.dataDb = storage
|
||||
c.accountDb = accountDb
|
||||
c.tpid = tpid
|
||||
c.actions = make(map[string][]*Action)
|
||||
c.actionsTimings = make(map[string][]*ActionTiming)
|
||||
@@ -103,7 +105,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Action timings")
|
||||
}
|
||||
for k, ats := range dbr.actionsTimings {
|
||||
err = storage.SetActionTimings(k, ats)
|
||||
err = accountingStorage.SetActionTimings(k, ats)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -115,7 +117,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Actions")
|
||||
}
|
||||
for k, as := range dbr.actions {
|
||||
err = storage.SetActions(k, as)
|
||||
err = accountingStorage.SetActions(k, as)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -127,7 +129,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
log.Print("Account actions")
|
||||
}
|
||||
for _, ub := range dbr.accountActions {
|
||||
err = storage.SetUserBalance(ub)
|
||||
err = accountingStorage.SetUserBalance(ub)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -465,7 +467,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions)
|
||||
if accountAction.ActionTimingsId != "" {
|
||||
// get old userBalanceIds
|
||||
var exitingUserBalanceIds []string
|
||||
existingActionTimings, err := dbr.dataDb.GetActionTimings(accountAction.ActionTimingsId)
|
||||
existingActionTimings, err := dbr.accountDb.GetActionTimings(accountAction.ActionTimingsId)
|
||||
if err == nil && len(existingActionTimings) > 0 {
|
||||
// all action timings from a specific tag shuld have the same list of user balances from the first one
|
||||
exitingUserBalanceIds = existingActionTimings[0].UserBalanceIds
|
||||
@@ -525,7 +527,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions)
|
||||
}
|
||||
|
||||
// write action timings
|
||||
err = dbr.dataDb.SetActionTimings(accountAction.ActionTimingsId, actionTimings)
|
||||
err = dbr.accountDb.SetActionTimings(accountAction.ActionTimingsId, actionTimings)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -593,12 +595,12 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions)
|
||||
}
|
||||
// writee actions
|
||||
for k, as := range acts {
|
||||
err = dbr.dataDb.SetActions(k, as)
|
||||
err = dbr.accountDb.SetActions(k, as)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
ub, err := dbr.dataDb.GetUserBalance(id)
|
||||
ub, err := dbr.accountDb.GetUserBalance(id)
|
||||
if err != nil {
|
||||
ub = &UserBalance{
|
||||
Type: UB_TYPE_PREPAID,
|
||||
@@ -607,7 +609,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions)
|
||||
}
|
||||
ub.ActionTriggers = actionTriggers
|
||||
|
||||
if err := dbr.dataDb.SetUserBalance(ub); err != nil {
|
||||
if err := dbr.accountDb.SetUserBalance(ub); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,10 +20,11 @@ package engine
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
/*
|
||||
|
||||
@@ -220,7 +220,7 @@ func (rs *Responder) getBalance(arg *CallDescriptor, balanceId string, reply *Ca
|
||||
return errors.New("No balancer supported for this command right now")
|
||||
}
|
||||
ubKey := arg.Direction + ":" + arg.Tenant + ":" + arg.Account
|
||||
userBalance, err := storageGetter.GetUserBalance(ubKey)
|
||||
userBalance, err := accountingStorage.GetUserBalance(ubKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -75,8 +75,11 @@ type DataStorage interface {
|
||||
GetRatingProfile(string, bool) (*RatingProfile, error)
|
||||
SetRatingProfile(*RatingProfile) error
|
||||
GetDestination(string) (*Destination, error)
|
||||
// DestinationContainsPrefix(string, string) (int, error)
|
||||
SetDestination(*Destination) error
|
||||
}
|
||||
|
||||
type AccountingStorage interface {
|
||||
Storage
|
||||
GetActions(string, bool) (Actions, error)
|
||||
SetActions(string, Actions) error
|
||||
GetUserBalance(string) (*UserBalance, error)
|
||||
|
||||
@@ -34,7 +34,7 @@ type MapStorage struct {
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
func NewMapStorage() (DataStorage, error) {
|
||||
func NewMapStorage() (*MapStorage, error) {
|
||||
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler()}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ type RedisStorage struct {
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
func NewRedisStorage(address string, db int, pass, mrshlerStr string) (DataStorage, error) {
|
||||
func NewRedisStorage(address string, db int, pass, mrshlerStr string) (*RedisStorage, error) {
|
||||
ndb := &redis.Client{Addr: address, Db: db}
|
||||
if pass != "" {
|
||||
if err := ndb.Auth(pass); err != nil {
|
||||
|
||||
@@ -34,7 +34,7 @@ type UnitsCounter struct {
|
||||
func (uc *UnitsCounter) initBalances(ats []*ActionTrigger) {
|
||||
uc.Balances = BalanceChain{&Balance{}} // general balance
|
||||
for _, at := range ats {
|
||||
acs, err := storageGetter.GetActions(at.ActionsId, false)
|
||||
acs, err := accountingStorage.GetActions(at.ActionsId, false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -379,7 +379,7 @@ func (ub *UserBalance) countUnits(a *Action) {
|
||||
func (ub *UserBalance) initCounters() {
|
||||
ucTempMap := make(map[string]*UnitsCounter, 2)
|
||||
for _, at := range ub.ActionTriggers {
|
||||
acs, err := storageGetter.GetActions(at.ActionsId, false)
|
||||
acs, err := accountingStorage.GetActions(at.ActionsId, false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -37,12 +37,12 @@ func populateTestActionsForTriggers() {
|
||||
&Action{ActionType: "*topup", BalanceId: CREDIT, Direction: OUTBOUND, Balance: &Balance{Value: 10}},
|
||||
&Action{ActionType: "*topup", BalanceId: MINUTES, Direction: OUTBOUND, Balance: &Balance{Weight: 20, Value: 10, DestinationId: "NAT"}},
|
||||
}
|
||||
storageGetter.SetActions("TEST_ACTIONS", ats)
|
||||
accountingStorage.SetActions("TEST_ACTIONS", ats)
|
||||
ats1 := []*Action{
|
||||
&Action{ActionType: "*topup", BalanceId: CREDIT, Direction: OUTBOUND, Balance: &Balance{Value: 10}, Weight: 20},
|
||||
&Action{ActionType: "*reset_prepaid", Weight: 10},
|
||||
}
|
||||
storageGetter.SetActions("TEST_ACTIONS_ORDER", ats1)
|
||||
accountingStorage.SetActions("TEST_ACTIONS_ORDER", ats1)
|
||||
}
|
||||
|
||||
func TestBalanceStoreRestore(t *testing.T) {
|
||||
@@ -99,8 +99,8 @@ func TestUserBalanceStorageStoreRestore(t *testing.T) {
|
||||
b1 := &Balance{Value: 10, Weight: 10, DestinationId: "NAT"}
|
||||
b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"}
|
||||
rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}}
|
||||
storageGetter.SetUserBalance(rifsBalance)
|
||||
ub1, err := storageGetter.GetUserBalance("other")
|
||||
accountingStorage.SetUserBalance(rifsBalance)
|
||||
ub1, err := accountingStorage.GetUserBalance("other")
|
||||
if err != nil || !ub1.BalanceMap[CREDIT+OUTBOUND].Equal(rifsBalance.BalanceMap[CREDIT+OUTBOUND]) {
|
||||
t.Log("UB: ", ub1)
|
||||
t.Errorf("Expected %v was %v", rifsBalance.BalanceMap[CREDIT+OUTBOUND], ub1.BalanceMap[CREDIT+OUTBOUND])
|
||||
@@ -161,8 +161,8 @@ func TestUserBalanceStorageStore(t *testing.T) {
|
||||
b1 := &Balance{Value: 10, Weight: 10, DestinationId: "NAT"}
|
||||
b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"}
|
||||
rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}}
|
||||
storageGetter.SetUserBalance(rifsBalance)
|
||||
result, err := storageGetter.GetUserBalance(rifsBalance.Id)
|
||||
accountingStorage.SetUserBalance(rifsBalance)
|
||||
result, err := accountingStorage.GetUserBalance(rifsBalance.Id)
|
||||
if err != nil || rifsBalance.Id != result.Id ||
|
||||
len(rifsBalance.BalanceMap[MINUTES+OUTBOUND]) < 2 || len(result.BalanceMap[MINUTES+OUTBOUND]) < 2 ||
|
||||
!(rifsBalance.BalanceMap[MINUTES+OUTBOUND][0].Equal(result.BalanceMap[MINUTES+OUTBOUND][0])) ||
|
||||
@@ -1063,8 +1063,8 @@ func BenchmarkUserBalanceStorageStoreRestore(b *testing.B) {
|
||||
b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"}
|
||||
rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}}
|
||||
for i := 0; i < b.N; i++ {
|
||||
storageGetter.SetUserBalance(rifsBalance)
|
||||
storageGetter.GetUserBalance(rifsBalance.Id)
|
||||
accountingStorage.SetUserBalance(rifsBalance)
|
||||
accountingStorage.GetUserBalance(rifsBalance.Id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,9 +20,10 @@ package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
@@ -63,7 +64,7 @@ func (s *Scheduler) Loop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) LoadActionTimings(storage engine.DataStorage) {
|
||||
func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) {
|
||||
actionTimings, err := storage.GetAllActionTimings()
|
||||
if err != nil {
|
||||
engine.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err))
|
||||
|
||||
Reference in New Issue
Block a user