From 3c6b016a77bf2d45e8228d1dcb4ec29fd24fc520 Mon Sep 17 00:00:00 2001 From: DanB Date: Sat, 21 Oct 2017 21:23:59 +0200 Subject: [PATCH] ThresholdS support in balance updates --- apier/v1/thresholds_it_test.go | 13 ++- cmd/cgr-engine/cgr-engine.go | 3 +- cmd/cgr-engine/rater.go | 29 ++++++- data/conf/samples/tutmongo/cgrates.json | 3 + data/conf/samples/tutmysql/cgrates.json | 3 + data/conf/samples/tutpostgres/cgrates.json | 5 +- data/tariffplans/tutorial/Thresholds.csv | 4 +- engine/balances.go | 95 ++++++++++++++++------ engine/calldesc.go | 5 ++ utils/consts.go | 2 +- 10 files changed, 121 insertions(+), 41 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 592d6fa0d..8481f96ca 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -55,13 +55,12 @@ var tEvs = []*engine.ThresholdEvent{ Tenant: "cgrates.org", ID: "event2", Fields: map[string]interface{}{ - utils.EventType: utils.BalanceUpdate, - utils.EventSource: utils.AccountService, - utils.ACCOUNT: "1002", - utils.BalanceType: utils.MONETARY, - utils.BalanceID: utils.META_DEFAULT, - utils.BalanceValue: 12.3, - utils.ExpiryTime: "2009-11-10T23:00:00Z"}}, + utils.EventType: utils.BalanceUpdate, + utils.EventSource: utils.AccountService, + utils.ACCOUNT: "1002", + utils.BalanceID: utils.META_DEFAULT, + utils.Units: 12.3, + utils.ExpiryTime: "2009-11-10T23:00:00Z"}}, &engine.ThresholdEvent{ // hitting THD_STATS_1 Tenant: "cgrates.org", ID: "event3", diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 7155897f3..ff3c40776 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -790,7 +790,8 @@ func main() { // Start rater service if cfg.RALsEnabled { - go startRater(internalRaterChan, cacheDoneChan, internalCdrStatSChan, internalStatSChan, + go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan, + internalCdrStatSChan, internalStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, srvManager, server, dm, loadDb, cdrDb, &stopHandled, exitChan) } diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index eaa5802e6..716c51f18 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -32,7 +32,7 @@ import ( // Starts rater and reports on chan func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{}, - internalCdrStatSChan, internalStatSChan, internalHistorySChan, + internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection, serviceManager *servmanager.ServiceManager, server *utils.Server, dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) { @@ -99,6 +99,22 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC cacheDoneChan <- struct{}{} }() + var thdS *rpcclient.RpcClientPool + if len(cfg.RALsThresholdSConns) != 0 { // Connections to ThresholdS + thdsTaskChan := make(chan struct{}) + waitTasks = append(waitTasks, thdsTaskChan) + go func() { + defer close(thdsTaskChan) + thdS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.RALsThresholdSConns, internalThdSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS, error: %s", err.Error())) + exitChan <- true + return + } + }() + } + var cdrStats *rpcclient.RpcClientPool if len(cfg.RALsCDRStatSConns) != 0 { // Connections to CDRStats cdrstatTaskChan := make(chan struct{}) @@ -114,6 +130,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + var stats *rpcclient.RpcClientPool if len(cfg.RALsStatSConns) != 0 { // Connections to CDRStats statsTaskChan := make(chan struct{}) @@ -129,6 +146,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + if len(cfg.RALsHistorySConns) != 0 { // Connection to HistoryS, histTaskChan := make(chan struct{}) waitTasks = append(waitTasks, histTaskChan) @@ -144,6 +162,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + if len(cfg.RALsPubSubSConns) != 0 { // Connection to pubsubs pubsubTaskChan := make(chan struct{}) waitTasks = append(waitTasks, pubsubTaskChan) @@ -159,6 +178,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + if len(cfg.RALsAliasSConns) != 0 { // Connection to AliasService aliasesTaskChan := make(chan struct{}) waitTasks = append(waitTasks, aliasesTaskChan) @@ -174,6 +194,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC } }() } + var usersConns rpcclient.RpcClientConnection if len(cfg.RALsUserSConns) != 0 { // Connection to UserService usersTaskChan := make(chan struct{}) @@ -196,7 +217,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC responder := &engine.Responder{ExitChan: exitChan} responder.SetTimeToLive(cfg.ResponseCacheTTL, nil) apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataManager: dm, CdrDb: cdrDb, - Config: cfg, Responder: responder, ServManager: serviceManager, HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)} + Config: cfg, Responder: responder, ServManager: serviceManager, + HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)} + if thdS != nil { + engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS + } if cdrStats != nil { // ToDo: Fix here properly the init of stats responder.Stats = cdrStats apierRpcV1.CdrStatsSrv = cdrStats diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index d0e3d32d0..54e2ea9d2 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -49,6 +49,9 @@ "rals": { "enabled": true, + "thresholds_conns": [ + {"address": "*internal"} + ], "cdrstats_conns": [ {"address": "*internal"} ], diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 43c94761e..39eb37ae1 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -42,6 +42,9 @@ "rals": { "enabled": true, + "thresholds_conns": [ + {"address": "*internal"} + ], "cdrstats_conns": [ {"address": "*internal"} ], diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 7fd54ef10..146170ec2 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -20,7 +20,10 @@ "rals": { - "enabled": true, // enable Rater service: + "enabled": true, + "thresholds_conns": [ + {"address": "*internal"} + ], "cdrstats_conns": [ {"address": "*internal"} ], diff --git a/data/tariffplans/tutorial/Thresholds.csv b/data/tariffplans/tutorial/Thresholds.csv index 26ce20ccc..5fd122255 100644 --- a/data/tariffplans/tutorial/Thresholds.csv +++ b/data/tariffplans/tutorial/Thresholds.csv @@ -1,7 +1,7 @@ #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinHits[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11],Async[12] cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, -cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,, -cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,, +cgrates.org,THD_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,,,,,,, +cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,, cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,, cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING, diff --git a/engine/balances.go b/engine/balances.go index 75f224224..08e13e252 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -766,41 +766,82 @@ func (bc Balances) HasBalance(balance *Balance) bool { } func (bc Balances) SaveDirtyBalances(acc *Account) { - savedAccounts := make(map[string]bool) + savedAccounts := make(map[string]*Account) for _, b := range bc { if b.dirty { // publish event accountId := "" allowNegative := "" disabled := "" - if b.account != nil { // only publish modifications for balances with account set - //utils.LogStack() - accountId = b.account.ID - allowNegative = strconv.FormatBool(b.account.AllowNegative) - disabled = strconv.FormatBool(b.account.Disabled) - Publish(CgrEvent{ - "EventName": utils.EVT_ACCOUNT_BALANCE_MODIFIED, - "Uuid": b.Uuid, - "Id": b.ID, - "Value": strconv.FormatFloat(b.Value, 'f', -1, 64), - "ExpirationDate": b.ExpirationDate.String(), - "Weight": strconv.FormatFloat(b.Weight, 'f', -1, 64), - "DestinationIDs": b.DestinationIDs.String(), - "Directions": b.Directions.String(), - "RatingSubject": b.RatingSubject, - "Categories": b.Categories.String(), - "SharedGroups": b.SharedGroups.String(), - "TimingIDs": b.TimingIDs.String(), - "Account": accountId, - "AccountAllowNegative": allowNegative, - "AccountDisabled": disabled, - }) + if b.account == nil { // only publish modifications for balances with account set + continue + } + accountId = b.account.ID + acntTnt := utils.NewTenantID(accountId) + if thresholdS != nil { + ev := &ThresholdEvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Fields: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.EventSource: utils.AccountService, + utils.ACCOUNT: acntTnt.ID, + utils.BalanceID: b.ID, + utils.Units: b.Value}} + if !b.ExpirationDate.IsZero() { + ev.Fields[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) + } + var hits int + if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil { + utils.Logger.Warning(fmt.Sprintf(" error: %s processing balance event %+v with thresholds.", err.Error(), ev)) + } + } + //utils.LogStack() + + allowNegative = strconv.FormatBool(b.account.AllowNegative) + disabled = strconv.FormatBool(b.account.Disabled) + Publish(CgrEvent{ + "EventName": utils.EVT_ACCOUNT_BALANCE_MODIFIED, + "Uuid": b.Uuid, + "Id": b.ID, + "Value": strconv.FormatFloat(b.Value, 'f', -1, 64), + "ExpirationDate": b.ExpirationDate.String(), + "Weight": strconv.FormatFloat(b.Weight, 'f', -1, 64), + "DestinationIDs": b.DestinationIDs.String(), + "Directions": b.Directions.String(), + "RatingSubject": b.RatingSubject, + "Categories": b.Categories.String(), + "SharedGroups": b.SharedGroups.String(), + "TimingIDs": b.TimingIDs.String(), + "Account": accountId, + "AccountAllowNegative": allowNegative, + "AccountDisabled": disabled, + }) + } + if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == nil { + dm.DataDB().SetAccount(b.account) + savedAccounts[b.account.ID] = b.account + } + } + if len(savedAccounts) != 0 && thresholdS != nil { + for _, acnt := range savedAccounts { + acntTnt := utils.NewTenantID(acnt.ID) + ev := &ThresholdEvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Fields: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.EventSource: utils.AccountService, + utils.ACCOUNT: acntTnt.ID, + utils.AllowNegative: acnt.AllowNegative, + utils.Disabled: acnt.Disabled}} + var hits int + if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with thresholds.", err.Error(), ev)) } } - if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == false { - dm.DataDB().SetAccount(b.account) - savedAccounts[b.account.ID] = true - } + } } diff --git a/engine/calldesc.go b/engine/calldesc.go index 33c4ac44e..1b3a03744 100755 --- a/engine/calldesc.go +++ b/engine/calldesc.go @@ -71,6 +71,7 @@ var ( cdrStorage CdrStorage debitPeriod = 10 * time.Second globalRoundingDecimals = 6 + thresholdS rpcclient.RpcClientConnection // used by RALs to communicate with ThresholdS historyScribe rpcclient.RpcClientConnection pubSubServer rpcclient.RpcClientConnection userService rpcclient.RpcClientConnection @@ -84,6 +85,10 @@ func SetDataStorage(dm2 *DataManager) { dm = dm2 } +func SetThresholdS(thdS rpcclient.RpcClientConnection) { + thresholdS = thdS +} + // Sets the global rounding method and decimal precision for GetCost method func SetRoundingDecimals(rd int) { globalRoundingDecimals = rd diff --git a/utils/consts.go b/utils/consts.go index 54ba3438d..2276ac14a 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -470,7 +470,7 @@ const ( StatID = "StatID" BalanceType = "BalanceType" BalanceID = "BalanceID" - BalanceValue = "BalanceValue" + Units = "Units" ResourceS = "ResourceS" CacheThresholdProfiles = "threshold_profiles" CacheThresholds = "thresholds"