replaced logdb with pubsub

This commit is contained in:
Radu Ioan Fericean
2016-07-18 21:40:25 +03:00
parent f4b3839bff
commit 8e4573cc66
16 changed files with 34 additions and 118 deletions

View File

@@ -43,7 +43,6 @@ type ApierV1 struct {
RatingDb engine.RatingStorage
AccountDb engine.AccountingStorage
CdrDb engine.CdrStorage
LogDb engine.LogStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
Responder *engine.Responder

View File

@@ -289,7 +289,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
exitChan <- true
}
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine.LogStorage, cdrDb engine.CdrStorage,
func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage,
internalRaterChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection,
internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) {
@@ -514,7 +514,6 @@ func main() {
}
var ratingDb engine.RatingStorage
var accountDb engine.AccountingStorage
var logDb engine.LogStorage
var loadDb engine.LoadStorage
var cdrDb engine.CdrStorage
if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary
@@ -542,17 +541,16 @@ func main() {
}
}
if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary
logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort,
cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns, cfg.StorDBCDRSIndexes)
if err != nil { // Cannot configure logger database, show stopper
utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err))
return
}
defer logDb.Close()
engine.SetStorageLogger(logDb)
// loadDb,cdrDb and logDb are all mapped on the same stordb storage
loadDb = logDb.(engine.LoadStorage)
cdrDb = logDb.(engine.CdrStorage)
defer storDb.Close()
// loadDb,cdrDb and storDb are all mapped on the same stordb storage
loadDb = storDb.(engine.LoadStorage)
cdrDb = storDb.(engine.CdrStorage)
engine.SetCdrStorage(cdrDb)
}
@@ -586,7 +584,7 @@ func main() {
// Start rater service
if cfg.RALsEnabled {
go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan)
server, ratingDb, accountDb, loadDb, cdrDb, &stopHandled, exitChan)
}
// Start Scheduler
@@ -596,7 +594,7 @@ func main() {
// Start CDR Server
if cfg.CDRSEnabled {
go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan)
go startCDRS(internalCdrSChan, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan)
}
// Start CDR Stats server

View File

@@ -55,8 +55,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection,
internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection,
server *utils.Server,
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage,
stopHandled *bool, exitChan chan bool) {
ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
var waitTasks []chan struct{}
//Cache load
@@ -209,7 +208,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
responder := &engine.Responder{Bal: bal, ExitChan: exitChan}
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched,
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Sched: sched,
Config: cfg, Responder: responder}
if cdrStats != nil { // ToDo: Fix here properly the init of stats
responder.Stats = cdrStats

View File

@@ -360,7 +360,11 @@ func (at *ActionTiming) Execute() (err error) {
utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err))
return err
}
storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, aac)
Publish(CgrEvent{
"Uuid": at.Uuid,
"Id": at.actionPlanID,
"ActionIds": at.ActionsID,
})
return
}

View File

@@ -107,7 +107,11 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro
at.Executed = false
}
if !transactionFailed && ub != nil && !removeAccountActionFound {
storageLogger.LogActionTrigger(ub.ID, utils.RATER_SOURCE, at, aac)
Publish(CgrEvent{
"Uuid": at.UniqueID,
"Id": at.ID,
"ActionIds": at.ActionsID,
})
accountingStorage.SetAccount(ub)
}
return

View File

@@ -1059,7 +1059,11 @@ func TestActionTriggerLogging(t *testing.T) {
if err != nil {
t.Error("Error getting actions for the action timing: ", as, err)
}
storageLogger.LogActionTrigger("rif", utils.RATER_SOURCE, at, as)
Publish(CgrEvent{
"Uuid": at.UniqueID,
"Id": at.ID,
"ActionIds": at.ActionsID,
})
//expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0"
var key string
atMap, _ := ratingStorage.GetAllActionPlans()
@@ -1097,11 +1101,14 @@ func TestActionPlanLogging(t *testing.T) {
Weight: 10.0,
ActionsID: "TEST_ACTIONS",
}
as, err := ratingStorage.GetActions(at.ActionsID, false)
if err != nil {
t.Error("Error getting actions for the action trigger: ", err)
}
storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, as)
Publish(CgrEvent{
"Uuid": at.Uuid,
"Id": at.actionPlanID,
"ActionIds": at.ActionsID,
})
//expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0"
var key string
atMap, _ := ratingStorage.GetAllActionPlans()

