ExistsData method on dataDb, ApierV1.ReloadCache method, cgr-loader calling ReloadCache if configured

This commit is contained in:
DanB
2013-11-13 19:27:44 +01:00
parent 4c19a2078b
commit ef9f5fe612
8 changed files with 112 additions and 23 deletions

View File

@@ -316,3 +316,12 @@ func (self *ApierV1) ReloadScheduler(input string, reply *string) error {
*reply = utils.ERR_NOT_FOUND
return errors.New(utils.ERR_NOT_FOUND)
}
func (self *ApierV1) ReloadCache(attrs utils.ApiReloadCache, reply *string) error {
if err := self.DataDb.PreCache(attrs.DestinationIds, attrs.RatingPlanIds); err!= nil {
return err
}
*reply = "OK"
return nil
}

View File

@@ -28,6 +28,8 @@ import (
"github.com/cgrates/cgrates/utils"
"log"
"path"
"net/rpc"
"net/rpc/jsonrpc"
)
var (
@@ -56,7 +58,8 @@ var (
verbose = flag.Bool("verbose", false, "Enable detailed verbose logging output")
fromStorDb = flag.Bool("from_stordb", false, "Load the tariff plan from storDb to dataDb")
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
historyServer = flag.String("history_server", "", "The history server address:port")
historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving")
raterAddress = flag.String("rater_address", cgrConfig.RaterListen, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
rpcEncoding = flag.String("rpc_encoding", "json", "The history server rpc encoding json|gob")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
)
@@ -91,10 +94,36 @@ func main() {
log.Fatalf("Could not open database connection: %v", err)
}
}
var rater *rpc.Client
if !*toStorDb { // Connections to history and rater
if *historyServer != "" { // Init scribeAgent
if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil {
log.Fatalf("Could not connect to history server: %s. Make sure you have properly configured it via -history_server flag." + err.Error())
return
} else {
engine.SetHistoryScribe(scribeAgent)
gob.Register(&engine.Destination{})
defer scribeAgent.Client.Close()
}
} else {
log.Print("WARNING: Rates history archiving is disabled!")
}
if *raterAddress != "" {
if *rpcEncoding == "json" {
rater, err = jsonrpc.Dial("tcp", *raterAddress)
} else {
rater, err = rpc.Dial("tcp", *raterAddress)
}
if err != nil {
log.Fatalf("Could not connect to rater: %s", err.Error())
return
}
} else {
log.Print("WARNING: Rates automatic cache reloading is disabled!")
}
}
var loader engine.TPLoader
if *fromStorDb { // Load Tariff Plan from storDb into dataDb
loader = engine.NewDbReader(storDb, dataDb, *tpid)
} else if *toStorDb { // Import files from a directory into storDb
if *toStorDb { // Import files from a directory into storDb
if *tpid == "" {
log.Fatal("TPid required, please define it via *-tpid* command argument.")
}
@@ -103,6 +132,10 @@ func main() {
log.Fatal(errImport)
}
return
}
if *fromStorDb { // Load Tariff Plan from storDb into dataDb
loader = engine.NewDbReader(storDb, dataDb, *tpid)
} else { // Default load from csv files to dataDb
for fn, v := range engine.FileValidators {
err := engine.ValidateCSVData(path.Join(*dataPath, fn), v.Rule)
@@ -113,16 +146,6 @@ func main() {
loader = engine.NewFileCSVReader(dataDb, ',', utils.DESTINATIONS_CSV, utils.TIMINGS_CSV, utils.RATES_CSV, utils.DESTINATION_RATES_CSV, utils.RATING_PLANS_CSV, utils.RATING_PROFILES_CSV, utils.ACTIONS_CSV, utils.ACTION_TIMINGS_CSV, utils.ACTION_TRIGGERS_CSV, utils.ACCOUNT_ACTIONS_CSV)
}
if *historyServer != "" {
if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil {
log.Fatal("Could not connect to history server:" + err.Error())
return
} else {
engine.SetHistoryScribe(scribeAgent)
gob.Register(&engine.Destination{})
defer scribeAgent.Client.Close()
}
}
err = loader.LoadDestinations()
if err != nil {
log.Fatal(err)
@@ -168,4 +191,13 @@ func main() {
if err := loader.WriteToDatabase(*flush, *verbose); err != nil {
log.Fatal("Could not write to database: ", err)
}
// Reload cache
if rater != nil {
//ToDo: only reload for destinations and rating plans we have loaded
// For this will need to export Destinations and RatingPlans loaded or a method providing their keys
reply := ""
if err = rater.Call("ApierV1.ReloadCache", utils.ApiReloadCache{}, &reply); err!=nil {
log.Fatalf("Got error on cache reload: %s", err.Error())
}
}
}

View File

@@ -277,7 +277,7 @@ func (csvr *CSVReader) LoadDestinationRates() (err error) {
}
}
if !destinationExists {
if dbExists, err := csvr.storage.ExistsDestination(record[1]); err != nil {
if dbExists, err := csvr.storage.ExistsData(DESTINATION, record[1]); err != nil {
return err
} else if !dbExists {
return fmt.Errorf("Could not get destination for tag %v", record[1])
@@ -351,7 +351,7 @@ func (csvr *CSVReader) LoadRatingProfiles() (err error) {
}
_, exists := csvr.ratingPlans[record[5]]
if !exists {
if dbExists, err := csvr.storage.ExistsRatingPlan(record[5]); err != nil {
if dbExists, err := csvr.storage.ExistsData(RATING_PLAN, record[5]); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", record[5]))

View File

@@ -167,9 +167,9 @@ func (dbr *DbReader) LoadDestinationRates() (err error) {
}
}
if !destinationExists {
if dest, err := dbr.dataDb.GetDestination(dr.DestinationsTag); err != nil {
if dbExists, err := dbr.dataDb.ExistsData(DESTINATION, dr.DestinationsTag); err != nil {
return err
} else if dest == nil {
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not get destination for tag %v", dr.DestinationsTag))
}
}
@@ -218,9 +218,9 @@ func (dbr *DbReader) LoadRatingProfiles() error {
}
_, exists := dbr.ratingPlans[rp.DestRatesTimingTag]
if !exists {
if rpl, err := dbr.dataDb.GetRatingPlan(rp.DestRatesTimingTag); err != nil {
if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN, rp.DestRatesTimingTag); err != nil {
return err
} else if rpl == nil {
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", rp.DestRatesTimingTag))
}
}
@@ -265,12 +265,15 @@ func (dbr *DbReader) LoadRatingPlanByTag(tag string) error {
ratingPlan.AddRateInterval(drate.DestinationsTag, rp.GetRateInterval(drate))
dms, err := dbr.storDb.GetTpDestinations(dbr.tpid, drate.DestinationsTag)
if err != nil || len(dms) == 0 {
if dest, err := dbr.dataDb.GetDestination(drate.DestinationsTag); err != nil {
if err != nil {
return err
} else if len(dms) == 0 {
if dbExists, err := dbr.dataDb.ExistsData(DESTINATION, drate.DestinationsTag); err != nil {
return err
} else if dest == nil {
} else if !dbExists {
return fmt.Errorf("Could not get destination for tag %v", drate.DestinationsTag)
}
continue
}
Logger.Debug(fmt.Sprintf("Tag: %s Destinations: %v", drate.DestinationsTag, dms))
for _, destination := range dms {
@@ -295,6 +298,15 @@ func (dbr *DbReader) LoadRatingProfileByTag(tag string) error {
if err != nil {
return fmt.Errorf("Cannot parse activation time from %v", ratingProfile.ActivationTime)
}
// Check if referenced RatingPlan exists
_, exists := dbr.ratingPlans[ratingProfile.DestRatesTimingTag]
if !exists {
if dbExists, err := dbr.dataDb.ExistsData(RATING_PLAN, ratingProfile.DestRatesTimingTag); err != nil {
return err
} else if !dbExists {
return errors.New(fmt.Sprintf("Could not load rating plans for tag: %v", ratingProfile.DestRatesTimingTag))
}
}
resultRatingProfile.RatingPlanActivations = append(resultRatingProfile.RatingPlanActivations, &RatingPlanActivation{at, ratingProfile.DestRatesTimingTag, ratingProfile.FallbackKeys})
}
return dbr.dataDb.SetRatingProfile(resultRatingProfile)

View File

@@ -50,6 +50,9 @@ const (
MEDIATOR_SOURCE = "MED"
SCHED_SOURCE = "SCH"
RATER_SOURCE = "RAT"
DESTINATION = "destination"
RATING_PLAN = "rating_plan"
)
type Storage interface {
@@ -63,6 +66,7 @@ Interface for storage providers.
type DataStorage interface {
Storage
PreCache([]string, []string) error
ExistsData(string, string)(bool, error)
GetRatingPlan(string) (*RatingPlan, error)
SetRatingPlan(*RatingPlan) error
GetRatingProfile(string) (*RatingProfile, error)

View File

@@ -60,6 +60,19 @@ func (ms *MapStorage) PreCache(dKeys, rppKeys []string) error {
return nil
}
// Used to check if specific subject is stored using prefix key attached to entity
func (ms *MapStorage) ExistsData(categ, subject string) (bool, error) {
switch categ {
case DESTINATION:
_, exists := ms.dict[DESTINATION_PREFIX+subject]
return exists, nil
case RATING_PLAN:
_, exists := ms.dict[RATING_PLAN_PREFIX+subject]
return exists, nil
}
return false, errors.New("Unsupported category")
}
func (ms *MapStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) {
if x, err := cache2go.GetCached(key); err == nil {
return x.(*RatingPlan), nil

View File

@@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"time"
"errors"
)
type RedisStorage struct {
@@ -104,6 +105,18 @@ func (rs *RedisStorage) PreCache(dKeys, rpKeys []string) (err error) {
return
}
// Used to check if specific subject is stored using prefix key attached to entity
func (rs *RedisStorage) ExistsData(entity, subject string) (bool, error) {
switch entity {
case DESTINATION:
return rs.db.Exists(DESTINATION_PREFIX+subject)
case RATING_PLAN:
return rs.db.Exists(RATING_PLAN_PREFIX+subject)
}
return false, errors.New("Unsupported entity in ExistsData")
}
func (rs *RedisStorage) GetRatingPlan(key string) (rp *RatingPlan, err error) {
if x, err := cache2go.GetCached(key); err == nil {
return x.(*RatingPlan), nil

View File

@@ -140,3 +140,9 @@ type ApiTPAccountActions struct {
ActionTimingsId string // Id of ActionTimings profile to use
ActionTriggersId string // Id of ActionTriggers profile to use
}
type ApiReloadCache struct {
DestinationIds []string
RatingPlanIds []string
}