From 5bc107fecff2ef3dfe7d8e8f10d905fdf6ebef37 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 22 Apr 2018 19:50:46 +0200 Subject: [PATCH] New Action: *publish_account to force publishing account information to ThresholdS, fixes #1059 --- engine/account.go | 24 ++++++++++++++++++++++++ engine/action.go | 20 ++++++++++++++++++++ engine/balances.go | 32 +++++++++++++++++++++++++++++++- 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/engine/account.go b/engine/account.go index c7c939ac8..80ddc6cf5 100644 --- a/engine/account.go +++ b/engine/account.go @@ -1074,6 +1074,30 @@ func (acc *Account) AsAccountSummary() *AccountSummary { return ad } +func (acnt *Account) Publish() { + if thresholdS == nil { + return + } + acntTnt := utils.NewTenantID(acnt.ID) + thEv := &ArgsProcessEvent{ + CGREvent: utils.CGREvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.AccountUpdate, + utils.EventSource: utils.AccountService, + utils.Account: acntTnt.ID, + utils.AllowNegative: acnt.AllowNegative, + utils.Disabled: acnt.Disabled}}} + var hits int + if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, + thEv, &hits); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), thEv)) + } +} + func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) { if !utils.IsSliceMember([]string{"", "null"}, jsn) { // Unmarshal only when content json.Unmarshal([]byte(jsn), &acntSummary) diff --git a/engine/action.go b/engine/action.go index 50293b023..7d68ccbf5 100644 --- a/engine/action.go +++ b/engine/action.go @@ -81,6 +81,7 @@ const ( CGR_RPC = "*cgr_rpc" TopUpZeroNegative = "*topup_zero_negative" SetExpiry = "*set_expiry" + MetaPublishAccount = "*publish_account" ) func (a *Action) Clone() *Action { @@ -119,6 +120,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { CGR_RPC: cgrRPCAction, TopUpZeroNegative: topupZeroNegativeAction, SetExpiry: setExpiryAction, + MetaPublishAccount: publishAccount, } f, exists := actionFuncMap[typ] return f, exists @@ -773,6 +775,24 @@ func setExpiryAction(account *Account, sq *CDRStatsQueueTriggered, a *Action, ac return nil } +// publishAccount will publish the account as well as each balance received to ThresholdS +func publishAccount(acnt *Account, sq *CDRStatsQueueTriggered, + a *Action, acs Actions) error { + if acnt == nil { + return errors.New("nil account") + } + acnt.Publish() + for bType := range acnt.BalanceMap { + for _, b := range acnt.BalanceMap[bType] { + if b.account == nil { + b.account = acnt + } + b.Publish() + } + } + return nil +} + // Structure to store actions according to weight type Actions []*Action diff --git a/engine/balances.go b/engine/balances.go index d64f20c7d..c8f7ff0c7 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -695,6 +695,35 @@ func (b *Balance) AsBalanceSummary(typ string) *BalanceSummary { return bd } +func (b *Balance) Publish() { + if b.account == nil || + thresholdS == nil { + return + } + accountId := b.account.ID + acntTnt := utils.NewTenantID(accountId) + thEv := &ArgsProcessEvent{ + CGREvent: utils.CGREvent{ + Tenant: acntTnt.Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.EventType: utils.BalanceUpdate, + utils.EventSource: utils.AccountService, + utils.Account: acntTnt.ID, + utils.BalanceID: b.ID, + utils.Units: b.Value}}} + if !b.ExpirationDate.IsZero() { + thEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) + } + var hits int + if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", + err.Error(), thEv)) + } +} + /* Structure to store minute buckets according to weight, precision or price. */ @@ -800,7 +829,8 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( - fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", err.Error(), thEv)) + fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", + err.Error(), thEv)) } } //utils.LogStack()