Merbe PublishBalance in PublishAccount action

This commit is contained in:
TeoV
2020-10-22 18:04:54 +03:00
committed by Dan Christian Bogos
parent 18c4d1f94b
commit 1d99855364
9 changed files with 61 additions and 107 deletions

View File

@@ -507,7 +507,7 @@ func (apierSv1 *APIerSv1) modifyBalance(aType string, attr *AttrAddBalance, repl
Balance: balance,
}
publishAction := &engine.Action{
ActionType: utils.MetaPublishBalance,
ActionType: utils.MetaPublishAccount,
}
acts := engine.Actions{a, publishAction}
if attr.Cdrlog {
@@ -585,7 +585,7 @@ func (apierSv1 *APIerSv1) SetBalance(attr *utils.AttrSetBalance, reply *string)
Balance: balance,
}
publishAction := &engine.Action{
ActionType: utils.MetaPublishBalance,
ActionType: utils.MetaPublishAccount,
}
acts := engine.Actions{a, publishAction}
if attr.Cdrlog {
@@ -667,7 +667,7 @@ func (apierSv1 *APIerSv1) SetBalances(attr *utils.AttrSetBalances, reply *string
Balance: balFltr,
}
publishAction := &engine.Action{
ActionType: utils.MetaPublishBalance,
ActionType: utils.MetaPublishAccount,
}
acts := engine.Actions{a, publishAction}
if bal.Cdrlog {

View File

@@ -869,31 +869,33 @@ func testV1TSGetThresholdsWithoutTenant(t *testing.T) {
func testV1TSProcessAccountUpdateEvent(t *testing.T) {
var result string
tPrfl := &engine.ThresholdWithCache{
thAcntUpdate := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: tenant,
Tenant: "cgrates.org",
ID: "TH_ACNT_UPDATE_EV",
FilterIDs: []string{
"*string:~*opts.eventType:AccountUpdate",
"*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0"},
"*string:~*opts.*eventType:AccountUpdate",
"*string:~*asm.ID:testV1TSProcessAccountUpdateEvent",
"*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0",
},
MaxHits: 10,
MinSleep: time.Duration(1 * time.Second),
MinSleep: time.Duration(10 * time.Millisecond),
Weight: 20.0,
ActionIDs: []string{"LOG_WARNING"},
Async: true,
},
}
if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, thAcntUpdate, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
var reply *engine.ThresholdProfile
if err := tSv1Rpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil {
&utils.TenantID{Tenant: "cgrates.org", ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply)
} else if !reflect.DeepEqual(thAcntUpdate.ThresholdProfile, reply) {
t.Errorf("Expecting: %+v, received: %+v", thAcntUpdate.ThresholdProfile, reply)
}
attrSetBalance := &utils.AttrSetBalance{
@@ -920,9 +922,9 @@ func testV1TSProcessAccountUpdateEvent(t *testing.T) {
t.Error(err)
}
tEv := &engine.ThresholdsArgsProcessEvent{
acntUpdateEv := &engine.ThresholdsArgsProcessEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1
CGREvent: &utils.CGREvent{ // hitting TH_ACNT_UPDATE_EV
Tenant: "cgrates.org",
ID: "SIMULATE_ACNT_UPDATE_EV",
Event: acnt.AsAccountSummary().AsMapInterface(),
@@ -934,8 +936,8 @@ func testV1TSProcessAccountUpdateEvent(t *testing.T) {
}
var ids []string
eIDs := []string{"THD_ACNT_BALANCE_1"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil {
eIDs := []string{"TH_ACNT_UPDATE_EV"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, acntUpdateEv, &ids); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)

View File

@@ -22,6 +22,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strings"
"time"
@@ -1102,6 +1103,7 @@ func (acc *Account) Publish() {
CGREvent: &utils.CGREvent{
Tenant: acntSummary.Tenant,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
Event: acntSummary.AsMapInterface(),
},
Opts: map[string]interface{}{
@@ -1232,6 +1234,25 @@ func (as *AccountSummary) FieldAsInterface(fldPath []string) (val interface{}, e
}
}
func (as *AccountSummary) FieldAsString(fldPath []string) (val string, err error) {
var iface interface{}
iface, err = as.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(iface), nil
}
// String implements utils.DataProvider
func (as *AccountSummary) String() string {
return utils.ToIJSON(as)
}
// RemoteHost implements utils.DataProvider
func (ar *AccountSummary) RemoteHost() net.Addr {
return utils.LocalAddr()
}
func (as *AccountSummary) AsMapInterface() map[string]interface{} {
return map[string]interface{}{
utils.Tenant: as.Tenant,
@@ -1240,5 +1261,4 @@ func (as *AccountSummary) AsMapInterface() map[string]interface{} {
utils.Disabled: as.Disabled,
utils.BalanceSummaries: as.BalanceSummaries,
}
}

View File

@@ -98,7 +98,6 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
utils.TopUpZeroNegative: topupZeroNegativeAction,
utils.SetExpiry: setExpiryAction,
utils.MetaPublishAccount: publishAccount,
utils.MetaPublishBalance: publishBalance,
utils.MetaRemoveSessionCosts: removeSessionCosts,
utils.MetaRemoveExpired: removeExpired,
utils.MetaPostEvent: postEvent,
@@ -758,30 +757,6 @@ func publishAccount(ub *Account, a *Action, acs Actions, extraData interface{})
return errors.New("nil account")
}
ub.Publish()
for bType := range ub.BalanceMap {
for _, b := range ub.BalanceMap[bType] {
if b.account == nil {
b.account = ub
}
b.Publish()
}
}
return nil
}
// publishAccount will publish the account as well as each balance received to ThresholdS
func publishBalance(ub *Account, a *Action, acs Actions, extraData interface{}) error {
if ub == nil {
return errors.New("nil account")
}
for bType := range ub.BalanceMap {
for _, b := range ub.BalanceMap[bType] {
if b.account == nil {
b.account = ub
}
b.Publish()
}
}
return nil
}

View File

@@ -684,59 +684,6 @@ func (b *Balance) AsBalanceSummary(typ string) *BalanceSummary {
return bd
}
func (b *Balance) Publish() {
if b.account == nil {
return
}
accountID := b.account.ID
acntTnt := utils.NewTenantID(accountID)
cgrEv := &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.BalanceID: b.ID,
utils.Units: b.Value,
},
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.BalanceUpdate,
},
}
if !b.ExpirationDate.IsZero() {
cgrEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
go func() {
var reply []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{
CGREventWithOpts: cgrEv}, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with StatS.",
err.Error(), cgrEv))
}
}()
}
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
go func() {
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{
CGREventWithOpts: cgrEv}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
err.Error(), cgrEv))
}
}()
}
}
/*
Structure to store minute buckets according to weight, precision or price.
*/
@@ -825,6 +772,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) {
CGREvent: &utils.CGREvent{
Tenant: acntSummary.Tenant,
ID: utils.GenUUID(),
Time: utils.TimePointer(time.Now()),
Event: acntSummary.AsMapInterface(),
},
Opts: map[string]interface{}{

View File

@@ -63,7 +63,7 @@ func (dDP *dynamicDP) RemoteHost() net.Addr {
var initialDPPrefixes = utils.NewStringSet([]string{utils.MetaReq, utils.MetaVars,
utils.MetaCgreq, utils.MetaCgrep, utils.MetaRep, utils.MetaCGRAReq,
utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaAsm})
utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts})
func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) {
if len(fldPath) == 0 {
@@ -141,7 +141,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val interface{}, err e
return nil, err
}
dDP.cache.Set(fldPath[:1], acntSummary)
return acntSummary.FieldAsInterface(fldPath[2:])
return acntSummary.FieldAsInterface(fldPath[1:])
default: // in case of constant we give an empty DataProvider ( empty navigable map )
}
return nil, utils.ErrNotFound

