diff --git a/apier/v1/accounts.go b/apier/v1/accounts.go index 15e25b514..412d0d1d8 100644 --- a/apier/v1/accounts.go +++ b/apier/v1/accounts.go @@ -507,7 +507,7 @@ func (apierSv1 *APIerSv1) modifyBalance(aType string, attr *AttrAddBalance, repl Balance: balance, } publishAction := &engine.Action{ - ActionType: utils.MetaPublishBalance, + ActionType: utils.MetaPublishAccount, } acts := engine.Actions{a, publishAction} if attr.Cdrlog { @@ -585,7 +585,7 @@ func (apierSv1 *APIerSv1) SetBalance(attr *utils.AttrSetBalance, reply *string) Balance: balance, } publishAction := &engine.Action{ - ActionType: utils.MetaPublishBalance, + ActionType: utils.MetaPublishAccount, } acts := engine.Actions{a, publishAction} if attr.Cdrlog { @@ -667,7 +667,7 @@ func (apierSv1 *APIerSv1) SetBalances(attr *utils.AttrSetBalances, reply *string Balance: balFltr, } publishAction := &engine.Action{ - ActionType: utils.MetaPublishBalance, + ActionType: utils.MetaPublishAccount, } acts := engine.Actions{a, publishAction} if bal.Cdrlog { diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 7b9b78207..c777e9b15 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -869,31 +869,33 @@ func testV1TSGetThresholdsWithoutTenant(t *testing.T) { func testV1TSProcessAccountUpdateEvent(t *testing.T) { var result string - tPrfl := &engine.ThresholdWithCache{ + thAcntUpdate := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ - Tenant: tenant, + Tenant: "cgrates.org", ID: "TH_ACNT_UPDATE_EV", FilterIDs: []string{ - "*string:~*opts.eventType:AccountUpdate", - "*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0"}, + "*string:~*opts.*eventType:AccountUpdate", + "*string:~*asm.ID:testV1TSProcessAccountUpdateEvent", + "*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0", + }, MaxHits: 10, - MinSleep: time.Duration(1 * time.Second), + MinSleep: time.Duration(10 * time.Millisecond), Weight: 20.0, ActionIDs: []string{"LOG_WARNING"}, Async: true, }, } - if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil { + if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, thAcntUpdate, &result); err != nil { t.Error(err) } else if result != utils.OK { t.Error("Unexpected reply returned", result) } var reply *engine.ThresholdProfile if err := tSv1Rpc.Call(utils.APIerSv1GetThresholdProfile, - &utils.TenantID{Tenant: tenant, ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil { + &utils.TenantID{Tenant: "cgrates.org", ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil { t.Error(err) - } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { - t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) + } else if !reflect.DeepEqual(thAcntUpdate.ThresholdProfile, reply) { + t.Errorf("Expecting: %+v, received: %+v", thAcntUpdate.ThresholdProfile, reply) } attrSetBalance := &utils.AttrSetBalance{ @@ -920,9 +922,9 @@ func testV1TSProcessAccountUpdateEvent(t *testing.T) { t.Error(err) } - tEv := &engine.ThresholdsArgsProcessEvent{ + acntUpdateEv := &engine.ThresholdsArgsProcessEvent{ CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1 + CGREvent: &utils.CGREvent{ // hitting TH_ACNT_UPDATE_EV Tenant: "cgrates.org", ID: "SIMULATE_ACNT_UPDATE_EV", Event: acnt.AsAccountSummary().AsMapInterface(), @@ -934,8 +936,8 @@ func testV1TSProcessAccountUpdateEvent(t *testing.T) { } var ids []string - eIDs := []string{"THD_ACNT_BALANCE_1"} - if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil { + eIDs := []string{"TH_ACNT_UPDATE_EV"} + if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, acntUpdateEv, &ids); err != nil { t.Error(err) } else if !reflect.DeepEqual(ids, eIDs) { t.Errorf("Expecting ids: %s, received: %s", eIDs, ids) diff --git a/engine/account.go b/engine/account.go index d013e7517..3acb3f227 100644 --- a/engine/account.go +++ b/engine/account.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + "net" "strings" "time" @@ -1102,6 +1103,7 @@ func (acc *Account) Publish() { CGREvent: &utils.CGREvent{ Tenant: acntSummary.Tenant, ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), Event: acntSummary.AsMapInterface(), }, Opts: map[string]interface{}{ @@ -1232,6 +1234,25 @@ func (as *AccountSummary) FieldAsInterface(fldPath []string) (val interface{}, e } } +func (as *AccountSummary) FieldAsString(fldPath []string) (val string, err error) { + var iface interface{} + iface, err = as.FieldAsInterface(fldPath) + if err != nil { + return + } + return utils.IfaceAsString(iface), nil +} + +// String implements utils.DataProvider +func (as *AccountSummary) String() string { + return utils.ToIJSON(as) +} + +// RemoteHost implements utils.DataProvider +func (ar *AccountSummary) RemoteHost() net.Addr { + return utils.LocalAddr() +} + func (as *AccountSummary) AsMapInterface() map[string]interface{} { return map[string]interface{}{ utils.Tenant: as.Tenant, @@ -1240,5 +1261,4 @@ func (as *AccountSummary) AsMapInterface() map[string]interface{} { utils.Disabled: as.Disabled, utils.BalanceSummaries: as.BalanceSummaries, } - } diff --git a/engine/action.go b/engine/action.go index c4e4d5c37..a0556be0a 100644 --- a/engine/action.go +++ b/engine/action.go @@ -98,7 +98,6 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.TopUpZeroNegative: topupZeroNegativeAction, utils.SetExpiry: setExpiryAction, utils.MetaPublishAccount: publishAccount, - utils.MetaPublishBalance: publishBalance, utils.MetaRemoveSessionCosts: removeSessionCosts, utils.MetaRemoveExpired: removeExpired, utils.MetaPostEvent: postEvent, @@ -758,30 +757,6 @@ func publishAccount(ub *Account, a *Action, acs Actions, extraData interface{}) return errors.New("nil account") } ub.Publish() - for bType := range ub.BalanceMap { - for _, b := range ub.BalanceMap[bType] { - if b.account == nil { - b.account = ub - } - b.Publish() - } - } - return nil -} - -// publishAccount will publish the account as well as each balance received to ThresholdS -func publishBalance(ub *Account, a *Action, acs Actions, extraData interface{}) error { - if ub == nil { - return errors.New("nil account") - } - for bType := range ub.BalanceMap { - for _, b := range ub.BalanceMap[bType] { - if b.account == nil { - b.account = ub - } - b.Publish() - } - } return nil } diff --git a/engine/balances.go b/engine/balances.go index 2ea34a006..7ac13387a 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -684,59 +684,6 @@ func (b *Balance) AsBalanceSummary(typ string) *BalanceSummary { return bd } -func (b *Balance) Publish() { - if b.account == nil { - return - } - accountID := b.account.ID - acntTnt := utils.NewTenantID(accountID) - cgrEv := &utils.CGREventWithOpts{ - CGREvent: &utils.CGREvent{ - Tenant: acntTnt.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.BalanceUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.BalanceID: b.ID, - utils.Units: b.Value, - }, - }, - Opts: map[string]interface{}{ - utils.MetaEventType: utils.BalanceUpdate, - }, - } - if !b.ExpirationDate.IsZero() { - cgrEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) - } - if len(config.CgrConfig().RalsCfg().StatSConns) != 0 { - go func() { - var reply []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, - utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{ - CGREventWithOpts: cgrEv}, &reply); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing balance event %+v with StatS.", - err.Error(), cgrEv)) - } - }() - } - if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 { - go func() { - var tIDs []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, - utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{ - CGREventWithOpts: cgrEv}, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", - err.Error(), cgrEv)) - } - }() - } -} - /* Structure to store minute buckets according to weight, precision or price. */ @@ -825,6 +772,7 @@ func (bc Balances) SaveDirtyBalances(acc *Account) { CGREvent: &utils.CGREvent{ Tenant: acntSummary.Tenant, ID: utils.GenUUID(), + Time: utils.TimePointer(time.Now()), Event: acntSummary.AsMapInterface(), }, Opts: map[string]interface{}{ diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go index 5ddeb6385..b9fada387 100644 --- a/engine/dynamicdp.go +++ b/engine/dynamicdp.go @@ -63,7 +63,7 @@ func (dDP *dynamicDP) RemoteHost() net.Addr { var initialDPPrefixes = utils.NewStringSet([]string{utils.MetaReq, utils.MetaVars, utils.MetaCgreq, utils.MetaCgrep, utils.MetaRep, utils.MetaCGRAReq, - utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaAsm}) + utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts}) func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) { if len(fldPath) == 0 { @@ -141,7 +141,7 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val interface{}, err e return nil, err } dDP.cache.Set(fldPath[:1], acntSummary) - return acntSummary.FieldAsInterface(fldPath[2:]) + return acntSummary.FieldAsInterface(fldPath[1:]) default: // in case of constant we give an empty DataProvider ( empty navigable map ) } return nil, utils.ErrNotFound diff --git a/engine/thresholds.go b/engine/thresholds.go index 8e070656a..cbb99d74a 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -93,19 +93,25 @@ func (t *Threshold) ProcessEvent(args *ThresholdsArgsProcessEvent, dm *DataManag t.Hits > t.tPrfl.MaxHits) { return } - acnt, _ := args.FieldAsString(utils.Account) - var acntID string - if acnt != "" { - acntID = utils.ConcatenatedKey(args.Tenant, acnt) + var tntAcnt string + var acnt string + if utils.IfaceAsString(args.Opts[utils.MetaEventType]) == utils.AccountUpdate { + acnt, _ = args.FieldAsString(utils.ID) + } else { + acnt, _ = args.FieldAsString(utils.Account) } + if acnt != utils.EmptyString { + tntAcnt = utils.ConcatenatedKey(args.Tenant, acnt) + } + for _, actionSetID := range t.tPrfl.ActionIDs { at := &ActionTiming{ Uuid: utils.GenUUID(), ActionsID: actionSetID, ExtraData: args.CGREventWithOpts, } - if acntID != "" { - at.accountIDs = utils.NewStringMap(acntID) + if tntAcnt != utils.EmptyString { + at.accountIDs = utils.NewStringMap(tntAcnt) } if t.tPrfl.Async { go func() { @@ -272,6 +278,9 @@ func (tS *ThresholdService) matchingThresholdsForEvent(tnt string, args *Thresho } t, err := tS.dm.GetThreshold(tPrfl.Tenant, tPrfl.ID, true, true, "") if err != nil { + if err == utils.ErrNotFound { // corner case where the threshold was removed due to MaxHits + continue + } return nil, err } if t.dirty == nil || tPrfl.MaxHits == -1 || t.Hits < tPrfl.MaxHits { diff --git a/general_tests/accounts_it_test.go b/general_tests/accounts_it_test.go index 8138070b2..c15706b15 100644 --- a/general_tests/accounts_it_test.go +++ b/general_tests/accounts_it_test.go @@ -252,9 +252,10 @@ func testV1AccSendToThreshold(t *testing.T) { tPrfl := &engine.ThresholdWithCache{ ThresholdProfile: &engine.ThresholdProfile{ - Tenant: "cgrates.org", - ID: "THD_AccDisableAndLog", - FilterIDs: []string{"*string:~*req.Account:testAccThreshold"}, + Tenant: "cgrates.org", + ID: "THD_AccDisableAndLog", + FilterIDs: []string{"*string:~*opts.*eventType:AccountUpdate", + "*string:~*asm.ID:testAccThreshold"}, MaxHits: -1, MinSleep: time.Duration(1 * time.Second), Weight: 20.0, diff --git a/utils/consts.go b/utils/consts.go index 37836db98..8d840ab20 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -990,7 +990,6 @@ const ( TopUpZeroNegative = "*topup_zero_negative" SetExpiry = "*set_expiry" MetaPublishAccount = "*publish_account" - MetaPublishBalance = "*publish_balance" MetaRemoveSessionCosts = "*remove_session_costs" MetaRemoveExpired = "*remove_expired" MetaPostEvent = "*post_event"