New Action: *publish_account to force publishing account information to ThresholdS, fixes #1059

This commit is contained in:
DanB
2018-04-22 19:50:46 +02:00
parent ef7a32cda1
commit 5bc107fecf
3 changed files with 75 additions and 1 deletions

View File

@@ -1074,6 +1074,30 @@ func (acc *Account) AsAccountSummary() *AccountSummary {
return ad
}
func (acnt *Account) Publish() {
if thresholdS == nil {
return
}
acntTnt := utils.NewTenantID(acnt.ID)
thEv := &ArgsProcessEvent{
CGREvent: utils.CGREvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.AllowNegative: acnt.AllowNegative,
utils.Disabled: acnt.Disabled}}}
var hits int
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent,
thEv, &hits); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), thEv))
}
}
func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) {
if !utils.IsSliceMember([]string{"", "null"}, jsn) { // Unmarshal only when content
json.Unmarshal([]byte(jsn), &acntSummary)

View File

@@ -81,6 +81,7 @@ const (
CGR_RPC = "*cgr_rpc"
TopUpZeroNegative = "*topup_zero_negative"
SetExpiry = "*set_expiry"
MetaPublishAccount = "*publish_account"
)
func (a *Action) Clone() *Action {
@@ -119,6 +120,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
CGR_RPC: cgrRPCAction,
TopUpZeroNegative: topupZeroNegativeAction,
SetExpiry: setExpiryAction,
MetaPublishAccount: publishAccount,
}
f, exists := actionFuncMap[typ]
return f, exists
@@ -773,6 +775,24 @@ func setExpiryAction(account *Account, sq *CDRStatsQueueTriggered, a *Action, ac
return nil
}
// publishAccount will publish the account as well as each balance received to ThresholdS
func publishAccount(acnt *Account, sq *CDRStatsQueueTriggered,
a *Action, acs Actions) error {
if acnt == nil {
return errors.New("nil account")
}
acnt.Publish()
for bType := range acnt.BalanceMap {
for _, b := range acnt.BalanceMap[bType] {
if b.account == nil {
b.account = acnt
}
b.Publish()
}
}
return nil
}
// Structure to store actions according to weight
type Actions []*Action

View File

@@ -695,6 +695,35 @@ func (b *Balance) AsBalanceSummary(typ string) *BalanceSummary {
return bd
}
func (b *Balance) Publish() {
if b.account == nil ||
thresholdS == nil {
return
}
accountId := b.account.ID
acntTnt := utils.NewTenantID(accountId)
thEv := &ArgsProcessEvent{
CGREvent: utils.CGREvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.BalanceID: b.ID,
utils.Units: b.Value}}}
if !b.ExpirationDate.IsZero() {
thEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
var hits int
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
err.Error(), thEv))
}
}
/*
Structure to store minute buckets according to weight, precision or price.
*/
@@ -800,7 +829,8 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
if err := thresholdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.", err.Error(), thEv))
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
err.Error(), thEv))
}
}
//utils.LogStack()