Loader does not longer needs db connections on -dry_run

This commit is contained in:
DanB
2013-11-19 20:02:45 +01:00
parent 30d91c9747
commit a20d6258e2

View File

@@ -74,67 +74,41 @@ func main() {
var errDataDb, errStorDb, err error
var dataDb engine.DataStorage
var storDb engine.LoadStorage
// Init necessary db connections
if *fromStorDb {
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding)
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else if *toStorDb { // Import from csv files to storDb
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else { // Default load from csv files to dataDb
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding)
}
// Defer databases opened to be closed when we are done
for _, db := range []engine.Storage{dataDb, storDb} {
if db != nil {
defer db.Close()
}
}
// Stop on db errors
for _, err = range []error{errDataDb, errStorDb} {
if err != nil {
log.Fatalf("Could not open database connection: %v", err)
}
}
var rater *rpc.Client
if !*toStorDb { // Connections to history and rater
if *historyServer != "" { // Init scribeAgent
if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {
engine.SetHistoryScribe(scribeAgent)
gob.Register(&engine.Destination{})
defer scribeAgent.Client.Close()
}
} else {
log.Print("WARNING: Rates history archiving is disabled!")
}
if *raterAddress != "" {
if *rpcEncoding == "json" {
rater, err = jsonrpc.Dial("tcp", *raterAddress)
} else {
rater, err = rpc.Dial("tcp", *raterAddress)
}
if err != nil {
log.Fatalf("Could not connect to rater: %s", err.Error())
return
}
} else {
log.Print("WARNING: Rates automatic cache reloading is disabled!")
}
}
var loader engine.TPLoader
if *toStorDb { // Import files from a directory into storDb
if *tpid == "" {
log.Fatal("TPid required, please define it via *-tpid* command argument.")
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
if *fromStorDb {
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding)
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else if *toStorDb { // Import from csv files to storDb
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else { // Default load from csv files to dataDb
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding)
}
csvImporter := engine.TPCSVImporter{*tpid, storDb, *dataPath, ',', *verbose, *runId}
if errImport := csvImporter.Run(); errImport != nil {
log.Fatal(errImport)
// Defer databases opened to be closed when we are done
for _, db := range []engine.Storage{dataDb, storDb} {
if db != nil {
defer db.Close()
}
}
// Stop on db errors
for _, err = range []error{errDataDb, errStorDb} {
if err != nil {
log.Fatalf("Could not open database connection: %v", err)
}
}
if *toStorDb { // Import files from a directory into storDb
if *tpid == "" {
log.Fatal("TPid required, please define it via *-tpid* command argument.")
}
csvImporter := engine.TPCSVImporter{*tpid, storDb, *dataPath, ',', *verbose, *runId}
if errImport := csvImporter.Run(); errImport != nil {
log.Fatal(errImport)
}
return
}
return
}
if *fromStorDb { // Load Tariff Plan from storDb into dataDb
loader = engine.NewDbReader(storDb, dataDb, *tpid)
} else { // Default load from csv files to dataDb
@@ -190,11 +164,37 @@ func main() {
if *dryRun { // We were just asked to parse the data, not saving it
return
}
if *historyServer != "" { // Init scribeAgent so we can store the differences
if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {
engine.SetHistoryScribe(scribeAgent)
gob.Register(&engine.Destination{})
defer scribeAgent.Client.Close()
}
} else {
log.Print("WARNING: Rates history archiving is disabled!")
}
if *raterAddress != "" { // Init connection to rater so we can reload it's data
if *rpcEncoding == "json" {
rater, err = jsonrpc.Dial("tcp", *raterAddress)
} else {
rater, err = rpc.Dial("tcp", *raterAddress)
}
if err != nil {
log.Fatalf("Could not connect to rater: %s", err.Error())
return
}
} else {
log.Print("WARNING: Rates automatic cache reloading is disabled!")
}
// write maps to database
if err := loader.WriteToDatabase(*flush, *verbose); err != nil {
log.Fatal("Could not write to database: ", err)
}
// Reload cache
// Reload scheduler and cache
if rater != nil {
reply := ""
actIds,_ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)