Add Caching option for LoaderS

This commit is contained in:
TeoV
2019-08-22 11:36:30 +03:00
committed by Dan Christian Bogos
parent b33d623e35
commit ddb242191c
19 changed files with 112 additions and 62 deletions

View File

@@ -269,10 +269,10 @@ func (self *ApierV1) LoadSharedGroup(attrs AttrLoadSharedGroup, reply *string) e
type AttrLoadTpFromStorDb struct {
TPid string
FlushDb bool // Flush dataDB before loading
DryRun bool // Only simulate, no write
Validate bool // Run structural checks
ArgDispatcher *utils.ArgDispatcher
Caching *string // Caching strategy
}
// Loads complete data in a TP from storDb
@@ -296,12 +296,17 @@ func (self *ApierV1) LoadTariffPlanFromStorDb(attrs AttrLoadTpFromStorDb, reply
*reply = utils.OK
return nil // Mission complete, no errors
}
if err := dbReader.WriteToDatabase(attrs.FlushDb, false, false); err != nil {
if err := dbReader.WriteToDatabase(false, false); err != nil {
return utils.NewErrServerError(err)
}
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if attrs.Caching != nil {
caching = *attrs.Caching
}
// reload cache
utils.Logger.Info("ApierV1.LoadTariffPlanFromStorDb, reloading cache.")
if err := dbReader.ReloadCache(attrs.FlushDb, true, attrs.ArgDispatcher); err != nil {
if err := dbReader.ReloadCache(caching, true, attrs.ArgDispatcher); err != nil {
return utils.NewErrServerError(err)
}
@@ -809,12 +814,17 @@ func (self *ApierV1) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
}
// write data intro Database
if err := loader.WriteToDatabase(attrs.FlushDb, false, false); err != nil {
if err := loader.WriteToDatabase(false, false); err != nil {
return utils.NewErrServerError(err)
}
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if attrs.Caching != nil {
caching = *attrs.Caching
}
// reload cache
utils.Logger.Info("ApierV1.LoadTariffPlanFromFolder, reloading cache.")
if err := loader.ReloadCache(attrs.FlushDb, true, attrs.ArgDispatcher); err != nil {
if err := loader.ReloadCache(caching, true, attrs.ArgDispatcher); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
@@ -861,9 +871,14 @@ func (self *ApierV1) RemoveTPFromFolder(attrs utils.AttrLoadTpFromFolder, reply
if err := loader.RemoveFromDatabase(false, false); err != nil {
return utils.NewErrServerError(err)
}
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if attrs.Caching != nil {
caching = *attrs.Caching
}
// reload cache
utils.Logger.Info("ApierV1.RemoveTPFromFolder, reloading cache.")
if err := loader.ReloadCache(attrs.FlushDb, true, attrs.ArgDispatcher); err != nil {
if err := loader.ReloadCache(caching, true, attrs.ArgDispatcher); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
@@ -896,9 +911,14 @@ func (self *ApierV1) RemoveTPFromStorDB(attrs AttrLoadTpFromStorDb, reply *strin
if err := dbReader.RemoveFromDatabase(false, false); err != nil {
return utils.NewErrServerError(err)
}
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if attrs.Caching != nil {
caching = *attrs.Caching
}
// reload cache
utils.Logger.Info("ApierV1.RemoveTPFromStorDB, reloading cache.")
if err := dbReader.ReloadCache(attrs.FlushDb, true, attrs.ArgDispatcher); err != nil {
if err := dbReader.ReloadCache(caching, true, attrs.ArgDispatcher); err != nil {
return utils.NewErrServerError(err)
}

View File

@@ -122,12 +122,17 @@ func (self *ApierV2) LoadTariffPlanFromFolder(attrs utils.AttrLoadTpFromFolder,
}
}
if err := loader.WriteToDatabase(attrs.FlushDb, false, false); err != nil {
if err := loader.WriteToDatabase(false, false); err != nil {
return utils.NewErrServerError(err)
}
utils.Logger.Info("ApierV2.LoadTariffPlanFromFolder, reloading cache.")
if err := loader.ReloadCache(attrs.FlushDb, true, attrs.ArgDispatcher); err != nil {
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if attrs.Caching != nil {
caching = *attrs.Caching
}
if err := loader.ReloadCache(caching, true, attrs.ArgDispatcher); err != nil {
return utils.NewErrServerError(err)
}
loadHistList, err := self.DataManager.DataDB().GetLoadHistory(1, true, utils.NonTransactional)

View File

@@ -68,8 +68,8 @@ var (
storDBPasswd = cgrLoaderFlags.String("stordb_passwd", dfltCfg.StorDbCfg().StorDBPass,
"The storDb user's password.")
flush = cgrLoaderFlags.Bool("flushdb", false,
"Flush the database before importing")
cachingArg = cgrLoaderFlags.String("caching", "",
"Cache option to do when load tp")
tpid = cgrLoaderFlags.String("tpid", dfltCfg.LoaderCgrCfg().TpID,
"The tariff plan ID from the database")
dataPath = cgrLoaderFlags.String("path", dfltCfg.LoaderCgrCfg().DataPath,
@@ -333,11 +333,15 @@ func main() {
if !*remove {
// write maps to database
if err := tpReader.WriteToDatabase(*flush, *verbose, *disableReverse); err != nil {
if err := tpReader.WriteToDatabase(*verbose, *disableReverse); err != nil {
log.Fatal("Could not write to database: ", err)
}
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if cachingArg != nil && *cachingArg != utils.EmptyString {
caching = *cachingArg
}
// reload cache
if err := tpReader.ReloadCache(*flush, *verbose, &utils.ArgDispatcher{
if err := tpReader.ReloadCache(caching, *verbose, &utils.ArgDispatcher{
APIKey: apiKey,
RouteID: routeID,
}); err != nil {

View File

@@ -378,7 +378,7 @@ func LoadTariffPlanFromFolder(tpPath, timezone string, dm *DataManager, disable_
if err := loader.LoadAll(); err != nil {
return utils.NewErrServerError(err)
}
if err := loader.WriteToDatabase(false, false, disable_reverse); err != nil {
if err := loader.WriteToDatabase(false, disable_reverse); err != nil {
return utils.NewErrServerError(err)
}
return nil

View File

@@ -100,7 +100,7 @@ func init() {
if err := csvr.LoadDispatcherHosts(); err != nil {
log.Print("error in LoadDispatcherHosts:", err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
Cache.Clear(nil)
//dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
}

View File

@@ -154,7 +154,7 @@ func TestLoaderITRemoveLoad(t *testing.T) {
if err = loader.LoadDispatcherHosts(); err != nil {
t.Error("Failed loading Dispatcher hosts: ", err.Error())
}
if err := loader.WriteToDatabase(true, false, false); err != nil {
if err := loader.WriteToDatabase(false, false); err != nil {
t.Error("Could not write data into dataDb: ", err.Error())
}
if err := loader.RemoveFromDatabase(false, true); err != nil {
@@ -229,7 +229,7 @@ func TestLoaderITLoadFromCSV(t *testing.T) {
if err = loader.LoadDispatcherHosts(); err != nil {
t.Error("Failed loading Dispatcher hosts: ", err.Error())
}
if err := loader.WriteToDatabase(true, false, false); err != nil {
if err := loader.WriteToDatabase(false, false); err != nil {
t.Error("Could not write data into dataDb: ", err.Error())
}
}

View File

@@ -26,7 +26,6 @@ import (
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/structmatcher"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -1361,18 +1360,10 @@ func (tpr *TpReader) IsValid() bool {
return valid
}
func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err error) {
func (tpr *TpReader) WriteToDatabase(verbose, disable_reverse bool) (err error) {
if tpr.dm.dataDB == nil {
return errors.New("no database connection")
}
if flush {
// if verbose {
// log.Print("Flushing database")
// }
// if err = tpr.dm.DataDB().Flush(""); err != nil {
// return
// }
}
//generate a loadID
loadID := time.Now().UnixNano()
loadIDs := make(map[string]int64)
@@ -2365,7 +2356,7 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disable_reverse bool) (err erro
return
}
func (tpr *TpReader) ReloadCache(flush, verbose bool, argDispatcher *utils.ArgDispatcher) (err error) {
func (tpr *TpReader) ReloadCache(caching string, verbose bool, argDispatcher *utils.ArgDispatcher) (err error) {
if tpr.cacheS == nil {
log.Print("Disabled automatic reload")
return
@@ -2420,7 +2411,6 @@ func (tpr *TpReader) ReloadCache(flush, verbose bool, argDispatcher *utils.ArgDi
DispatcherProfileIDs: &dppIDs,
DispatcherHostIDs: &dphIDs,
},
FlushAll: flush,
},
}
@@ -2428,7 +2418,7 @@ func (tpr *TpReader) ReloadCache(flush, verbose bool, argDispatcher *utils.ArgDi
log.Print("Reloading cache")
}
var reply string
switch config.CgrConfig().GeneralCfg().DefaultCaching {
switch caching {
case utils.META_NONE:
return
case utils.MetaReload:
@@ -2440,8 +2430,11 @@ func (tpr *TpReader) ReloadCache(flush, verbose bool, argDispatcher *utils.ArgDi
return
}
case utils.MetaRemove:
//
if err = tpr.cacheS.Call(utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
return
}
case utils.MetaClear:
cacheArgs.FlushAll = true
if err = tpr.cacheS.Call(utils.CacheSv1FlushCache, cacheArgs, &reply); err != nil {
return
}

View File

@@ -61,7 +61,7 @@ ENABLE_ACNT,*enable_account,,,,,,,,,,,,,false,false,10`
if err := csvr.LoadAll(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
engine.Cache.Clear(nil)
dbAcntActs.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil,

View File

@@ -68,7 +68,7 @@ cgrates.org,call,*any,2013-01-06T00:00:00Z,RP_ANY,`
if err := csvr.LoadAll(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
if acnt, err := dbAuth.DataDB().GetAccount("cgrates.org:testauthpostpaid1"); err != nil {
t.Error(err)
} else if acnt == nil {

View File

@@ -74,7 +74,7 @@ cgrates.org,sms,*any,2012-01-01T00:00:00Z,RP_SMS1,`
if err := csvr.LoadRatingProfiles(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)

View File

@@ -58,7 +58,7 @@ RP_DATA1,DR_DATA_2,TM2,10`
if err := csvr.LoadRatingProfiles(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)

View File

@@ -100,7 +100,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
if err := csvr.LoadAccountActions(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
if acnt, err := dataDB.DataDB().GetAccount("cgrates.org:12344"); err != nil {
t.Error(err)
} else if acnt == nil {

View File

@@ -98,7 +98,7 @@ TOPUP10_AT,TOPUP10_AC1,ASAP,10`
if err := csvr.LoadAccountActions(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
if acnt, err := dataDB2.DataDB().GetAccount("cgrates.org:12345"); err != nil {
t.Error(err)
} else if acnt == nil {

View File

@@ -97,7 +97,7 @@ cgrates.org,call,discounted_minutes,2013-01-06T00:00:00Z,RP_UK_Mobile_BIG5_PKG,`
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
if acnt, err := dataDB3.DataDB().GetAccount("cgrates.org:12346"); err != nil {
t.Error(err)
} else if acnt == nil {

View File

@@ -56,7 +56,7 @@ func TestSMSLoadCsvTpSmsChrg1(t *testing.T) {
if err := csvr.LoadRatingProfiles(); err != nil {
t.Fatal(err)
}
csvr.WriteToDatabase(false, false, false)
csvr.WriteToDatabase(false, false)
engine.Cache.Clear(nil)
dataDB.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil,
nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)

View File

@@ -126,13 +126,13 @@ func (ldr *Loader) ListenAndServe(exitChan chan struct{}) (err error) {
}
// ProcessFolder will process the content in the folder with locking
func (ldr *Loader) ProcessFolder() (err error) {
func (ldr *Loader) ProcessFolder(caching string) (err error) {
if err = ldr.lockFolder(); err != nil {
return
}
defer ldr.unlockFolder()
for ldrType := range ldr.rdrs {
if err = ldr.processFiles(ldrType); err != nil {
if err = ldr.processFiles(ldrType, caching); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, ldr.ldrID, ldrType, err.Error()))
continue
@@ -187,7 +187,7 @@ func (ldr *Loader) moveFiles() (err error) {
return
}
func (ldr *Loader) processFiles(loaderType string) (err error) {
func (ldr *Loader) processFiles(loaderType, caching string) (err error) {
for fName := range ldr.rdrs[loaderType] {
var rdr *os.File
if rdr, err = os.Open(path.Join(ldr.tpInDir, fName)); err != nil {
@@ -198,14 +198,14 @@ func (ldr *Loader) processFiles(loaderType string) (err error) {
ldr.rdrs[loaderType][fName] = &openedCSVFile{
fileName: fName, rdr: rdr, csvRdr: csvReader}
defer ldr.unreferenceFile(loaderType, fName)
if err = ldr.processContent(loaderType); err != nil {
if err = ldr.processContent(loaderType, caching); err != nil {
return
}
}
return
}
func (ldr *Loader) processContent(loaderType string) (err error) {
func (ldr *Loader) processContent(loaderType, caching string) (err error) {
// start processing lines
keepLooping := true // controls looping
lineNr := 0
@@ -250,7 +250,7 @@ func (ldr *Loader) processContent(loaderType string) (err error) {
break // have stolen the existing key in buffer
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}); err != nil {
map[string][]LoaderData{prevTntID: ldr.bufLoaderData[prevTntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, prevTntID)
@@ -263,7 +263,7 @@ func (ldr *Loader) processContent(loaderType string) (err error) {
break // get the first tenantID
}
if err = ldr.storeLoadedData(loaderType,
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}); err != nil {
map[string][]LoaderData{tntID: ldr.bufLoaderData[tntID]}, caching); err != nil {
return
}
delete(ldr.bufLoaderData, tntID)
@@ -271,7 +271,7 @@ func (ldr *Loader) processContent(loaderType string) (err error) {
}
func (ldr *Loader) storeLoadedData(loaderType string,
lds map[string][]LoaderData) (err error) {
lds map[string][]LoaderData, caching string) (err error) {
var ids []string
cacheArgs := utils.InitAttrReloadCache()
switch loaderType {
@@ -527,12 +527,34 @@ func (ldr *Loader) storeLoadedData(loaderType string,
}
if ldr.cacheS != nil {
var reply string
if err = ldr.cacheS.Call(utils.CacheSv1ReloadCache,
utils.AttrReloadCacheWithArgDispatcher{
switch caching {
case utils.META_NONE:
return
case utils.MetaReload:
if err = ldr.cacheS.Call(utils.CacheSv1ReloadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return err
return
}
case utils.MetaLoad:
if err = ldr.cacheS.Call(utils.CacheSv1LoadCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
case utils.MetaRemove:
if err = ldr.cacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
case utils.MetaClear:
cacheArgs.FlushAll = true
if err = ldr.cacheS.Call(utils.CacheSv1FlushCache, utils.AttrReloadCacheWithArgDispatcher{
AttrReloadCache: cacheArgs}, &reply); err != nil {
return
}
}
}
return
}

View File

@@ -98,7 +98,7 @@ func TestLoaderProcessContentSingleFile(t *testing.T) {
"Attributes.csv": &openedCSVFile{fileName: "Attributes.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaAttributes); err != nil {
if err := ldr.processContent(utils.MetaAttributes, utils.EmptyString); err != nil {
t.Error(err)
}
eAP := &engine.AttributeProfile{
@@ -189,7 +189,7 @@ func TestLoaderProcessContentMultiFiles(t *testing.T) {
"File2.csv": &openedCSVFile{fileName: "File2.csv",
rdr: rdr2, csvRdr: csvRdr2}},
}
if err := ldr.processContent(utils.MetaAttributes); err != nil {
if err := ldr.processContent(utils.MetaAttributes, utils.EmptyString); err != nil {
t.Error(err)
}
eAP := &engine.AttributeProfile{
@@ -282,7 +282,7 @@ func TestLoaderProcessResource(t *testing.T) {
"Resources.csv": &openedCSVFile{fileName: "Resources.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaResources); err != nil {
if err := ldr.processContent(utils.MetaResources, utils.EmptyString); err != nil {
t.Error(err)
}
eResPrf1 := &engine.ResourceProfile{
@@ -380,7 +380,7 @@ func TestLoaderProcessFilters(t *testing.T) {
"Filters.csv": &openedCSVFile{fileName: "Filters.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaFilters); err != nil {
if err := ldr.processContent(utils.MetaFilters, utils.EmptyString); err != nil {
t.Error(err)
}
eFltr1 := &engine.Filter{
@@ -511,7 +511,7 @@ func TestLoaderProcessThresholds(t *testing.T) {
"Thresholds.csv": &openedCSVFile{fileName: "Thresholds.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaThresholds); err != nil {
if err := ldr.processContent(utils.MetaThresholds, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
@@ -618,7 +618,7 @@ func TestLoaderProcessStats(t *testing.T) {
"Stats.csv": &openedCSVFile{fileName: "Stats.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaStats); err != nil {
if err := ldr.processContent(utils.MetaStats, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
@@ -751,7 +751,7 @@ func TestLoaderProcessSuppliers(t *testing.T) {
"Suppliers.csv": &openedCSVFile{fileName: "Suppliers.csv",
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaSuppliers); err != nil {
if err := ldr.processContent(utils.MetaSuppliers, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
@@ -842,7 +842,7 @@ func TestLoaderProcessChargers(t *testing.T) {
utils.ChargersCsv: &openedCSVFile{fileName: utils.ChargersCsv,
rdr: rdr, csvRdr: csvRdr}},
}
if err := ldr.processContent(utils.MetaChargers); err != nil {
if err := ldr.processContent(utils.MetaChargers, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
@@ -974,7 +974,7 @@ func TestLoaderProcessDispatches(t *testing.T) {
},
},
}
if err := ldr.processContent(utils.MetaDispatchers); err != nil {
if err := ldr.processContent(utils.MetaDispatchers, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {
@@ -1078,7 +1078,7 @@ func TestLoaderProcessDispatcheHosts(t *testing.T) {
},
},
}
if err := ldr.processContent(utils.MetaDispatcherHosts); err != nil {
if err := ldr.processContent(utils.MetaDispatcherHosts, utils.EmptyString); err != nil {
t.Error(err)
}
if len(ldr.bufLoaderData) != 0 {

View File

@@ -74,6 +74,7 @@ func (ldrS *LoaderService) ListenAndServe(exitChan chan bool) (err error) {
type ArgsProcessFolder struct {
LoaderID string
ForceLock bool
Caching *string
}
func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
@@ -95,7 +96,12 @@ func (ldrS *LoaderService) V1Load(args *ArgsProcessFolder,
}
return errors.New("ANOTHER_LOADER_RUNNING")
}
if err := ldr.ProcessFolder(); err != nil {
//verify If Caching is present in arguments
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if args.Caching != nil {
caching = *args.Caching
}
if err := ldr.ProcessFolder(caching); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK

View File

@@ -676,10 +676,10 @@ type AttrRemCdrs struct {
type AttrLoadTpFromFolder struct {
FolderPath string // Take files from folder absolute path
DryRun bool // Do not write to database but parse only
FlushDb bool // Flush previous data before loading new one
Validate bool // Run structural checks on data
Recursive bool // load data recursive
ArgDispatcher *ArgDispatcher
Caching *string
}
type AttrImportTPFromFolder struct {