first aliases implementation

tests pending
This commit is contained in:
Radu Ioan Fericean
2014-02-26 18:29:49 +02:00
parent 8d98436656
commit 074313b0f8
13 changed files with 195 additions and 23 deletions

View File

@@ -457,7 +457,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
}
func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error {
var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys []string
var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, alsKeys []string
if len(attrs.DestinationIds) > 0 {
dstKeys = make([]string, len(attrs.DestinationIds))
for idx, dId := range attrs.DestinationIds {
@@ -488,7 +488,13 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro
shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId
}
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil {
if len(attrs.Aliases) > 0 {
alsKeys = make([]string, len(attrs.Aliases))
for idx, alias := range attrs.Aliases {
alsKeys[idx] = engine.ALIAS_PREFIX + alias
}
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, alsKeys); err != nil {
return err
}
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil {
@@ -586,7 +592,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
for idx, shgId := range shgIds {
shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil {
aliases, _ := loader.GetLoadedIds(engine.ALIAS_PREFIX)
alsKeys := make([]string, len(aliases))
for idx, alias := range aliases {
alsKeys[idx] = engine.ALIAS_PREFIX + alias
}
if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, alsKeys); err != nil {
return err
}
if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil {

View File

@@ -74,7 +74,7 @@ var (
)
func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) {
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
if err := ratingDb.CacheRating(nil, nil, nil, nil); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error()))
exitChan <- true
return
@@ -119,7 +119,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
}
engine.Logger.Info("Registering Mediator RPC service.")
server.RpcRegister(&mediator.MediatorV1{Medi: medi})
close(chanDone)
}
@@ -204,12 +204,12 @@ func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struc
if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Connecting internally to HistoryServer"))
select {
case <-time.After(1 * time.Minute):
case <-time.After(1 * time.Minute):
engine.Logger.Crit(fmt.Sprintf("<HistoryAgent> Timeout waiting for server to start."))
exitChan <- true
return
case <-chanServerStarted:
}
case <-chanServerStarted:
}
//<-chanServerStarted // If server is not enabled, will have deadlock here
} else { // Connect in iteration since there are chances of concurrency here
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable

View File

@@ -188,11 +188,12 @@ func main() {
rpfIds, _ := loader.GetLoadedIds(engine.RATING_PROFILE_PREFIX)
actIds, _ := loader.GetLoadedIds(engine.ACTION_PREFIX)
shgIds, _ := loader.GetLoadedIds(engine.SHARED_GROUP_PREFIX)
aliases, _ := loader.GetLoadedIds(engine.ALIAS_PREFIX)
// Reload cache first since actions could be calling info from within
if *verbose {
log.Print("Reloading cache")
}
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds}, &reply); err != nil {
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, aliases}, &reply); err != nil {
log.Fatalf("Got error on cache reload: %s", err.Error())
}
actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)

View File

@@ -73,7 +73,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
}
defer accountDb.Close()
engine.SetAccountingStorage(accountDb)
if err := ratingDb.CacheRating(nil, nil, nil); err != nil {
if err := ratingDb.CacheRating(nil, nil, nil, nil); err != nil {
return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error())
}
log.Printf("Runnning %d cycles...", *runs)

View File

