From 46fc12e0cdf942ff5a8d5fcc03cf327e9eeddd3e Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Thu, 4 Sep 2014 17:35:28 +0300 Subject: [PATCH 1/3] fixes for loading apier methods --- apier/apier.go | 43 +++++++++++++++++++++++++++++++++++++---- engine/ratingprofile.go | 2 +- engine/storage_map.go | 22 ++++++++++----------- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/apier/apier.go b/apier/apier.go index 0b7a3666e..b345d1e96 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -183,11 +183,14 @@ func (self *ApierV1) LoadRatingPlan(attrs AttrLoadRatingPlan, reply *string) err if missing := utils.MissingStructFields(&attrs, []string{"TPid", "RatingPlanId"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } + if attrs.RatingPlanId == utils.ANY { + attrs.RatingPlanId = "" + } dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) if loaded, err := dbReader.LoadRatingPlanByTag(attrs.RatingPlanId); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } else if !loaded { - return errors.New("NOT_FOUND") + return errors.New(utils.ERR_NOT_FOUND) } //Automatic cache of the newly inserted rating plan didNotChange := []string{} @@ -200,16 +203,37 @@ func (self *ApierV1) LoadRatingPlan(attrs AttrLoadRatingPlan, reply *string) err // Process dependencies and load a specific rating profile from storDb into dataDb. func (self *ApierV1) LoadRatingProfile(attrs utils.TPRatingProfile, reply *string) error { - if missing := utils.MissingStructFields(&attrs, []string{"TPid", "LoadId", "Tenant", "TOR", "Direction", "Subject"}); len(missing) != 0 { + if missing := utils.MissingStructFields(&attrs, []string{"TPid", "LoadId", "Tenant", "Category", "Direction", "Subject"}); len(missing) != 0 { return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } + + if attrs.LoadId == utils.ANY { + attrs.LoadId = "" + } + if attrs.Tenant == utils.ANY { + attrs.Tenant = "" + } + if attrs.Category == utils.ANY { + attrs.Category = "" + } + if attrs.Direction == utils.ANY { + attrs.Direction = "" + } + if attrs.Subject == utils.ANY { + attrs.Subject = "" + } + dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) if err := dbReader.LoadRatingProfileFiltered(&attrs); err != nil { return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) } //Automatic cache of the newly inserted rating profile didNotChange := []string{} - if err := self.RatingDb.CacheRating(didNotChange, didNotChange, []string{engine.RATING_PROFILE_PREFIX + attrs.KeyId()}, didNotChange, didNotChange); err != nil { + var ratingProfile []string + if attrs.KeyId() != ":::" { // if has some filters + ratingProfile = []string{engine.RATING_PROFILE_PREFIX + attrs.KeyId()} + } + if err := self.RatingDb.CacheRating(didNotChange, didNotChange, ratingProfile, didNotChange, didNotChange); err != nil { return err } *reply = OK @@ -556,7 +580,18 @@ func (self *ApierV1) LoadAccountActions(attrs utils.TPAccountActions, reply *str return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) } dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) - + if attrs.LoadId == utils.ANY { + attrs.LoadId = "" + } + if attrs.Tenant == utils.ANY { + attrs.Tenant = "" + } + if attrs.Account == utils.ANY { + attrs.Account = "" + } + if attrs.Direction == utils.ANY { + attrs.Direction = "" + } if _, err := engine.AccLock.Guard(attrs.KeyId(), func() (float64, error) { if err := dbReader.LoadAccountActionsFiltered(&attrs); err != nil { return 0, err diff --git a/engine/ratingprofile.go b/engine/ratingprofile.go index 3ec16692d..85a877823 100644 --- a/engine/ratingprofile.go +++ b/engine/ratingprofile.go @@ -166,7 +166,7 @@ func (rp *RatingProfile) GetRatingPlansForPrefix(cd *CallDescriptor) (err error) return } - return errors.New("not found") + return errors.New(utils.ERR_NOT_FOUND) } // history record method diff --git a/engine/storage_map.go b/engine/storage_map.go index 9782bb60d..c9296d62e 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -196,7 +196,7 @@ func (ms *MapStorage) GetRatingPlan(key string, skipCache bool) (rp *RatingPlan, err = ms.ms.Unmarshal(out, rp) cache2go.Cache(key, rp) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -231,7 +231,7 @@ func (ms *MapStorage) GetRatingProfile(key string, skipCache bool) (rpf *RatingP err = ms.ms.Unmarshal(values, rpf) cache2go.Cache(key, rpf) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -260,7 +260,7 @@ func (ms *MapStorage) GetLCR(key string, skipCache bool) (lcr *LCR, err error) { err = ms.ms.Unmarshal(values, &lcr) cache2go.Cache(key, lcr) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -285,7 +285,7 @@ func (ms *MapStorage) GetRpAlias(key string, skipCache bool) (alias string, err alias = string(values) cache2go.Cache(key, alias) } else { - return "", errors.New("not found") + return "", errors.New(utils.ERR_NOT_FOUND) } return } @@ -352,7 +352,7 @@ func (ms *MapStorage) GetAccAlias(key string, skipCache bool) (alias string, err alias = string(values) cache2go.Cache(key, alias) } else { - return "", errors.New("not found") + return "", errors.New(utils.ERR_NOT_FOUND) } return } @@ -405,7 +405,7 @@ func (ms *MapStorage) GetDestination(key string) (dest *Destination, err error) cache2go.CachePush(DESTINATION_PREFIX+p, dest.Id) } } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -438,7 +438,7 @@ func (ms *MapStorage) GetActions(key string, skipCache bool) (as Actions, err er err = ms.ms.Unmarshal(values, &as) cache2go.Cache(key, as) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -463,7 +463,7 @@ func (ms *MapStorage) GetSharedGroup(key string, skipCache bool) (sg *SharedGrou err = ms.ms.Unmarshal(values, &sg) cache2go.Cache(key, sg) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -480,7 +480,7 @@ func (ms *MapStorage) GetAccount(key string) (ub *Account, err error) { ub = &Account{Id: key} err = ms.ms.Unmarshal(values, ub) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -502,7 +502,7 @@ func (ms *MapStorage) GetActionTimings(key string) (ats ActionPlan, err error) { if values, ok := ms.dict[ACTION_TIMING_PREFIX+key]; ok { err = ms.ms.Unmarshal(values, &ats) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } @@ -593,7 +593,7 @@ func (ms *MapStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCost, if values, ok := ms.dict[LOG_CALL_COST_PREFIX+source+runid+"_"+cgrid]; ok { err = ms.ms.Unmarshal(values, &cc) } else { - return nil, errors.New("not found") + return nil, errors.New(utils.ERR_NOT_FOUND) } return } From 013ff6c6f247470918dfa5a306fdc36733657479 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Fri, 5 Sep 2014 22:25:20 +0300 Subject: [PATCH 2/3] fixes + easy switch between cache stores --- cache2go/cache.go | 50 +++++------ cache2go/store.go | 189 +++++++++++++++++++++++++++++++++++++--- engine/storage_redis.go | 2 +- engine/storage_test.go | 4 +- 4 files changed, 205 insertions(+), 40 deletions(-) diff --git a/cache2go/cache.go b/cache2go/cache.go index 46ceac596..711b02125 100644 --- a/cache2go/cache.go +++ b/cache2go/cache.go @@ -2,19 +2,17 @@ package cache2go import ( - "errors" "sync" "time" - - "github.com/cgrates/cgrates/utils" ) const ( - PREFIX_LEN = 4 - KIND_ADD = "ADD" - KIND_ADP = "ADP" - KIND_REM = "REM" - KIND_PRF = "PRF" + PREFIX_LEN = 4 + KIND_ADD = "ADD" + KIND_ADP = "ADP" + KIND_REM = "REM" + KIND_PRF = "PRF" + DOUBLE_CACHE = true ) type timestampedValue struct { @@ -32,10 +30,17 @@ type transactionItem struct { kind string } -var ( - cache = make(cacheStore) - mux sync.RWMutex +func init() { + if DOUBLE_CACHE { + cache = newDoubleStore() + } else { + cache = newSimpleStore() + } +} +var ( + mux sync.RWMutex + cache cacheStore // transaction stuff transactionBuffer []transactionItem transactionMux sync.Mutex @@ -146,34 +151,27 @@ func RemPrefixKey(prefix string) { func Flush() { mux.Lock() defer mux.Unlock() - cache = make(cacheStore) + if DOUBLE_CACHE { + cache = newDoubleStore() + } else { + cache = newSimpleStore() + } } func CountEntries(prefix string) (result int64) { mux.RLock() defer mux.RUnlock() - if _, ok := cache[prefix]; ok { - return int64(len(cache[prefix])) - } - return 0 + return cache.CountEntriesForPrefix(prefix) } func GetAllEntries(prefix string) (map[string]timestampedValue, error) { mux.RLock() defer mux.RUnlock() - if keyMap, ok := cache[prefix]; ok { - return keyMap, nil - } - return nil, errors.New(utils.ERR_NOT_FOUND) + return cache.GetAllForPrefix(prefix) } func GetEntriesKeys(prefix string) (keys []string) { mux.RLock() defer mux.RUnlock() - if keyMap, ok := cache[prefix]; ok { - for key := range keyMap { - keys = append(keys, key) - } - } - return + return cache.GetKeysForPrefix(prefix) } diff --git a/cache2go/store.go b/cache2go/store.go index b2436e527..a2148ad33 100644 --- a/cache2go/store.go +++ b/cache2go/store.go @@ -3,14 +3,32 @@ package cache2go import ( "errors" + "strings" "time" "github.com/cgrates/cgrates/utils" ) -type cacheStore map[string]map[string]timestampedValue +type cacheStore interface { + Put(string, interface{}) + Append(string, interface{}) + Get(string) (interface{}, error) + GetAge(string) (time.Duration, error) + Delete(string) + DeletePrefix(string) + CountEntriesForPrefix(string) int64 + GetAllForPrefix(string) (map[string]timestampedValue, error) + GetKeysForPrefix(string) []string +} -func (cs cacheStore) Put(key string, value interface{}) { +// easy to be counted exported by prefix +type cacheDoubleStore map[string]map[string]timestampedValue + +func newDoubleStore() cacheDoubleStore { + return make(cacheDoubleStore) +} + +func (cs cacheDoubleStore) Put(key string, value interface{}) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] if _, ok := cs[prefix]; !ok { cs[prefix] = make(map[string]timestampedValue) @@ -18,7 +36,7 @@ func (cs cacheStore) Put(key string, value interface{}) { cs[prefix][key] = timestampedValue{time.Now(), value} } -func (cs cacheStore) Append(key string, value interface{}) { +func (cs cacheDoubleStore) Append(key string, value interface{}) { var elements []interface{} v, err := cs.Get(key) if err == nil { @@ -38,7 +56,7 @@ func (cs cacheStore) Append(key string, value interface{}) { cache.Put(key, elements) } -func (cs cacheStore) Get(key string) (interface{}, error) { +func (cs cacheDoubleStore) Get(key string) (interface{}, error) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] if keyMap, ok := cs[prefix]; ok { if ti, exists := keyMap[key]; exists { @@ -48,7 +66,7 @@ func (cs cacheStore) Get(key string) (interface{}, error) { return nil, errors.New(utils.ERR_NOT_FOUND) } -func (cs cacheStore) GetAge(key string) (time.Duration, error) { +func (cs cacheDoubleStore) GetAge(key string) (time.Duration, error) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] if keyMap, ok := cs[prefix]; ok { if ti, exists := keyMap[key]; exists { @@ -58,17 +76,166 @@ func (cs cacheStore) GetAge(key string) (time.Duration, error) { return -1, errors.New(utils.ERR_NOT_FOUND) } -func (cs cacheStore) Delete(key string) { +func (cs cacheDoubleStore) Delete(key string) { prefix, key := key[:PREFIX_LEN], key[PREFIX_LEN:] if keyMap, ok := cs[prefix]; ok { - if _, exists := keyMap[key]; exists { - delete(keyMap, key) + delete(keyMap, key) + } +} + +func (cs cacheDoubleStore) DeletePrefix(prefix string) { + delete(cs, prefix) +} + +func (cs cacheDoubleStore) CountEntriesForPrefix(prefix string) int64 { + if _, ok := cs[prefix]; ok { + return int64(len(cs[prefix])) + } + return 0 +} + +func (cs cacheDoubleStore) GetAllForPrefix(prefix string) (map[string]timestampedValue, error) { + if keyMap, ok := cs[prefix]; ok { + return keyMap, nil + } + return nil, errors.New(utils.ERR_NOT_FOUND) +} + +func (cs cacheDoubleStore) GetKeysForPrefix(prefix string) (keys []string) { + prefix, key := prefix[:PREFIX_LEN], prefix[PREFIX_LEN:] + if keyMap, ok := cs[prefix]; ok { + for iterKey := range keyMap { + if len(key) > 0 && strings.HasPrefix(iterKey, key) { + keys = append(keys, prefix+iterKey) + } + } + } + return +} + +// faster to access +type cacheSimpleStore struct { + cache map[string]timestampedValue + counters map[string]int64 +} + +func newSimpleStore() cacheSimpleStore { + return cacheSimpleStore{ + cache: make(map[string]timestampedValue), + counters: make(map[string]int64), + } +} + +func (cs cacheSimpleStore) Put(key string, value interface{}) { + if _, ok := cs.cache[key]; !ok { + // only count if the key is not already there + cs.count(key) + } + cs.cache[key] = timestampedValue{time.Now(), value} +} + +func (cs cacheSimpleStore) Append(key string, value interface{}) { + var elements []interface{} + if ti, exists := cs.cache[key]; exists { + elements = ti.value.([]interface{}) + } + // check if the val is already present + found := false + for _, v := range elements { + if value == v { + found = true + break + } + } + if !found { + elements = append(elements, value) + } + cs.Put(key, elements) +} + +func (cs cacheSimpleStore) Get(key string) (interface{}, error) { + if ti, exists := cs.cache[key]; exists { + return ti.value, nil + } + return nil, errors.New(utils.ERR_NOT_FOUND) +} + +func (cs cacheSimpleStore) GetAge(key string) (time.Duration, error) { + if ti, exists := cs.cache[key]; exists { + return time.Since(ti.timestamp), nil + } + + return -1, errors.New(utils.ERR_NOT_FOUND) +} + +func (cs cacheSimpleStore) Delete(key string) { + if _, ok := cs.cache[key]; ok { + delete(cs.cache, key) + cs.descount(key) + } +} + +func (cs cacheSimpleStore) DeletePrefix(prefix string) { + for key, _ := range cs.cache { + if strings.HasPrefix(key, prefix) { + delete(cs.cache, key) + cs.descount(key) } } } -func (cs cacheStore) DeletePrefix(prefix string) { - if _, ok := cs[prefix]; ok { - delete(cs, prefix) +// increments the counter for the specified key prefix +func (cs cacheSimpleStore) count(key string) { + if len(key) < PREFIX_LEN { + return + } + prefix := key[:PREFIX_LEN] + if _, ok := cs.counters[prefix]; ok { + // increase the value + cs.counters[prefix] += 1 + } else { + cs.counters[prefix] = 1 } } + +// decrements the counter for the specified key prefix +func (cs cacheSimpleStore) descount(key string) { + if len(key) < PREFIX_LEN { + return + } + prefix := key[:PREFIX_LEN] + if value, ok := cs.counters[prefix]; ok && value > 0 { + cs.counters[prefix] -= 1 + } +} + +func (cs cacheSimpleStore) CountEntriesForPrefix(prefix string) int64 { + if _, ok := cs.counters[prefix]; ok { + return cs.counters[prefix] + } + return 0 +} + +func (cs cacheSimpleStore) GetAllForPrefix(prefix string) (map[string]timestampedValue, error) { + result := make(map[string]timestampedValue) + found := false + for key, ti := range cs.cache { + if strings.HasPrefix(key, prefix) { + result[key[PREFIX_LEN:]] = ti + found = true + } + } + if !found { + return nil, errors.New(utils.ERR_NOT_FOUND) + } + return result, nil +} + +func (cs cacheSimpleStore) GetKeysForPrefix(prefix string) (keys []string) { + for key, _ := range cs.cache { + if strings.HasPrefix(key, prefix) { + keys = append(keys, key) + } + } + return +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index f702194b8..566db1ec1 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -485,6 +485,7 @@ func (rs *RedisStorage) RemoveAccAliases(tenantAccounts []*TenantAccount) (err e if tntAcnt.Account != alias { continue } + cache2go.RemKey(key) if _, err = rs.db.Del(key); err != nil { return err } @@ -506,7 +507,6 @@ func (rs *RedisStorage) GetAccountAliases(tenant, account string, skipCache bool } } for _, key := range alsKeys { - tenantPrfx := ACC_ALIAS_PREFIX + tenant + utils.CONCATENATED_KEY_SEP if alsAcnt, err := rs.GetAccAlias(key[len(ACC_ALIAS_PREFIX):], skipCache); err != nil { return nil, err } else if alsAcnt == account { diff --git a/engine/storage_test.go b/engine/storage_test.go index f2bc6c413..6e1fafec5 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -165,12 +165,12 @@ func TestRemRSubjAliases(t *testing.T) { if err := dataStorage.RemoveRpAliases([]*TenantRatingSubject{&TenantRatingSubject{Tenant: "cgrates.org", Subject: "1001"}}); err != nil { t.Error(err) } - if cgrAliases, err := dataStorage.GetRPAliases("cgrates.org", "1001", false); err != nil { + if cgrAliases, err := dataStorage.GetRPAliases("cgrates.org", "1001", true); err != nil { t.Error(err) } else if len(cgrAliases) != 0 { t.Error("Subject aliases not removed: ", cgrAliases) } - if iscAliases, err := dataStorage.GetRPAliases("itsyscom.com", "1001", false); err != nil { // Make sure the aliases were removed at tenant level + if iscAliases, err := dataStorage.GetRPAliases("itsyscom.com", "1001", true); err != nil { // Make sure the aliases were removed at tenant level t.Error(err) } else if !reflect.DeepEqual(iscAliases, []string{"2003"}) { t.Errorf("Unexpected aliases: %v", iscAliases) From 2f739a3897371601eb603d2b9147e41762bae24d Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 9 Sep 2014 14:30:09 +0300 Subject: [PATCH 3/3] added load destinations to apier and stordb --- apier/apier.go | 28 ++++++++++++++++++++++++++++ engine/loader_db.go | 8 ++++++++ 2 files changed, 36 insertions(+) diff --git a/apier/apier.go b/apier/apier.go index b345d1e96..ae51fdd3a 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -173,6 +173,34 @@ func (self *ApierV1) ExecuteAction(attr *AttrExecuteAction, reply *string) error return nil } +type AttrLoadDestination struct { + TPid string + DestinationId string +} + +// Load destinations from storDb into dataDb. +func (self *ApierV1) LoadDestination(attrs AttrLoadDestination, reply *string) error { + if missing := utils.MissingStructFields(&attrs, []string{"TPid", "DestinationId"}); len(missing) != 0 { + return fmt.Errorf("%s:%v", utils.ERR_MANDATORY_IE_MISSING, missing) + } + if attrs.DestinationId == utils.ANY { + attrs.DestinationId = "" + } + dbReader := engine.NewDbReader(self.StorDb, self.RatingDb, self.AccountDb, attrs.TPid) + if loaded, err := dbReader.LoadDestinationByTag(attrs.DestinationId); err != nil { + return fmt.Errorf("%s:%s", utils.ERR_SERVER_ERROR, err.Error()) + } else if !loaded { + return errors.New(utils.ERR_NOT_FOUND) + } + //Automatic cache of the newly inserted rating plan + didNotChange := []string{} + if err := self.RatingDb.CacheRating(nil, didNotChange, didNotChange, didNotChange, didNotChange); err != nil { + return err + } + *reply = OK + return nil +} + type AttrLoadRatingPlan struct { TPid string RatingPlanId string diff --git a/engine/loader_db.go b/engine/loader_db.go index d4b96dbfd..c810ef9a6 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -293,6 +293,14 @@ func (dbr *DbReader) LoadDestinations() (err error) { return } +func (dbr *DbReader) LoadDestinationByTag(tag string) (bool, error) { + destinations, err := dbr.storDb.GetTpDestinations(dbr.tpid, tag) + for _, destination := range destinations { + dbr.dataDb.SetDestination(destination) + } + return len(destinations) > 0, err +} + func (dbr *DbReader) LoadTimings() (err error) { dbr.timings, err = dbr.storDb.GetTpTimings(dbr.tpid, "") return err