From e03bbc9b966a8febbdb4ebf5eecd5e29f9f49188 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 17 Dec 2013 21:19:38 +0200 Subject: [PATCH] started accounting storage --- engine/action_timing.go | 6 ++-- engine/action_trigger.go | 4 +-- engine/actions_test.go | 8 ++--- engine/calldesc.go | 33 +++++++++++++-------- engine/calldesc_test.go | 8 ++--- engine/loader_csv.go | 59 +++++++++++++++++++------------------ engine/loader_csv_test.go | 2 +- engine/loader_db.go | 20 +++++++------ engine/loader_local_test.go | 5 ++-- engine/responder.go | 2 +- engine/storage_interface.go | 5 +++- engine/storage_map.go | 2 +- engine/storage_redis.go | 2 +- engine/units_counter.go | 2 +- engine/userbalance.go | 2 +- engine/userbalance_test.go | 16 +++++----- scheduler/scheduler.go | 5 ++-- 17 files changed, 99 insertions(+), 82 deletions(-) diff --git a/engine/action_timing.go b/engine/action_timing.go index 35374998e..e1b55e3a3 100644 --- a/engine/action_timing.go +++ b/engine/action_timing.go @@ -207,7 +207,7 @@ func (at *ActionTiming) SetActions(as Actions) { func (at *ActionTiming) getActions() (as []*Action, err error) { if at.actions == nil { - at.actions, err = storageGetter.GetActions(at.ActionsId, false) + at.actions, err = accountingStorage.GetActions(at.ActionsId, false) } at.actions.Sort() return at.actions, err @@ -229,7 +229,7 @@ func (at *ActionTiming) Execute() (err error) { } for _, ubId := range at.UserBalanceIds { AccLock.Guard(ubId, func() (float64, error) { - ub, err := storageGetter.GetUserBalance(ubId) + ub, err := accountingStorage.GetUserBalance(ubId) if err != nil { Logger.Warning(fmt.Sprintf("Could not get user balances for this id: %s. Skipping!", ubId)) return 0, err @@ -237,7 +237,7 @@ func (at *ActionTiming) Execute() (err error) { Logger.Info(fmt.Sprintf("Executing %v on %v", a.ActionType, ub.Id)) err = actionFunction(ub, a) - storageGetter.SetUserBalance(ub) + accountingStorage.SetUserBalance(ub) return 0, nil }) } diff --git a/engine/action_trigger.go b/engine/action_trigger.go index 0a6107897..ec51114d9 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -41,7 +41,7 @@ type ActionTrigger struct { func (at *ActionTrigger) Execute(ub *UserBalance) (err error) { // does NOT need to Lock() because it is triggered from a method that took the Lock var aac Actions - aac, err = storageGetter.GetActions(at.ActionsId, false) + aac, err = accountingStorage.GetActions(at.ActionsId, false) aac.Sort() if err != nil { Logger.Err(fmt.Sprintf("Failed to get actions: %v", err)) @@ -62,7 +62,7 @@ func (at *ActionTrigger) Execute(ub *UserBalance) (err error) { } storageLogger.LogActionTrigger(ub.Id, RATER_SOURCE, at, aac) at.Executed = true - storageGetter.SetUserBalance(ub) + accountingStorage.SetUserBalance(ub) return } diff --git a/engine/actions_test.go b/engine/actions_test.go index f62d1017e..4b7733c56 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -851,14 +851,14 @@ func TestActionTriggerLogging(t *testing.T) { Weight: 10.0, ActionsId: "TEST_ACTIONS", } - as, err := storageGetter.GetActions(at.ActionsId, false) + as, err := accountingStorage.GetActions(at.ActionsId, false) if err != nil { t.Error("Error getting actions for the action timing: ", as, err) } storageLogger.LogActionTrigger("rif", RATER_SOURCE, at, as) //expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0" var key string - atMap, _ := storageGetter.GetAllActionTimings() + atMap, _ := accountingStorage.GetAllActionTimings() for k, v := range atMap { _ = k _ = v @@ -895,14 +895,14 @@ func TestActionTimingLogging(t *testing.T) { Weight: 10.0, ActionsId: "TEST_ACTIONS", } - as, err := storageGetter.GetActions(at.ActionsId, false) + as, err := accountingStorage.GetActions(at.ActionsId, false) if err != nil { t.Error("Error getting actions for the action trigger: ", err) } storageLogger.LogActionTiming(SCHED_SOURCE, at, as) //expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0" var key string - atMap, _ := storageGetter.GetAllActionTimings() + atMap, _ := accountingStorage.GetAllActionTimings() for k, v := range atMap { _ = k _ = v diff --git a/engine/calldesc.go b/engine/calldesc.go index 5ef5000f6..ab3adfa84 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -39,9 +39,11 @@ func init() { DEBUG := true if DEBUG { storageGetter, _ = NewMapStorage() + accountingStorage, _ = NewMapStorage() } else { //storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "") storageGetter, _ = NewRedisStorage("127.0.0.1:6379", 11, "", utils.MSGPACK) + accountingStorage, _ = NewRedisStorage("127.0.0.1:6379", 12, "", utils.MSGPACK) } storageLogger = storageGetter.(LogStorage) } @@ -52,13 +54,14 @@ const ( ) var ( - Logger utils.LoggerInterface - storageGetter DataStorage - storageLogger LogStorage - debitPeriod = 10 * time.Second - roundingMethod = "*middle" - roundingDecimals = 4 - historyScribe history.Scribe + Logger utils.LoggerInterface + storageGetter DataStorage + accountingStorage AccountingStorage + storageLogger LogStorage + debitPeriod = 10 * time.Second + roundingMethod = "*middle" + roundingDecimals = 4 + historyScribe history.Scribe //historyScribe, _ = history.NewMockScribe() ) @@ -67,6 +70,10 @@ func SetDataStorage(sg DataStorage) { storageGetter = sg } +func SetAccountingStorage(ag AccountingStorage) { + accountingStorage = ag +} + // Sets the global rounding method and decimal precision for GetCost method func SetRoundingMethodAndDecimals(rm string, rd int) { roundingMethod = rm @@ -136,7 +143,7 @@ func (cd *CallDescriptor) GetUserBalanceKey() string { // Gets and caches the user balance information. func (cd *CallDescriptor) getUserBalance() (ub *UserBalance, err error) { if cd.userBalance == nil { - cd.userBalance, err = storageGetter.GetUserBalance(cd.GetUserBalanceKey()) + cd.userBalance, err = accountingStorage.GetUserBalance(cd.GetUserBalanceKey()) } return cd.userBalance, err } @@ -481,7 +488,7 @@ func (cd *CallDescriptor) Debit() (cc *CallCost, err error) { // Logger.Debug(fmt.Sprintf(" No user balance defined: %v", cd.GetUserBalanceKey())) } else { //Logger.Debug(fmt.Sprintf(" Attempting to debit from %v, value: %v", cd.GetUserBalanceKey(), cc.Cost+cc.ConnectFee)) - defer storageGetter.SetUserBalance(userBalance) + defer accountingStorage.SetUserBalance(userBalance) //ub, _ := json.Marshal(userBalance) //Logger.Debug(fmt.Sprintf("UserBalance: %s", ub)) //cCost, _ := json.Marshal(cc) @@ -510,7 +517,7 @@ func (cd *CallDescriptor) MaxDebit() (cc *CallCost, err error) { func (cd *CallDescriptor) RefundIncrements() (left float64, err error) { if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { - defer storageGetter.SetUserBalance(userBalance) + defer accountingStorage.SetUserBalance(userBalance) userBalance.refundIncrements(cd.Increments, cd.Direction, true) } return 0.0, err @@ -522,7 +529,7 @@ The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitCents() (left float64, err error) { if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { - defer storageGetter.SetUserBalance(userBalance) + defer accountingStorage.SetUserBalance(userBalance) return userBalance.debitGenericBalance(CREDIT, cd.Direction, cd.Amount, true), nil } return 0.0, err @@ -534,7 +541,7 @@ The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitSMS() (left float64, err error) { if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { - defer storageGetter.SetUserBalance(userBalance) + defer accountingStorage.SetUserBalance(userBalance) return userBalance.debitGenericBalance(SMS, cd.Direction, cd.Amount, true), nil } return 0, err @@ -546,7 +553,7 @@ The amount filed has to be filled in call descriptor. */ func (cd *CallDescriptor) DebitSeconds() (err error) { if userBalance, err := cd.getUserBalance(); err == nil && userBalance != nil { - defer storageGetter.SetUserBalance(userBalance) + defer accountingStorage.SetUserBalance(userBalance) return userBalance.debitCreditBalance(cd.CreateCallCost(), true) } return err diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index a58ed76bc..67ac4d216 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -55,10 +55,10 @@ func populateDB() { &Balance{Value: 100, DestinationId: "RET", Weight: 20}, }}, } - if storageGetter != nil { - storageGetter.(Storage).Flush() - storageGetter.SetUserBalance(broker) - storageGetter.SetUserBalance(minu) + if accountingStorage != nil { + accountingStorage.(Storage).Flush() + accountingStorage.SetUserBalance(broker) + accountingStorage.SetUserBalance(minu) } else { log.Fatal("Could not connect to db!") } diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 004ed7906..3adb8057e 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -31,28 +31,30 @@ import ( ) type CSVReader struct { - sep rune - storage DataStorage - readerFunc func(string, rune, int) (*csv.Reader, *os.File, error) - actions map[string][]*Action - actionsTimings map[string][]*ActionTiming - actionsTriggers map[string][]*ActionTrigger - accountActions []*UserBalance - destinations []*Destination - timings map[string]*utils.TPTiming - rates map[string]*utils.TPRate - destinationRates map[string]*utils.TPDestinationRate - ratingPlans map[string]*RatingPlan - ratingProfiles map[string]*RatingProfile + sep rune + dataStorage DataStorage + accountingStorage AccountingStorage + readerFunc func(string, rune, int) (*csv.Reader, *os.File, error) + actions map[string][]*Action + actionsTimings map[string][]*ActionTiming + actionsTriggers map[string][]*ActionTrigger + accountActions []*UserBalance + destinations []*Destination + timings map[string]*utils.TPTiming + rates map[string]*utils.TPRate + destinationRates map[string]*utils.TPDestinationRate + ratingPlans map[string]*RatingPlan + ratingProfiles map[string]*RatingProfile // file names destinationsFn, ratesFn, destinationratesFn, timingsFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string } -func NewFileCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader { +func NewFileCSVReader(dataStorage DataStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader { c := new(CSVReader) c.sep = sep - c.storage = storage + c.dataStorage = dataStorage + c.accountingStorage = accountingStorage c.actions = make(map[string][]*Action) c.actionsTimings = make(map[string][]*ActionTiming) c.actionsTriggers = make(map[string][]*ActionTrigger) @@ -68,8 +70,8 @@ func NewFileCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn, return c } -func NewStringCSVReader(storage DataStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader { - c := NewFileCSVReader(storage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn) +func NewStringCSVReader(dataStorage DataStorage, accountingStorage AccountingStorage, sep rune, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn string) *CSVReader { + c := NewFileCSVReader(dataStorage, accountingStorage, sep, destinationsFn, timingsFn, ratesFn, destinationratesFn, destinationratetimingsFn, ratingprofilesFn, actionsFn, actiontimingsFn, actiontriggersFn, accountactionsFn) c.readerFunc = openStringCSVReader return c } @@ -113,18 +115,19 @@ func (csvr *CSVReader) ShowStatistics() { } func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { - storage := csvr.storage - if storage == nil { + dataStorage := csvr.dataStorage + accountingStorage := csvr.accountingStorage + if dataStorage == nil { return errors.New("No database connection!") } if flush { - storage.(Storage).Flush() + dataStorage.(Storage).Flush() } if verbose { log.Print("Destinations") } for _, d := range csvr.destinations { - err = storage.SetDestination(d) + err = dataStorage.SetDestination(d) if err != nil { return err } @@ -136,7 +139,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Rating plans") } for _, rp := range csvr.ratingPlans { - err = storage.SetRatingPlan(rp) + err = dataStorage.SetRatingPlan(rp) if err != nil { return err } @@ -148,7 +151,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Rating profiles") } for _, rp := range csvr.ratingProfiles { - err = storage.SetRatingProfile(rp) + err = dataStorage.SetRatingProfile(rp) if err != nil { return err } @@ -160,7 +163,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Action timings") } for k, ats := range csvr.actionsTimings { - err = storage.SetActionTimings(k, ats) + err = accountingStorage.SetActionTimings(k, ats) if err != nil { return err } @@ -172,7 +175,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Actions") } for k, as := range csvr.actions { - err = storage.SetActions(k, as) + err = accountingStorage.SetActions(k, as) if err != nil { return err } @@ -184,7 +187,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Account actions") } for _, ub := range csvr.accountActions { - err = storage.SetUserBalance(ub) + err = accountingStorage.SetUserBalance(ub) if err != nil { return err } @@ -297,7 +300,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { } } if !destinationExists { - if dbExists, err := csvr.storage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil { + if dbExists, err := csvr.dataStorage.ExistsData(DESTINATION_PREFIX, record[1]); err != nil { return err } else if !dbExists { return fmt.Errorf("Could not get destination for tag %v", record[1]) @@ -380,7 +383,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { } _, exists := csvr.ratingPlans[record[5]] if !exists { - if dbExists, err := csvr.storage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil { + if dbExists, err := csvr.dataStorage.ExistsData(RATING_PLAN_PREFIX, record[5]); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index 196e1c744..9d13893a4 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -129,7 +129,7 @@ vdf,minitsboy,*out,MORE_MINUTES,STANDARD_TRIGGER var csvr *CSVReader func init() { - csvr = NewStringCSVReader(storageGetter, ',', destinations, timings, rates, destinationRates, destinationRateTimings, ratingProfiles, actions, actionTimings, actionTriggers, accountActions) + csvr = NewStringCSVReader(storageGetter, accountingStorage, ',', destinations, timings, rates, destinationRates, destinationRateTimings, ratingProfiles, actions, actionTimings, actionTriggers, accountActions) csvr.LoadDestinations() csvr.LoadTimings() csvr.LoadRates() diff --git a/engine/loader_db.go b/engine/loader_db.go index 23dbbdb54..6c93375e2 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -30,6 +30,7 @@ type DbReader struct { tpid string storDb LoadStorage dataDb DataStorage + accountDb AccountingStorage actions map[string][]*Action actionsTimings map[string][]*ActionTiming actionsTriggers map[string][]*ActionTrigger @@ -42,10 +43,11 @@ type DbReader struct { ratingProfiles map[string]*RatingProfile } -func NewDbReader(storDB LoadStorage, storage DataStorage, tpid string) *DbReader { +func NewDbReader(storDB LoadStorage, storage DataStorage, accountDb AccountingStorage, tpid string) *DbReader { c := new(DbReader) c.storDb = storDB c.dataDb = storage + c.accountDb = accountDb c.tpid = tpid c.actions = make(map[string][]*Action) c.actionsTimings = make(map[string][]*ActionTiming) @@ -103,7 +105,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Action timings") } for k, ats := range dbr.actionsTimings { - err = storage.SetActionTimings(k, ats) + err = accountingStorage.SetActionTimings(k, ats) if err != nil { return err } @@ -115,7 +117,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Actions") } for k, as := range dbr.actions { - err = storage.SetActions(k, as) + err = accountingStorage.SetActions(k, as) if err != nil { return err } @@ -127,7 +129,7 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { log.Print("Account actions") } for _, ub := range dbr.accountActions { - err = storage.SetUserBalance(ub) + err = accountingStorage.SetUserBalance(ub) if err != nil { return err } @@ -465,7 +467,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) if accountAction.ActionTimingsId != "" { // get old userBalanceIds var exitingUserBalanceIds []string - existingActionTimings, err := dbr.dataDb.GetActionTimings(accountAction.ActionTimingsId) + existingActionTimings, err := dbr.accountDb.GetActionTimings(accountAction.ActionTimingsId) if err == nil && len(existingActionTimings) > 0 { // all action timings from a specific tag shuld have the same list of user balances from the first one exitingUserBalanceIds = existingActionTimings[0].UserBalanceIds @@ -525,7 +527,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) } // write action timings - err = dbr.dataDb.SetActionTimings(accountAction.ActionTimingsId, actionTimings) + err = dbr.accountDb.SetActionTimings(accountAction.ActionTimingsId, actionTimings) if err != nil { return err } @@ -593,12 +595,12 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) } // writee actions for k, as := range acts { - err = dbr.dataDb.SetActions(k, as) + err = dbr.accountDb.SetActions(k, as) if err != nil { return err } } - ub, err := dbr.dataDb.GetUserBalance(id) + ub, err := dbr.accountDb.GetUserBalance(id) if err != nil { ub = &UserBalance{ Type: UB_TYPE_PREPAID, @@ -607,7 +609,7 @@ func (dbr *DbReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) } ub.ActionTriggers = actionTriggers - if err := dbr.dataDb.SetUserBalance(ub); err != nil { + if err := dbr.accountDb.SetUserBalance(ub); err != nil { return err } } diff --git a/engine/loader_local_test.go b/engine/loader_local_test.go index cfee8632a..294de1d9d 100644 --- a/engine/loader_local_test.go +++ b/engine/loader_local_test.go @@ -20,10 +20,11 @@ package engine import ( "flag" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/utils" "path" "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" ) /* diff --git a/engine/responder.go b/engine/responder.go index 3701aac99..d3cf90b9b 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -220,7 +220,7 @@ func (rs *Responder) getBalance(arg *CallDescriptor, balanceId string, reply *Ca return errors.New("No balancer supported for this command right now") } ubKey := arg.Direction + ":" + arg.Tenant + ":" + arg.Account - userBalance, err := storageGetter.GetUserBalance(ubKey) + userBalance, err := accountingStorage.GetUserBalance(ubKey) if err != nil { return err } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index e59baa6d6..1e8fa9242 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -75,8 +75,11 @@ type DataStorage interface { GetRatingProfile(string, bool) (*RatingProfile, error) SetRatingProfile(*RatingProfile) error GetDestination(string) (*Destination, error) - // DestinationContainsPrefix(string, string) (int, error) SetDestination(*Destination) error +} + +type AccountingStorage interface { + Storage GetActions(string, bool) (Actions, error) SetActions(string, Actions) error GetUserBalance(string) (*UserBalance, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 96051378f..38d444107 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -34,7 +34,7 @@ type MapStorage struct { ms Marshaler } -func NewMapStorage() (DataStorage, error) { +func NewMapStorage() (*MapStorage, error) { return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler()}, nil } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c0ddefe08..15deaecec 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -39,7 +39,7 @@ type RedisStorage struct { ms Marshaler } -func NewRedisStorage(address string, db int, pass, mrshlerStr string) (DataStorage, error) { +func NewRedisStorage(address string, db int, pass, mrshlerStr string) (*RedisStorage, error) { ndb := &redis.Client{Addr: address, Db: db} if pass != "" { if err := ndb.Auth(pass); err != nil { diff --git a/engine/units_counter.go b/engine/units_counter.go index bbc82c6fb..3adc1c605 100644 --- a/engine/units_counter.go +++ b/engine/units_counter.go @@ -34,7 +34,7 @@ type UnitsCounter struct { func (uc *UnitsCounter) initBalances(ats []*ActionTrigger) { uc.Balances = BalanceChain{&Balance{}} // general balance for _, at := range ats { - acs, err := storageGetter.GetActions(at.ActionsId, false) + acs, err := accountingStorage.GetActions(at.ActionsId, false) if err != nil { continue } diff --git a/engine/userbalance.go b/engine/userbalance.go index bf21d5bb2..3dc6433db 100644 --- a/engine/userbalance.go +++ b/engine/userbalance.go @@ -379,7 +379,7 @@ func (ub *UserBalance) countUnits(a *Action) { func (ub *UserBalance) initCounters() { ucTempMap := make(map[string]*UnitsCounter, 2) for _, at := range ub.ActionTriggers { - acs, err := storageGetter.GetActions(at.ActionsId, false) + acs, err := accountingStorage.GetActions(at.ActionsId, false) if err != nil { continue } diff --git a/engine/userbalance_test.go b/engine/userbalance_test.go index 766820dbe..38636e2a0 100644 --- a/engine/userbalance_test.go +++ b/engine/userbalance_test.go @@ -37,12 +37,12 @@ func populateTestActionsForTriggers() { &Action{ActionType: "*topup", BalanceId: CREDIT, Direction: OUTBOUND, Balance: &Balance{Value: 10}}, &Action{ActionType: "*topup", BalanceId: MINUTES, Direction: OUTBOUND, Balance: &Balance{Weight: 20, Value: 10, DestinationId: "NAT"}}, } - storageGetter.SetActions("TEST_ACTIONS", ats) + accountingStorage.SetActions("TEST_ACTIONS", ats) ats1 := []*Action{ &Action{ActionType: "*topup", BalanceId: CREDIT, Direction: OUTBOUND, Balance: &Balance{Value: 10}, Weight: 20}, &Action{ActionType: "*reset_prepaid", Weight: 10}, } - storageGetter.SetActions("TEST_ACTIONS_ORDER", ats1) + accountingStorage.SetActions("TEST_ACTIONS_ORDER", ats1) } func TestBalanceStoreRestore(t *testing.T) { @@ -99,8 +99,8 @@ func TestUserBalanceStorageStoreRestore(t *testing.T) { b1 := &Balance{Value: 10, Weight: 10, DestinationId: "NAT"} b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"} rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}} - storageGetter.SetUserBalance(rifsBalance) - ub1, err := storageGetter.GetUserBalance("other") + accountingStorage.SetUserBalance(rifsBalance) + ub1, err := accountingStorage.GetUserBalance("other") if err != nil || !ub1.BalanceMap[CREDIT+OUTBOUND].Equal(rifsBalance.BalanceMap[CREDIT+OUTBOUND]) { t.Log("UB: ", ub1) t.Errorf("Expected %v was %v", rifsBalance.BalanceMap[CREDIT+OUTBOUND], ub1.BalanceMap[CREDIT+OUTBOUND]) @@ -161,8 +161,8 @@ func TestUserBalanceStorageStore(t *testing.T) { b1 := &Balance{Value: 10, Weight: 10, DestinationId: "NAT"} b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"} rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}} - storageGetter.SetUserBalance(rifsBalance) - result, err := storageGetter.GetUserBalance(rifsBalance.Id) + accountingStorage.SetUserBalance(rifsBalance) + result, err := accountingStorage.GetUserBalance(rifsBalance.Id) if err != nil || rifsBalance.Id != result.Id || len(rifsBalance.BalanceMap[MINUTES+OUTBOUND]) < 2 || len(result.BalanceMap[MINUTES+OUTBOUND]) < 2 || !(rifsBalance.BalanceMap[MINUTES+OUTBOUND][0].Equal(result.BalanceMap[MINUTES+OUTBOUND][0])) || @@ -1063,8 +1063,8 @@ func BenchmarkUserBalanceStorageStoreRestore(b *testing.B) { b2 := &Balance{Value: 100, Weight: 20, DestinationId: "RET"} rifsBalance := &UserBalance{Id: "other", BalanceMap: map[string]BalanceChain{MINUTES + OUTBOUND: BalanceChain{b1, b2}, CREDIT + OUTBOUND: BalanceChain{&Balance{Value: 21}}}} for i := 0; i < b.N; i++ { - storageGetter.SetUserBalance(rifsBalance) - storageGetter.GetUserBalance(rifsBalance.Id) + accountingStorage.SetUserBalance(rifsBalance) + accountingStorage.GetUserBalance(rifsBalance.Id) } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index bde77a620..47942de48 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -20,9 +20,10 @@ package scheduler import ( "fmt" - "github.com/cgrates/cgrates/engine" "sort" "time" + + "github.com/cgrates/cgrates/engine" ) type Scheduler struct { @@ -63,7 +64,7 @@ func (s *Scheduler) Loop() { } } -func (s *Scheduler) LoadActionTimings(storage engine.DataStorage) { +func (s *Scheduler) LoadActionTimings(storage engine.AccountingStorage) { actionTimings, err := storage.GetAllActionTimings() if err != nil { engine.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err))