From 237508e590f77df2421cb22b1e34ef8d5713467e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sat, 14 Dec 2013 12:44:42 +0200 Subject: [PATCH] performance enhancements --- apier/v1/apier.go | 10 ++++- cmd/cgr-engine/cgr-engine.go | 2 +- cmd/cgr-loader/cgr-loader.go | 12 ++--- cmd/stress/cgr-raterstress/cgr-raterstress.py | 10 +++-- cmd/stress/cgr-spansstress/cgr-spansstress.go | 4 ++ docs/api_cache.rst | 1 + engine/calldesc.go | 10 ++--- engine/calldesc_test.go | 2 +- engine/loader_csv_test.go | 5 ++- engine/storage_interface.go | 4 +- engine/storage_map.go | 33 ++++++++++---- engine/storage_redis.go | 44 +++++++++++++++---- engine/storage_test.go | 2 +- utils/apitpdata.go | 5 ++- 14 files changed, 102 insertions(+), 42 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index f58c31c28..364c40278 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -311,7 +311,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { - var dstKeys, rpKeys []string + var dstKeys, rpKeys, rpfKeys []string if len(attrs.DestinationIds) > 0 { dstKeys = make([]string, len(attrs.DestinationIds)) for idx, dId := range attrs.DestinationIds { @@ -324,7 +324,13 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro rpKeys[idx] = engine.RATING_PLAN_PREFIX + rpId } } - if err := self.DataDb.PreCache(dstKeys, rpKeys); err != nil { + if len(attrs.RatingProfileIds) > 0 { + rpfKeys = make([]string, len(attrs.RatingProfileIds)) + for idx, rpfId := range attrs.RatingProfileIds { + rpfKeys[idx] = engine.RATING_PROFILE_PREFIX + rpfId + } + } + if err := self.DataDb.PreCache(dstKeys, rpKeys, rpfKeys); err != nil { return err } *reply = "OK" diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 87ea07f5d..e83197c26 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -328,7 +328,7 @@ func main() { cfg.SchedulerEnabled = *schedEnabled } if cfg.RaterEnabled { - if err := dataDb.PreCache(nil, nil); err != nil { + if err := dataDb.PreCache(nil, nil, nil); err != nil { engine.Logger.Crit(fmt.Sprintf("Pre-caching error: %v", err)) return } diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 643aa6604..78d237902 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -22,14 +22,15 @@ import ( "encoding/gob" "flag" "fmt" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/history" - "github.com/cgrates/cgrates/utils" "log" "net/rpc" "net/rpc/jsonrpc" "path" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/history" + "github.com/cgrates/cgrates/utils" ) var ( @@ -165,11 +166,12 @@ func main() { reply := "" dstIds, _ := loader.GetLoadedIds(engine.DESTINATION_PREFIX) rplIds, _ := loader.GetLoadedIds(engine.RATING_PLAN_PREFIX) + rpfIds, _ := loader.GetLoadedIds(engine.RATING_PROFILE_PREFIX) // Reload cache first since actions could be calling info from within if *verbose { log.Print("Reloading cache") } - if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds}, &reply); err != nil { + if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds}, &reply); err != nil { log.Fatalf("Got error on cache reload: %s", err.Error()) } actIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX) diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.py b/cmd/stress/cgr-raterstress/cgr-raterstress.py index 2e1c3d54e..8993fa4d9 100644 --- a/cmd/stress/cgr-raterstress/cgr-raterstress.py +++ b/cmd/stress/cgr-raterstress/cgr-raterstress.py @@ -2,7 +2,7 @@ # A simple JSONRPC client library, created to work with Go servers # Written by Stephen Day # Modified by Bruce Eckel to work with both Python 2 & 3 -import json, socket, itertools +import json, socket, itertools, time from datetime import datetime class JSONClient(object): @@ -50,7 +50,7 @@ cd = {"Direction":"*out", "Destination": "+49", "TimeStart": "2013-08-07T17:30:00Z", "TimeEnd": "2013-08-07T18:30:00Z", - "CallDuration": "60000000000", + "CallDuration": 60000000000, } # alternative to the above @@ -58,8 +58,12 @@ cd = {"Direction":"*out", #s.sendall(json.dumps({"id": 1, "method": "Responder.GetCost", "params": [cd]})) #print(s.recv(4096)) +start_time = time.time() i = 0 +runs = 1e5 result = "" -for i in range(int(1e5) + 1): +for i in range(int(runs) + 1): result = rpc.call("Responder.GetCost", cd) print(i, result) +duration = time.time() - start_time +print("Elapsed: %ds resulted: %d req/s." % (duration, runs/duration)) diff --git a/cmd/stress/cgr-spansstress/cgr-spansstress.go b/cmd/stress/cgr-spansstress/cgr-spansstress.go index 1e4936942..461c86c91 100644 --- a/cmd/stress/cgr-spansstress/cgr-spansstress.go +++ b/cmd/stress/cgr-spansstress/cgr-spansstress.go @@ -74,6 +74,10 @@ func main() { defer getter.Close() engine.SetDataStorage(getter) + if err := getter.PreCache(nil, nil, nil); err != nil { + log.Printf("Pre-caching error: %v", err) + return + } log.Printf("Runnning %d cycles...", *runs) var result *engine.CallCost diff --git a/docs/api_cache.rst b/docs/api_cache.rst index 795035f03..cce5ea337 100644 --- a/docs/api_cache.rst +++ b/docs/api_cache.rst @@ -18,6 +18,7 @@ Data: type ApiReloadCache struct { DestinationIds []string RatingPlanIds []string + RatingProfileIds []string } Mandatory parameters: none diff --git a/engine/calldesc.go b/engine/calldesc.go index b66131042..f17b4368c 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -48,7 +48,7 @@ func init() { const ( RECURSION_MAX_DEPTH = 3 - FALLBACK_SUBJECT = "*any" + FALLBACK_SUBJECT = utils.ANY ) var ( @@ -165,11 +165,11 @@ func (cd *CallDescriptor) getRatingPlansForPrefix(key string, recursionDepth int err = errors.New("Max fallback recursion depth reached!" + key) return } - rp, err := storageGetter.GetRatingProfile(key) - if err != nil || rp == nil { + rpf, err := storageGetter.GetRatingProfile(key, false) + if err != nil || rpf == nil { return err } - if err = rp.GetRatingPlansForPrefix(cd); err != nil || !cd.continousRatingInfos() { + if err = rpf.GetRatingPlansForPrefix(cd); err != nil || !cd.continousRatingInfos() { // try rating profile fallback recursionDepth++ for index := 0; index < len(cd.RatingInfos); index++ { @@ -578,7 +578,7 @@ func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) { func (cd *CallDescriptor) FlushCache() (err error) { cache2go.XFlush() cache2go.Flush() - storageGetter.PreCache(nil, nil) + storageGetter.PreCache(nil, nil, nil) return nil } diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index 932d63346..a58ed76bc 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -342,7 +342,7 @@ func BenchmarkStorageGetting(b *testing.B) { cd := &CallDescriptor{Direction: "*out", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2} b.StartTimer() for i := 0; i < b.N; i++ { - storageGetter.GetRatingProfile(cd.GetKey(cd.Subject)) + storageGetter.GetRatingProfile(cd.GetKey(cd.Subject), false) } } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index e61320e56..ebc293926 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -19,10 +19,11 @@ along with this program. If not, see package engine import ( - "github.com/cgrates/cgrates/utils" "reflect" "testing" "time" + + "github.com/cgrates/cgrates/utils" ) var ( @@ -140,7 +141,7 @@ func init() { csvr.LoadActionTriggers() csvr.LoadAccountActions() csvr.WriteToDatabase(false, false) - storageGetter.PreCache(nil, nil) + storageGetter.PreCache(nil, nil, nil) } func TestLoadDestinations(t *testing.T) { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 6c33c9957..730dd9fa7 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -68,11 +68,11 @@ Interface for storage providers. */ type DataStorage interface { Storage - PreCache([]string, []string) error + PreCache([]string, []string, []string) error ExistsData(string, string) (bool, error) GetRatingPlan(string, bool) (*RatingPlan, error) SetRatingPlan(*RatingPlan) error - GetRatingProfile(string) (*RatingProfile, error) + GetRatingProfile(string, bool) (*RatingProfile, error) SetRatingProfile(*RatingProfile) error GetDestination(string, bool) (*Destination, error) // DestinationContainsPrefix(string, string) (int, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 2f0439d7b..db9d7b856 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -45,7 +45,7 @@ func (ms *MapStorage) Flush() error { return nil } -func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error { +func (ms *MapStorage) PreCache(dKeys, rpKeys, rpfKeys []string) error { for k, _ := range ms.dict { if strings.HasPrefix(k, DESTINATION_PREFIX) { cache2go.RemKey(k) @@ -59,6 +59,12 @@ func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error { return err } } + if strings.HasPrefix(k, RATING_PROFILE_PREFIX) { + cache2go.RemKey(k) + if _, err := ms.GetRatingProfile(k[len(RATING_PROFILE_PREFIX):], true); err != nil { + return err + } + } } return nil } @@ -104,22 +110,31 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { return } -func (ms *MapStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { - if values, ok := ms.dict[RATING_PROFILE_PREFIX+key]; ok { - rp = new(RatingProfile) +func (ms *MapStorage) GetRatingProfile(key string, checkDb bool) (rpf *RatingProfile, err error) { + key = RATING_PROFILE_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(*RatingProfile), nil + } + if !checkDb { + return nil, errors.New(utils.ERR_NOT_FOUND) + } + if values, ok := ms.dict[key]; ok { + rpf = new(RatingProfile) - err = ms.ms.Unmarshal(values, rp) + err = ms.ms.Unmarshal(values, rpf) + cache2go.Cache(key, rpf) } else { return nil, errors.New("not found") } return } -func (ms *MapStorage) SetRatingProfile(rp *RatingProfile) (err error) { - result, err := ms.ms.Marshal(rp) - ms.dict[RATING_PROFILE_PREFIX+rp.Id] = result +func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { + result, err := ms.ms.Marshal(rpf) + ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 - go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) + go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response) + cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index bfd0adc7b..73bb66aa3 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -68,7 +68,7 @@ func (rs *RedisStorage) Flush() (err error) { return } -func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) { +func (rs *RedisStorage) PreCache(dKeys, rpKeys, rpfKeys []string) (err error) { if dKeys == nil { Logger.Info("Caching all destinations") if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil { @@ -103,6 +103,23 @@ func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) { if len(rpKeys) != 0 { Logger.Info("Finished rating plans caching.") } + if rpfKeys == nil { + Logger.Info("Caching all rating profiles") + if rpfKeys, err = rs.db.Keys(RATING_PROFILE_PREFIX + "*"); err != nil { + return + } + } else if len(rpfKeys) != 0 { + Logger.Info(fmt.Sprintf("Caching rating profile: %v", rpfKeys)) + } + for _, key := range rpfKeys { + cache2go.RemKey(key) + if _, err = rs.GetRatingProfile(key[len(RATING_PROFILE_PREFIX):], true); err != nil { + return err + } + } + if len(rpfKeys) != 0 { + Logger.Info("Finished rating profiles caching.") + } return } @@ -157,22 +174,31 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) { return } -func (rs *RedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) { +func (rs *RedisStorage) GetRatingProfile(key string, checkDb bool) (rpf *RatingProfile, err error) { + key = RATING_PROFILE_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(*RatingProfile), nil + } + if !checkDb { + return nil, errors.New(utils.ERR_NOT_FOUND) + } var values []byte - if values, err = rs.db.Get(RATING_PROFILE_PREFIX + key); err == nil { - rp = new(RatingProfile) - err = rs.ms.Unmarshal(values, rp) + if values, err = rs.db.Get(key); err == nil { + rpf = new(RatingProfile) + err = rs.ms.Unmarshal(values, rpf) + cache2go.Cache(key, rpf) } return } -func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) { - result, err := rs.ms.Marshal(rp) - err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result) +func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { + result, err := rs.ms.Marshal(rpf) + err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result) if err == nil && historyScribe != nil { response := 0 - go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response) + go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response) } + cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) return } diff --git a/engine/storage_test.go b/engine/storage_test.go index 08e078b8a..8b265e899 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -99,7 +99,7 @@ func TestPreCacheRefresh(t *testing.T) { storageGetter.SetDestination(&Destination{"T11", []string{"0"}}) storageGetter.GetDestination("T11", false) storageGetter.SetDestination(&Destination{"T11", []string{"1"}}) - storageGetter.PreCache(nil, nil) + storageGetter.PreCache(nil, nil, nil) if d, err := storageGetter.GetDestination("T11", false); err != nil || d.Prefixes[0] != "1" { t.Error("Error refreshing cache:", d) } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 6a34423fd..c306a4c5a 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -268,6 +268,7 @@ func (self *TPAccountActions) KeyId() string { // Data used to do remote cache reloads via api type ApiReloadCache struct { - DestinationIds []string - RatingPlanIds []string + DestinationIds []string + RatingPlanIds []string + RatingProfileIds []string }