mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
changed the way the destinations are cached
This commit is contained in:
@@ -42,7 +42,7 @@ type ApierV1 struct {
|
||||
}
|
||||
|
||||
func (self *ApierV1) GetDestination(dstId string, reply *engine.Destination) error {
|
||||
if dst, err := self.DataDb.GetDestination(dstId, false); err != nil {
|
||||
if dst, err := self.DataDb.GetDestination(dstId); err != nil {
|
||||
return errors.New(utils.ERR_NOT_FOUND)
|
||||
} else {
|
||||
*reply = *dst
|
||||
|
||||
@@ -135,6 +135,16 @@ func RemKey(key string) {
|
||||
delete(cache, key)
|
||||
}
|
||||
|
||||
func RemPrefixKey(prefix string) {
|
||||
mux.Lock()
|
||||
defer mux.Unlock()
|
||||
for key, _ := range cache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
delete(cache, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func XRemKey(key string) {
|
||||
xMux.Lock()
|
||||
defer xMux.Unlock()
|
||||
@@ -145,6 +155,20 @@ func XRemKey(key string) {
|
||||
}
|
||||
delete(xcache, key)
|
||||
}
|
||||
func XRemPrefixKey(prefix string) {
|
||||
xMux.Lock()
|
||||
defer xMux.Unlock()
|
||||
for key, _ := range xcache {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
if r, ok := xcache[key]; ok {
|
||||
if r.timer() != nil {
|
||||
r.timer().Stop()
|
||||
}
|
||||
}
|
||||
delete(xcache, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all keys from expiraton cache
|
||||
func XFlush() {
|
||||
|
||||
@@ -113,3 +113,27 @@ func TestXGetKeyAge(t *testing.T) {
|
||||
t.Error("Error getting cache key age: ", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemPrefixKey(t *testing.T) {
|
||||
Cache("x_t1", "test")
|
||||
Cache("y_t1", "test")
|
||||
RemPrefixKey("x_")
|
||||
_, errX := GetCached("x_t1")
|
||||
_, errY := GetCached("y_t1")
|
||||
if errX == nil || errY != nil {
|
||||
t.Error("Error removing prefix: ", errX, errY)
|
||||
}
|
||||
}
|
||||
|
||||
func TestXRemPrefixKey(t *testing.T) {
|
||||
a := &myStruct{data: "mama are mere"}
|
||||
a.XCache("x_t1", 10*time.Second, a)
|
||||
a.XCache("y_t1", 10*time.Second, a)
|
||||
|
||||
XRemPrefixKey("x_")
|
||||
_, errX := GetXCached("x_t1")
|
||||
_, errY := GetXCached("y_t1")
|
||||
if errX == nil || errY != nil {
|
||||
t.Error("Error removing prefix: ", errX, errY)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,9 +54,9 @@ func main() {
|
||||
CallDuration: 60 * time.Second,
|
||||
Direction: "*out",
|
||||
TOR: "call",
|
||||
Tenant: "cgrates.org",
|
||||
Subject: "1001",
|
||||
Destination: "+49676016500",
|
||||
Tenant: "185.25.80.254",
|
||||
Subject: "dan",
|
||||
Destination: "+4986517174963",
|
||||
}
|
||||
getter, err := engine.ConfigureDataStorage(utils.REDIS, "127.0.0.1", "6379", "10", "", "", utils.MSGPACK)
|
||||
if err != nil {
|
||||
@@ -75,7 +75,7 @@ func main() {
|
||||
j := 0
|
||||
start := time.Now()
|
||||
for i := 0; i < *runs; i++ {
|
||||
result, err = cd.Debit()
|
||||
result, err = cd.GetCost()
|
||||
if *memprofile != "" {
|
||||
runtime.MemProfileRate = 1
|
||||
runtime.GC()
|
||||
|
||||
@@ -18,23 +18,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
const (
|
||||
LONG_PREFIX_SLICE_LENGTH = 30
|
||||
)
|
||||
import "strings"
|
||||
|
||||
/*
|
||||
Structure that gathers multiple destination prefixes under a common id.
|
||||
*/
|
||||
type Destination struct {
|
||||
Id string
|
||||
Prefixes []string
|
||||
longPrefixesMap map[string]interface{}
|
||||
Id string
|
||||
Prefixes []string
|
||||
}
|
||||
|
||||
// returns prefix precision
|
||||
@@ -42,18 +33,9 @@ func (d *Destination) containsPrefix(prefix string) int {
|
||||
if d == nil {
|
||||
return 0
|
||||
}
|
||||
if d.Prefixes != nil {
|
||||
for _, p := range d.Prefixes {
|
||||
if strings.Index(prefix, p) == 0 {
|
||||
return len(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
if d.longPrefixesMap != nil {
|
||||
for _, p := range utils.SplitPrefix(prefix) {
|
||||
if _, found := d.longPrefixesMap[p]; found {
|
||||
return len(p)
|
||||
}
|
||||
for _, p := range d.Prefixes {
|
||||
if strings.Index(prefix, p) == 0 {
|
||||
return len(p)
|
||||
}
|
||||
}
|
||||
return 0
|
||||
@@ -61,15 +43,8 @@ func (d *Destination) containsPrefix(prefix string) int {
|
||||
|
||||
func (d *Destination) String() (result string) {
|
||||
result = d.Id + ": "
|
||||
if d.Prefixes != nil {
|
||||
for _, k := range d.Prefixes {
|
||||
result += k + ", "
|
||||
}
|
||||
}
|
||||
if d.longPrefixesMap != nil {
|
||||
for k, _ := range d.longPrefixesMap {
|
||||
result += k + ", "
|
||||
}
|
||||
for _, k := range d.Prefixes {
|
||||
result += k + ", "
|
||||
}
|
||||
result = strings.TrimRight(result, ", ")
|
||||
return result
|
||||
@@ -78,13 +53,3 @@ func (d *Destination) String() (result string) {
|
||||
func (d *Destination) AddPrefix(pfx string) {
|
||||
d.Prefixes = append(d.Prefixes, pfx)
|
||||
}
|
||||
|
||||
func (d *Destination) OptimizePrefixes() {
|
||||
if len(d.Prefixes) > LONG_PREFIX_SLICE_LENGTH {
|
||||
d.longPrefixesMap = make(map[string]interface{})
|
||||
for _, p := range d.Prefixes {
|
||||
d.longPrefixesMap[p] = nil
|
||||
}
|
||||
d.Prefixes = nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ package engine
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
|
||||
@@ -44,7 +43,7 @@ func TestDestinationStorageStore(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error storing destination: ", err)
|
||||
}
|
||||
result, err := storageGetter.GetDestination(nationale.Id, false)
|
||||
result, err := storageGetter.GetDestination(nationale.Id)
|
||||
if nationale.containsPrefix("0257") == 0 || nationale.containsPrefix("0256") == 0 || nationale.containsPrefix("0723") == 0 {
|
||||
t.Errorf("Expected %q was %q", nationale, result)
|
||||
}
|
||||
@@ -75,61 +74,39 @@ func TestDestinationContainsPrefixWrong(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDestinationGetExists(t *testing.T) {
|
||||
d, err := storageGetter.GetDestination("NAT", false)
|
||||
d, err := storageGetter.GetDestination("NAT")
|
||||
if err != nil || d == nil {
|
||||
t.Error("Could not get destination: ", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationGetExistsCache(t *testing.T) {
|
||||
storageGetter.GetDestination("NAT", false)
|
||||
if _, err := cache2go.GetCached(DESTINATION_PREFIX + "NAT"); err != nil {
|
||||
storageGetter.GetDestination("NAT")
|
||||
if _, err := cache2go.GetCached(DESTINATION_PREFIX + "0256"); err != nil {
|
||||
t.Error("Destination not cached:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationGetNotExists(t *testing.T) {
|
||||
d, err := storageGetter.GetDestination("not existing", false)
|
||||
d, err := storageGetter.GetDestination("not existing")
|
||||
if d != nil {
|
||||
t.Error("Got false destination: ", d, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationGetNotExistsCache(t *testing.T) {
|
||||
storageGetter.GetDestination("not existing", false)
|
||||
storageGetter.GetDestination("not existing")
|
||||
if d, err := cache2go.GetCached("not existing"); err == nil {
|
||||
t.Error("Bad destination cached: ", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationOptimzeShort(t *testing.T) {
|
||||
d := &Destination{}
|
||||
for i := 0; i < LONG_PREFIX_SLICE_LENGTH; i++ {
|
||||
d.AddPrefix(strconv.Itoa(i))
|
||||
}
|
||||
d.OptimizePrefixes()
|
||||
if d.Prefixes == nil || d.longPrefixesMap != nil {
|
||||
t.Logf("Error optimizing destinations %+v", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDestinationOptimzeLong(t *testing.T) {
|
||||
d := &Destination{}
|
||||
for i := 0; i < LONG_PREFIX_SLICE_LENGTH+1; i++ {
|
||||
d.AddPrefix(strconv.Itoa(i))
|
||||
}
|
||||
d.OptimizePrefixes()
|
||||
if d.Prefixes != nil || d.longPrefixesMap == nil {
|
||||
t.Logf("Error optimizing destinations %+v", d)
|
||||
}
|
||||
}
|
||||
|
||||
/********************************* Benchmarks **********************************/
|
||||
|
||||
func BenchmarkDestinationStorageStoreRestore(b *testing.B) {
|
||||
nationale := &Destination{Id: "nat", Prefixes: []string{"0257", "0256", "0723"}}
|
||||
for i := 0; i < b.N; i++ {
|
||||
storageGetter.SetDestination(nationale)
|
||||
storageGetter.GetDestination(nationale.Id, true)
|
||||
storageGetter.GetDestination(nationale.Id)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error)
|
||||
bestPrecision := 0
|
||||
var rps RateIntervalList
|
||||
for _, p := range utils.SplitPrefix(cd.Destination) {
|
||||
if x, err := cache2go.GetCached(p); err == nil {
|
||||
if x, err := cache2go.GetCached(DESTINATION_PREFIX + p); err == nil {
|
||||
destIds := x.([]string)
|
||||
for _, dId := range destIds {
|
||||
if _, ok := rpl.DestinationRates[dId]; ok {
|
||||
|
||||
@@ -74,7 +74,7 @@ type DataStorage interface {
|
||||
SetRatingPlan(*RatingPlan) error
|
||||
GetRatingProfile(string, bool) (*RatingProfile, error)
|
||||
SetRatingProfile(*RatingProfile) error
|
||||
GetDestination(string, bool) (*Destination, error)
|
||||
GetDestination(string) (*Destination, error)
|
||||
// DestinationContainsPrefix(string, string) (int, error)
|
||||
SetDestination(*Destination) error
|
||||
GetActions(string, bool) (Actions, error)
|
||||
|
||||
@@ -49,11 +49,10 @@ func (ms *MapStorage) PreCache(dKeys, rpKeys, rpfKeys, actKeys []string) error {
|
||||
if dKeys == nil && rpKeys == nil && rpfKeys == nil && actKeys == nil {
|
||||
cache2go.Flush()
|
||||
}
|
||||
cache2go.RemPrefixKey(DESTINATION_PREFIX)
|
||||
for k, _ := range ms.dict {
|
||||
if strings.HasPrefix(k, DESTINATION_PREFIX) {
|
||||
cache2go.RemKey(k)
|
||||
// TODO: here I must delete all optimized prefixes
|
||||
if _, err := ms.GetDestination(k[len(DESTINATION_PREFIX):], true); err != nil {
|
||||
if _, err := ms.GetDestination(k[len(DESTINATION_PREFIX):]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -148,28 +147,20 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetDestination(key string, checkDb bool) (dest *Destination, err error) {
|
||||
func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) {
|
||||
key = DESTINATION_PREFIX + key
|
||||
if x, err := cache2go.GetCached(key); err == nil {
|
||||
return x.(*Destination), nil
|
||||
}
|
||||
if !checkDb {
|
||||
return nil, errors.New(utils.ERR_NOT_FOUND)
|
||||
}
|
||||
if values, ok := ms.dict[key]; ok {
|
||||
dest = &Destination{Id: key}
|
||||
err = ms.ms.Unmarshal(values, dest)
|
||||
// create optimized structure
|
||||
for _, p := range dest.Prefixes {
|
||||
var ids []string
|
||||
if x, err := cache2go.GetCached(p); err == nil {
|
||||
if x, err := cache2go.GetCached(DESTINATION_PREFIX + p); err == nil {
|
||||
ids = x.([]string)
|
||||
}
|
||||
ids = append(ids, dest.Id)
|
||||
cache2go.Cache(p, ids)
|
||||
cache2go.Cache(DESTINATION_PREFIX+p, ids)
|
||||
}
|
||||
dest.OptimizePrefixes()
|
||||
cache2go.Cache(key, dest)
|
||||
} else {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
@@ -77,12 +77,12 @@ func (rs *RedisStorage) PreCache(dKeys, rpKeys, rpfKeys, actKeys []string) (err
|
||||
if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil {
|
||||
return
|
||||
}
|
||||
} else if len(dKeys) != 0 {
|
||||
cache2go.RemPrefixKey(DESTINATION_PREFIX)
|
||||
} else if len(rpKeys) != 0 {
|
||||
Logger.Info(fmt.Sprintf("Caching destinations: %v", dKeys))
|
||||
}
|
||||
for _, key := range dKeys {
|
||||
cache2go.RemKey(key)
|
||||
if _, err = rs.GetDestination(key[len(DESTINATION_PREFIX):], true); err != nil {
|
||||
if _, err = rs.GetDestination(key[len(DESTINATION_PREFIX):]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -222,14 +222,8 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetDestination(key string, checkDb bool) (dest *Destination, err error) {
|
||||
func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) {
|
||||
key = DESTINATION_PREFIX + key
|
||||
if x, err := cache2go.GetCached(key); err == nil {
|
||||
return x.(*Destination), nil
|
||||
}
|
||||
if !checkDb {
|
||||
return nil, errors.New(utils.ERR_NOT_FOUND)
|
||||
}
|
||||
var values []byte
|
||||
if values, err = rs.db.Get(key); len(values) > 0 && err == nil {
|
||||
b := bytes.NewBuffer(values)
|
||||
@@ -247,14 +241,12 @@ func (rs *RedisStorage) GetDestination(key string, checkDb bool) (dest *Destinat
|
||||
// create optimized structure
|
||||
for _, p := range dest.Prefixes {
|
||||
var ids []string
|
||||
if x, err := cache2go.GetCached(p); err == nil {
|
||||
if x, err := cache2go.GetCached(DESTINATION_PREFIX + p); err == nil {
|
||||
ids = x.([]string)
|
||||
}
|
||||
ids = append(ids, dest.Id)
|
||||
cache2go.Cache(p, ids)
|
||||
cache2go.Cache(DESTINATION_PREFIX+p, ids)
|
||||
}
|
||||
dest.OptimizePrefixes()
|
||||
cache2go.Cache(key, dest)
|
||||
} else {
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ func TestMsgpackTime(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStorageDestinationContainsPrefixShort(t *testing.T) {
|
||||
dest, err := storageGetter.GetDestination("NAT", false)
|
||||
dest, err := storageGetter.GetDestination("NAT")
|
||||
precision := dest.containsPrefix("0723")
|
||||
if err != nil || precision != 4 {
|
||||
t.Error("Error finding prefix: ", err, precision)
|
||||
@@ -80,7 +80,7 @@ func TestStorageDestinationContainsPrefixShort(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStorageDestinationContainsPrefixLong(t *testing.T) {
|
||||
dest, err := storageGetter.GetDestination("NAT", false)
|
||||
dest, err := storageGetter.GetDestination("NAT")
|
||||
precision := dest.containsPrefix("0723045326")
|
||||
if err != nil || precision != 4 {
|
||||
t.Error("Error finding prefix: ", err, precision)
|
||||
@@ -88,7 +88,7 @@ func TestStorageDestinationContainsPrefixLong(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStorageDestinationContainsPrefixNotExisting(t *testing.T) {
|
||||
dest, err := storageGetter.GetDestination("NAT", false)
|
||||
dest, err := storageGetter.GetDestination("NAT")
|
||||
precision := dest.containsPrefix("072")
|
||||
if err != nil || precision != 0 {
|
||||
t.Error("Error finding prefix: ", err, precision)
|
||||
@@ -96,11 +96,11 @@ func TestStorageDestinationContainsPrefixNotExisting(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPreCacheRefresh(t *testing.T) {
|
||||
storageGetter.SetDestination(&Destination{"T11", []string{"0"}, nil})
|
||||
storageGetter.GetDestination("T11", false)
|
||||
storageGetter.SetDestination(&Destination{"T11", []string{"1"}, nil})
|
||||
storageGetter.SetDestination(&Destination{"T11", []string{"0"}})
|
||||
storageGetter.GetDestination("T11")
|
||||
storageGetter.SetDestination(&Destination{"T11", []string{"1"}})
|
||||
storageGetter.PreCache(nil, nil, nil, nil)
|
||||
d, err := storageGetter.GetDestination("T11", false)
|
||||
d, err := storageGetter.GetDestination("T11")
|
||||
p := d.containsPrefix("1")
|
||||
if err != nil || p == 0 {
|
||||
t.Error("Error refreshing cache:", d)
|
||||
|
||||
@@ -115,7 +115,7 @@ func (self *TPCSVImporter) importDestinations(fn string) error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
dst := &Destination{record[0], []string{record[1]}, nil}
|
||||
dst := &Destination{record[0], []string{record[1]}}
|
||||
if err := self.StorDb.SetTPDestination(self.TPid, dst); err != nil {
|
||||
if self.Verbose {
|
||||
log.Printf("Ignoring line %d, storDb operational error: <%s> ", lineNr, err.Error())
|
||||
|
||||
@@ -18,6 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Amount of a trafic of a certain type
|
||||
type UnitsCounter struct {
|
||||
Direction string
|
||||
@@ -63,16 +68,20 @@ func (uc *UnitsCounter) addUnits(amount float64, prefix string) {
|
||||
if !mb.HasDestination() {
|
||||
continue
|
||||
}
|
||||
dest, err := storageGetter.GetDestination(mb.DestinationId, false)
|
||||
if err != nil {
|
||||
Logger.Err("Counter: unknown destination: " + mb.DestinationId)
|
||||
continue
|
||||
}
|
||||
precision := dest.containsPrefix(prefix)
|
||||
if precision > 0 {
|
||||
mb.Value += amount
|
||||
counted = true
|
||||
break
|
||||
for _, p := range utils.SplitPrefix(prefix) {
|
||||
if x, err := cache2go.GetCached(DESTINATION_PREFIX + p); err == nil {
|
||||
destIds := x.([]string)
|
||||
for _, dId := range destIds {
|
||||
if dId == mb.DestinationId {
|
||||
mb.Value += amount
|
||||
counted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if counted {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/cache2go"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"strings"
|
||||
@@ -124,14 +125,20 @@ func (ub *UserBalance) getBalancesForPrefix(prefix string, balances BalanceChain
|
||||
continue
|
||||
}
|
||||
if b.DestinationId != "" && b.DestinationId != utils.ANY {
|
||||
dest, err := storageGetter.GetDestination(b.DestinationId, false)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
precision := dest.containsPrefix(prefix)
|
||||
if precision > 0 {
|
||||
b.precision = precision
|
||||
usefulBalances = append(usefulBalances, b)
|
||||
for _, p := range utils.SplitPrefix(prefix) {
|
||||
if x, err := cache2go.GetCached(DESTINATION_PREFIX + p); err == nil {
|
||||
destIds := x.([]string)
|
||||
for _, dId := range destIds {
|
||||
if dId == b.DestinationId {
|
||||
b.precision = len(p)
|
||||
usefulBalances = append(usefulBalances, b)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if b.precision > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
usefulBalances = append(usefulBalances, b)
|
||||
|
||||
Reference in New Issue
Block a user