mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 15:18:44 +05:00
splited storage interface in multiple interfaces
This commit is contained in:
@@ -31,7 +31,7 @@ const (
|
||||
)
|
||||
|
||||
type ApierV1 struct {
|
||||
StorDb engine.DataStorage
|
||||
StorDb engine.LoadStorage
|
||||
DataDb engine.DataStorage
|
||||
Sched *scheduler.Scheduler
|
||||
}
|
||||
|
||||
10
cdrs/cdrs.go
10
cdrs/cdrs.go
@@ -29,7 +29,7 @@ import (
|
||||
|
||||
var (
|
||||
cfg *config.CGRConfig // Share the configuration with the rest of the package
|
||||
storage engine.DataStorage
|
||||
storage engine.CdrStorage
|
||||
medi *mediator.Mediator
|
||||
)
|
||||
|
||||
@@ -43,7 +43,7 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
} else {
|
||||
//TODO: use the connection to mediator
|
||||
}
|
||||
} ()
|
||||
}()
|
||||
} else {
|
||||
engine.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err))
|
||||
}
|
||||
@@ -68,7 +68,7 @@ func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
type CDRS struct{}
|
||||
|
||||
func New(s engine.DataStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS {
|
||||
func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS {
|
||||
storage = s
|
||||
medi = m
|
||||
cfg = c
|
||||
@@ -76,7 +76,7 @@ func New(s engine.DataStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS
|
||||
}
|
||||
|
||||
func (cdrs *CDRS) StartCapturingCDRs() {
|
||||
http.HandleFunc("/cgr_json", cgrCdrHandler) // Attach CGR CDR Handler
|
||||
http.HandleFunc("/freeswitch_json", fsCdrHandler) // Attach FreeSWITCH JSON CDR Handler
|
||||
http.HandleFunc("/cgr_json", cgrCdrHandler) // Attach CGR CDR Handler
|
||||
http.HandleFunc("/freeswitch_json", fsCdrHandler) // Attach FreeSWITCH JSON CDR Handler
|
||||
http.ListenAndServe(cfg.CDRSListen, nil)
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ var (
|
||||
err error
|
||||
)
|
||||
|
||||
func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddress string, rpc_encoding string, getter engine.DataStorage, loggerDb engine.DataStorage) {
|
||||
func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddress string, rpc_encoding string, getter engine.DataStorage, loggerDb engine.LogStorage) {
|
||||
l, err := net.Listen("tcp", rpcAddress)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not listen to %v: %v", rpcAddress, err))
|
||||
@@ -94,7 +94,7 @@ func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddr
|
||||
}
|
||||
}
|
||||
|
||||
func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) {
|
||||
var connector engine.Connector
|
||||
if cfg.MediatorRater == INTERNAL {
|
||||
connector = responder
|
||||
@@ -125,7 +125,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
connector = &engine.RPCClientConnector{Client: client}
|
||||
}
|
||||
var err error
|
||||
medi, err = mediator.NewMediator(connector, loggerDb, cfg)
|
||||
medi, err = mediator.NewMediator(connector, loggerDb, cdrDb, cfg)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
|
||||
exitChan <- true
|
||||
@@ -136,7 +136,7 @@ func startMediator(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
}
|
||||
}
|
||||
|
||||
func startSessionManager(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) {
|
||||
var connector engine.Connector
|
||||
if cfg.SMRater == INTERNAL {
|
||||
connector = responder
|
||||
@@ -183,7 +183,7 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.DataStorag
|
||||
exitChan <- true
|
||||
}
|
||||
|
||||
func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) {
|
||||
if cfg.CDRSMediator == INTERNAL {
|
||||
for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable
|
||||
time.Sleep(time.Duration(i/2) * time.Second)
|
||||
@@ -196,7 +196,7 @@ func startCDRS(responder *engine.Responder, loggerDb engine.DataStorage) {
|
||||
exitChan <- true
|
||||
}
|
||||
}
|
||||
cs := cdrs.New(loggerDb, medi, cfg)
|
||||
cs := cdrs.New(cdrDb, medi, cfg)
|
||||
cs.StartCapturingCDRs()
|
||||
exitChan <- true
|
||||
}
|
||||
@@ -308,25 +308,28 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
var getter, loggerDb engine.DataStorage
|
||||
getter, err = engine.ConfigureDatabase(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass)
|
||||
var dataDb engine.DataStorage
|
||||
var logDb engine.LogStorage
|
||||
var loadDb engine.LoadStorage
|
||||
var cdrDb engine.CdrStorage
|
||||
dataDb, err = engine.ConfigureDataStorage(cfg.DataDBType, cfg.DataDBHost, cfg.DataDBPort, cfg.DataDBName, cfg.DataDBUser, cfg.DataDBPass)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
defer getter.Close()
|
||||
engine.SetDataStorage(getter)
|
||||
defer dataDb.Close()
|
||||
engine.SetDataStorage(dataDb)
|
||||
if cfg.StorDBType == SAME {
|
||||
loggerDb = getter
|
||||
logDb = dataDb.(engine.LogStorage)
|
||||
} else {
|
||||
loggerDb, err = engine.ConfigureDatabase(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass)
|
||||
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass)
|
||||
if err != nil { // Cannot configure logger database, show stopper
|
||||
engine.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
defer loggerDb.Close()
|
||||
engine.SetStorageLogger(loggerDb)
|
||||
defer logDb.Close()
|
||||
engine.SetStorageLogger(logDb)
|
||||
engine.SetRoundingMethodAndDecimals(cfg.RoundingMethod, cfg.RoundingDecimals)
|
||||
|
||||
if cfg.SMDebitInterval > 0 {
|
||||
@@ -341,16 +344,16 @@ func main() {
|
||||
go stopRaterSingnalHandler()
|
||||
}
|
||||
responder := &engine.Responder{ExitChan: exitChan}
|
||||
apier := &apier.ApierV1{StorDb: loggerDb, DataDb: getter}
|
||||
apier := &apier.ApierV1{StorDb: loadDb, DataDb: dataDb}
|
||||
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterListen != INTERNAL {
|
||||
engine.Logger.Info(fmt.Sprintf("Starting CGRateS Rater on %s.", cfg.RaterListen))
|
||||
go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding, getter, loggerDb)
|
||||
go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding, dataDb, logDb)
|
||||
}
|
||||
if cfg.BalancerEnabled {
|
||||
engine.Logger.Info(fmt.Sprintf("Starting CGRateS Balancer on %s.", cfg.BalancerListen))
|
||||
go stopBalancerSingnalHandler()
|
||||
responder.Bal = bal
|
||||
go listenToRPCRequests(responder, apier, cfg.BalancerListen, cfg.RPCEncoding, getter, loggerDb)
|
||||
go listenToRPCRequests(responder, apier, cfg.BalancerListen, cfg.RPCEncoding, dataDb, logDb)
|
||||
if cfg.RaterEnabled {
|
||||
engine.Logger.Info("Starting internal engine.")
|
||||
bal.AddClient("local", new(engine.ResponderWorker))
|
||||
@@ -361,28 +364,28 @@ func main() {
|
||||
engine.Logger.Info("Starting CGRateS Scheduler.")
|
||||
go func() {
|
||||
sched := scheduler.NewScheduler()
|
||||
go reloadSchedulerSingnalHandler(sched, getter)
|
||||
go reloadSchedulerSingnalHandler(sched, dataDb)
|
||||
apier.Sched = sched
|
||||
sched.LoadActionTimings(getter)
|
||||
sched.LoadActionTimings(dataDb)
|
||||
sched.Loop()
|
||||
}()
|
||||
}
|
||||
|
||||
if cfg.SMEnabled {
|
||||
engine.Logger.Info("Starting CGRateS SessionManager.")
|
||||
go startSessionManager(responder, loggerDb)
|
||||
go startSessionManager(responder, logDb)
|
||||
// close all sessions on shutdown
|
||||
go shutdownSessionmanagerSingnalHandler()
|
||||
}
|
||||
|
||||
if cfg.MediatorEnabled {
|
||||
engine.Logger.Info("Starting CGRateS Mediator.")
|
||||
go startMediator(responder, loggerDb)
|
||||
go startMediator(responder, logDb, cdrDb)
|
||||
}
|
||||
|
||||
if cfg.CDRSEnabled {
|
||||
engine.Logger.Info("Starting CGRateS CDR Server.")
|
||||
go startCDRS(responder, loggerDb)
|
||||
go startCDRS(responder, cdrDb)
|
||||
}
|
||||
|
||||
if cfg.HistoryServerEnabled || cfg.HistoryAgentEnabled {
|
||||
|
||||
@@ -66,18 +66,19 @@ func main() {
|
||||
return
|
||||
}
|
||||
var errDataDb, errStorDb, err error
|
||||
var dataDb, storDb engine.DataStorage
|
||||
var dataDb engine.DataStorage
|
||||
var storDb engine.LoadStorage
|
||||
// Init necessary db connections
|
||||
if *fromStorDb {
|
||||
dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass)
|
||||
storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass)
|
||||
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass)
|
||||
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass)
|
||||
} else if *toStorDb { // Import from csv files to storDb
|
||||
storDb, errStorDb = engine.ConfigureDatabase(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass)
|
||||
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass)
|
||||
} else { // Default load from csv files to dataDb
|
||||
dataDb, errDataDb = engine.ConfigureDatabase(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass)
|
||||
dataDb, errDataDb = engine.ConfigureDataStorage(*data_db_type, *data_db_host, *data_db_port, *data_db_name, *data_db_user, *data_db_pass)
|
||||
}
|
||||
// Defer databases opened to be closed when we are done
|
||||
for _, db := range []engine.DataStorage{dataDb, storDb} {
|
||||
for _, db := range []engine.Storage{dataDb, storDb} {
|
||||
if db != nil {
|
||||
defer db.Close()
|
||||
}
|
||||
|
||||
@@ -744,7 +744,7 @@ func TestActionTriggerLogging(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error getting actions for the action timing: ", err)
|
||||
}
|
||||
storageGetter.LogActionTrigger("rif", RATER_SOURCE, at, as)
|
||||
storageLogger.LogActionTrigger("rif", RATER_SOURCE, at, as)
|
||||
//expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0"
|
||||
var key string
|
||||
atMap, _ := storageGetter.GetAllActionTimings()
|
||||
@@ -784,7 +784,7 @@ func TestActionTimingLogging(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Error("Error getting actions for the action trigger: ", err)
|
||||
}
|
||||
storageGetter.LogActionTiming(SCHED_SOURCE, at, as)
|
||||
storageLogger.LogActionTiming(SCHED_SOURCE, at, as)
|
||||
//expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0"
|
||||
var key string
|
||||
atMap, _ := storageGetter.GetAllActionTimings()
|
||||
|
||||
@@ -36,6 +36,14 @@ func init() {
|
||||
Logger = new(utils.StdLogger)
|
||||
Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err))
|
||||
}
|
||||
//db_server := "127.0.0.1"
|
||||
//db_server := "192.168.0.17"
|
||||
m, _ := NewMapStorage()
|
||||
storageGetter, _ = m.(DataStorage)
|
||||
//storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "")
|
||||
//storageGetter, _ = NewRedisStorage(db_server+":6379", 11, "")
|
||||
|
||||
storageLogger = storageGetter.(LogStorage)
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -45,13 +53,9 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
Logger utils.LoggerInterface
|
||||
db_server = "127.0.0.1"
|
||||
//db_server = "192.168.0.17"
|
||||
storageGetter, _ = NewMapStorage()
|
||||
//storageGetter, _ = NewMongoStorage(db_server, "27017", "cgrates_test", "", "")
|
||||
//storageGetter, _ = NewRedisStorage(db_server+":6379", 11, "")
|
||||
storageLogger = storageGetter
|
||||
Logger utils.LoggerInterface
|
||||
storageGetter DataStorage
|
||||
storageLogger LogStorage
|
||||
debitPeriod = 10 * time.Second
|
||||
roundingMethod = "*middle"
|
||||
roundingDecimals = 4
|
||||
@@ -73,7 +77,7 @@ func SetRoundingMethodAndDecimals(rm string, rd int) {
|
||||
/*
|
||||
Sets the database for logging (can be de same as storage getter or different db)
|
||||
*/
|
||||
func SetStorageLogger(sg DataStorage) {
|
||||
func SetStorageLogger(sg LogStorage) {
|
||||
storageLogger = sg
|
||||
}
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ func populateDB() {
|
||||
},
|
||||
}
|
||||
if storageGetter != nil {
|
||||
storageGetter.Flush()
|
||||
storageGetter.(Storage).Flush()
|
||||
storageGetter.SetUserBalance(broker)
|
||||
storageGetter.SetUserBalance(minu)
|
||||
} else {
|
||||
|
||||
@@ -101,7 +101,7 @@ func (csvr *CSVReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
return errors.New("No database connection!")
|
||||
}
|
||||
if flush {
|
||||
storage.Flush()
|
||||
storage.(Storage).Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
|
||||
type DbReader struct {
|
||||
tpid string
|
||||
storDb DataStorage
|
||||
storDb LoadStorage
|
||||
dataDb DataStorage
|
||||
actions map[string][]*Action
|
||||
actionsTimings map[string][]*ActionTiming
|
||||
@@ -41,7 +41,7 @@ type DbReader struct {
|
||||
ratingProfiles map[string]*RatingProfile
|
||||
}
|
||||
|
||||
func NewDbReader(storDB DataStorage, storage DataStorage, tpid string) *DbReader {
|
||||
func NewDbReader(storDB LoadStorage, storage DataStorage, tpid string) *DbReader {
|
||||
c := new(DbReader)
|
||||
c.storDb = storDB
|
||||
c.dataDb = storage
|
||||
@@ -54,7 +54,7 @@ func NewDbReader(storDB DataStorage, storage DataStorage, tpid string) *DbReader
|
||||
func (dbr *DbReader) WriteToDatabase(flush, verbose bool) (err error) {
|
||||
storage := dbr.dataDb
|
||||
if flush {
|
||||
storage.Flush()
|
||||
storage.(Storage).Flush()
|
||||
}
|
||||
if verbose {
|
||||
log.Print("Destinations")
|
||||
|
||||
@@ -169,7 +169,7 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
|
||||
if rs.Bal != nil {
|
||||
rs.Bal.Shutdown("Responder.Shutdown")
|
||||
}
|
||||
storageGetter.Close()
|
||||
storageGetter.(Storage).Close()
|
||||
defer func() { rs.ExitChan <- true }()
|
||||
*reply = "Done!"
|
||||
return
|
||||
|
||||
@@ -47,16 +47,49 @@ const (
|
||||
RATER_SOURCE = "RAT"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
Close()
|
||||
Flush() error
|
||||
}
|
||||
|
||||
/*
|
||||
Interface for storage providers.
|
||||
*/
|
||||
type DataStorage interface {
|
||||
Close()
|
||||
Flush() error
|
||||
Storage
|
||||
GetRatingProfile(string) (*RatingProfile, error)
|
||||
SetRatingProfile(*RatingProfile) error
|
||||
GetDestination(string) (*Destination, error)
|
||||
SetDestination(*Destination) error
|
||||
// End Apier functions
|
||||
GetActions(string) (Actions, error)
|
||||
SetActions(string, Actions) error
|
||||
GetUserBalance(string) (*UserBalance, error)
|
||||
SetUserBalance(*UserBalance) error
|
||||
GetActionTimings(string) (ActionTimings, error)
|
||||
SetActionTimings(string, ActionTimings) error
|
||||
GetAllActionTimings() (map[string]ActionTimings, error)
|
||||
}
|
||||
|
||||
type CdrStorage interface {
|
||||
Storage
|
||||
SetCdr(utils.CDR) error
|
||||
SetRatedCdr(utils.CDR, *CallCost, string) error
|
||||
GetAllRatedCdr() ([]utils.CDR, error)
|
||||
}
|
||||
|
||||
type LogStorage interface {
|
||||
Storage
|
||||
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
|
||||
LogCallCost(uuid, source string, cc *CallCost) error
|
||||
LogError(uuid, source, errstr string) error
|
||||
LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error
|
||||
LogActionTiming(source string, at *ActionTiming, as Actions) error
|
||||
GetCallCostLog(uuid, source string) (*CallCost, error)
|
||||
}
|
||||
|
||||
type LoadStorage interface {
|
||||
Storage
|
||||
// Apier functions
|
||||
GetTPIds() ([]string, error)
|
||||
SetTPTiming(string, *Timing) error
|
||||
@@ -97,23 +130,6 @@ type DataStorage interface {
|
||||
ExistsTPAccountActions(string, string) (bool, error)
|
||||
SetTPAccountActions(string, map[string]*AccountAction) error
|
||||
GetTPAccountActionIds(string) ([]string, error)
|
||||
// End Apier functions
|
||||
GetActions(string) (Actions, error)
|
||||
SetActions(string, Actions) error
|
||||
GetUserBalance(string) (*UserBalance, error)
|
||||
SetUserBalance(*UserBalance) error
|
||||
GetActionTimings(string) (ActionTimings, error)
|
||||
SetActionTimings(string, ActionTimings) error
|
||||
GetAllActionTimings() (map[string]ActionTimings, error)
|
||||
SetCdr(utils.CDR) error
|
||||
SetRatedCdr(utils.CDR, *CallCost, string) error
|
||||
GetAllRatedCdr() ([]utils.CDR, error)
|
||||
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
|
||||
LogCallCost(uuid, source string, cc *CallCost) error
|
||||
LogError(uuid, source, errstr string) error
|
||||
LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error
|
||||
LogActionTiming(source string, at *ActionTiming, as Actions) error
|
||||
GetCallCostLog(uuid, source string) (*CallCost, error)
|
||||
// loader functions
|
||||
GetTpDestinations(string, string) ([]*Destination, error)
|
||||
GetTpTimings(string, string) (map[string]*Timing, error)
|
||||
|
||||
@@ -32,7 +32,7 @@ type MapStorage struct {
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
func NewMapStorage() (DataStorage, error) {
|
||||
func NewMapStorage() (Storage, error) {
|
||||
return &MapStorage{dict: make(map[string][]byte), ms: new(MsgpackMarshaler)}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ type MongoStorage struct {
|
||||
db *mgo.Database
|
||||
}
|
||||
|
||||
func NewMongoStorage(host, port, db, user, pass string) (DataStorage, error) {
|
||||
func NewMongoStorage(host, port, db, user, pass string) (Storage, error) {
|
||||
dial := fmt.Sprintf(host)
|
||||
if user != "" && pass != "" {
|
||||
dial = fmt.Sprintf("%s:%s@%s", user, pass, dial)
|
||||
|
||||
@@ -28,7 +28,7 @@ type MySQLStorage struct {
|
||||
*SQLStorage
|
||||
}
|
||||
|
||||
func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) {
|
||||
func NewMySQLStorage(host, port, name, user, password string) (Storage, error) {
|
||||
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", user, password, host, port, name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -28,7 +28,7 @@ type PostgresStorage struct {
|
||||
*SQLStorage
|
||||
}
|
||||
|
||||
func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) {
|
||||
func NewPostgresStorage(host, port, name, user, password string) (Storage, error) {
|
||||
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, name, user, password))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -35,7 +35,7 @@ type RedisStorage struct {
|
||||
ms Marshaler
|
||||
}
|
||||
|
||||
func NewRedisStorage(address string, db int, pass string) (DataStorage, error) {
|
||||
func NewRedisStorage(address string, db int, pass string) (Storage, error) {
|
||||
addrSplit := strings.Split(address, ":")
|
||||
host := addrSplit[0]
|
||||
port := 6379
|
||||
|
||||
@@ -30,7 +30,9 @@ type SQLStorage struct {
|
||||
Db *sql.DB
|
||||
}
|
||||
|
||||
func (self *SQLStorage) Close() {}
|
||||
func (self *SQLStorage) Close() {
|
||||
self.Close()
|
||||
}
|
||||
|
||||
func (self *SQLStorage) Flush() (err error) {
|
||||
return
|
||||
|
||||
@@ -26,7 +26,8 @@ import (
|
||||
|
||||
// Various helpers to deal with database
|
||||
|
||||
func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataStorage, err error) {
|
||||
func ConfigureDataStorage(db_type, host, port, name, user, pass string) (db DataStorage, err error) {
|
||||
var d Storage
|
||||
switch db_type {
|
||||
case utils.REDIS:
|
||||
var db_nb int
|
||||
@@ -38,13 +39,80 @@ func ConfigureDatabase(db_type, host, port, name, user, pass string) (db DataSto
|
||||
if port != "" {
|
||||
host += ":" + port
|
||||
}
|
||||
db, err = NewRedisStorage(host, db_nb, pass)
|
||||
d, err = NewRedisStorage(host, db_nb, pass)
|
||||
db = d.(DataStorage)
|
||||
case utils.MONGO:
|
||||
db, err = NewMongoStorage(host, port, name, user, pass)
|
||||
case utils.POSTGRES:
|
||||
db, err = NewPostgresStorage(host, port, name, user, pass)
|
||||
case utils.MYSQL:
|
||||
db, err = NewMySQLStorage(host, port, name, user, pass)
|
||||
d, err = NewMongoStorage(host, port, name, user, pass)
|
||||
db = d.(DataStorage)
|
||||
default:
|
||||
err = errors.New("unknown db")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func ConfigureLogStorage(db_type, host, port, name, user, pass string) (db LogStorage, err error) {
|
||||
var d Storage
|
||||
switch db_type {
|
||||
case utils.REDIS:
|
||||
var db_nb int
|
||||
db_nb, err = strconv.Atoi(name)
|
||||
if err != nil {
|
||||
Logger.Crit("Redis db name must be an integer!")
|
||||
return nil, err
|
||||
}
|
||||
if port != "" {
|
||||
host += ":" + port
|
||||
}
|
||||
d, err = NewRedisStorage(host, db_nb, pass)
|
||||
db = d.(LogStorage)
|
||||
case utils.MONGO:
|
||||
d, err = NewMongoStorage(host, port, name, user, pass)
|
||||
db = d.(LogStorage)
|
||||
case utils.POSTGRES:
|
||||
d, err = NewPostgresStorage(host, port, name, user, pass)
|
||||
db = d.(LogStorage)
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass)
|
||||
db = d.(LogStorage)
|
||||
default:
|
||||
err = errors.New("unknown db")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func ConfigureLoadStorage(db_type, host, port, name, user, pass string) (db LoadStorage, err error) {
|
||||
var d Storage
|
||||
switch db_type {
|
||||
case utils.POSTGRES:
|
||||
d, err = NewPostgresStorage(host, port, name, user, pass)
|
||||
db = d.(LoadStorage)
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass)
|
||||
db = d.(LoadStorage)
|
||||
default:
|
||||
err = errors.New("unknown db")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func ConfigureCdrStorage(db_type, host, port, name, user, pass string) (db CdrStorage, err error) {
|
||||
var d Storage
|
||||
switch db_type {
|
||||
case utils.POSTGRES:
|
||||
d, err = NewPostgresStorage(host, port, name, user, pass)
|
||||
db = d.(CdrStorage)
|
||||
case utils.MYSQL:
|
||||
d, err = NewMySQLStorage(host, port, name, user, pass)
|
||||
db = d.(CdrStorage)
|
||||
default:
|
||||
err = errors.New("unknown db")
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
// Import tariff plan from csv into storDb
|
||||
type TPCSVImporter struct {
|
||||
TPid string // Load data on this tpid
|
||||
StorDb DataStorage // StorDb connection handle
|
||||
StorDb LoadStorage // StorDb connection handle
|
||||
DirPath string // Directory path to import from
|
||||
Sep rune // Separator in the csv file
|
||||
Verbose bool // If true will print a detailed information instead of silently discarding it
|
||||
|
||||
@@ -35,10 +35,10 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewMediator(connector engine.Connector, storDb engine.DataStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
|
||||
func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engine.CdrStorage, cfg *config.CGRConfig) (m *Mediator, err error) {
|
||||
m = &Mediator{
|
||||
connector: connector,
|
||||
storDb: storDb,
|
||||
logDb: logDb,
|
||||
cgrCfg: cfg,
|
||||
}
|
||||
m.fieldNames = make(map[string][]string)
|
||||
@@ -52,7 +52,8 @@ func NewMediator(connector engine.Connector, storDb engine.DataStorage, cfg *con
|
||||
|
||||
type Mediator struct {
|
||||
connector engine.Connector
|
||||
storDb engine.DataStorage
|
||||
logDb engine.LogStorage
|
||||
cdrDb engine.CdrStorage
|
||||
cgrCfg *config.CGRConfig
|
||||
cdrInDir, cdrOutDir string
|
||||
accIdField string
|
||||
@@ -166,8 +167,8 @@ func (self *Mediator) TrackCDRFiles() (err error) {
|
||||
// Retrive the cost from logging database
|
||||
func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) {
|
||||
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
|
||||
cc, err = self.storDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log?
|
||||
if cc != nil { // There were no errors, chances are that we got what we are looking for
|
||||
cc, err = self.logDb.GetCallCostLog(cdr.GetCgrId(), engine.SESSION_MANAGER_SOURCE) //ToDo: What are we getting when there is no log?
|
||||
if cc != nil { // There were no errors, chances are that we got what we are looking for
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Duration(i) * time.Second)
|
||||
@@ -204,10 +205,10 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error)
|
||||
err = self.connector.GetCost(cd, cc)
|
||||
}
|
||||
if err != nil {
|
||||
self.storDb.LogError(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, err.Error())
|
||||
self.logDb.LogError(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, err.Error())
|
||||
} else {
|
||||
// If the mediator calculated a price it will write it to logdb
|
||||
self.storDb.LogCallCost(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, cc)
|
||||
self.logDb.LogCallCost(cdr.GetCgrId(), engine.MEDIATOR_SOURCE, cc)
|
||||
}
|
||||
return cc, err
|
||||
}
|
||||
@@ -273,9 +274,9 @@ func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error {
|
||||
func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.CdrStorage) error {
|
||||
var qryCC *engine.CallCost
|
||||
cc := &engine.CallCost{Cost:-1}
|
||||
cc := &engine.CallCost{Cost: -1}
|
||||
var errCost error
|
||||
if cdr.GetReqType() == utils.PREPAID || cdr.GetReqType() == utils.POSTPAID {
|
||||
// Should be previously calculated and stored in DB
|
||||
@@ -293,5 +294,5 @@ func (self *Mediator) MediateDBCDR(cdr utils.CDR, db engine.DataStorage) error {
|
||||
if errCost != nil {
|
||||
extraInfo = errCost.Error()
|
||||
}
|
||||
return self.storDb.SetRatedCdr(cdr, cc, extraInfo)
|
||||
return self.cdrDb.SetRatedCdr(cdr, cc, extraInfo)
|
||||
}
|
||||
|
||||
@@ -42,10 +42,10 @@ type FSSessionManager struct {
|
||||
sessions []*Session
|
||||
connector engine.Connector
|
||||
debitPeriod time.Duration
|
||||
loggerDB engine.DataStorage
|
||||
loggerDB engine.LogStorage
|
||||
}
|
||||
|
||||
func NewFSSessionManager(storage engine.DataStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager {
|
||||
func NewFSSessionManager(storage engine.LogStorage, connector engine.Connector, debitPeriod time.Duration) *FSSessionManager {
|
||||
return &FSSessionManager{loggerDB: storage, connector: connector, debitPeriod: debitPeriod}
|
||||
}
|
||||
|
||||
@@ -344,7 +344,7 @@ func (sm *FSSessionManager) GetDebitPeriod() time.Duration {
|
||||
return sm.debitPeriod
|
||||
}
|
||||
|
||||
func (sm *FSSessionManager) GetDbLogger() engine.DataStorage {
|
||||
func (sm *FSSessionManager) GetDbLogger() engine.LogStorage {
|
||||
return sm.loggerDB
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,6 @@ type SessionManager interface {
|
||||
RemoveSession(*Session)
|
||||
LoopAction(*Session, *engine.CallDescriptor, float64)
|
||||
GetDebitPeriod() time.Duration
|
||||
GetDbLogger() engine.DataStorage
|
||||
GetDbLogger() engine.LogStorage
|
||||
Shutdown() error
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user