@@ -140,6 +140,16 @@ func (cd *CallDescriptor) GetAccountKey() string {
if cd.Account != "" {
subj = cd.Account
}
// check if subject is alias
if rs, err := cache2go.GetCached(ALIAS_PREFIX + RATING_PROFILE_PREFIX + subj); err == nil {
realSubject := rs.(string)
subj = realSubject
if cd.Account != "" {
cd.Account = realSubject
} else {
cd.Subject = realSubject
}
}
return fmt.Sprintf("%s:%s:%s", cd.Direction, cd.Tenant, subj)
}
@@ -282,6 +292,12 @@ func (cd *CallDescriptor) addRatingInfos(ris RatingInfos) bool {
// Constructs the key for the storage lookup.
// The prefixLen is limiting the length of the destination prefix.
func (cd *CallDescriptor) GetKey(subject string) string {
// check if subject is alias
if rs, err := cache2go.GetCached(ALIAS_PREFIX + RATING_PROFILE_PREFIX + subject); err == nil {
realSubject := rs.(string)
subject = realSubject
cd.Subject = realSubject
}
return fmt.Sprintf("%s:%s:%s:%s", cd.Direction, cd.Tenant, cd.TOR, subject)
}
@@ -602,7 +618,7 @@ func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) {
func (cd *CallDescriptor) FlushCache() (err error) {
cache2go.XFlush()
cache2go.Flush()
dataStorage.CacheRating(nil, nil, nil)
dataStorage.CacheRating(nil, nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil)
return nil

View File

@@ -38,6 +38,7 @@ type CSVReader struct {
actions map[string][]*Action
actionsTimings map[string][]*ActionTiming
actionsTriggers map[string][]*ActionTrigger
aliases map[string]string
accountActions []*Account
destinations []*Destination
timings map[string]*utils.TPTiming
@@ -244,6 +245,18 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Println(ub.Id)
}
}
if verbose {
log.Print("Aliases")
}
for key, alias := range csvr.aliases {
err = dataStorage.SetAlias(key, alias)
if err != nil {
return err
}
if verbose {
log.Print(key)
}
}
return
}
@@ -429,6 +442,14 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) {
if err != nil {
return errors.New(fmt.Sprintf("Cannot parse activation time from %v", record[4]))
}
// extract aliases from subject
aliases := strings.Split(subject, ";")
if len(aliases) > 1 {
subject = aliases[0]
for _, alias := range aliases[1:] {
csvr.aliases[RATING_PROFILE_PREFIX+alias] = subject
}
}
key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject)
rp, ok := csvr.ratingProfiles[key]
if !ok {
@@ -638,7 +659,16 @@ func (csvr *CSVReader) LoadAccountActions() (err error) {
defer fp.Close()
}
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
tag := fmt.Sprintf("%s:%s:%s", record[2], record[0], record[1])
tenant, account, direction := record[0], record[1], record[2]
// extract aliases from subject
aliases := strings.Split(account, ";")
if len(aliases) > 1 {
account = aliases[0]
for _, alias := range aliases[1:] {
csvr.aliases[ACCOUNT_PREFIX+alias] = account
}
}
tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account)
aTriggers, exists := csvr.actionsTriggers[record[4]]
if record[4] != "" && !exists {
// only return error if there was something ther for the tag
@@ -738,6 +768,14 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) {
i++
}
return keys, nil
case ALIAS_PREFIX: // aliases
keys := make([]string, len(csvr.aliases))
i := 0
for k := range csvr.aliases {
keys[i] = k
i++
}
return keys, nil
}
return nil, errors.New("Unsupported category")
}

View File