View File

@@ -64,13 +64,11 @@ func init() {
log.Fatal(err)
}
}
storageLogger = ratingStorage.(LogStorage)
}
var (
ratingStorage RatingStorage
accountingStorage AccountingStorage
storageLogger LogStorage
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 6
@@ -104,13 +102,6 @@ func SetLcrSubjectPrefixMatching(flag bool) {
lcrSubjectPrefixMatching = flag
}
/*
Sets the database for logging (can be de same as storage getter or different db)
*/
func SetStorageLogger(sg LogStorage) {
storageLogger = sg
}
/*
Sets the database for CDR storing, used by *cdrlog in first place
*/

View File

@@ -602,7 +602,6 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
}
ratingStorage.Close()
accountingStorage.Close()
storageLogger.Close()
cdrStorage.Close()
defer func() { rs.ExitChan <- true }()
*reply = "Done!"

View File

@@ -106,13 +106,6 @@ type CdrStorage interface {
GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error)
}
type LogStorage interface {
Storage
//GetAllActionTimingsLogs() (map[string]ActionsTimings, error)
LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error
LogActionTiming(source string, at *ActionTiming, as Actions) error
}
type LoadStorage interface {
Storage
LoadReader

View File

@@ -963,36 +963,6 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin
return
}
func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return
}
mas, err := ms.ms.Marshal(&as)
if err != nil {
return
}
ms.dict[utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas)))
return
}
func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
mat, err := ms.ms.Marshal(at)
if err != nil {
return
}
mas, err := ms.ms.Marshal(&as)
if err != nil {
return
}
ms.dict[utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas)))
return
}
func (ms *MapStorage) SetStructVersion(v *StructVersion) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()

View File

@@ -50,8 +50,6 @@ const (
colUsr = "users"
colCrs = "cdr_stats"
colLht = "load_history"
colLogAtr = "action_trigger_logs"
colLogApl = "action_plan_logs"
colLogErr = "error_logs"
colVer = "versions"
)

View File

@@ -736,29 +736,6 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error {
return err
}
func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
session, col := ms.conn(colLogAtr)
defer session.Close()
return col.Insert(&struct {
ubId string
ActionTrigger *ActionTrigger
Actions Actions
LogTime time.Time
Source string
}{ubId, at, as, time.Now(), source})
}
func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
session, col := ms.conn(colLogApl)
defer session.Close()
return col.Insert(&struct {
ActionPlan *ActionTiming
Actions Actions
LogTime time.Time
Source string
}{at, as, time.Now(), source})
}
func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
session, col := ms.conn(utils.TBLSMCosts)
defer session.Close()

View File

@@ -1196,30 +1196,6 @@ func (rs *RedisStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCos
return
}
func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return
}
mas, err := rs.ms.Marshal(as)
if err != nil {
return
}
return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err
}
func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) {
mat, err := rs.ms.Marshal(at)
if err != nil {
return
}
mas, err := rs.ms.Marshal(as)
if err != nil {
return
}
return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err
}
func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) {
var result []byte
result, err = rs.ms.Marshal(v)

View File

@@ -253,7 +253,6 @@ func TestStoreInterfaces(t *testing.T) {
var _ AccountingStorage = rds
sql := new(SQLStorage)
var _ CdrStorage = sql
var _ LogStorage = sql
}
func TestDifferentUuid(t *testing.T) {

View File

@@ -82,8 +82,8 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler
return d, nil
}
func ConfigureLogStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn int, cdrsIndexes []string) (db LogStorage, err error) {
var d LogStorage
func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn int, cdrsIndexes []string) (db Storage, err error) {
var d Storage
switch db_type {
/*
case utils.REDIS:

View File

@@ -2,4 +2,6 @@ package utils
const (
EVT_ACCOUNT_BALANCE_MODIFIED = "ACCOUNT_BALANCE_MODIFIED"
EVT_ACTION_TRIGGER_FIRED = "ACTION_TRIGGER_FIRED"
EVT_ACTION_TIMING_FIRED = "ACTION_TRIGGER_FIRED"
)