Updated cgr-loader to accept URLs as path to data

This commit is contained in:
Trial97
2020-03-20 17:04:43 +02:00
committed by Dan Christian Bogos
parent d4fc68c86b
commit 57cfcbf5fc
7 changed files with 316 additions and 168 deletions

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package main
import (
"errors"
"flag"
"fmt"
"log"
@@ -32,10 +33,8 @@ import (
)
var (
err error
dm *engine.DataManager
storDb engine.LoadStorage
loader engine.LoadReader
dataDB engine.DataDB
storDB engine.LoadStorage
cgrLoaderFlags = flag.NewFlagSet("cgr-loader", flag.ContinueOnError)
dfltCfg = config.CgrConfig()
@@ -106,8 +105,9 @@ var (
func loadConfig() (ldrCfg *config.CGRConfig) {
ldrCfg = config.CgrConfig()
if *cfgPath != "" {
var err error
if ldrCfg, err = config.NewCGRConfigFromPath(*cfgPath); err != nil {
log.Fatalf("Error loading config file %s", err.Error())
log.Fatalf("Error loading config file %s", err)
}
config.SetCgrConfig(ldrCfg)
}
@@ -226,41 +226,80 @@ func loadConfig() (ldrCfg *config.CGRConfig) {
if *disableReverse != dfltCfg.LoaderCgrCfg().DisableReverse {
ldrCfg.LoaderCgrCfg().DisableReverse = *disableReverse
}
if *cachingArg != utils.EmptyString {
ldrCfg.GeneralCfg().DefaultCaching = *cachingArg
}
return
}
func importData(cfg *config.CGRConfig) (err error) {
if cfg.LoaderCgrCfg().TpID == "" {
return errors.New("TPid required")
}
if *flushStorDB {
if err = storDB.RemTpData("", cfg.LoaderCgrCfg().TpID, map[string]string{}); err != nil {
return err
}
}
csvImporter := engine.TPCSVImporter{
TPid: cfg.LoaderCgrCfg().TpID,
StorDb: storDB,
DirPath: *dataPath,
Sep: cfg.LoaderCgrCfg().FieldSeparator,
Verbose: *verbose,
ImportId: *importID,
}
return csvImporter.Run()
}
func getLoader(cfg *config.CGRConfig) (loader engine.LoadReader, err error) {
if *fromStorDB { // Load Tariff Plan from storDb into dataDb
loader = storDB
return
}
if gprefix := utils.MetaGoogleAPI + utils.CONCATENATED_KEY_SEP; strings.HasPrefix(*dataPath, gprefix) { // Default load from csv files to dataDb
return engine.NewGoogleCSVStorage(cfg.LoaderCgrCfg().FieldSeparator, strings.TrimPrefix(*dataPath, gprefix), *cfgPath)
}
if !utils.IsURL(*dataPath) {
loader = engine.NewFileCSVStorage(cfg.LoaderCgrCfg().FieldSeparator, *dataPath)
return
}
loader = engine.NewURLCSVStorage(cfg.LoaderCgrCfg().FieldSeparator, *dataPath)
return
}
func main() {
if err := cgrLoaderFlags.Parse(os.Args[1:]); err != nil {
return
var err error
if err = cgrLoaderFlags.Parse(os.Args[1:]); err != nil {
log.Fatal(err)
}
if *version {
if rcv, err := utils.GetCGRVersion(); err != nil {
fmt.Println(err)
} else {
fmt.Println(rcv)
var version string
if version, err = utils.GetCGRVersion(); err != nil {
log.Fatal(err)
}
fmt.Println(version)
return
}
ldrCfg := loadConfig()
// we initialize connManager here with nil for InternalChannels
cM := engine.NewConnManager(ldrCfg, nil)
engine.NewConnManager(ldrCfg, nil)
if !*toStorDB {
d, err := engine.NewDataDBConn(ldrCfg.DataDbCfg().DataDbType,
if dataDB, err = engine.NewDataDBConn(ldrCfg.DataDbCfg().DataDbType,
ldrCfg.DataDbCfg().DataDbHost, ldrCfg.DataDbCfg().DataDbPort,
ldrCfg.DataDbCfg().DataDbName, ldrCfg.DataDbCfg().DataDbUser,
ldrCfg.DataDbCfg().DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding,
ldrCfg.DataDbCfg().DataDbSentinelName, ldrCfg.DataDbCfg().Items)
if err != nil {
ldrCfg.DataDbCfg().DataDbSentinelName, ldrCfg.DataDbCfg().Items); err != nil {
log.Fatalf("Coud not open dataDB connection: %s", err.Error())
}
dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg(), cM)
defer dm.DataDB().Close()
defer dataDB.Close()
}
if *fromStorDB || *toStorDB {
if storDb, err = engine.NewStorDBConn(ldrCfg.StorDbCfg().Type,
if storDB, err = engine.NewStorDBConn(ldrCfg.StorDbCfg().Type,
ldrCfg.StorDbCfg().Host, ldrCfg.StorDbCfg().Port,
ldrCfg.StorDbCfg().Name, ldrCfg.StorDbCfg().User,
ldrCfg.StorDbCfg().Password, ldrCfg.GeneralCfg().DBDataEncoding, ldrCfg.StorDbCfg().SSLMode,
@@ -269,46 +308,24 @@ func main() {
ldrCfg.StorDbCfg().PrefixIndexedFields, ldrCfg.StorDbCfg().Items); err != nil {
log.Fatalf("Coud not open storDB connection: %s", err.Error())
}
defer storDb.Close()
defer storDB.Close()
}
if !*dryRun && *toStorDB { // Import files from a directory into storDb
if ldrCfg.LoaderCgrCfg().TpID == "" {
log.Fatal("TPid required.")
}
if *flushStorDB {
if err = storDb.RemTpData("", ldrCfg.LoaderCgrCfg().TpID, map[string]string{}); err != nil {
log.Fatal(err)
}
}
csvImporter := engine.TPCSVImporter{
TPid: ldrCfg.LoaderCgrCfg().TpID,
StorDb: storDb,
DirPath: *dataPath,
Sep: ldrCfg.LoaderCgrCfg().FieldSeparator,
Verbose: *verbose,
ImportId: *importID,
}
if errImport := csvImporter.Run(); errImport != nil {
log.Fatal(errImport)
if err = importData(ldrCfg); err != nil {
log.Fatal(err)
}
return
}
if *fromStorDB { // Load Tariff Plan from storDb into dataDb
loader = storDb
} else if gprefix := utils.MetaGoogleAPI + utils.CONCATENATED_KEY_SEP; strings.HasPrefix(*dataPath, gprefix) { // Default load from csv files to dataDb
loader, err = engine.NewGoogleCSVStorage(ldrCfg.LoaderCgrCfg().FieldSeparator, strings.TrimPrefix(*dataPath, gprefix), *cfgPath)
if err != nil {
log.Fatal(err)
}
} else {
loader = engine.NewFileCSVStorage(ldrCfg.LoaderCgrCfg().FieldSeparator, *dataPath)
var loader engine.LoadReader
if loader, err = getLoader(ldrCfg); err != nil {
log.Fatal(err)
}
tpReader, err := engine.NewTpReader(dm.DataDB(), loader, ldrCfg.LoaderCgrCfg().TpID,
ldrCfg.GeneralCfg().DefaultTimezone, ldrCfg.LoaderCgrCfg().CachesConns, ldrCfg.LoaderCgrCfg().SchedulerConns)
if err != nil {
var tpReader *engine.TpReader
if tpReader, err = engine.NewTpReader(dataDB, loader,
ldrCfg.LoaderCgrCfg().TpID, ldrCfg.GeneralCfg().DefaultTimezone,
ldrCfg.LoaderCgrCfg().CachesConns,
ldrCfg.LoaderCgrCfg().SchedulerConns); err != nil {
log.Fatal(err)
}
if err = tpReader.LoadAll(); err != nil {
@@ -320,29 +337,25 @@ func main() {
}
if *remove {
if err := tpReader.RemoveFromDatabase(*verbose, *disableReverse); err != nil {
if err = tpReader.RemoveFromDatabase(*verbose, *disableReverse); err != nil {
log.Fatal("Could not delete from database: ", err)
}
return
}
// write maps to database
if err := tpReader.WriteToDatabase(*verbose, *disableReverse); err != nil {
if err = tpReader.WriteToDatabase(*verbose, *disableReverse); err != nil {
log.Fatal("Could not write to database: ", err)
}
caching := config.CgrConfig().GeneralCfg().DefaultCaching
if cachingArg != nil && *cachingArg != utils.EmptyString {
caching = *cachingArg
}
// reload cache
if err := tpReader.ReloadCache(caching, *verbose, &utils.ArgDispatcher{
if err = tpReader.ReloadCache(ldrCfg.GeneralCfg().DefaultCaching, *verbose, &utils.ArgDispatcher{
APIKey: apiKey,
RouteID: routeID,
}); err != nil {
log.Fatal("Could not reload cache: ", err)
}
if len(ldrCfg.LoaderCgrCfg().SchedulerConns) != 0 {
if err := tpReader.ReloadScheduler(*verbose); err != nil {
if err = tpReader.ReloadScheduler(*verbose); err != nil {
log.Fatal("Could not reload scheduler: ", err)
}
}