ThresholdS support in balance updates

This commit is contained in:
DanB
2017-10-21 21:23:59 +02:00
parent afe3d6d6da
commit 3c6b016a77
10 changed files with 121 additions and 41 deletions

View File

@@ -55,13 +55,12 @@ var tEvs = []*engine.ThresholdEvent{
Tenant: "cgrates.org",
ID: "event2",
Fields: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: "1002",
utils.BalanceType: utils.MONETARY,
utils.BalanceID: utils.META_DEFAULT,
utils.BalanceValue: 12.3,
utils.ExpiryTime: "2009-11-10T23:00:00Z"}},
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: "1002",
utils.BalanceID: utils.META_DEFAULT,
utils.Units: 12.3,
utils.ExpiryTime: "2009-11-10T23:00:00Z"}},
&engine.ThresholdEvent{ // hitting THD_STATS_1
Tenant: "cgrates.org",
ID: "event3",

View File

@@ -790,7 +790,8 @@ func main() {
// Start rater service
if cfg.RALsEnabled {
go startRater(internalRaterChan, cacheDoneChan, internalCdrStatSChan, internalStatSChan,
go startRater(internalRaterChan, cacheDoneChan, internalThresholdSChan,
internalCdrStatSChan, internalStatSChan,
internalHistorySChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan,
srvManager, server, dm, loadDb, cdrDb, &stopHandled, exitChan)
}

View File

@@ -32,7 +32,7 @@ import (
// Starts rater and reports on chan
func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneChan chan struct{},
internalCdrStatSChan, internalStatSChan, internalHistorySChan,
internalThdSChan, internalCdrStatSChan, internalStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan chan rpcclient.RpcClientConnection,
serviceManager *servmanager.ServiceManager, server *utils.Server,
dm *engine.DataManager, loadDb engine.LoadStorage, cdrDb engine.CdrStorage, stopHandled *bool, exitChan chan bool) {
@@ -99,6 +99,22 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
cacheDoneChan <- struct{}{}
}()
var thdS *rpcclient.RpcClientPool
if len(cfg.RALsThresholdSConns) != 0 { // Connections to ThresholdS
thdsTaskChan := make(chan struct{})
waitTasks = append(waitTasks, thdsTaskChan)
go func() {
defer close(thdsTaskChan)
thdS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsThresholdSConns, internalThdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to ThresholdS, error: %s", err.Error()))
exitChan <- true
return
}
}()
}
var cdrStats *rpcclient.RpcClientPool
if len(cfg.RALsCDRStatSConns) != 0 { // Connections to CDRStats
cdrstatTaskChan := make(chan struct{})
@@ -114,6 +130,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
}()
}
var stats *rpcclient.RpcClientPool
if len(cfg.RALsStatSConns) != 0 { // Connections to CDRStats
statsTaskChan := make(chan struct{})
@@ -129,6 +146,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
}()
}
if len(cfg.RALsHistorySConns) != 0 { // Connection to HistoryS,
histTaskChan := make(chan struct{})
waitTasks = append(waitTasks, histTaskChan)
@@ -144,6 +162,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
}()
}
if len(cfg.RALsPubSubSConns) != 0 { // Connection to pubsubs
pubsubTaskChan := make(chan struct{})
waitTasks = append(waitTasks, pubsubTaskChan)
@@ -159,6 +178,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
}()
}
if len(cfg.RALsAliasSConns) != 0 { // Connection to AliasService
aliasesTaskChan := make(chan struct{})
waitTasks = append(waitTasks, aliasesTaskChan)
@@ -174,6 +194,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
}
}()
}
var usersConns rpcclient.RpcClientConnection
if len(cfg.RALsUserSConns) != 0 { // Connection to UserService
usersTaskChan := make(chan struct{})
@@ -196,7 +217,11 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
responder := &engine.Responder{ExitChan: exitChan}
responder.SetTimeToLive(cfg.ResponseCacheTTL, nil)
apierRpcV1 := &v1.ApierV1{StorDb: loadDb, DataManager: dm, CdrDb: cdrDb,
Config: cfg, Responder: responder, ServManager: serviceManager, HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
Config: cfg, Responder: responder, ServManager: serviceManager,
HTTPPoster: utils.NewHTTPPoster(cfg.HttpSkipTlsVerify, cfg.ReplyTimeout)}
if thdS != nil {
engine.SetThresholdS(thdS) // temporary architectural fix until we will have separate AccountS
}
if cdrStats != nil { // ToDo: Fix here properly the init of stats
responder.Stats = cdrStats
apierRpcV1.CdrStatsSrv = cdrStats

View File

@@ -49,6 +49,9 @@
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"cdrstats_conns": [
{"address": "*internal"}
],

View File

@@ -42,6 +42,9 @@
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"cdrstats_conns": [
{"address": "*internal"}
],

View File

@@ -20,7 +20,10 @@
"rals": {
"enabled": true, // enable Rater service: <true|false>
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"cdrstats_conns": [
{"address": "*internal"}
],

View File

@@ -1,7 +1,7 @@
#Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinHits[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11],Async[12]
cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,,
cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,,
cgrates.org,THD_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,,,,,,,
cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,,
cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,,
cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
1 #Tenant[0],Id[1],FilterType[2],FilterFieldName[3],FilterFieldValues[4],ActivationInterval[5],Recurrent[6],MinHits[7],MinSleep[8],Blocker[9],Weight[10],ActionIDs[11],Async[12]
2 cgrates.org,THD_ACNT_BALANCE_1,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
3 cgrates.org,THD_ACNT_BALANCE_1,*string,EventSource,AccountS,,,,,,, cgrates.org,THD_ACNT_BALANCE_1,*string,EventType,BalanceUpdate,,,,,,,
4 cgrates.org,THD_ACNT_BALANCE_1,*gte,BalanceValue,10.0,,,,,,, cgrates.org,THD_ACNT_BALANCE_1,*gte,Units,10.0,,,,,,,
5 cgrates.org,THD_ACNT_EXPIRED,*string,Account,1001;1002,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,
6 cgrates.org,THD_ACNT_EXPIRED,*gte,ExpiryTime,*now,,,,,,,
7 cgrates.org,THD_STATS_1,*string,EventSource,StatS,2014-07-29T15:00:00Z,true,1s,false,10,LOG_WARNING,

View File

@@ -766,41 +766,82 @@ func (bc Balances) HasBalance(balance *Balance) bool {
}
func (bc Balances) SaveDirtyBalances(acc *Account) {
savedAccounts := make(map[string]bool)
savedAccounts := make(map[string]*Account)
for _, b := range bc {
if b.dirty {
// publish event
accountId := ""
allowNegative := ""
disabled := ""
if b.account != nil { // only publish modifications for balances with account set
//utils.LogStack()
accountId = b.account.ID
allowNegative = strconv.FormatBool(b.account.AllowNegative)
disabled = strconv.FormatBool(b.account.Disabled)
Publish(CgrEvent{
"EventName": utils.EVT_ACCOUNT_BALANCE_MODIFIED,
"Uuid": b.Uuid,
"Id": b.ID,
"Value": strconv.FormatFloat(b.Value, 'f', -1, 64),
"ExpirationDate": b.ExpirationDate.String(),
"Weight": strconv.FormatFloat(b.Weight, 'f', -1, 64),
"DestinationIDs": b.DestinationIDs.String(),
"Directions": b.Directions.String(),
"RatingSubject": b.RatingSubject,
"Categories": b.Categories.String(),
"SharedGroups": b.SharedGroups.String(),
"TimingIDs": b.TimingIDs.String(),
"Account": accountId,
"AccountAllowNegative": allowNegative,
"AccountDisabled": disabled,
})
if b.account == nil { // only publish modifications for balances with account set
continue
}
accountId = b.account.ID
acntTnt := utils.NewTenantID(accountId)
if thresholdS != nil {
ev := &ThresholdEvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Fields: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: acntTnt.ID,
utils.BalanceID: b.ID,
utils.Units: b.Value}}
if !b.ExpirationDate.IsZero() {
ev.Fields[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
var hits int
if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil {
utils.Logger.Warning(fmt.Sprintf("<AccountS> error: %s processing balance event %+v with thresholds.", err.Error(), ev))
}
}
//utils.LogStack()
allowNegative = strconv.FormatBool(b.account.AllowNegative)
disabled = strconv.FormatBool(b.account.Disabled)
Publish(CgrEvent{
"EventName": utils.EVT_ACCOUNT_BALANCE_MODIFIED,
"Uuid": b.Uuid,
"Id": b.ID,
"Value": strconv.FormatFloat(b.Value, 'f', -1, 64),
"ExpirationDate": b.ExpirationDate.String(),
"Weight": strconv.FormatFloat(b.Weight, 'f', -1, 64),
"DestinationIDs": b.DestinationIDs.String(),
"Directions": b.Directions.String(),
"RatingSubject": b.RatingSubject,
"Categories": b.Categories.String(),
"SharedGroups": b.SharedGroups.String(),
"TimingIDs": b.TimingIDs.String(),
"Account": accountId,
"AccountAllowNegative": allowNegative,
"AccountDisabled": disabled,
})
}
if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == nil {
dm.DataDB().SetAccount(b.account)
savedAccounts[b.account.ID] = b.account
}
}
if len(savedAccounts) != 0 && thresholdS != nil {
for _, acnt := range savedAccounts {
acntTnt := utils.NewTenantID(acnt.ID)
ev := &ThresholdEvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Fields: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.ACCOUNT: acntTnt.ID,
utils.AllowNegative: acnt.AllowNegative,
utils.Disabled: acnt.Disabled}}
var hits int
if err := thresholdS.Call("ThresholdSV1.ProcessEvent", ev, &hits); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with thresholds.", err.Error(), ev))
}
}
if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == false {
dm.DataDB().SetAccount(b.account)
savedAccounts[b.account.ID] = true
}
}
}

View File

@@ -71,6 +71,7 @@ var (
cdrStorage CdrStorage
debitPeriod = 10 * time.Second
globalRoundingDecimals = 6
thresholdS rpcclient.RpcClientConnection // used by RALs to communicate with ThresholdS
historyScribe rpcclient.RpcClientConnection
pubSubServer rpcclient.RpcClientConnection
userService rpcclient.RpcClientConnection
@@ -84,6 +85,10 @@ func SetDataStorage(dm2 *DataManager) {
dm = dm2
}
func SetThresholdS(thdS rpcclient.RpcClientConnection) {
thresholdS = thdS
}
// Sets the global rounding method and decimal precision for GetCost method
func SetRoundingDecimals(rd int) {
globalRoundingDecimals = rd

View File

@@ -470,7 +470,7 @@ const (
StatID = "StatID"
BalanceType = "BalanceType"
BalanceID = "BalanceID"
BalanceValue = "BalanceValue"
Units = "Units"
ResourceS = "ResourceS"
CacheThresholdProfiles = "threshold_profiles"
CacheThresholds = "thresholds"