From 074313b0f818384df727cf23f20a39ec314ae4a3 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 26 Feb 2014 18:29:49 +0200 Subject: [PATCH] first aliases implementation tests pending --- apier/apier.go | 17 +++++++++++--- cmd/cgr-engine/cgr-engine.go | 10 ++++---- cmd/cgr-loader/cgr-loader.go | 3 ++- cmd/cgr-tester/cgr-tester.go | 2 +- engine/calldesc.go | 18 ++++++++++++++- engine/loader_csv.go | 40 +++++++++++++++++++++++++++++++- engine/loader_csv_test.go | 2 +- engine/loader_db.go | 32 +++++++++++++++++++++++++- engine/storage_interface.go | 5 +++- engine/storage_map.go | 44 +++++++++++++++++++++++++++++++----- engine/storage_redis.go | 42 +++++++++++++++++++++++++++++++++- engine/storage_test.go | 2 +- utils/apitpdata.go | 1 + 13 files changed, 195 insertions(+), 23 deletions(-) diff --git a/apier/apier.go b/apier/apier.go index 6f4278408..dafe360b3 100644 --- a/apier/apier.go +++ b/apier/apier.go @@ -457,7 +457,7 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { } func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { - var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys []string + var dstKeys, rpKeys, rpfKeys, actKeys, shgKeys, alsKeys []string if len(attrs.DestinationIds) > 0 { dstKeys = make([]string, len(attrs.DestinationIds)) for idx, dId := range attrs.DestinationIds { @@ -488,7 +488,13 @@ func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) erro shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId } } - if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil { + if len(attrs.Aliases) > 0 { + alsKeys = make([]string, len(attrs.Aliases)) + for idx, alias := range attrs.Aliases { + alsKeys[idx] = engine.ALIAS_PREFIX + alias + } + } + if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, alsKeys); err != nil { return err } if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil { @@ -586,7 +592,12 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder, for idx, shgId := range shgIds { shgKeys[idx] = engine.SHARED_GROUP_PREFIX + shgId } - if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys); err != nil { + aliases, _ := loader.GetLoadedIds(engine.ALIAS_PREFIX) + alsKeys := make([]string, len(aliases)) + for idx, alias := range aliases { + alsKeys[idx] = engine.ALIAS_PREFIX + alias + } + if err := self.RatingDb.CacheRating(dstKeys, rpKeys, rpfKeys, alsKeys); err != nil { return err } if err := self.AccountDb.CacheAccounting(actKeys, shgKeys); err != nil { diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index f2cfde492..e368d000c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -74,7 +74,7 @@ var ( ) func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, doneChan chan struct{}) { - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + if err := ratingDb.CacheRating(nil, nil, nil, nil); err != nil { engine.Logger.Crit(fmt.Sprintf("Cache rating error: %s", err.Error())) exitChan <- true return @@ -119,7 +119,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD } engine.Logger.Info("Registering Mediator RPC service.") server.RpcRegister(&mediator.MediatorV1{Medi: medi}) - + close(chanDone) } @@ -204,12 +204,12 @@ func startHistoryAgent(scribeServer history.Scribe, chanServerStarted chan struc if cfg.HistoryServer == utils.INTERNAL { // For internal requests, wait for server to come online before connecting engine.Logger.Crit(fmt.Sprintf(" Connecting internally to HistoryServer")) select { - case <-time.After(1 * time.Minute): + case <-time.After(1 * time.Minute): engine.Logger.Crit(fmt.Sprintf(" Timeout waiting for server to start.")) exitChan <- true return - case <-chanServerStarted: - } + case <-chanServerStarted: + } //<-chanServerStarted // If server is not enabled, will have deadlock here } else { // Connect in iteration since there are chances of concurrency here for i := 0; i < 3; i++ { //ToDo: Make it globally configurable diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index ca6f8c790..4b01cdb93 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -188,11 +188,12 @@ func main() { rpfIds, _ := loader.GetLoadedIds(engine.RATING_PROFILE_PREFIX) actIds, _ := loader.GetLoadedIds(engine.ACTION_PREFIX) shgIds, _ := loader.GetLoadedIds(engine.SHARED_GROUP_PREFIX) + aliases, _ := loader.GetLoadedIds(engine.ALIAS_PREFIX) // Reload cache first since actions could be calling info from within if *verbose { log.Print("Reloading cache") } - if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds}, &reply); err != nil { + if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{dstIds, rplIds, rpfIds, actIds, shgIds, aliases}, &reply); err != nil { log.Fatalf("Got error on cache reload: %s", err.Error()) } actTmgIds, _ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX) diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 8eade503f..2c032f907 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -73,7 +73,7 @@ func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { } defer accountDb.Close() engine.SetAccountingStorage(accountDb) - if err := ratingDb.CacheRating(nil, nil, nil); err != nil { + if err := ratingDb.CacheRating(nil, nil, nil, nil); err != nil { return nilDuration, fmt.Errorf("Cache rating error: %s", err.Error()) } log.Printf("Runnning %d cycles...", *runs) diff --git a/engine/calldesc.go b/engine/calldesc.go index 3f8c79bba..5beefeda7 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -140,6 +140,16 @@ func (cd *CallDescriptor) GetAccountKey() string { if cd.Account != "" { subj = cd.Account } + // check if subject is alias + if rs, err := cache2go.GetCached(ALIAS_PREFIX + RATING_PROFILE_PREFIX + subj); err == nil { + realSubject := rs.(string) + subj = realSubject + if cd.Account != "" { + cd.Account = realSubject + } else { + cd.Subject = realSubject + } + } return fmt.Sprintf("%s:%s:%s", cd.Direction, cd.Tenant, subj) } @@ -282,6 +292,12 @@ func (cd *CallDescriptor) addRatingInfos(ris RatingInfos) bool { // Constructs the key for the storage lookup. // The prefixLen is limiting the length of the destination prefix. func (cd *CallDescriptor) GetKey(subject string) string { + // check if subject is alias + if rs, err := cache2go.GetCached(ALIAS_PREFIX + RATING_PROFILE_PREFIX + subject); err == nil { + realSubject := rs.(string) + subject = realSubject + cd.Subject = realSubject + } return fmt.Sprintf("%s:%s:%s:%s", cd.Direction, cd.Tenant, cd.TOR, subject) } @@ -602,7 +618,7 @@ func (cd *CallDescriptor) AddRecievedCallSeconds() (err error) { func (cd *CallDescriptor) FlushCache() (err error) { cache2go.XFlush() cache2go.Flush() - dataStorage.CacheRating(nil, nil, nil) + dataStorage.CacheRating(nil, nil, nil, nil) accountingStorage.CacheAccounting(nil, nil) return nil diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 09f488006..b34a9aafa 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -38,6 +38,7 @@ type CSVReader struct { actions map[string][]*Action actionsTimings map[string][]*ActionTiming actionsTriggers map[string][]*ActionTrigger + aliases map[string]string accountActions []*Account destinations []*Destination timings map[string]*utils.TPTiming @@ -244,6 +245,18 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { log.Println(ub.Id) } } + if verbose { + log.Print("Aliases") + } + for key, alias := range csvr.aliases { + err = dataStorage.SetAlias(key, alias) + if err != nil { + return err + } + if verbose { + log.Print(key) + } + } return } @@ -429,6 +442,14 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { if err != nil { return errors.New(fmt.Sprintf("Cannot parse activation time from %v", record[4])) } + // extract aliases from subject + aliases := strings.Split(subject, ";") + if len(aliases) > 1 { + subject = aliases[0] + for _, alias := range aliases[1:] { + csvr.aliases[RATING_PROFILE_PREFIX+alias] = subject + } + } key := fmt.Sprintf("%s:%s:%s:%s", direction, tenant, tor, subject) rp, ok := csvr.ratingProfiles[key] if !ok { @@ -638,7 +659,16 @@ func (csvr *CSVReader) LoadAccountActions() (err error) { defer fp.Close() } for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() { - tag := fmt.Sprintf("%s:%s:%s", record[2], record[0], record[1]) + tenant, account, direction := record[0], record[1], record[2] + // extract aliases from subject + aliases := strings.Split(account, ";") + if len(aliases) > 1 { + account = aliases[0] + for _, alias := range aliases[1:] { + csvr.aliases[ACCOUNT_PREFIX+alias] = account + } + } + tag := fmt.Sprintf("%s:%s:%s", direction, tenant, account) aTriggers, exists := csvr.actionsTriggers[record[4]] if record[4] != "" && !exists { // only return error if there was something ther for the tag @@ -738,6 +768,14 @@ func (csvr *CSVReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil + case ALIAS_PREFIX: // aliases + keys := make([]string, len(csvr.aliases)) + i := 0 + for k := range csvr.aliases { + keys[i] = k + i++ + } + return keys, nil } return nil, errors.New("Unsupported category") } diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index c39536cd5..ca746dc5a 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -149,7 +149,7 @@ func init() { csvr.LoadActionTriggers() csvr.LoadAccountActions() csvr.WriteToDatabase(false, false) - dataStorage.CacheRating(nil, nil, nil) + dataStorage.CacheRating(nil, nil, nil, nil) accountingStorage.CacheAccounting(nil, nil) } diff --git a/engine/loader_db.go b/engine/loader_db.go index 78d41894f..fd6e17fdd 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "log" + "strings" "github.com/cgrates/cgrates/utils" ) @@ -36,6 +37,7 @@ type DbReader struct { actionsTriggers map[string][]*ActionTrigger accountActions []*Account destinations []*Destination + aliases map[string]string timings map[string]*utils.TPTiming rates map[string]*utils.TPRate destinationRates map[string]*utils.TPDestinationRate @@ -188,6 +190,18 @@ func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { log.Println(ub.Id) } } + if verbose { + log.Print("Aliases") + } + for key, alias := range dbr.aliases { + err = storage.SetAlias(key, alias) + if err != nil { + return err + } + if verbose { + log.Println(key) + } + } return } @@ -272,6 +286,14 @@ func (dbr *DbReader) LoadRatingProfiles() error { return err } for _, tpRpf := range mpTpRpfs { + // extract aliases from subject + aliases := strings.Split(tpRpf.Subject, ";") + if len(aliases) > 1 { + tpRpf.Subject = aliases[0] + for _, alias := range aliases[1:] { + dbr.aliases[RATING_PLAN_PREFIX+alias] = tpRpf.Subject + } + } rpf := &RatingProfile{Id: tpRpf.KeyId()} for _, tpRa := range tpRpf.RatingPlanActivations { at, err := utils.ParseDate(tpRa.ActivationTime) @@ -431,7 +453,7 @@ func (dbr *DbReader) LoadActionTimings() (err error) { } for atId, ats := range atsMap { for _, at := range ats { - + _, exists := dbr.actions[at.ActionsId] if !exists { return errors.New(fmt.Sprintf("ActionTiming: Could not load the action for tag: %v", at.ActionsId)) @@ -490,6 +512,14 @@ func (dbr *DbReader) LoadAccountActions() (err error) { return err } for _, aa := range acs { + // extract aliases from subject + aliases := strings.Split(aa.Account, ";") + if len(aliases) > 1 { + aa.Account = aliases[0] + for _, alias := range aliases[1:] { + dbr.aliases[ACCOUNT_PREFIX+alias] = aa.Account + } + } aTriggers, exists := dbr.actionsTriggers[aa.ActionTriggersId] if !exists { return errors.New(fmt.Sprintf("Could not get action triggers for tag %v", aa.ActionTriggersId)) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index d2057d5cb..5af838c2b 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -35,6 +35,7 @@ const ( ACTION_TIMING_PREFIX = "apl_" RATING_PLAN_PREFIX = "rpl_" RATING_PROFILE_PREFIX = "rpf_" + ALIAS_PREFIX = "als_" ACTION_PREFIX = "act_" SHARED_GROUP_PREFIX = "shg_" ACCOUNT_PREFIX = "ubl_" @@ -69,12 +70,14 @@ Interface for storage providers. */ type RatingStorage interface { Storage - CacheRating([]string, []string, []string) error + CacheRating([]string, []string, []string, []string) error HasData(string, string) (bool, error) GetRatingPlan(string, bool) (*RatingPlan, error) SetRatingPlan(*RatingPlan) error GetRatingProfile(string, bool) (*RatingProfile, error) SetRatingProfile(*RatingProfile) error + GetAlias(string, bool) (string, error) + SetAlias(string, string) error GetDestination(string) (*Destination, error) SetDestination(*Destination) error } diff --git a/engine/storage_map.go b/engine/storage_map.go index 435d79763..a40161021 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -45,7 +45,7 @@ func (ms *MapStorage) Flush() error { return nil } -func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error { +func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) error { if dKeys == nil { cache2go.RemPrefixKey(DESTINATION_PREFIX) } @@ -55,6 +55,9 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error { if rpfKeys == nil { cache2go.RemPrefixKey(RATING_PROFILE_PREFIX) } + if alsKeys == nil { + cache2go.RemPrefixKey(ALIAS_PREFIX) + } for k, _ := range ms.dict { if strings.HasPrefix(k, DESTINATION_PREFIX) { if _, err := ms.GetDestination(k[len(DESTINATION_PREFIX):]); err != nil { @@ -73,6 +76,12 @@ func (ms *MapStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) error { return err } } + if strings.HasPrefix(k, ALIAS_PREFIX) { + cache2go.RemKey(k) + if _, err := ms.GetAlias(k[len(ALIAS_PREFIX):], true); err != nil { + return err + } + } } return nil } @@ -141,7 +150,7 @@ func (ms *MapStorage) SetRatingPlan(rp *RatingPlan) (err error) { response := 0 go historyScribe.Record(rp.GetHistoryRecord(), &response) - cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) + //cache2go.Cache(RATING_PLAN_PREFIX+rp.Id, rp) return } @@ -169,7 +178,30 @@ func (ms *MapStorage) SetRatingProfile(rpf *RatingProfile) (err error) { ms.dict[RATING_PROFILE_PREFIX+rpf.Id] = result response := 0 go historyScribe.Record(rpf.GetHistoryRecord(), &response) - cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) + //cache2go.Cache(RATING_PROFILE_PREFIX+rpf.Id, rpf) + return +} + +func (ms *MapStorage) GetAlias(key string, checkDb bool) (alias string, err error) { + key = ALIAS_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(string), nil + } + if !checkDb { + return "", errors.New(utils.ERR_NOT_FOUND) + } + if values, ok := ms.dict[key]; ok { + alias = string(values) + cache2go.Cache(key, alias) + } else { + return "", errors.New("not found") + } + return +} + +func (ms *MapStorage) SetAlias(key, alias string) (err error) { + ms.dict[ALIAS_PREFIX+key] = []byte(alias) + //cache2go.Cache(ALIAS_PREFIX+key, alias) return } @@ -198,7 +230,7 @@ func (ms *MapStorage) SetDestination(dest *Destination) (err error) { ms.dict[DESTINATION_PREFIX+dest.Id] = result response := 0 go historyScribe.Record(dest.GetHistoryRecord(), &response) - cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) + //cache2go.Cache(DESTINATION_PREFIX+dest.Id, dest) return } @@ -222,7 +254,7 @@ func (ms *MapStorage) GetActions(key string, checkDb bool) (as Actions, err erro func (ms *MapStorage) SetActions(key string, as Actions) (err error) { result, err := ms.ms.Marshal(&as) ms.dict[ACTION_PREFIX+key] = result - cache2go.Cache(ACTION_PREFIX+key, as) + //cache2go.Cache(ACTION_PREFIX+key, as) return } @@ -246,7 +278,7 @@ func (ms *MapStorage) GetSharedGroup(key string, checkDb bool) (sg *SharedGroup, func (ms *MapStorage) SetSharedGroup(key string, sg *SharedGroup) (err error) { result, err := ms.ms.Marshal(sg) ms.dict[SHARED_GROUP_PREFIX+key] = result - cache2go.Cache(ACTION_PREFIX+key, sg) + //cache2go.Cache(ACTION_PREFIX+key, sg) return } diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 6db56f9ae..ce4c0814d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -67,7 +67,7 @@ func (rs *RedisStorage) Flush() (err error) { return } -func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error) { +func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys, alsKeys []string) (err error) { if dKeys == nil { Logger.Info("Caching all destinations") if dKeys, err = rs.db.Keys(DESTINATION_PREFIX + "*"); err != nil { @@ -121,6 +121,24 @@ func (rs *RedisStorage) CacheRating(dKeys, rpKeys, rpfKeys []string) (err error) if len(rpfKeys) != 0 { Logger.Info("Finished rating profile caching.") } + if alsKeys == nil { + Logger.Info("Caching all aliases") + if alsKeys, err = rs.db.Keys(ALIAS_PREFIX + "*"); err != nil { + return + } + cache2go.RemPrefixKey(ALIAS_PREFIX) + } else if len(alsKeys) != 0 { + Logger.Info(fmt.Sprintf("Caching aliases: %v", alsKeys)) + } + for _, key := range alsKeys { + cache2go.RemKey(key) + if _, err = rs.GetAlias(key[len(ALIAS_PREFIX):], true); err != nil { + return err + } + } + if len(alsKeys) != 0 { + Logger.Info("Finished aliases caching.") + } return } @@ -247,6 +265,28 @@ func (rs *RedisStorage) SetRatingProfile(rpf *RatingProfile) (err error) { return } +func (rs *RedisStorage) GetAlias(key string, checkDb bool) (alias string, err error) { + key = ALIAS_PREFIX + key + if x, err := cache2go.GetCached(key); err == nil { + return x.(string), nil + } + if !checkDb { + return "", errors.New(utils.ERR_NOT_FOUND) + } + var values []byte + if values, err = rs.db.Get(key); err == nil { + alias = string(values) + cache2go.Cache(key, alias) + } + return +} + +func (rs *RedisStorage) SetAlias(key, alias string) (err error) { + err = rs.db.Set(ALIAS_PREFIX+key, []byte(alias)) + //cache2go.Cache(ALIAS_PREFIX+key, alias) + return +} + func (rs *RedisStorage) GetDestination(key string) (dest *Destination, err error) { key = DESTINATION_PREFIX + key var values []byte diff --git a/engine/storage_test.go b/engine/storage_test.go index 1ed0cd72d..026ebbe3f 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -99,7 +99,7 @@ func TestCacheRefresh(t *testing.T) { dataStorage.SetDestination(&Destination{"T11", []string{"0"}}) dataStorage.GetDestination("T11") dataStorage.SetDestination(&Destination{"T11", []string{"1"}}) - dataStorage.CacheRating(nil, nil, nil) + dataStorage.CacheRating(nil, nil, nil, nil) d, err := dataStorage.GetDestination("T11") p := d.containsPrefix("1") if err != nil || p == 0 { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index af59989a1..7b74d6cee 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -283,6 +283,7 @@ type ApiReloadCache struct { RatingProfileIds []string ActionIds []string SharedGroupIds []string + Aliases []string } type AttrCacheStats struct { // Add in the future filters here maybe so we avoid counting complete cache