Adding caching for DerivedCharging settings

This commit is contained in:
DanB
2014-04-18 19:59:11 +02:00
parent 05e4772f6d
commit 94da5d9e74
17 changed files with 184 additions and 32 deletions

View File

@@ -499,7 +499,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str
}
// ToDo: Get the action keys loaded by dbReader so we reload only these in cache
// Need to do it before scheduler otherwise actions to run will be unknown
if err := self.AccountDb.CacheAccounting(nil, nil, nil); err != nil {
if err := self.AccountDb.CacheAccounting(nil, nil, nil, []string{}); err != nil {
return err
}
if self.Sched != nil {
@@ -522,7 +522,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
}
func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error {
var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, rpAlsKeys, accAlsKeys []string
var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, rpAlsKeys, accAlsKeys, dcsKeys []string
if len(attrs.DestinationIds) > 0 {
dstKeys = make([]string, len(attrs.DestinationIds))
for idx, dId := range attrs.DestinationIds {
@@ -565,10 +565,16 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro
accAlsKeys[idx] = engine.ACC_ALIAS_PREFIX + alias
}
}
if len(attrs.DerivedChargers) > 0 {
dcsKeys = make([]string, len(attrs.DerivedChargers))
for idx, dc := range attrs.DerivedChargers {
dcsKeys[idx] = engine.DERIVEDCHARGERS_PREFIX + dc
}
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, rpAlsKeys); err != nil {
return err
}
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys); err != nil {
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys, dcsKeys); err != nil {
return err
}
*reply = "OK"
@@ -584,6 +590,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach
cs.SharedGroups = cache2go.CountEntries(engine.SHARED_GROUP_PREFIX)
cs.RatingAliases = cache2go.CountEntries(engine.RP_ALIAS_PREFIX)
cs.AccountAliases = cache2go.CountEntries(engine.ACC_ALIAS_PREFIX)
cs.DerivedChargers = cache2go.CountEntries(engine.DERIVEDCHARGERS_PREFIX)
*reply = *cs
return nil
}
@@ -683,10 +690,15 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
for idx, alias := range accAliases {
accAlsKeys[idx] = engine.ACC_ALIAS_PREFIX + alias
}
dcs, _ := loader.GetLoadedIds(engine.DERIVEDCHARGERS_PREFIX)
dcsKeys := make([]string, len(dcs))
for idx, dc := range dcs {
dcsKeys[idx] = engine.DERIVEDCHARGERS_PREFIX + dc
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, rpAlsKeys); err != nil {
return err
}
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys); err != nil {
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys, dcsKeys); err != nil {
return err
}
if self.Sched != nil {

View File

@@ -1255,6 +1255,28 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) {
time.Sleep(100 * time.Millisecond) // Give time for scheduler to execute topups
}
func TestResetDataAfterLoadFromFolder(t *testing.T) {
if !*testLocal {
return
}
reply := ""
arc := new(utils.ApiReloadCache)
// Simple test that command is executed without errors
if err := rater.Call("ApierV1.ReloadCache", arc, &reply); err != nil {
t.Error("Got error on ApierV1.ReloadCache: ", err.Error())
} else if reply != "OK" {
t.Error("Calling ApierV1.ReloadCache got reply: ", reply)
}
var rcvStats *utils.CacheStats
expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 1, RatingProfiles: 1, Actions: 2, DerivedChargers: 2}
var args utils.AttrCacheStats
if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil {
t.Error("Got error on ApierV1.GetCacheStats: ", err.Error())
} else if !reflect.DeepEqual(rcvStats, expectedStats) {
t.Errorf("Calling ApierV1.GetCacheStats received: %v, expected: %v", rcvStats, expectedStats)
}
}
// Make sure balance was topped-up
// Bug reported by DigiDaz over IRC
func TestApierGetAccountAfterLoad(t *testing.T) {

View File

@@ -79,7 +79,7 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage
exitChan <- true
return
}
if err := accountDb.CacheAccounting(nil, nil, nil); err != nil {
if err := accountDb.CacheAccounting(nil, nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error()))
exitChan <- true
return

