diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 89df45af2..8597978c8 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -74,67 +74,41 @@ func main() { var errDataDb, errStorDb, err error var dataDb engine.DataStorage var storDb engine.LoadStorage - // Init necessary db connections - if *fromStorDb { - dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding) - storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) - } else if *toStorDb { // Import from csv files to storDb - storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) - } else { // Default load from csv files to dataDb - dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding) - } - // Defer databases opened to be closed when we are done - for _, db := range []engine.Storage{dataDb, storDb} { - if db != nil { - defer db.Close() - } - } - // Stop on db errors - for _, err = range []error{errDataDb, errStorDb} { - if err != nil { - log.Fatalf("Could not open database connection: %v", err) - } - } var rater *rpc.Client - if !*toStorDb { // Connections to history and rater - if *historyServer != "" { // Init scribeAgent - if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil { - log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) - return - } else { - engine.SetHistoryScribe(scribeAgent) - gob.Register(&engine.Destination{}) - defer scribeAgent.Client.Close() - } - } else { - log.Print("WARNING: Rates history archiving is disabled!") - } - if *raterAddress != "" { - if *rpcEncoding == "json" { - rater, err = jsonrpc.Dial("tcp", *raterAddress) - } else { - rater, err = rpc.Dial("tcp", *raterAddress) - } - if err != nil { - log.Fatalf("Could not connect to rater: %s", err.Error()) - return - } - } else { - log.Print("WARNING: Rates automatic cache reloading is disabled!") - } - } var loader engine.TPLoader - if *toStorDb { // Import files from a directory into storDb - if *tpid == "" { - log.Fatal("TPid required, please define it via *-tpid* command argument.") + // Init necessary db connections, only if not already + if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb + if *fromStorDb { + dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding) + storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) + } else if *toStorDb { // Import from csv files to storDb + storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding) + } else { // Default load from csv files to dataDb + dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass, *dbdata_encoding) } - csvImporter := engine.TPCSVImporter{*tpid, storDb, *dataPath, ',', *verbose, *runId} - if errImport := csvImporter.Run(); errImport != nil { - log.Fatal(errImport) + // Defer databases opened to be closed when we are done + for _, db := range []engine.Storage{dataDb, storDb} { + if db != nil { + defer db.Close() + } + } + // Stop on db errors + for _, err = range []error{errDataDb, errStorDb} { + if err != nil { + log.Fatalf("Could not open database connection: %v", err) + } + } + if *toStorDb { // Import files from a directory into storDb + if *tpid == "" { + log.Fatal("TPid required, please define it via *-tpid* command argument.") + } + csvImporter := engine.TPCSVImporter{*tpid, storDb, *dataPath, ',', *verbose, *runId} + if errImport := csvImporter.Run(); errImport != nil { + log.Fatal(errImport) + } + return } - return } - if *fromStorDb { // Load Tariff Plan from storDb into dataDb loader = engine.NewDbReader(storDb, dataDb, *tpid) } else { // Default load from csv files to dataDb @@ -190,11 +164,37 @@ func main() { if *dryRun { // We were just asked to parse the data, not saving it return } + if *historyServer != "" { // Init scribeAgent so we can store the differences + if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil { + log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error()) + return + } else { + engine.SetHistoryScribe(scribeAgent) + gob.Register(&engine.Destination{}) + defer scribeAgent.Client.Close() + } + } else { + log.Print("WARNING: Rates history archiving is disabled!") + } + if *raterAddress != "" { // Init connection to rater so we can reload it's data + if *rpcEncoding == "json" { + rater, err = jsonrpc.Dial("tcp", *raterAddress) + } else { + rater, err = rpc.Dial("tcp", *raterAddress) + } + if err != nil { + log.Fatalf("Could not connect to rater: %s", err.Error()) + return + } + } else { + log.Print("WARNING: Rates automatic cache reloading is disabled!") + } + // write maps to database if err := loader.WriteToDatabase(*flush, *verbose); err != nil { log.Fatal("Could not write to database: ", err) } - // Reload cache + // Reload scheduler and cache if rater != nil { reply := "" actIds,_ := loader.GetLoadedIds(engine.ACTION_TIMING_PREFIX)