From 8e4573cc66aa487051446decad8de345efbf5805 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Mon, 18 Jul 2016 21:40:25 +0300 Subject: [PATCH] replaced logdb with pubsub --- apier/v1/apier.go | 1 - cmd/cgr-engine/cgr-engine.go | 18 ++++++++---------- cmd/cgr-engine/rater.go | 5 ++--- engine/action_plan.go | 6 +++++- engine/action_trigger.go | 6 +++++- engine/actions_test.go | 13 ++++++++++--- engine/calldesc.go | 9 --------- engine/responder.go | 1 - engine/storage_interface.go | 7 ------- engine/storage_map.go | 30 ------------------------------ engine/storage_mongo_datadb.go | 2 -- engine/storage_mongo_stordb.go | 23 ----------------------- engine/storage_redis.go | 24 ------------------------ engine/storage_test.go | 1 - engine/storage_utils.go | 4 ++-- utils/events.go | 2 ++ 16 files changed, 34 insertions(+), 118 deletions(-) diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 07b204da4..9749f97e9 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -43,7 +43,6 @@ type ApierV1 struct { RatingDb engine.RatingStorage AccountDb engine.AccountingStorage CdrDb engine.CdrStorage - LogDb engine.LogStorage Sched *scheduler.Scheduler Config *config.CGRConfig Responder *engine.Responder diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 48b1ae840..26da8300c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -289,7 +289,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien exitChan <- true } -func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine.LogStorage, cdrDb engine.CdrStorage, +func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, internalRaterChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, internalCdrStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { @@ -514,7 +514,6 @@ func main() { } var ratingDb engine.RatingStorage var accountDb engine.AccountingStorage - var logDb engine.LogStorage var loadDb engine.LoadStorage var cdrDb engine.CdrStorage if cfg.RALsEnabled || cfg.SchedulerEnabled || cfg.CDRStatsEnabled { // Only connect to dataDb if necessary @@ -542,17 +541,16 @@ func main() { } } if cfg.RALsEnabled || cfg.CDRSEnabled || cfg.SchedulerEnabled { // Only connect to storDb if necessary - logDb, err = engine.ConfigureLogStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, + storDb, err := engine.ConfigureStorStorage(cfg.StorDBType, cfg.StorDBHost, cfg.StorDBPort, cfg.StorDBName, cfg.StorDBUser, cfg.StorDBPass, cfg.DBDataEncoding, cfg.StorDBMaxOpenConns, cfg.StorDBMaxIdleConns, cfg.StorDBCDRSIndexes) if err != nil { // Cannot configure logger database, show stopper utils.Logger.Crit(fmt.Sprintf("Could not configure logger database: %s exiting!", err)) return } - defer logDb.Close() - engine.SetStorageLogger(logDb) - // loadDb,cdrDb and logDb are all mapped on the same stordb storage - loadDb = logDb.(engine.LoadStorage) - cdrDb = logDb.(engine.CdrStorage) + defer storDb.Close() + // loadDb,cdrDb and storDb are all mapped on the same stordb storage + loadDb = storDb.(engine.LoadStorage) + cdrDb = storDb.(engine.CdrStorage) engine.SetCdrStorage(cdrDb) } @@ -586,7 +584,7 @@ func main() { // Start rater service if cfg.RALsEnabled { go startRater(internalRaterChan, cacheDoneChan, internalBalancerChan, internalSchedulerChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, - server, ratingDb, accountDb, loadDb, cdrDb, logDb, &stopHandled, exitChan) + server, ratingDb, accountDb, loadDb, cdrDb, &stopHandled, exitChan) } // Start Scheduler @@ -596,7 +594,7 @@ func main() { // Start CDR Server if cfg.CDRSEnabled { - go startCDRS(internalCdrSChan, logDb, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan) + go startCDRS(internalCdrSChan, cdrDb, internalRaterChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, server, exitChan) } // Start CDR Stats server diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index cbc8fbd1e..7b7533871 100644 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -55,8 +55,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC internalCdrStatSChan chan rpcclient.RpcClientConnection, internalHistorySChan chan rpcclient.RpcClientConnection, internalPubSubSChan chan rpcclient.RpcClientConnection, internalUserSChan chan rpcclient.RpcClientConnection, internalAliaseSChan chan rpcclient.RpcClientConnection, server *utils.Server, - ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, logDb engine.LogStorage, - stopHandled *bool, exitChan chan bool) { + ratingDb engine.RatingStorage, accountDb engine.AccountingStorage, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) { var waitTasks []chan struct{} //Cache load @@ -209,7 +208,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } responder := &engine.Responder{Bal: bal, ExitChan: exitChan} responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) - apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, LogDb: logDb, Sched: sched, + apierRpcV1 := &v1.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Sched: sched, Config: cfg, Responder: responder} if cdrStats != nil { // ToDo: Fix here properly the init of stats responder.Stats = cdrStats diff --git a/engine/action_plan.go b/engine/action_plan.go index 563ea9413..a88bb6500 100644 --- a/engine/action_plan.go +++ b/engine/action_plan.go @@ -360,7 +360,11 @@ func (at *ActionTiming) Execute() (err error) { utils.Logger.Warning(fmt.Sprintf("Error executing action plan: %v", err)) return err } - storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, aac) + Publish(CgrEvent{ + "Uuid": at.Uuid, + "Id": at.actionPlanID, + "ActionIds": at.ActionsID, + }) return } diff --git a/engine/action_trigger.go b/engine/action_trigger.go index 3843dd2d3..409e7d22a 100644 --- a/engine/action_trigger.go +++ b/engine/action_trigger.go @@ -107,7 +107,11 @@ func (at *ActionTrigger) Execute(ub *Account, sq *StatsQueueTriggered) (err erro at.Executed = false } if !transactionFailed && ub != nil && !removeAccountActionFound { - storageLogger.LogActionTrigger(ub.ID, utils.RATER_SOURCE, at, aac) + Publish(CgrEvent{ + "Uuid": at.UniqueID, + "Id": at.ID, + "ActionIds": at.ActionsID, + }) accountingStorage.SetAccount(ub) } return diff --git a/engine/actions_test.go b/engine/actions_test.go index 7ad8fd533..157ce5d44 100644 --- a/engine/actions_test.go +++ b/engine/actions_test.go @@ -1059,7 +1059,11 @@ func TestActionTriggerLogging(t *testing.T) { if err != nil { t.Error("Error getting actions for the action timing: ", as, err) } - storageLogger.LogActionTrigger("rif", utils.RATER_SOURCE, at, as) + Publish(CgrEvent{ + "Uuid": at.UniqueID, + "Id": at.ID, + "ActionIds": at.ActionsID, + }) //expected := "rif*some_uuid;MONETARY;OUT;NAT;TEST_ACTIONS;100;10;false*|TOPUP|MONETARY|OUT|10|0" var key string atMap, _ := ratingStorage.GetAllActionPlans() @@ -1097,11 +1101,14 @@ func TestActionPlanLogging(t *testing.T) { Weight: 10.0, ActionsID: "TEST_ACTIONS", } - as, err := ratingStorage.GetActions(at.ActionsID, false) if err != nil { t.Error("Error getting actions for the action trigger: ", err) } - storageLogger.LogActionTiming(utils.SCHED_SOURCE, at, as) + Publish(CgrEvent{ + "Uuid": at.Uuid, + "Id": at.actionPlanID, + "ActionIds": at.ActionsID, + }) //expected := "some uuid|test|one,two,three|;1,2,3,4,5,6,7,8,9,10,11,12;1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31;1,2,3,4,5;18:00:00;00:00:00;10;0;1;60;1|10|TEST_ACTIONS*|TOPUP|MONETARY|OUT|10|0" var key string atMap, _ := ratingStorage.GetAllActionPlans() diff --git a/engine/calldesc.go b/engine/calldesc.go index fea8ebe98..b2e804ce1 100644 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -64,13 +64,11 @@ func init() { log.Fatal(err) } } - storageLogger = ratingStorage.(LogStorage) } var ( ratingStorage RatingStorage accountingStorage AccountingStorage - storageLogger LogStorage cdrStorage CdrStorage debitPeriod = 10 * time.Second globalRoundingDecimals = 6 @@ -104,13 +102,6 @@ func SetLcrSubjectPrefixMatching(flag bool) { lcrSubjectPrefixMatching = flag } -/* -Sets the database for logging (can be de same as storage getter or different db) -*/ -func SetStorageLogger(sg LogStorage) { - storageLogger = sg -} - /* Sets the database for CDR storing, used by *cdrlog in first place */ diff --git a/engine/responder.go b/engine/responder.go index 2ff31e60b..6de84a947 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -602,7 +602,6 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) { } ratingStorage.Close() accountingStorage.Close() - storageLogger.Close() cdrStorage.Close() defer func() { rs.ExitChan <- true }() *reply = "Done!" diff --git a/engine/storage_interface.go b/engine/storage_interface.go index b56ae0411..3e1553636 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -106,13 +106,6 @@ type CdrStorage interface { GetCDRs(*utils.CDRsFilter, bool) ([]*CDR, int64, error) } -type LogStorage interface { - Storage - //GetAllActionTimingsLogs() (map[string]ActionsTimings, error) - LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) error - LogActionTiming(source string, at *ActionTiming, as Actions) error -} - type LoadStorage interface { Storage LoadReader diff --git a/engine/storage_map.go b/engine/storage_map.go index b4f12aafd..8dc7db777 100644 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -963,36 +963,6 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin return } -func (ms *MapStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - ms.mu.Lock() - defer ms.mu.Unlock() - mat, err := ms.ms.Marshal(at) - if err != nil { - return - } - mas, err := ms.ms.Marshal(&as) - if err != nil { - return - } - ms.dict[utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s*%s", ubId, string(mat), string(mas))) - return -} - -func (ms *MapStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - ms.mu.Lock() - defer ms.mu.Unlock() - mat, err := ms.ms.Marshal(at) - if err != nil { - return - } - mas, err := ms.ms.Marshal(&as) - if err != nil { - return - } - ms.dict[utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano)] = []byte(fmt.Sprintf("%s*%s", string(mat), string(mas))) - return -} - func (ms *MapStorage) SetStructVersion(v *StructVersion) (err error) { ms.mu.Lock() defer ms.mu.Unlock() diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 4ea52887b..5a7d6eeb2 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -50,8 +50,6 @@ const ( colUsr = "users" colCrs = "cdr_stats" colLht = "load_history" - colLogAtr = "action_trigger_logs" - colLogApl = "action_plan_logs" colLogErr = "error_logs" colVer = "versions" ) diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index b07c8ba65..d8e91917a 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -736,29 +736,6 @@ func (ms *MongoStorage) SetTpAccountActions(tps []TpAccountAction) error { return err } -func (ms *MongoStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - session, col := ms.conn(colLogAtr) - defer session.Close() - return col.Insert(&struct { - ubId string - ActionTrigger *ActionTrigger - Actions Actions - LogTime time.Time - Source string - }{ubId, at, as, time.Now(), source}) -} - -func (ms *MongoStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - session, col := ms.conn(colLogApl) - defer session.Close() - return col.Insert(&struct { - ActionPlan *ActionTiming - Actions Actions - LogTime time.Time - Source string - }{at, as, time.Now(), source}) -} - func (ms *MongoStorage) SetSMCost(smc *SMCost) error { session, col := ms.conn(utils.TBLSMCosts) defer session.Close() diff --git a/engine/storage_redis.go b/engine/storage_redis.go index c989301c4..f67296c3e 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1196,30 +1196,6 @@ func (rs *RedisStorage) GetCallCostLog(cgrid, source, runid string) (cc *CallCos return } -func (rs *RedisStorage) LogActionTrigger(ubId, source string, at *ActionTrigger, as Actions) (err error) { - mat, err := rs.ms.Marshal(at) - if err != nil { - return - } - mas, err := rs.ms.Marshal(as) - if err != nil { - return - } - return rs.db.Cmd("SET", utils.LOG_ACTION_TRIGGER_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v*%v", ubId, string(mat), string(mas)))).Err -} - -func (rs *RedisStorage) LogActionTiming(source string, at *ActionTiming, as Actions) (err error) { - mat, err := rs.ms.Marshal(at) - if err != nil { - return - } - mas, err := rs.ms.Marshal(as) - if err != nil { - return - } - return rs.db.Cmd("SET", utils.LOG_ACTION_TIMMING_PREFIX+source+"_"+time.Now().Format(time.RFC3339Nano), []byte(fmt.Sprintf("%v*%v", string(mat), string(mas)))).Err -} - func (rs *RedisStorage) SetStructVersion(v *StructVersion) (err error) { var result []byte result, err = rs.ms.Marshal(v) diff --git a/engine/storage_test.go b/engine/storage_test.go index 8aeb0c4ac..491693041 100644 --- a/engine/storage_test.go +++ b/engine/storage_test.go @@ -253,7 +253,6 @@ func TestStoreInterfaces(t *testing.T) { var _ AccountingStorage = rds sql := new(SQLStorage) var _ CdrStorage = sql - var _ LogStorage = sql } func TestDifferentUuid(t *testing.T) { diff --git a/engine/storage_utils.go b/engine/storage_utils.go index eda2205b0..d7bc99975 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -82,8 +82,8 @@ func ConfigureAccountingStorage(db_type, host, port, name, user, pass, marshaler return d, nil } -func ConfigureLogStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn int, cdrsIndexes []string) (db LogStorage, err error) { - var d LogStorage +func ConfigureStorStorage(db_type, host, port, name, user, pass, marshaler string, maxConn, maxIdleConn int, cdrsIndexes []string) (db Storage, err error) { + var d Storage switch db_type { /* case utils.REDIS: diff --git a/utils/events.go b/utils/events.go index 98007c821..d303b1e70 100644 --- a/utils/events.go +++ b/utils/events.go @@ -2,4 +2,6 @@ package utils const ( EVT_ACCOUNT_BALANCE_MODIFIED = "ACCOUNT_BALANCE_MODIFIED" + EVT_ACTION_TRIGGER_FIRED = "ACTION_TRIGGER_FIRED" + EVT_ACTION_TIMING_FIRED = "ACTION_TRIGGER_FIRED" )