View File

@@ -191,11 +191,12 @@ func main() {
shgIds, _ := loader.GetLoadedIds(engine.SHARED_GROUP_PREFIX)
rpAliases, _ := loader.GetLoadedIds(engine.RP_ALIAS_PREFIX)
accAliases, _ := loader.GetLoadedIds(engine.ACC_ALIAS_PREFIX)
dcs, _ := loader.GetLoadedIds(engine.DERIVEDCHARGERS_PREFIX)
// Reload cache first since actions could be calling info from within
if *verbose {
log.Print("Reloading cache")
}
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases}, &reply); err != nil {
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases, dcs}, &reply); err != nil {
log.Fatalf("Got error on cache reload: %s", err.Error())
}
actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)

View File

@@ -148,7 +148,7 @@ type CGRConfig struct {
MediatorSetupTimeFields []string // Name of setup_time fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAnswerTimeFields []string // Name of answer_time fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
PseudoSessions DerivedChargers // System wide pseudosessions which will be executed in case of no particular ones defined per account
DerivedChargers DerivedChargers // System wide pseudosessions which will be executed in case of no particular ones defined per account
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
@@ -239,6 +239,7 @@ func (self *CGRConfig) setDefaults() error {
self.MediatorSetupTimeFields = []string{}
self.MediatorAnswerTimeFields = []string{}
self.MediatorDurationFields = []string{}
self.DerivedChargers = make(DerivedChargers, 0)
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = "internal"

View File

@@ -120,6 +120,7 @@ func TestDefaults(t *testing.T) {
eCfg.MediatorDestFields = []string{}
eCfg.MediatorSetupTimeFields = []string{}
eCfg.MediatorAnswerTimeFields = []string{}
eCfg.DerivedChargers = make(DerivedChargers, 0)
eCfg.MediatorDurationFields = []string{}
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS

View File

@@ -1,4 +1,4 @@
Tenant,Tor,Direction,Account,Subject,RunId,ReqTypeField,DirectionField,TenantField,TorField,AccountField,SubjectField,DestinationField,SetupTimeField,AnswerTimeField,DurationField
#Tenant,Tor,Direction,Account,Subject,RunId,ReqTypeField,DirectionField,TenantField,TorField,AccountField,SubjectField,DestinationField,SetupTimeField,AnswerTimeField,DurationField
cgrates.org,call,*out,dan,dan,extra1,^prepaid,,,,rif,rif,,,,cgr_duration
cgrates.org,call,*out,dan,dan,extra2,,,,,ivo,ivo,,,,
cgrates.org,call,*out,dan,*any,extra1,,,,,rif2,rif2,,,,
1 Tenant #Tenant Tor Direction Account Subject RunId ReqTypeField DirectionField TenantField TorField AccountField SubjectField DestinationField SetupTimeField AnswerTimeField DurationField
2 cgrates.org cgrates.org call *out dan dan extra1 ^prepaid rif rif cgr_duration
3 cgrates.org cgrates.org call *out dan dan extra2 ivo ivo
4 cgrates.org cgrates.org call *out dan *any extra1 rif2 rif2

View File

@@ -616,7 +616,7 @@ func (cd *CallDescriptor) FlushCache() (err error) {
cache2go.XFlush()
cache2go.Flush()
dataStorage.CacheRating(nil, nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil, nil, nil)
return nil
}

View File

@@ -195,7 +195,7 @@ func init() {
csvr.LoadDerivedChargers()
csvr.WriteToDatabase(false, false)
dataStorage.CacheRating(nil, nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil, nil, nil)
}
func TestLoadDestinations(t *testing.T) {

View File

@@ -89,7 +89,7 @@ type RatingStorage interface {
type AccountingStorage interface {
Storage
HasData(string, string) (bool, error)
CacheAccounting([]string, []string, []string) error
CacheAccounting([]string, []string, []string, []string) error
GetActions(string, bool) (Actions, error)
SetActions(string, Actions) error
GetSharedGroup(string, bool) (*SharedGroup, error)

View File

@@ -96,7 +96,7 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) erro
return nil
}
func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error {
func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys, dcsKeys []string) error {
if actKeys == nil {
cache2go.RemPrefixKey(ACTION_PREFIX) // Forced until we can fine tune it
}
@@ -106,6 +106,9 @@ func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error
if alsKeys == nil {
cache2go.RemPrefixKey(ACC_ALIAS_PREFIX)
}
if dcsKeys == nil {
cache2go.RemPrefixKey(DERIVEDCHARGERS_PREFIX)
}
for k, _ := range ms.dict {
if strings.HasPrefix(k, ACTION_PREFIX) {
cache2go.RemKey(k)
@@ -125,6 +128,12 @@ func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error
return err
}
}
if strings.HasPrefix(k, DERIVEDCHARGERS_PREFIX) {
cache2go.RemKey(k)
if _, err := ms.GetDerivedChargers(k[len(DERIVEDCHARGERS_PREFIX):], true); err != nil {
return err
}
}
}
return nil
}
@@ -438,7 +447,7 @@ func (ms *MapStorage) GetDerivedChargers(key string, checkDb bool) (dcs config.D
return nil, errors.New(utils.ERR_NOT_FOUND)
}
if values, ok := ms.dict[key]; ok {
err = ms.ms.Unmarshal(values, dcs)
err = ms.ms.Unmarshal(values, &dcs)
cache2go.Cache(key, dcs)
} else {
return nil, errors.New("not found")

View File

@@ -144,7 +144,7 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) (e
return
}
func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) (err error) {
func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys, dcsKeys []string) (err error) {
if actKeys == nil {
cache2go.RemPrefixKey(ACTION_PREFIX)
}
@@ -203,6 +203,25 @@ func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) (err
if len(alsKeys) != 0 {
Logger.Info("Finished account aliases caching.")
}
// DerivedChargers caching
if dcsKeys == nil {
Logger.Info("Caching all derived chargers")
if dcsKeys, err = rs.db.Keys(DERIVEDCHARGERS_PREFIX + "*"); err != nil {
return
}
cache2go.RemPrefixKey(DERIVEDCHARGERS_PREFIX)
} else if len(dcsKeys) != 0 {
Logger.Info(fmt.Sprintf("Caching derived chargers: %v", dcsKeys))
}
for _, key := range dcsKeys {
cache2go.RemKey(key)
if _, err = rs.GetDerivedChargers(key[len(DERIVEDCHARGERS_PREFIX):], true); err != nil {
return err
}
}
if len(dcsKeys) != 0 {
Logger.Info("Finished derived chargers caching.")
}
return nil
}
@@ -535,7 +554,7 @@ func (rs *RedisStorage) GetDerivedChargers(key string, checkDb bool) (dcs config
}
var values []byte
if values, err = rs.db.Get(key); err == nil {
err = rs.ms.Unmarshal(values, dcs)
err = rs.ms.Unmarshal(values, &dcs)
cache2go.Cache(key, dcs)
}
return dcs, err

View File

@@ -0,0 +1,84 @@
/*
Real-Time Charging System for Telecom Environments
Copyright (C) 2012-2014 ITsysCOM GmbH
This program is free software: you can Storagetribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITH*out ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"reflect"
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var rds *RedisStorage
var err error
func TestConnectRedis(t *testing.T) {
if !*testLocal {
return
}
cfg, _ = config.NewDefaultCGRConfig()
rds, err = NewRedisStorage(fmt.Sprintf("%s:%s", cfg.RatingDBHost, cfg.RatingDBPort), 4, cfg.RatingDBPass, cfg.DBDataEncoding)
if err != nil {
t.Fatal("Could not connect to Redis", err.Error())
}
}
func TestFlush(t *testing.T) {
if !*testLocal {
return
}
if err := rds.Flush(); err != nil {
t.Error("Failed to Flush redis database", err.Error())
}
rds.CacheAccounting(nil, nil, nil, nil)
}
func TestSetGetDerivedCharges(t *testing.T) {
if !*testLocal {
return
}
keyCharger1 := utils.ConcatenatedKey("cgrates.org", "call", "*out", "dan", "dan")
charger1 := config.DerivedChargers{
&config.DerivedCharger{RunId: "extra1", ReqTypeField: "^prepaid", DirectionField: "*default", TenantField: "*default", TorField: "*default",
AccountField: "rif", SubjectField: "rif", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", DurationField: "*default"},
&config.DerivedCharger{RunId: "extra2", ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", TorField: "*default",
AccountField: "ivo", SubjectField: "ivo", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", DurationField: "*default"},
}
if err := rds.SetDerivedChargers(keyCharger1, charger1); err != nil {
t.Error("Error on setting DerivedChargers", err.Error())
}
// Try retrieving from cache, should not be in yet
if _, err := rds.GetDerivedChargers(keyCharger1, false); err == nil {
t.Error("DerivedCharger should not be in the cache")
}
// Retrieve from db
if rcvCharger, err := rds.GetDerivedChargers(keyCharger1, true); err != nil {
t.Error("Error when retrieving DerivedCHarger", err.Error())
} else if !reflect.DeepEqual(rcvCharger, charger1) {
t.Errorf("Expecting %v, received: %v", charger1, rcvCharger)
}
// Retrieve from cache
if rcvCharger, err := rds.GetDerivedChargers(keyCharger1, false); err != nil {
t.Error("Error when retrieving DerivedCHarger", err.Error())
} else if !reflect.DeepEqual(rcvCharger, charger1) {
t.Errorf("Expecting %v, received: %v", charger1, rcvCharger)
}
}

View File

@@ -103,7 +103,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
t.Error("No account saved")
}
ratingDb.CacheRating(nil, nil, nil, nil)
acntDb.CacheAccounting(nil, nil, nil)
acntDb.CacheAccounting(nil, nil, nil, nil)
if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 {
t.Error("Wrong number of cached destinations found", cachedDests)
}

View File

@@ -103,7 +103,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
t.Error("No account saved")
}
ratingDb2.CacheRating(nil, nil, nil, nil)
acntDb2.CacheAccounting(nil, nil, nil)
acntDb2.CacheAccounting(nil, nil, nil, nil)
if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 {
t.Error("Wrong number of cached destinations found", cachedDests)
}

View File

@@ -101,7 +101,7 @@ cgrates.org,call,*out,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_
t.Error("No account saved")
}
ratingDb3.CacheRating(nil, nil, nil, nil)
acntDb3.CacheAccounting(nil, nil, nil)
acntDb3.CacheAccounting(nil, nil, nil, nil)
if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 {
t.Error("Wrong number of cached destinations found", cachedDests)
}

View File

@@ -285,19 +285,21 @@ type ApiReloadCache struct {
SharedGroupIds []string
RpAliases []string
AccAliases []string
DerivedChargers []string
}
type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache
}
type CacheStats struct {
Destinations int
RatingPlans int
RatingProfiles int
Actions int
SharedGroups int
RatingAliases int
AccountAliases int
Destinations int
RatingPlans int
RatingProfiles int
Actions int
SharedGroups int
RatingAliases int
AccountAliases int
DerivedChargers int
}
type AttrCachedItemAge struct {
@@ -306,13 +308,14 @@ type AttrCachedItemAge struct {
}
type CachedItemAge struct {
Destination time.Duration
RatingPlan time.Duration
RatingProfile time.Duration
Action time.Duration
SharedGroup time.Duration
RatingAlias time.Duration
AccountAlias time.Duration
Destination time.Duration
RatingPlan time.Duration
RatingProfile time.Duration
Action time.Duration
SharedGroup time.Duration
RatingAlias time.Duration
AccountAlias time.Duration
DerivedChargers time.Duration
}
type AttrExpFileCdrs struct {