diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 521e37eac..455fef218 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -31,7 +31,7 @@ const ( ) type ApierV1 struct { - StorDb engine.DataStorage + StorDb engine.LoadStorage DataDb engine.DataStorage Sched *scheduler.Scheduler } diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 6da450eb4..7cb0688a1 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -29,7 +29,7 @@ import ( var ( cfg *config.CGRConfig // Share the configuration with the rest of the package - storage engine.DataStorage + storage engine.CdrStorage medi *mediator.Mediator ) @@ -43,7 +43,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { } else { //TODO: use the connection to mediator } - } () + }() } else { engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) } @@ -68,7 +68,7 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { type CDRS struct{} -func New(s engine.DataStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS { +func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS { storage = s medi = m cfg = c @@ -76,7 +76,7 @@ func New(s engine.DataStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS } func (cdrs *CDRS) StartCapturingCDRs() { - http.HandleFunc("/cgr_json", cgrCdrHandler) // Attach CGR CDR Handler - http.HandleFunc("/freeswitch_json", fsCdrHandler) // Attach FreeSWITCH JSON CDR Handler + http.HandleFunc("/cgr_json", cgrCdrHandler) // Attach CGR CDR Handler + http.HandleFunc("/freeswitch_json", fsCdrHandler) // Attach FreeSWITCH JSON CDR Handler http.ListenAndServe(cfg.CDRSListen, nil) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1b7e3a8cc..cf7c96d53 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -64,7 +64,7 @@ var ( err error ) -func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddress string, rpc_encoding string, getter engine.DataStorage, loggerDb engine.DataStorage) { +func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddress string, rpc_encoding string, getter engine.DataStorage, loggerDb engine.LogStorage) { l, err := net.Listen("tcp", rpcAddress) if err != nil { engine.Logger.Crit(fmt.Sprintf(" Could not listen to %v: %v", rpcAddress, err)) @@ -94,7 +94,7 @@ func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddr } } -func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) { +func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) { var connector engine.Connector if cfg.MediatorRater == INTERNAL { connector = responder @@ -125,7 +125,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) { connector = &engine.RPCClientConnector{Client: client} } var err error - medi, err = mediator.NewMediator(connector, loggerDb, cfg) + medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cfg) if err != nil { engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err)) exitChan <- true @@ -136,7 +136,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) { } } -func startSessionManager(responder *engine.Responder, loggerDb engine.DataStorage) { +func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) { var connector engine.Connector if cfg.SMRater == INTERNAL { connector = responder @@ -183,7 +183,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.DataStorag exitChan <- true } -func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) { +func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) { if cfg.CDRSMediator == INTERNAL { for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable time.Sleep(time.Duration(i/2) * time.Second) @@ -196,7 +196,7 @@ func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) { exitChan <- true } } - cs := cdrs.New(loggerDb, medi, cfg) + cs := cdrs.New(cdrDb, medi, cfg) cs.StartCapturingCDRs() exitChan <- true } @@ -308,25 +308,28 @@ func main() { return } - var getter, loggerDb engine.DataStorage - getter, err = engine.ConfigureDatabase(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass) + var dataDb engine.DataStorage + var logDb engine.LogStorage + var loadDb engine.LoadStorage + var cdrDb engine.CdrStorage + dataDb, err = engine.ConfigureDataStorage(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass) if err != nil { // Cannot configure getter database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return } - defer getter.Close() - engine.SetDataStorage(getter) + defer dataDb.Close() + engine.SetDataStorage(dataDb) if cfg.StorDBType == SAME { - loggerDb = getter + logDb = dataDb.(engine.LogStorage) } else { - loggerDb, err = engine.ConfigureDatabase(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass) + logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass) if err != nil { // Cannot configure logger database, show stopper engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) return } } - defer loggerDb.Close() - engine.SetStorageLogger(loggerDb) + defer logDb.Close() + engine.SetStorageLogger(logDb) engine.SetRoundingMethodAndDecimals(cfg.RoundingMethod, cfg.RoundingDecimals) if cfg.SMDebitInterval > 0 { @@ -341,16 +344,16 @@ func main() { go stopRaterSingnalHandler() } responder := &engine.Responder{ExitChan: exitChan} - apier := &apier.ApierV1{StorDb: loggerDb, DataDb: getter} + apier := &apier.ApierV1{StorDb: loadDb, DataDb: dataDb} if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterListen != INTERNAL { engine.Logger.Info(fmt.Sprintf("Starting CGRateS Rater on %s.", cfg.RaterListen)) - go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding, getter, loggerDb) + go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding, dataDb, logDb) } if cfg.BalancerEnabled { engine.Logger.Info(fmt.Sprintf("Starting CGRateS Balancer on %s.", cfg.BalancerListen)) go stopBalancerSingnalHandler() responder.Bal = bal - go listenToRPCRequests(responder, apier, cfg.BalancerListen, cfg.RPCEncoding, getter, loggerDb) + go listenToRPCRequests(responder, apier, cfg.BalancerListen, cfg.RPCEncoding, dataDb, logDb) if cfg.RaterEnabled { engine.Logger.Info("Starting internal engine.") bal.AddClient("local", new(engine.ResponderWorker)) @@ -361,28 +364,28 @@ func main() { engine.Logger.Info("Starting CGRateS Scheduler.") go func() { sched := scheduler.NewScheduler() - go reloadSchedulerSingnalHandler(sched, getter) + go reloadSchedulerSingnalHandler(sched, dataDb) apier.Sched = sched - sched.LoadActionTimings(getter) + sched.LoadActionTimings(dataDb) sched.Loop() }() } if cfg.SMEnabled { engine.Logger.Info("Starting CGRateS SessionManager.") - go startSessionManager(responder, loggerDb) + go startSessionManager(responder, logDb) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler() } if cfg.MediatorEnabled { engine.Logger.Info("Starting CGRateS Mediator.") - go startMediator(responder, loggerDb) + go startMediator(responder, logDb, cdrDb) } if cfg.CDRSEnabled { engine.Logger.Info("Starting CGRateS CDR Server.") - go startCDRS(responder, loggerDb) + go startCDRS(responder, cdrDb) } if cfg.HistoryServerEnabled || cfg.HistoryAgentEnabled { diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 984122ae3..aff61c0f1 100644 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -66,18 +66,19 @@ func main() { return } var errDataDb, errStorDb, err error - var dataDb, storDb engine.DataStorage + var dataDb engine.DataStorage + var storDb engine.LoadStorage // Init necessary db connections if *fromStorDb { - dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) - storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) + dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) + storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) } else if *toStorDb { // Import from csv files to storDb - storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) + storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass) } else { // Default load from csv files to dataDb - dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) + dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass) } // Defer databases opened to be closed when we are done - for _, db := range []engine.DataStorage{dataDb, storDb} { + for _, db := range []engine.Storage{dataDb, storDb} { if db != nil { defer db.Close() } diff --git a/engine/actions_test.go b/engine/actions_test.go index 964e49dec..87fea53ee 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -744,7 +744,7 @@ func TestActionTriggerLogging(t *testing.T) { if err != nil { t.Error("Error getting actions for the action timing: ", err) } - storageGetter.LogActionTrigger("rif", RATER_SOURCE, at, as) + storageLogger.LogActionTrigger("rif", RATER_SOURCE, at, as) //expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0" var key string atMap, _ := storageGetter.GetAllActionTimings() @@ -784,7 +784,7 @@ func TestActionTimingLogging(t *testing.T) { if err != nil { t.Error("Error getting actions for the action trigger: ", err) } - storageGetter.LogActionTiming(SCHED_SOURCE, at, as) + storageLogger.LogActionTiming(SCHED_SOURCE, at, as) //expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0" var key string atMap, _ := storageGetter.GetAllActionTimings() diff --git a/engine/calldesc.go b/engine/calldesc.go index 60ee36878..78da15fa5 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -36,6 +36,14 @@ func init() { Logger = new(utils.StdLogger) Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err)) } + //db_server := "127.0.0.1" + //db_server := "192.168.0.17" + m, _ := NewMapStorage() + storageGetter, _ = m.(DataStorage) + //storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "") + //storageGetter, _ = NewRedisStorage(db_server+":6379", 11, "") + + storageLogger = storageGetter.(LogStorage) } const ( @@ -45,13 +53,9 @@ const ( ) var ( - Logger utils.LoggerInterface - db_server = "127.0.0.1" - //db_server = "192.168.0.17" - storageGetter, _ = NewMapStorage() - //storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "") - //storageGetter, _ = NewRedisStorage(db_server+":6379", 11, "") - storageLogger = storageGetter + Logger utils.LoggerInterface + storageGetter DataStorage + storageLogger LogStorage debitPeriod = 10 * time.Second roundingMethod = "*middle" roundingDecimals = 4 @@ -73,7 +77,7 @@ func SetRoundingMethodAndDecimals(rm string, rd int) { /* Sets the database for logging (can be de same as storage getter or different db) */ -func SetStorageLogger(sg DataStorage) { +func SetStorageLogger(sg LogStorage) { storageLogger = sg } diff --git a/engine/calldesc_test.go b/engine/calldesc_test.go index cffa333d5..f5337cc6d 100644 --- a/engine/calldesc_test.go +++ b/engine/calldesc_test.go @@ -53,7 +53,7 @@ func populateDB() { }, } if storageGetter != nil { - storageGetter.Flush() + storageGetter.(Storage).Flush() storageGetter.SetUserBalance(broker) storageGetter.SetUserBalance(minu) } else { diff --git a/engine/loader_csv.go b/engine/loader_csv.go index b0ae606df..657c96c47 100644 --- a/engine/loader_csv.go +++ b/engine/loader_csv.go @@ -101,7 +101,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) { return errors.New("No database connection!") } if flush { - storage.Flush() + storage.(Storage).Flush() } if verbose { log.Print("Destinations") diff --git a/engine/loader_db.go b/engine/loader_db.go index e585d7a97..4c55a7030 100644 --- a/engine/loader_db.go +++ b/engine/loader_db.go @@ -27,7 +27,7 @@ import ( type DbReader struct { tpid string - storDb DataStorage + storDb LoadStorage dataDb DataStorage actions map[string][]*Action actionsTimings map[string][]*ActionTiming @@ -41,7 +41,7 @@ type DbReader struct { ratingProfiles map[string]*RatingProfile } -func NewDbReader(storDB DataStorage, storage DataStorage, tpid string) *DbReader { +func NewDbReader(storDB LoadStorage, storage DataStorage, tpid string) *DbReader { c := new(DbReader) c.storDb = storDB c.dataDb = storage @@ -54,7 +54,7 @@ func NewDbReader(storDB DataStorage, storage DataStorage, tpid string) *DbReader func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) { storage := dbr.dataDb if flush { - storage.Flush() + storage.(Storage).Flush() } if verbose { log.Print("Destinations") diff --git a/engine/responder.go b/engine/responder.go index 413ccfad8..631194729 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -169,7 +169,7 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) { if rs.Bal != nil { rs.Bal.Shutdown("Responder.Shutdown") } - storageGetter.Close() + storageGetter.(Storage).Close() defer func() { rs.ExitChan <- true }() *reply = "Done!" return diff --git a/engine/storage_interface.go b/engine/storage_interface.go index cdcdd24da..c6187e848 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -47,16 +47,49 @@ const ( RATER_SOURCE = "RAT" ) +type Storage interface { + Close() + Flush() error +} + /* Interface for storage providers. */ type DataStorage interface { - Close() - Flush() error + Storage GetRatingProfile(string) (*RatingProfile, error) SetRatingProfile(*RatingProfile) error GetDestination(string) (*Destination, error) SetDestination(*Destination) error + // End Apier functions + GetActions(string) (Actions, error) + SetActions(string, Actions) error + GetUserBalance(string) (*UserBalance, error) + SetUserBalance(*UserBalance) error + GetActionTimings(string) (ActionTimings, error) + SetActionTimings(string, ActionTimings) error + GetAllActionTimings() (map[string]ActionTimings, error) +} + +type CdrStorage interface { + Storage + SetCdr(utils.CDR) error + SetRatedCdr(utils.CDR, *CallCost, string) error + GetAllRatedCdr() ([]utils.CDR, error) +} + +type LogStorage interface { + Storage + //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) + LogCallCost(uuid, source string, cc *CallCost) error + LogError(uuid, source, errstr string) error + LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error + LogActionTiming(source string, at *ActionTiming, as Actions) error + GetCallCostLog(uuid, source string) (*CallCost, error) +} + +type LoadStorage interface { + Storage // Apier functions GetTPIds() ([]string, error) SetTPTiming(string, *Timing) error @@ -97,23 +130,6 @@ type DataStorage interface { ExistsTPAccountActions(string, string) (bool, error) SetTPAccountActions(string, map[string]*AccountAction) error GetTPAccountActionIds(string) ([]string, error) - // End Apier functions - GetActions(string) (Actions, error) - SetActions(string, Actions) error - GetUserBalance(string) (*UserBalance, error) - SetUserBalance(*UserBalance) error - GetActionTimings(string) (ActionTimings, error) - SetActionTimings(string, ActionTimings) error - GetAllActionTimings() (map[string]ActionTimings, error) - SetCdr(utils.CDR) error - SetRatedCdr(utils.CDR, *CallCost, string) error - GetAllRatedCdr() ([]utils.CDR, error) - //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) - LogCallCost(uuid, source string, cc *CallCost) error - LogError(uuid, source, errstr string) error - LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error - LogActionTiming(source string, at *ActionTiming, as Actions) error - GetCallCostLog(uuid, source string) (*CallCost, error) // loader functions GetTpDestinations(string, string) ([]*Destination, error) GetTpTimings(string, string) (map[string]*Timing, error) diff --git a/engine/storage_map.go b/engine/storage_map.go index 40192bbc5..2e108267d 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -32,7 +32,7 @@ type MapStorage struct { ms Marshaler } -func NewMapStorage() (DataStorage, error) { +func NewMapStorage() (Storage, error) { return &MapStorage{dict: make(map[string][]byte), ms: new(MsgpackMarshaler)}, nil } diff --git a/engine/storage_mongo.go b/engine/storage_mongo.go index 313093139..7bf735a60 100644 --- a/engine/storage_mongo.go +++ b/engine/storage_mongo.go @@ -34,7 +34,7 @@ type MongoStorage struct { db *mgo.Database } -func NewMongoStorage(host, port, db, user, pass string) (DataStorage, error) { +func NewMongoStorage(host, port, db, user, pass string) (Storage, error) { dial := fmt.Sprintf(host) if user != "" && pass != "" { dial = fmt.Sprintf("%s:%s@%s", user, pass, dial) diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index c9792d638..6e3338ed9 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -28,7 +28,7 @@ type MySQLStorage struct { *SQLStorage } -func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) { +func NewMySQLStorage(host, port, name, user, password string) (Storage, error) { db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", user, password, host, port, name)) if err != nil { return nil, err diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 3ad3076b8..420ee39f8 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -28,7 +28,7 @@ type PostgresStorage struct { *SQLStorage } -func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) { +func NewPostgresStorage(host, port, name, user, password string) (Storage, error) { db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, name, user, password)) if err != nil { return nil, err diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 9f1d107d7..572ca34ac 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -35,7 +35,7 @@ type RedisStorage struct { ms Marshaler } -func NewRedisStorage(address string, db int, pass string) (DataStorage, error) { +func NewRedisStorage(address string, db int, pass string) (Storage, error) { addrSplit := strings.Split(address, ":") host := addrSplit[0] port := 6379 diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 8c239a02e..00674df31 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -30,7 +30,9 @@ type SQLStorage struct { Db *sql.DB } -func (self *SQLStorage) Close() {} +func (self *SQLStorage) Close() { + self.Close() +} func (self *SQLStorage) Flush() (err error) { return diff --git a/engine/storage_utils.go b/engine/storage_utils.go index b791f056a..7436118f7 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -26,7 +26,8 @@ import ( // Various helpers to deal with database -func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataStorage, err error) { +func ConfigureDataStorage(db_type, host, port, name, user, pass string) (db DataStorage, err error) { + var d Storage switch db_type { case utils.REDIS: var db_nb int @@ -38,13 +39,80 @@ func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataSto if port != "" { host += ":" + port } - db, err = NewRedisStorage(host, db_nb, pass) + d, err = NewRedisStorage(host, db_nb, pass) + db = d.(DataStorage) case utils.MONGO: - db, err = NewMongoStorage(host, port, name, user, pass) - case utils.POSTGRES: - db, err = NewPostgresStorage(host, port, name, user, pass) - case utils.MYSQL: - db, err = NewMySQLStorage(host, port, name, user, pass) + d, err = NewMongoStorage(host, port, name, user, pass) + db = d.(DataStorage) + default: + err = errors.New("unknown db") + } + if err != nil { + return nil, err + } + return db, nil +} + +func ConfigureLogStorage(db_type, host, port, name, user, pass string) (db LogStorage, err error) { + var d Storage + switch db_type { + case utils.REDIS: + var db_nb int + db_nb, err = strconv.Atoi(name) + if err != nil { + Logger.Crit("Redis db name must be an integer!") + return nil, err + } + if port != "" { + host += ":" + port + } + d, err = NewRedisStorage(host, db_nb, pass) + db = d.(LogStorage) + case utils.MONGO: + d, err = NewMongoStorage(host, port, name, user, pass) + db = d.(LogStorage) + case utils.POSTGRES: + d, err = NewPostgresStorage(host, port, name, user, pass) + db = d.(LogStorage) + case utils.MYSQL: + d, err = NewMySQLStorage(host, port, name, user, pass) + db = d.(LogStorage) + default: + err = errors.New("unknown db") + } + if err != nil { + return nil, err + } + return db, nil +} + +func ConfigureLoadStorage(db_type, host, port, name, user, pass string) (db LoadStorage, err error) { + var d Storage + switch db_type { + case utils.POSTGRES: + d, err = NewPostgresStorage(host, port, name, user, pass) + db = d.(LoadStorage) + case utils.MYSQL: + d, err = NewMySQLStorage(host, port, name, user, pass) + db = d.(LoadStorage) + default: + err = errors.New("unknown db") + } + if err != nil { + return nil, err + } + return db, nil +} + +func ConfigureCdrStorage(db_type, host, port, name, user, pass string) (db CdrStorage, err error) { + var d Storage + switch db_type { + case utils.POSTGRES: + d, err = NewPostgresStorage(host, port, name, user, pass) + db = d.(CdrStorage) + case utils.MYSQL: + d, err = NewMySQLStorage(host, port, name, user, pass) + db = d.(CdrStorage) default: err = errors.New("unknown db") } diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go index b4cdd77b3..56824abd4 100644 --- a/engine/tpimporter_csv.go +++ b/engine/tpimporter_csv.go @@ -30,7 +30,7 @@ import ( // Import tariff plan from csv into storDb type TPCSVImporter struct { TPid string // Load data on this tpid - StorDb DataStorage // StorDb connection handle + StorDb LoadStorage // StorDb connection handle DirPath string // Directory path to import from Sep rune // Separator in the csv file Verbose bool // If true will print a detailed information instead of silently discarding it diff --git a/mediator/mediator.go b/mediator/mediator.go index b1685cbe9..263fbbe26 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -35,10 +35,10 @@ import ( "time" ) -func NewMediator(connector engine.Connector, storDb engine.DataStorage, cfg *config.CGRConfig) (m *Mediator, err error) { +func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) { m = &Mediator{ connector: connector, - storDb: storDb, + logDb: logDb, cgrCfg: cfg, } m.fieldNames = make(map[string][]string) @@ -52,7 +52,8 @@ func NewMediator(connector engine.Connector, storDb engine.DataStorage, cfg *con type Mediator struct { connector engine.Connector - storDb engine.DataStorage + logDb engine.LogStorage + cdrDb engine.CdrStorage cgrCfg *config.CGRConfig cdrInDir, cdrOutDir string accIdField string @@ -166,8 +167,8 @@ func (self *Mediator) TrackCDRFiles() (err error) { // Retrive the cost from logging database func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) { for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up - cc, err = self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log? - if cc != nil { // There were no errors, chances are that we got what we are looking for + cc, err = self.logDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log? + if cc != nil { // There were no errors, chances are that we got what we are looking for break } time.Sleep(time.Duration(i) * time.Second) @@ -204,10 +205,10 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error) err = self.connector.GetCost(cd, cc) } if err != nil { - self.storDb.LogError(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, err.Error()) + self.logDb.LogError(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, err.Error()) } else { // If the mediator calculated a price it will write it to logdb - self.storDb.LogCallCost(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, cc) + self.logDb.LogCallCost(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, cc) } return cc, err } @@ -273,9 +274,9 @@ func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) { return } -func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error { +func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.CdrStorage) error { var qryCC *engine.CallCost - cc := &engine.CallCost{Cost:-1} + cc := &engine.CallCost{Cost: -1} var errCost error if cdr.GetReqType() == utils.PREPAID || cdr.GetReqType() == utils.POSTPAID { // Should be previously calculated and stored in DB @@ -293,5 +294,5 @@ func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error { if errCost != nil { extraInfo = errCost.Error() } - return self.storDb.SetRatedCdr(cdr, cc, extraInfo) + return self.cdrDb.SetRatedCdr(cdr, cc, extraInfo) } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 1a439e9a2..fe989f0a9 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -42,10 +42,10 @@ type FSSessionManager struct { sessions []*Session connector engine.Connector debitPeriod time.Duration - loggerDB engine.DataStorage + loggerDB engine.LogStorage } -func NewFSSessionManager(storage engine.DataStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager { +func NewFSSessionManager(storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager { return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod} } @@ -344,7 +344,7 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration { return sm.debitPeriod } -func (sm *FSSessionManager) GetDbLogger() engine.DataStorage { +func (sm *FSSessionManager) GetDbLogger() engine.LogStorage { return sm.loggerDB } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index b254e3dcd..ef7311e75 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -30,6 +30,6 @@ type SessionManager interface { RemoveSession(*Session) LoopAction(*Session, *engine.CallDescriptor, float64) GetDebitPeriod() time.Duration - GetDbLogger() engine.DataStorage + GetDbLogger() engine.LogStorage Shutdown() error }