@@ -149,7 +149,7 @@ func init() {
csvr.LoadActionTriggers()
csvr.LoadAccountActions()
csvr.WriteToDatabase(false, false)
dataStorage.CacheRating(nil, nil, nil)
dataStorage.CacheRating(nil, nil, nil, nil)
accountingStorage.CacheAccounting(nil, nil)
}

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"log"
"strings"
"github.com/cgrates/cgrates/utils"
)
@@ -36,6 +37,7 @@ type DbReader struct {
actionsTriggers map[string][]*ActionTrigger
accountActions []*Account
destinations []*Destination
aliases map[string]string
timings map[string]*utils.TPTiming
rates map[string]*utils.TPRate
destinationRates map[string]*utils.TPDestinationRate
@@ -188,6 +190,18 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
log.Println(ub.Id)
}
}
if verbose {
log.Print("Aliases")
}
for key, alias := range dbr.aliases {
err = storage.SetAlias(key, alias)
if err != nil {
return err
}
if verbose {
log.Println(key)
}
}
return
}
@@ -272,6 +286,14 @@ func (dbr *DbReader) LoadRatingProfiles() error {
return err
}
for _, tpRpf := range mpTpRpfs {
// extract aliases from subject
aliases := strings.Split(tpRpf.Subject, ";")
if len(aliases) > 1 {
tpRpf.Subject = aliases[0]
for _, alias := range aliases[1:] {
dbr.aliases[RATING_PLAN_PREFIX+alias] = tpRpf.Subject
}
}
rpf := &RatingProfile{Id: tpRpf.KeyId()}
for _, tpRa := range tpRpf.RatingPlanActivations {
at, err := utils.ParseDate(tpRa.ActivationTime)
@@ -431,7 +453,7 @@ func (dbr *DbReader) LoadActionTimings() (err error) {
}
for atId, ats := range atsMap {
for _, at := range ats {
_, exists := dbr.actions[at.ActionsId]
if !exists {
return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", at.ActionsId))
@@ -490,6 +512,14 @@ func (dbr *DbReader) LoadAccountActions() (err error) {
return err
}
for _, aa := range acs {
// extract aliases from subject
aliases := strings.Split(aa.Account, ";")
if len(aliases) > 1 {
aa.Account = aliases[0]
for _, alias := range aliases[1:] {
dbr.aliases[ACCOUNT_PREFIX+alias] = aa.Account
}
}
aTriggers, exists := dbr.actionsTriggers[aa.ActionTriggersId]
if !exists {
return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", aa.ActionTriggersId))

View File

@@ -35,6 +35,7 @@ const (
ACTION_TIMING_PREFIX = "apl_"
RATING_PLAN_PREFIX = "rpl_"
RATING_PROFILE_PREFIX = "rpf_"
ALIAS_PREFIX = "als_"
ACTION_PREFIX = "act_"
SHARED_GROUP_PREFIX = "shg_"
ACCOUNT_PREFIX = "ubl_"
@@ -69,12 +70,14 @@ Interface for storage providers.
*/
type RatingStorage interface {
Storage
CacheRating([]string, []string, []string) error
CacheRating([]string, []string, []string, []string) error
HasData(string, string) (bool, error)
GetRatingPlan(string, bool) (*RatingPlan, error)
SetRatingPlan(*RatingPlan) error
GetRatingProfile(string, bool) (*RatingProfile, error)
SetRatingProfile(*RatingProfile) error
GetAlias(string, bool) (string, error)
SetAlias(string, string) error
GetDestination(string) (*Destination, error)
SetDestination(*Destination) error
}

View File

@@ -45,7 +45,7 @@ func (ms *MapStorage) Flush() error {
return nil
}
func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error {
func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) error {
if dKeys == nil {
cache2go.RemPrefixKey(DESTINATION_PREFIX)
}
@@ -55,6 +55,9 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error {
if rpfKeys == nil {
cache2go.RemPrefixKey(RATING_PROFILE_PREFIX)
}
if alsKeys == nil {
cache2go.RemPrefixKey(ALIAS_PREFIX)
}
for k, _ := range ms.dict {
if strings.HasPrefix(k, DESTINATION_PREFIX) {
if _, err := ms.GetDestination(k[len(DESTINATION_PREFIX):]); err != nil {
@@ -73,6 +76,12 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error {
return err
}
}
if strings.HasPrefix(k, ALIAS_PREFIX) {
cache2go.RemKey(k)
if _, err := ms.GetAlias(k[len(ALIAS_PREFIX):], true); err != nil {
return err
}
}
}
return nil
}
@@ -141,7 +150,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) {
response := 0
go historyScribe.Record(rp.GetHistoryRecord(), &response)
cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
//cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp)
return
}
@@ -169,7 +178,30 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result
response := 0
go historyScribe.Record(rpf.GetHistoryRecord(), &response)
cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
//cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf)
return
}
func (ms *MapStorage) GetAlias(key string, checkDb bool) (alias string, err error) {
key = ALIAS_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(string), nil
}
if !checkDb {
return "", errors.New(utils.ERR_NOT_FOUND)
}
if values, ok := ms.dict[key]; ok {
alias = string(values)
cache2go.Cache(key, alias)
} else {
return "", errors.New("not found")
}
return
}
func (ms *MapStorage) SetAlias(key, alias string) (err error) {
ms.dict[ALIAS_PREFIX+key] = []byte(alias)
//cache2go.Cache(ALIAS_PREFIX+key, alias)
return
}
@@ -198,7 +230,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) {
ms.dict[DESTINATION_PREFIX+dest.Id] = result
response := 0
go historyScribe.Record(dest.GetHistoryRecord(), &response)
cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
//cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest)
return
}
@@ -222,7 +254,7 @@ func (ms *MapStorage) GetActions(key string, checkDb bool) (as Actions, err erro
func (ms *MapStorage) SetActions(key string, as Actions) (err error) {
result, err := ms.ms.Marshal(&as)
ms.dict[ACTION_PREFIX+key] = result
cache2go.Cache(ACTION_PREFIX+key, as)
//cache2go.Cache(ACTION_PREFIX+key, as)
return
}
@@ -246,7 +278,7 @@ func (ms *MapStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup,
func (ms *MapStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) {
result, err := ms.ms.Marshal(sg)
ms.dict[SHARED_GROUP_PREFIX+key] = result
cache2go.Cache(ACTION_PREFIX+key, sg)
//cache2go.Cache(ACTION_PREFIX+key, sg)
return
}

View File

@@ -67,7 +67,7 @@ func (rs *RedisStorage) Flush() (err error) {
return
}
func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error) {
func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) (err error) {
if dKeys == nil {
Logger.Info("Caching all destinations")
if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil {
@@ -121,6 +121,24 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error)
if len(rpfKeys) != 0 {
Logger.Info("Finished rating profile caching.")
}
if alsKeys == nil {
Logger.Info("Caching all aliases")
if alsKeys, err = rs.db.Keys(ALIAS_PREFIX + "*"); err != nil {
return
}
cache2go.RemPrefixKey(ALIAS_PREFIX)
} else if len(alsKeys) != 0 {
Logger.Info(fmt.Sprintf("Caching aliases: %v", alsKeys))
}
for _, key := range alsKeys {
cache2go.RemKey(key)
if _, err = rs.GetAlias(key[len(ALIAS_PREFIX):], true); err != nil {
return err
}
}
if len(alsKeys) != 0 {
Logger.Info("Finished aliases caching.")
}
return
}
@@ -247,6 +265,28 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) {
return
}
func (rs *RedisStorage) GetAlias(key string, checkDb bool) (alias string, err error) {
key = ALIAS_PREFIX + key
if x, err := cache2go.GetCached(key); err == nil {
return x.(string), nil
}
if !checkDb {
return "", errors.New(utils.ERR_NOT_FOUND)
}
var values []byte
if values, err = rs.db.Get(key); err == nil {
alias = string(values)
cache2go.Cache(key, alias)
}
return
}
func (rs *RedisStorage) SetAlias(key, alias string) (err error) {
err = rs.db.Set(ALIAS_PREFIX+key, []byte(alias))
//cache2go.Cache(ALIAS_PREFIX+key, alias)
return
}
func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) {
key = DESTINATION_PREFIX + key
var values []byte

View File

@@ -99,7 +99,7 @@ func TestCacheRefresh(t *testing.T) {
dataStorage.SetDestination(&Destination{"T11", []string{"0"}})
dataStorage.GetDestination("T11")
dataStorage.SetDestination(&Destination{"T11", []string{"1"}})
dataStorage.CacheRating(nil, nil, nil)
dataStorage.CacheRating(nil, nil, nil, nil)
d, err := dataStorage.GetDestination("T11")
p := d.containsPrefix("1")
if err != nil || p == 0 {

View File

@@ -283,6 +283,7 @@ type ApiReloadCache struct {
RatingProfileIds []string
ActionIds []string
SharedGroupIds []string
Aliases []string
}
type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache