From ef9f5fe612fb5e7575afe890c7d2c55b97498acd Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 13 Nov 2013 19:27:44 +0100 Subject: [PATCH] ExistsData method on dataDb, ApierV1.ReloadCache method, cgr-loader calling ReloadCache if configured --- apier/v1/apier.go | 9 ++++++ cmd/cgr-loader/cgr-loader.go | 60 +++++++++++++++++++++++++++--------- engine/loader_csv.go | 4 +-- engine/loader_db.go | 26 +++++++++++----- engine/storage_interface.go | 4 +++ engine/storage_map.go | 13 ++++++++ engine/storage_redis.go | 13 ++++++++ utils/apitpdata.go | 6 ++++ 8 files changed, 112 insertions(+), 23 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 5b9ecd54e..f84bd611d 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -316,3 +316,12 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error { *reply = utils.ERR_NOT_FOUND return errors.New(utils.ERR_NOT_FOUND) } + +func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error { + if err := self.DataDb.PreCache(attrs.DestinationIds, attrs.RatingPlanIds); err!= nil { + return err + } + *reply = "OK" + return nil +} + diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index be457a0a6..2f2138bd7 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -28,6 +28,8 @@ import ( "github.com/cgrates/cgrates/utils" "log" "path" + "net/rpc" + "net/rpc/jsonrpc" ) var ( @@ -56,7 +58,8 @@ var ( verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output") fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb") toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb") - historyServer = flag.String("history_server", "", "The history server address:port") + historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving") + raterAddress = flag.String("rater_address", cgrConfig.RaterListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads") rpcEncoding = flag.String("rpc_encoding", "json", "The history server rpc encoding json|gob") runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields") ) @@ -91,10 +94,36 @@ func main() { log.Fatalf("Could not open database connection: %v", err) } } + var rater *rpc.Client + if !*toStorDb { // Connections to history and rater + if *historyServer != "" { // Init scribeAgent + if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil { + log.Fatalf("Could not connect to history server: %s. Make sure you have properly configured it via -history_server flag." + err.Error()) + return + } else { + engine.SetHistoryScribe(scribeAgent) + gob.Register(&engine.Destination{}) + defer scribeAgent.Client.Close() + } + } else { + log.Print("WARNING: Rates history archiving is disabled!") + } + if *raterAddress != "" { + if *rpcEncoding == "json" { + rater, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + rater, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { + log.Fatalf("Could not connect to rater: %s", err.Error()) + return + } + } else { + log.Print("WARNING: Rates automatic cache reloading is disabled!") + } + } var loader engine.TPLoader - if *fromStorDb { // Load Tariff Plan from storDb into dataDb - loader = engine.NewDbReader(storDb, dataDb, *tpid) - } else if *toStorDb { // Import files from a directory into storDb + if *toStorDb { // Import files from a directory into storDb if *tpid == "" { log.Fatal("TPid required, please define it via *-tpid* command argument.") } @@ -103,6 +132,10 @@ func main() { log.Fatal(errImport) } return + } + + if *fromStorDb { // Load Tariff Plan from storDb into dataDb + loader = engine.NewDbReader(storDb, dataDb, *tpid) } else { // Default load from csv files to dataDb for fn, v := range engine.FileValidators { err := engine.ValidateCSVData(path.Join(*dataPath, fn), v.Rule) @@ -113,16 +146,6 @@ func main() { loader = engine.NewFileCSVReader(dataDb, ',', utils.DESTINATIONS_CSV, utils.TIMINGS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.RATING_PLANS_CSV, utils.RATING_PROFILES_CSV, utils.ACTIONS_CSV, utils.ACTION_TIMINGS_CSV, utils.ACTION_TRIGGERS_CSV, utils.ACCOUNT_ACTIONS_CSV) } - if *historyServer != "" { - if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil { - log.Fatal("Could not connect to history server:" + err.Error()) - return - } else { - engine.SetHistoryScribe(scribeAgent) - gob.Register(&engine.Destination{}) - defer scribeAgent.Client.Close() - } - } err = loader.LoadDestinations() if err != nil { log.Fatal(err) @@ -168,4 +191,13 @@ func main() { if err := loader.WriteToDatabase(*flush, *verbose); err != nil { log.Fatal("Could not write to database: ", err) } + // Reload cache + if rater != nil { + //ToDo: only reload for destinations and rating plans we have loaded + // For this will need to export Destinations and RatingPlans loaded or a method providing their keys + reply := "" + if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{}, &reply); err!=nil { + log.Fatalf("Got error on cache reload: %s", err.Error()) + } + } } diff --git a/engine/loader_csv.go b/engine/loader_csv.go index 5f696c60d..e3447ed66 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -277,7 +277,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) { } } if !destinationExists { - if dbExists, err := csvr.storage.ExistsDestination(record[1]); err != nil { + if dbExists, err := csvr.storage.ExistsData(DESTINATION, record[1]); err != nil { return err } else if !dbExists { return fmt.Errorf("Could not get destination for tag %v", record[1]) @@ -351,7 +351,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) { } _, exists := csvr.ratingPlans[record[5]] if !exists { - if dbExists, err := csvr.storage.ExistsRatingPlan(record[5]); err != nil { + if dbExists, err := csvr.storage.ExistsData(RATING_PLAN, record[5]); err != nil { return err } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5])) diff --git a/engine/loader_db.go b/engine/loader_db.go index ce1e5bb2f..00c054646 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -167,9 +167,9 @@ func (dbr *DbReader) LoadDestinationRates() (err error) { } } if !destinationExists { - if dest, err := dbr.dataDb.GetDestination(dr.DestinationsTag); err != nil { + if dbExists, err := dbr.dataDb.ExistsData(DESTINATION, dr.DestinationsTag); err != nil { return err - } else if dest == nil { + } else if !dbExists { return errors.New(fmt.Sprintf("Could not get destination for tag %v", dr.DestinationsTag)) } } @@ -218,9 +218,9 @@ func (dbr *DbReader) LoadRatingProfiles() error { } _, exists := dbr.ratingPlans[rp.DestRatesTimingTag] if !exists { - if rpl, err := dbr.dataDb.GetRatingPlan(rp.DestRatesTimingTag); err != nil { + if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN, rp.DestRatesTimingTag); err != nil { return err - } else if rpl == nil { + } else if !dbExists { return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", rp.DestRatesTimingTag)) } } @@ -265,12 +265,15 @@ func (dbr *DbReader) LoadRatingPlanByTag(tag string) error { ratingPlan.AddRateInterval(drate.DestinationsTag, rp.GetRateInterval(drate)) dms, err := dbr.storDb.GetTpDestinations(dbr.tpid, drate.DestinationsTag) - if err != nil || len(dms) == 0 { - if dest, err := dbr.dataDb.GetDestination(drate.DestinationsTag); err != nil { + if err != nil { + return err + } else if len(dms) == 0 { + if dbExists, err := dbr.dataDb.ExistsData(DESTINATION, drate.DestinationsTag); err != nil { return err - } else if dest == nil { + } else if !dbExists { return fmt.Errorf("Could not get destination for tag %v", drate.DestinationsTag) } + continue } Logger.Debug(fmt.Sprintf("Tag: %s Destinations: %v", drate.DestinationsTag, dms)) for _, destination := range dms { @@ -295,6 +298,15 @@ func (dbr *DbReader) LoadRatingProfileByTag(tag string) error { if err != nil { return fmt.Errorf("Cannot parse activation time from %v", ratingProfile.ActivationTime) } + // Check if referenced RatingPlan exists + _, exists := dbr.ratingPlans[ratingProfile.DestRatesTimingTag] + if !exists { + if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN, ratingProfile.DestRatesTimingTag); err != nil { + return err + } else if !dbExists { + return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", ratingProfile.DestRatesTimingTag)) + } + } resultRatingProfile.RatingPlanActivations = append(resultRatingProfile.RatingPlanActivations, &RatingPlanActivation{at, ratingProfile.DestRatesTimingTag, ratingProfile.FallbackKeys}) } return dbr.dataDb.SetRatingProfile(resultRatingProfile) diff --git a/engine/storage_interface.go b/engine/storage_interface.go index c2260c773..25470c52c 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -50,6 +50,9 @@ const ( MEDIATOR_SOURCE = "MED" SCHED_SOURCE = "SCH" RATER_SOURCE = "RAT" + DESTINATION = "destination" + RATING_PLAN = "rating_plan" + ) type Storage interface { @@ -63,6 +66,7 @@ Interface for storage providers. type DataStorage interface { Storage PreCache([]string, []string) error + ExistsData(string, string)(bool, error) GetRatingPlan(string) (*RatingPlan, error) SetRatingPlan(*RatingPlan) error GetRatingProfile(string) (*RatingProfile, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 891bab511..cf5ece059 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -60,6 +60,19 @@ func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error { return nil } +// Used to check if specific subject is stored using prefix key attached to entity +func (ms *MapStorage) ExistsData(categ, subject string) (bool, error) { + switch categ { + case DESTINATION: + _, exists := ms.dict[DESTINATION_PREFIX+subject] + return exists, nil + case RATING_PLAN: + _, exists := ms.dict[RATING_PLAN_PREFIX+subject] + return exists, nil + } + return false, errors.New("Unsupported category") +} + func (ms *MapStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { if x, err := cache2go.GetCached(key); err == nil { return x.(*RatingPlan), nil diff --git a/engine/storage_redis.go b/engine/storage_redis.go index d7dfdb897..a85f4e79d 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -30,6 +30,7 @@ import ( "strconv" "strings" "time" + "errors" ) type RedisStorage struct { @@ -104,6 +105,18 @@ func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) { return } +// Used to check if specific subject is stored using prefix key attached to entity +func (rs *RedisStorage) ExistsData(entity, subject string) (bool, error) { + switch entity { + case DESTINATION: + return rs.db.Exists(DESTINATION_PREFIX+subject) + case RATING_PLAN: + return rs.db.Exists(RATING_PLAN_PREFIX+subject) + } + return false, errors.New("Unsupported entity in ExistsData") +} + + func (rs *RedisStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) { if x, err := cache2go.GetCached(key); err == nil { return x.(*RatingPlan), nil diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 64b72f7f4..c39fd103f 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -140,3 +140,9 @@ type ApiTPAccountActions struct { ActionTimingsId string // Id of ActionTimings profile to use ActionTriggersId string // Id of ActionTriggers profile to use } + +type ApiReloadCache struct { + DestinationIds []string + RatingPlanIds []string +} +