From 94da5d9e740b09f582654095d84e949f51fffffe Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 18 Apr 2014 19:59:11 +0200 Subject: [PATCH] Adding caching for DerivedCharging settings --- apier/apier.go | 20 ++++- apier/apier_local_test.go | 22 +++++ cmd/cgr-engine/cgr-engine.go | 2 +- cmd/cgr-loader/cgr-loader.go | 3 +- config/config.go | 3 +- config/config_test.go | 1 + .../prepaid1centpsec/DerivedChargers.csv | 2 +- engine/calldesc.go | 2 +- engine/loader_csv_test.go | 2 +- engine/storage_interface.go | 2 +- engine/storage_map.go | 13 ++- engine/storage_redis.go | 23 ++++- engine/storage_redis_local_test.go | 84 +++++++++++++++++++ general_tests/ddazmbl1_test.go | 2 +- general_tests/ddazmbl2_test.go | 2 +- general_tests/ddazmbl3_test.go | 2 +- utils/apitpdata.go | 31 +++---- 17 files changed, 184 insertions(+), 32 deletions(-) create mode 100644 engine/storage_redis_local_test.go diff --git a/apier/apier.go b/apier/apier.go index 91a48de27..d1195b40a 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -499,7 +499,7 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str } // ToDo: Get the action keys loaded by dbReader so we reload only these in cache // Need to do it before scheduler otherwise actions to run will be unknown - if err := self.AccountDb.CacheAccounting(nil, nil, nil); err != nil { + if err := self.AccountDb.CacheAccounting(nil, nil, nil, []string{}); err != nil { return err } if self.Sched != nil { @@ -522,7 +522,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { - var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, rpAlsKeys, accAlsKeys []string + var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, rpAlsKeys, accAlsKeys, dcsKeys []string if len(attrs.DestinationIds) > 0 { dstKeys = make([]string, len(attrs.DestinationIds)) for idx, dId := range attrs.DestinationIds { @@ -565,10 +565,16 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro accAlsKeys[idx] = engine.ACC_ALIAS_PREFIX + alias } } + if len(attrs.DerivedChargers) > 0 { + dcsKeys = make([]string, len(attrs.DerivedChargers)) + for idx, dc := range attrs.DerivedChargers { + dcsKeys[idx] = engine.DERIVEDCHARGERS_PREFIX + dc + } + } if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, rpAlsKeys); err != nil { return err } - if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys); err != nil { + if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys, dcsKeys); err != nil { return err } *reply = "OK" @@ -584,6 +590,7 @@ func (self *ApierV1) GetCacheStats(attrs utils.AttrCacheStats, reply *utils.Cach cs.SharedGroups = cache2go.CountEntries(engine.SHARED_GROUP_PREFIX) cs.RatingAliases = cache2go.CountEntries(engine.RP_ALIAS_PREFIX) cs.AccountAliases = cache2go.CountEntries(engine.ACC_ALIAS_PREFIX) + cs.DerivedChargers = cache2go.CountEntries(engine.DERIVEDCHARGERS_PREFIX) *reply = *cs return nil } @@ -683,10 +690,15 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, for idx, alias := range accAliases { accAlsKeys[idx] = engine.ACC_ALIAS_PREFIX + alias } + dcs, _ := loader.GetLoadedIds(engine.DERIVEDCHARGERS_PREFIX) + dcsKeys := make([]string, len(dcs)) + for idx, dc := range dcs { + dcsKeys[idx] = engine.DERIVEDCHARGERS_PREFIX + dc + } if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, rpAlsKeys); err != nil { return err } - if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys); err != nil { + if err := self.AccountDb.CacheAccounting(actKeys, shgKeys, accAlsKeys, dcsKeys); err != nil { return err } if self.Sched != nil { diff --git a/apier/apier_local_test.go b/apier/apier_local_test.go index 8bc4d6ad4..f5f64b581 100644 --- a/apier/apier_local_test.go +++ b/apier/apier_local_test.go @@ -1255,6 +1255,28 @@ func TestApierLoadTariffPlanFromFolder(t *testing.T) { time.Sleep(100 * time.Millisecond) // Give time for scheduler to execute topups } +func TestResetDataAfterLoadFromFolder(t *testing.T) { + if !*testLocal { + return + } + reply := "" + arc := new(utils.ApiReloadCache) + // Simple test that command is executed without errors + if err := rater.Call("ApierV1.ReloadCache", arc, &reply); err != nil { + t.Error("Got error on ApierV1.ReloadCache: ", err.Error()) + } else if reply != "OK" { + t.Error("Calling ApierV1.ReloadCache got reply: ", reply) + } + var rcvStats *utils.CacheStats + expectedStats := &utils.CacheStats{Destinations: 4, RatingPlans: 1, RatingProfiles: 1, Actions: 2, DerivedChargers: 2} + var args utils.AttrCacheStats + if err := rater.Call("ApierV1.GetCacheStats", args, &rcvStats); err != nil { + t.Error("Got error on ApierV1.GetCacheStats: ", err.Error()) + } else if !reflect.DeepEqual(rcvStats, expectedStats) { + t.Errorf("Calling ApierV1.GetCacheStats received: %v, expected: %v", rcvStats, expectedStats) + } +} + // Make sure balance was topped-up // Bug reported by DigiDaz over IRC func TestApierGetAccountAfterLoad(t *testing.T) { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index ef2269ff4..206970546 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -79,7 +79,7 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage exitChan <- true return } - if err := accountDb.CacheAccounting(nil, nil, nil); err != nil { + if err := accountDb.CacheAccounting(nil, nil, nil, nil); err != nil { engine.Logger.Crit(fmt.Sprintf("Cache accounting error: %s", err.Error())) exitChan <- true return diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 01c6aed1b..57186df90 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -191,11 +191,12 @@ func main() { shgIds, _ := loader.GetLoadedIds(engine.SHARED_GROUP_PREFIX) rpAliases, _ := loader.GetLoadedIds(engine.RP_ALIAS_PREFIX) accAliases, _ := loader.GetLoadedIds(engine.ACC_ALIAS_PREFIX) + dcs, _ := loader.GetLoadedIds(engine.DERIVEDCHARGERS_PREFIX) // Reload cache first since actions could be calling info from within if *verbose { log.Print("Reloading cache") } - if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases}, &reply); err != nil { + if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, rpAliases, accAliases, dcs}, &reply); err != nil { log.Fatalf("Got error on cache reload: %s", err.Error()) } actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX) diff --git a/config/config.go b/config/config.go index ab676f60b..cc033d119 100644 --- a/config/config.go +++ b/config/config.go @@ -148,7 +148,7 @@ type CGRConfig struct { MediatorSetupTimeFields []string // Name of setup_time fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorAnswerTimeFields []string // Name of answer_time fields to be used during mediation. Use index numbers in case of .csv cdrs. MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs. - PseudoSessions DerivedChargers // System wide pseudosessions which will be executed in case of no particular ones defined per account + DerivedChargers DerivedChargers // System wide pseudosessions which will be executed in case of no particular ones defined per account FreeswitchServer string // freeswitch address host:port FreeswitchPass string // FS socket password FreeswitchReconnects int // number of times to attempt reconnect after connect fails @@ -239,6 +239,7 @@ func (self *CGRConfig) setDefaults() error { self.MediatorSetupTimeFields = []string{} self.MediatorAnswerTimeFields = []string{} self.MediatorDurationFields = []string{} + self.DerivedChargers = make(DerivedChargers, 0) self.SMEnabled = false self.SMSwitchType = FS self.SMRater = "internal" diff --git a/config/config_test.go b/config/config_test.go index d9c0a2437..c5123c8c3 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -120,6 +120,7 @@ func TestDefaults(t *testing.T) { eCfg.MediatorDestFields = []string{} eCfg.MediatorSetupTimeFields = []string{} eCfg.MediatorAnswerTimeFields = []string{} + eCfg.DerivedChargers = make(DerivedChargers, 0) eCfg.MediatorDurationFields = []string{} eCfg.SMEnabled = false eCfg.SMSwitchType = FS diff --git a/data/tariffplans/prepaid1centpsec/DerivedChargers.csv b/data/tariffplans/prepaid1centpsec/DerivedChargers.csv index 138a9f42b..ede973623 100644 --- a/data/tariffplans/prepaid1centpsec/DerivedChargers.csv +++ b/data/tariffplans/prepaid1centpsec/DerivedChargers.csv @@ -1,4 +1,4 @@ -Tenant,Tor,Direction,Account,Subject,RunId,ReqTypeField,DirectionField,TenantField,TorField,AccountField,SubjectField,DestinationField,SetupTimeField,AnswerTimeField,DurationField +#Tenant,Tor,Direction,Account,Subject,RunId,ReqTypeField,DirectionField,TenantField,TorField,AccountField,SubjectField,DestinationField,SetupTimeField,AnswerTimeField,DurationField cgrates.org,call,*out,dan,dan,extra1,^prepaid,,,,rif,rif,,,,cgr_duration cgrates.org,call,*out,dan,dan,extra2,,,,,ivo,ivo,,,, cgrates.org,call,*out,dan,*any,extra1,,,,,rif2,rif2,,,, diff --git a/engine/calldesc.go b/engine/calldesc.go index a6d1385eb..33954a40d 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -616,7 +616,7 @@ func (cd *CallDescriptor) FlushCache() (err error) { cache2go.XFlush() cache2go.Flush() dataStorage.CacheRating(nil, nil, nil, nil) - accountingStorage.CacheAccounting(nil, nil, nil) + accountingStorage.CacheAccounting(nil, nil, nil, nil) return nil } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index ded15dc1e..2cfa863cc 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -195,7 +195,7 @@ func init() { csvr.LoadDerivedChargers() csvr.WriteToDatabase(false, false) dataStorage.CacheRating(nil, nil, nil, nil) - accountingStorage.CacheAccounting(nil, nil, nil) + accountingStorage.CacheAccounting(nil, nil, nil, nil) } func TestLoadDestinations(t *testing.T) { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 1fb8e197c..d62d25979 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -89,7 +89,7 @@ type RatingStorage interface { type AccountingStorage interface { Storage HasData(string, string) (bool, error) - CacheAccounting([]string, []string, []string) error + CacheAccounting([]string, []string, []string, []string) error GetActions(string, bool) (Actions, error) SetActions(string, Actions) error GetSharedGroup(string, bool) (*SharedGroup, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index a0a333f01..7589a5f6f 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -96,7 +96,7 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) erro return nil } -func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error { +func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys, dcsKeys []string) error { if actKeys == nil { cache2go.RemPrefixKey(ACTION_PREFIX) // Forced until we can fine tune it } @@ -106,6 +106,9 @@ func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error if alsKeys == nil { cache2go.RemPrefixKey(ACC_ALIAS_PREFIX) } + if dcsKeys == nil { + cache2go.RemPrefixKey(DERIVEDCHARGERS_PREFIX) + } for k, _ := range ms.dict { if strings.HasPrefix(k, ACTION_PREFIX) { cache2go.RemKey(k) @@ -125,6 +128,12 @@ func (ms *MapStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) error return err } } + if strings.HasPrefix(k, DERIVEDCHARGERS_PREFIX) { + cache2go.RemKey(k) + if _, err := ms.GetDerivedChargers(k[len(DERIVEDCHARGERS_PREFIX):], true); err != nil { + return err + } + } } return nil } @@ -438,7 +447,7 @@ func (ms *MapStorage) GetDerivedChargers(key string, checkDb bool) (dcs config.D return nil, errors.New(utils.ERR_NOT_FOUND) } if values, ok := ms.dict[key]; ok { - err = ms.ms.Unmarshal(values, dcs) + err = ms.ms.Unmarshal(values, &dcs) cache2go.Cache(key, dcs) } else { return nil, errors.New("not found") diff --git a/engine/storage_redis.go b/engine/storage_redis.go index a38b8a20b..991b96bdb 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -144,7 +144,7 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) (e return } -func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) (err error) { +func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys, dcsKeys []string) (err error) { if actKeys == nil { cache2go.RemPrefixKey(ACTION_PREFIX) } @@ -203,6 +203,25 @@ func (rs *RedisStorage) CacheAccounting(actKeys, shgKeys, alsKeys []string) (err if len(alsKeys) != 0 { Logger.Info("Finished account aliases caching.") } + // DerivedChargers caching + if dcsKeys == nil { + Logger.Info("Caching all derived chargers") + if dcsKeys, err = rs.db.Keys(DERIVEDCHARGERS_PREFIX + "*"); err != nil { + return + } + cache2go.RemPrefixKey(DERIVEDCHARGERS_PREFIX) + } else if len(dcsKeys) != 0 { + Logger.Info(fmt.Sprintf("Caching derived chargers: %v", dcsKeys)) + } + for _, key := range dcsKeys { + cache2go.RemKey(key) + if _, err = rs.GetDerivedChargers(key[len(DERIVEDCHARGERS_PREFIX):], true); err != nil { + return err + } + } + if len(dcsKeys) != 0 { + Logger.Info("Finished derived chargers caching.") + } return nil } @@ -535,7 +554,7 @@ func (rs *RedisStorage) GetDerivedChargers(key string, checkDb bool) (dcs config } var values []byte if values, err = rs.db.Get(key); err == nil { - err = rs.ms.Unmarshal(values, dcs) + err = rs.ms.Unmarshal(values, &dcs) cache2go.Cache(key, dcs) } return dcs, err diff --git a/engine/storage_redis_local_test.go b/engine/storage_redis_local_test.go new file mode 100644 index 000000000..cea287ccd --- /dev/null +++ b/engine/storage_redis_local_test.go @@ -0,0 +1,84 @@ +/* +Real-Time Charging System for Telecom Environments +Copyright (C) 2012-2014 ITsysCOM GmbH + +This program is free software: you can Storagetribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITH*out ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "fmt" + "reflect" + "testing" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var rds *RedisStorage +var err error + +func TestConnectRedis(t *testing.T) { + if !*testLocal { + return + } + cfg, _ = config.NewDefaultCGRConfig() + rds, err = NewRedisStorage(fmt.Sprintf("%s:%s", cfg.RatingDBHost, cfg.RatingDBPort), 4, cfg.RatingDBPass, cfg.DBDataEncoding) + if err != nil { + t.Fatal("Could not connect to Redis", err.Error()) + } +} + +func TestFlush(t *testing.T) { + if !*testLocal { + return + } + if err := rds.Flush(); err != nil { + t.Error("Failed to Flush redis database", err.Error()) + } + rds.CacheAccounting(nil, nil, nil, nil) +} + +func TestSetGetDerivedCharges(t *testing.T) { + if !*testLocal { + return + } + keyCharger1 := utils.ConcatenatedKey("cgrates.org", "call", "*out", "dan", "dan") + charger1 := config.DerivedChargers{ + &config.DerivedCharger{RunId: "extra1", ReqTypeField: "^prepaid", DirectionField: "*default", TenantField: "*default", TorField: "*default", + AccountField: "rif", SubjectField: "rif", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", DurationField: "*default"}, + &config.DerivedCharger{RunId: "extra2", ReqTypeField: "*default", DirectionField: "*default", TenantField: "*default", TorField: "*default", + AccountField: "ivo", SubjectField: "ivo", DestinationField: "*default", SetupTimeField: "*default", AnswerTimeField: "*default", DurationField: "*default"}, + } + if err := rds.SetDerivedChargers(keyCharger1, charger1); err != nil { + t.Error("Error on setting DerivedChargers", err.Error()) + } + // Try retrieving from cache, should not be in yet + if _, err := rds.GetDerivedChargers(keyCharger1, false); err == nil { + t.Error("DerivedCharger should not be in the cache") + } + // Retrieve from db + if rcvCharger, err := rds.GetDerivedChargers(keyCharger1, true); err != nil { + t.Error("Error when retrieving DerivedCHarger", err.Error()) + } else if !reflect.DeepEqual(rcvCharger, charger1) { + t.Errorf("Expecting %v, received: %v", charger1, rcvCharger) + } + // Retrieve from cache + if rcvCharger, err := rds.GetDerivedChargers(keyCharger1, false); err != nil { + t.Error("Error when retrieving DerivedCHarger", err.Error()) + } else if !reflect.DeepEqual(rcvCharger, charger1) { + t.Errorf("Expecting %v, received: %v", charger1, rcvCharger) + } +} diff --git a/general_tests/ddazmbl1_test.go b/general_tests/ddazmbl1_test.go index 3694c52c5..b9fb9dd1f 100644 --- a/general_tests/ddazmbl1_test.go +++ b/general_tests/ddazmbl1_test.go @@ -103,7 +103,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` t.Error("No account saved") } ratingDb.CacheRating(nil, nil, nil, nil) - acntDb.CacheAccounting(nil, nil, nil) + acntDb.CacheAccounting(nil, nil, nil, nil) if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 { t.Error("Wrong number of cached destinations found", cachedDests) } diff --git a/general_tests/ddazmbl2_test.go b/general_tests/ddazmbl2_test.go index 0837c2e59..8d5ea7ac0 100644 --- a/general_tests/ddazmbl2_test.go +++ b/general_tests/ddazmbl2_test.go @@ -103,7 +103,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10` t.Error("No account saved") } ratingDb2.CacheRating(nil, nil, nil, nil) - acntDb2.CacheAccounting(nil, nil, nil) + acntDb2.CacheAccounting(nil, nil, nil, nil) if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 { t.Error("Wrong number of cached destinations found", cachedDests) } diff --git a/general_tests/ddazmbl3_test.go b/general_tests/ddazmbl3_test.go index 92f92d9eb..98144f3a5 100644 --- a/general_tests/ddazmbl3_test.go +++ b/general_tests/ddazmbl3_test.go @@ -101,7 +101,7 @@ cgrates.org,call,*out,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_ t.Error("No account saved") } ratingDb3.CacheRating(nil, nil, nil, nil) - acntDb3.CacheAccounting(nil, nil, nil) + acntDb3.CacheAccounting(nil, nil, nil, nil) if cachedDests := cache2go.CountEntries(engine.DESTINATION_PREFIX); cachedDests != 2 { t.Error("Wrong number of cached destinations found", cachedDests) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index ef2a64cef..e9b9801d5 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -285,19 +285,21 @@ type ApiReloadCache struct { SharedGroupIds []string RpAliases []string AccAliases []string + DerivedChargers []string } type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache } type CacheStats struct { - Destinations int - RatingPlans int - RatingProfiles int - Actions int - SharedGroups int - RatingAliases int - AccountAliases int + Destinations int + RatingPlans int + RatingProfiles int + Actions int + SharedGroups int + RatingAliases int + AccountAliases int + DerivedChargers int } type AttrCachedItemAge struct { @@ -306,13 +308,14 @@ type AttrCachedItemAge struct { } type CachedItemAge struct { - Destination time.Duration - RatingPlan time.Duration - RatingProfile time.Duration - Action time.Duration - SharedGroup time.Duration - RatingAlias time.Duration - AccountAlias time.Duration + Destination time.Duration + RatingPlan time.Duration + RatingProfile time.Duration + Action time.Duration + SharedGroup time.Duration + RatingAlias time.Duration + AccountAlias time.Duration + DerivedChargers time.Duration } type AttrExpFileCdrs struct {