performance enhancements

This commit is contained in:
Radu Ioan Fericean
2013-12-14 12:44:42 +02:00
parent 59543849d4
commit 237508e590
14 changed files with 102 additions and 42 deletions

View File

@@ -311,7 +311,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
}
func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error {
var dstKeys, rpKeys []string
var dstKeys, rpKeys, rpfKeys []string
if len(attrs.DestinationIds) > 0 {
dstKeys = make([]string, len(attrs.DestinationIds))
for idx, dId := range attrs.DestinationIds {
@@ -324,7 +324,13 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro
rpKeys[idx] = engine.RATING_PLAN_PREFIX + rpId
}
}
if err := self.DataDb.PreCache(dstKeys, rpKeys); err != nil {
if len(attrs.RatingProfileIds) > 0 {
rpfKeys = make([]string, len(attrs.RatingProfileIds))
for idx, rpfId := range attrs.RatingProfileIds {
rpfKeys[idx] = engine.RATING_PROFILE_PREFIX + rpfId
}
}
if err := self.DataDb.PreCache(dstKeys, rpKeys, rpfKeys); err != nil {
return err
}
*reply = "OK"

View File

@@ -328,7 +328,7 @@ func main() {
cfg.SchedulerEnabled = *schedEnabled
}
if cfg.RaterEnabled {
if err := dataDb.PreCache(nil, nil); err != nil {
if err := dataDb.PreCache(nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Pre-caching error: %v", err))
return
}

View File

@@ -22,14 +22,15 @@ import (
"encoding/gob"
"flag"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
"log"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
"github.com/cgrates/cgrates/utils"
)
var (
@@ -165,11 +166,12 @@ func main() {
reply := ""
dstIds, _ := loader.GetLoadedIds(engine.DESTINATION_PREFIX)
rplIds, _ := loader.GetLoadedIds(engine.RATING_PLAN_PREFIX)
rpfIds, _ := loader.GetLoadedIds(engine.RATING_PROFILE_PREFIX)
// Reload cache first since actions could be calling info from within
if *verbose {
log.Print("Reloading cache")
}
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds}, &reply); err != nil {
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds}, &reply); err != nil {
log.Fatalf("Got error on cache reload: %s", err.Error())
}
actIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)

View File

@@ -2,7 +2,7 @@
# A simple JSONRPC client library, created to work with Go servers
# Written by Stephen Day
# Modified by Bruce Eckel to work with both Python 2 & 3
import json, socket, itertools
import json, socket, itertools, time
from datetime import datetime
class JSONClient(object):
@@ -50,7 +50,7 @@ cd = {"Direction":"*out",
"Destination": "+49",
"TimeStart": "2013-08-07T17:30:00Z",
"TimeEnd": "2013-08-07T18:30:00Z",
"CallDuration": "60000000000",
"CallDuration": 60000000000,
}
# alternative to the above
@@ -58,8 +58,12 @@ cd = {"Direction":"*out",
#s.sendall(json.dumps({"id": 1, "method": "Responder.GetCost", "params": [cd]}))
#print(s.recv(4096))
start_time = time.time()
i = 0
runs = 1e5
result = ""
for i in range(int(1e5) + 1):
for i in range(int(runs) + 1):
result = rpc.call("Responder.GetCost", cd)
print(i, result)
duration = time.time() - start_time
print("Elapsed: %ds resulted: %d req/s." % (duration, runs/duration))

View File

@@ -74,6 +74,10 @@ func main() {
defer getter.Close()
engine.SetDataStorage(getter)
if err := getter.PreCache(nil, nil, nil); err != nil {
log.Printf("Pre-caching error: %v", err)
return
}
log.Printf("Runnning %d cycles...", *runs)
var result *engine.CallCost

View File

@@ -18,6 +18,7 @@ Data:
type ApiReloadCache struct {
DestinationIds []string
RatingPlanIds []string
RatingProfileIds []string
}
Mandatory parameters: none

View File

@@ -48,7 +48,7 @@ func init() {
const (
RECURSION_MAX_DEPTH = 3
FALLBACK_SUBJECT = "*any"
FALLBACK_SUBJECT = utils.ANY
)
var (
@@ -165,11 +165,11 @@ func (cd *CallDescriptor) getRatingPlansForPrefix(key string, recursionDepth int
err = errors.New("Max fallback recursion depth reached!" + key)
return
}
rp, err := storageGetter.GetRatingProfile(key)
if err != nil || rp == nil {
rpf, err := storageGetter.GetRatingProfile(key, false)
if err != nil || rpf == nil {
return err
}
if err = rp.GetRatingPlansForPrefix(cd); err != nil || !cd.continousRatingInfos() {
if err = rpf.GetRatingPlansForPrefix(cd); err != nil || !cd.continousRatingInfos() {
// try rating profile fallback
recursionDepth++
for index := 0; index < len(cd.RatingInfos); index++ {
@@ -578,7 +578,7 @@ func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) {
func (cd *CallDescriptor) FlushCache() (err error) {
cache2go.XFlush()
cache2go.Flush()
storageGetter.PreCache(nil, nil)
storageGetter.PreCache(nil, nil, nil)
return nil
}

View File

@@ -342,7 +342,7 @@ func BenchmarkStorageGetting(b *testing.B) {
cd := &CallDescriptor{Direction: "*out", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2}
b.StartTimer()
for i := 0; i < b.N; i++ {
storageGetter.GetRatingProfile(cd.GetKey(cd.Subject))
storageGetter.GetRatingProfile(cd.GetKey(cd.Subject), false)
}
}

View File

@@ -19,10 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"github.com/cgrates/cgrates/utils"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
var (
@@ -140,7 +141,7 @@ func init() {
csvr.LoadActionTriggers()
csvr.LoadAccountActions()
csvr.WriteToDatabase(false, false)
storageGetter.PreCache(nil, nil)
storageGetter.PreCache(nil, nil, nil)
}
func TestLoadDestinations(t *testing.T) {

View File

@@ -68,11 +68,11 @@ Interface for storage providers.
*/
type DataStorage interface {
Storage
PreCache([]string, []string) error
PreCache([]string, []string, []string) error
ExistsData(string, string) (bool, error)
GetRatingPlan(string, bool) (*RatingPlan, error)
SetRatingPlan(*RatingPlan) error
GetRatingProfile(string) (*RatingProfile, error)
GetRatingProfile(string, bool) (*RatingProfile, error)
SetRatingProfile(*RatingProfile) error
GetDestination(string, bool) (*Destination, error)
// DestinationContainsPrefix(string, string) (int, error)

View File

@@ -45,7 +45,7 @@ func (ms *MapStorage) Flush() error {
return nil
}
func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error {
func (ms *MapStorage) PreCache(dKeys, rpKeys, rpfKeys []string) error {
for k, _ := range ms.dict {
if strings.HasPrefix(k, DESTINATION_PREFIX) {
cache2go.RemKey(k)
@@ -59,6 +59,12 @@ func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error {
return err
}
}
if strings.HasPrefix(k, RATING_PROFILE_PREFIX) {
cache2go.RemKey(k)
if _, err := ms.GetRatingProfile(k[len(RATING_PROFILE_PREFIX):], true); err != nil {
return err
}
}
}
return nil
}
@@ -104,22 +110,31 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
return
}
func (ms *MapStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) {
if values, ok := ms.dict[RATING_PROFILE_PREFIX+key]; ok {
rp = new(RatingProfile)
func (ms *MapStorage) GetRatingProfile(key string, checkDb bool) (rpf *RatingProfile, err error) {
key = RATING_PROFILE_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(*RatingProfile), nil
}
if !checkDb {
return nil, errors.New(utils.ERR_NOT_FOUND)
}
if values, ok := ms.dict[key]; ok {
rpf = new(RatingProfile)
err = ms.ms.Unmarshal(values, rp)
err = ms.ms.Unmarshal(values, rpf)
cache2go.Cache(key, rpf)
} else {
return nil, errors.New("not found")
}
return
}
func (ms *MapStorage) SetRatingProfile(rp *RatingProfile) (err error) {
result, err := ms.ms.Marshal(rp)
ms.dict[RATING_PROFILE_PREFIX+rp.Id] = result
func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
result, err := ms.ms.Marshal(rpf)
ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response)
go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response)
cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}

View File

@@ -68,7 +68,7 @@ func (rs *RedisStorage) Flush() (err error) {
return
}
func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) {
func (rs *RedisStorage) PreCache(dKeys, rpKeys, rpfKeys []string) (err error) {
if dKeys == nil {
Logger.Info("Caching all destinations")
if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil {
@@ -103,6 +103,23 @@ func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) {
if len(rpKeys) != 0 {
Logger.Info("Finished rating plans caching.")
}
if rpfKeys == nil {
Logger.Info("Caching all rating profiles")
if rpfKeys, err = rs.db.Keys(RATING_PROFILE_PREFIX + "*"); err != nil {
return
}
} else if len(rpfKeys) != 0 {
Logger.Info(fmt.Sprintf("Caching rating profile: %v", rpfKeys))
}
for _, key := range rpfKeys {
cache2go.RemKey(key)
if _, err = rs.GetRatingProfile(key[len(RATING_PROFILE_PREFIX):], true); err != nil {
return err
}
}
if len(rpfKeys) != 0 {
Logger.Info("Finished rating profiles caching.")
}
return
}
@@ -157,22 +174,31 @@ func (rs *RedisStorage) SetRatingPlan(rp *RatingPlan) (err error) {
return
}
func (rs *RedisStorage) GetRatingProfile(key string) (rp *RatingProfile, err error) {
func (rs *RedisStorage) GetRatingProfile(key string, checkDb bool) (rpf *RatingProfile, err error) {
key = RATING_PROFILE_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(*RatingProfile), nil
}
if !checkDb {
return nil, errors.New(utils.ERR_NOT_FOUND)
}
var values []byte
if values, err = rs.db.Get(RATING_PROFILE_PREFIX + key); err == nil {
rp = new(RatingProfile)
err = rs.ms.Unmarshal(values, rp)
if values, err = rs.db.Get(key); err == nil {
rpf = new(RatingProfile)
err = rs.ms.Unmarshal(values, rpf)
cache2go.Cache(key, rpf)
}
return
}
func (rs *RedisStorage) SetRatingProfile(rp *RatingProfile) (err error) {
result, err := rs.ms.Marshal(rp)
err = rs.db.Set(RATING_PROFILE_PREFIX+rp.Id, result)
func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
result, err := rs.ms.Marshal(rpf)
err = rs.db.Set(RATING_PROFILE_PREFIX+rpf.Id, result)
if err == nil && historyScribe != nil {
response := 0
go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rp.Id, rp}, &response)
go historyScribe.Record(&history.Record{RATING_PROFILE_PREFIX + rpf.Id, rpf}, &response)
}
cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}

View File

@@ -99,7 +99,7 @@ func TestPreCacheRefresh(t *testing.T) {
storageGetter.SetDestination(&Destination{"T11", []string{"0"}})
storageGetter.GetDestination("T11", false)
storageGetter.SetDestination(&Destination{"T11", []string{"1"}})
storageGetter.PreCache(nil, nil)
storageGetter.PreCache(nil, nil, nil)
if d, err := storageGetter.GetDestination("T11", false); err != nil || d.Prefixes[0] != "1" {
t.Error("Error refreshing cache:", d)
}

View File

@@ -268,6 +268,7 @@ func (self *TPAccountActions) KeyId() string {
// Data used to do remote cache reloads via api
type ApiReloadCache struct {
DestinationIds []string
RatingPlanIds []string
DestinationIds []string
RatingPlanIds []string
RatingProfileIds []string
}