View File

@@ -93,19 +93,25 @@ func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManag
t.Hits > t.tPrfl.MaxHits) {
return
}
acnt, _ := args.FieldAsString(utils.Account)
var acntID string
if acnt != "" {
acntID = utils.ConcatenatedKey(args.Tenant, acnt)
var tntAcnt string
var acnt string
if utils.IfaceAsString(args.Opts[utils.MetaEventType]) == utils.AccountUpdate {
acnt, _ = args.FieldAsString(utils.ID)
} else {
acnt, _ = args.FieldAsString(utils.Account)
}
if acnt != utils.EmptyString {
tntAcnt = utils.ConcatenatedKey(args.Tenant, acnt)
}
for _, actionSetID := range t.tPrfl.ActionIDs {
at := &ActionTiming{
Uuid: utils.GenUUID(),
ActionsID: actionSetID,
ExtraData: args.CGREventWithOpts,
}
if acntID != "" {
at.accountIDs = utils.NewStringMap(acntID)
if tntAcnt != utils.EmptyString {
at.accountIDs = utils.NewStringMap(tntAcnt)
}
if t.tPrfl.Async {
go func() {
@@ -272,6 +278,9 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho
}
t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, "")
if err != nil {
if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits
continue
}
return nil, err
}
if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits {

View File

@@ -252,9 +252,10 @@ func testV1AccSendToThreshold(t *testing.T) {
tPrfl := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "THD_AccDisableAndLog",
FilterIDs: []string{"*string:~*req.Account:testAccThreshold"},
Tenant: "cgrates.org",
ID: "THD_AccDisableAndLog",
FilterIDs: []string{"*string:~*opts.*eventType:AccountUpdate",
"*string:~*asm.ID:testAccThreshold"},
MaxHits: -1,
MinSleep: time.Duration(1 * time.Second),
Weight: 20.0,

View File

@@ -990,7 +990,6 @@ const (
TopUpZeroNegative = "*topup_zero_negative"
SetExpiry = "*set_expiry"
MetaPublishAccount = "*publish_account"
MetaPublishBalance = "*publish_balance"
MetaRemoveSessionCosts = "*remove_session_costs"
MetaRemoveExpired = "*remove_expired"
MetaPostEvent = "*post_event"