From 18c4d1f94b2da4c7f43e21859bf9f3268968971d Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 21 Oct 2020 18:08:11 +0300 Subject: [PATCH] Start update event from RALs to ThresholdS and StatS --- apier/v1/thresholds_it_test.go | 77 ++++++++++++++++++++++++++++ engine/account.go | 68 ++++++++++++------------ engine/balances.go | 94 ++++++++++++---------------------- engine/dynamicdp.go | 14 ++++- utils/consts.go | 1 + 5 files changed, 159 insertions(+), 95 deletions(-) diff --git a/apier/v1/thresholds_it_test.go b/apier/v1/thresholds_it_test.go index 9d0896f92..7b9b78207 100644 --- a/apier/v1/thresholds_it_test.go +++ b/apier/v1/thresholds_it_test.go @@ -244,6 +244,7 @@ var ( testV1TSRemThresholdProfileWithoutTenant, testV1TSProcessEventWithoutTenant, testV1TSGetThresholdsWithoutTenant, + testV1TSProcessAccountUpdateEvent, testV1TSStopEngine, } ) @@ -865,3 +866,79 @@ func testV1TSGetThresholdsWithoutTenant(t *testing.T) { t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expectedThreshold.Tenant), utils.ToJSON(reply.Tenant)) } } + +func testV1TSProcessAccountUpdateEvent(t *testing.T) { + var result string + tPrfl := &engine.ThresholdWithCache{ + ThresholdProfile: &engine.ThresholdProfile{ + Tenant: tenant, + ID: "TH_ACNT_UPDATE_EV", + FilterIDs: []string{ + "*string:~*opts.eventType:AccountUpdate", + "*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0"}, + MaxHits: 10, + MinSleep: time.Duration(1 * time.Second), + Weight: 20.0, + ActionIDs: []string{"LOG_WARNING"}, + Async: true, + }, + } + if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var reply *engine.ThresholdProfile + if err := tSv1Rpc.Call(utils.APIerSv1GetThresholdProfile, + &utils.TenantID{Tenant: tenant, ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply) + } + + attrSetBalance := &utils.AttrSetBalance{ + Tenant: "cgrates.org", + Account: "testV1TSProcessAccountUpdateEvent", + BalanceType: "*monetary", + Value: 1.5, + Balance: map[string]interface{}{ + utils.ID: "HolidayBalance", + }, + } + if err := tSv1Rpc.Call(utils.APIerSv1SetBalance, attrSetBalance, &result); err != nil { + t.Error("Got error on APIerSv1.SetBalance: ", err.Error()) + } else if result != utils.OK { + t.Errorf("Calling APIerSv1.SetBalance received: %s", result) + } + + var acnt *engine.Account + attrs := &utils.AttrGetAccount{ + Tenant: "cgrates.org", + Account: "testV1TSProcessAccountUpdateEvent", + } + if err := tSv1Rpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil { + t.Error(err) + } + + tEv := &engine.ThresholdsArgsProcessEvent{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1 + Tenant: "cgrates.org", + ID: "SIMULATE_ACNT_UPDATE_EV", + Event: acnt.AsAccountSummary().AsMapInterface(), + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, + }, + }, + } + + var ids []string + eIDs := []string{"THD_ACNT_BALANCE_1"} + if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(ids, eIDs) { + t.Errorf("Expecting ids: %s, received: %s", eIDs, ids) + } + +} diff --git a/engine/account.go b/engine/account.go index be642a5d6..d013e7517 100644 --- a/engine/account.go +++ b/engine/account.go @@ -1097,53 +1097,46 @@ func (acc *Account) AsAccountSummary() *AccountSummary { // Publish sends the account to stats and threshold func (acc *Account) Publish() { - acntTnt := utils.NewTenantID(acc.ID) + acntSummary := acc.AsAccountSummary() cgrEv := &utils.CGREventWithOpts{ CGREvent: &utils.CGREvent{ - Tenant: acntTnt.Tenant, + Tenant: acntSummary.Tenant, ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.AccountUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.AllowNegative: acc.AllowNegative, - utils.Disabled: acc.Disabled, - }, + Event: acntSummary.AsMapInterface(), }, Opts: map[string]interface{}{ utils.MetaEventType: utils.AccountUpdate, }, } - if len(config.CgrConfig().RalsCfg().StatSConns) != 0 { - go func() { - var reply []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, - utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREventWithOpts: cgrEv}, &reply); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing balance event %+v with StatS.", - err.Error(), cgrEv)) - } - }() - } if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 { - go func() { - var tIDs []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, - utils.ThresholdSv1ProcessEvent, - &ThresholdsArgsProcessEvent{CGREventWithOpts: cgrEv}, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv)) - } - }() + var tIDs []string + if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, + utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{ + CGREventWithOpts: cgrEv, + }, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv)) + } } + if len(config.CgrConfig().RalsCfg().StatSConns) != 0 { + var stsIDs []string + if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, + utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{ + CGREventWithOpts: cgrEv, + }, &stsIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with StatS.", err.Error(), cgrEv)) + } + } + } // NewAccountSummaryFromJSON creates a new AcccountSummary from a json string func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) { if !utils.SliceHasMember([]string{"", "null"}, jsn) { // Unmarshal only when content - json.Unmarshal([]byte(jsn), &acntSummary) + err = json.Unmarshal([]byte(jsn), &acntSummary) } return } @@ -1238,3 +1231,14 @@ func (as *AccountSummary) FieldAsInterface(fldPath []string) (val interface{}, e return as.Disabled, nil } } + +func (as *AccountSummary) AsMapInterface() map[string]interface{} { + return map[string]interface{}{ + utils.Tenant: as.Tenant, + utils.ID: as.ID, + utils.AllowNegative: as.AllowNegative, + utils.Disabled: as.Disabled, + utils.BalanceSummaries: as.BalanceSummaries, + } + +} diff --git a/engine/balances.go b/engine/balances.go index e7e20349f..2ea34a006 100644 --- a/engine/balances.go +++ b/engine/balances.go @@ -813,77 +813,47 @@ func (bc Balances) HasBalance(balance *Balance) bool { func (bc Balances) SaveDirtyBalances(acc *Account) { savedAccounts := make(map[string]*Account) for _, b := range bc { - if b.dirty { - // publish event - if b.account == nil { // only publish modifications for balances with account set - continue - } - acntTnt := utils.NewTenantID(b.account.ID) - thEv := &ThresholdsArgsProcessEvent{ - CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: &utils.CGREvent{ - Tenant: acntTnt.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.BalanceUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.BalanceID: b.ID, - utils.Units: b.Value, - }, - }, - Opts: map[string]interface{}{ - utils.MetaEventType: utils.BalanceUpdate, - }, - }, - } - if !b.ExpirationDate.IsZero() { - thEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339) - } - if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 { - var tIDs []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, - utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing balance event %+v with ThresholdS.", - err.Error(), thEv)) - } - } - } if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == nil { dm.SetAccount(b.account) savedAccounts[b.account.ID] = b.account } } - if len(savedAccounts) != 0 && len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 { + if len(savedAccounts) != 0 { for _, acnt := range savedAccounts { - acntTnt := utils.NewTenantID(acnt.ID) - thEv := &ThresholdsArgsProcessEvent{ - CGREventWithOpts: &utils.CGREventWithOpts{ - CGREvent: &utils.CGREvent{ - Tenant: acntTnt.Tenant, - ID: utils.GenUUID(), - Event: map[string]interface{}{ - utils.EventType: utils.AccountUpdate, - utils.EventSource: utils.AccountService, - utils.Account: acntTnt.ID, - utils.AllowNegative: acnt.AllowNegative, - utils.Disabled: acnt.Disabled, - }, - }, - Opts: map[string]interface{}{ - utils.MetaEventType: utils.AccountUpdate, - }, + acntSummary := acnt.AsAccountSummary() + cgrEv := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: acntSummary.Tenant, + ID: utils.GenUUID(), + Event: acntSummary.AsMapInterface(), + }, + Opts: map[string]interface{}{ + utils.MetaEventType: utils.AccountUpdate, }, } - var tIDs []string - if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, - utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), thEv)) + if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 { + var tIDs []string + if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil, + utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{ + CGREventWithOpts: cgrEv, + }, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv)) + } } + if len(config.CgrConfig().RalsCfg().StatSConns) != 0 { + var stsIDs []string + if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil, + utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{ + CGREventWithOpts: cgrEv, + }, &stsIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf(" error: %s processing account event %+v with StatS.", err.Error(), cgrEv)) + } + } + } } diff --git a/engine/dynamicdp.go b/engine/dynamicdp.go index 41d0a0e08..5ddeb6385 100644 --- a/engine/dynamicdp.go +++ b/engine/dynamicdp.go @@ -63,7 +63,7 @@ func (dDP *dynamicDP) RemoteHost() net.Addr { var initialDPPrefixes = utils.NewStringSet([]string{utils.MetaReq, utils.MetaVars, utils.MetaCgreq, utils.MetaCgrep, utils.MetaRep, utils.MetaCGRAReq, - utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts}) + utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaAsm}) func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) { if len(fldPath) == 0 { @@ -130,6 +130,18 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val interface{}, err e } dDP.cache.Set(fldPath[:2], dp) return dp.FieldAsInterface(fldPath[2:]) + case utils.MetaAsm: + // sample of fieldName ~*asm.BalanceSummaries.HolidayBalance.Value + stringReq, err := dDP.initialDP.FieldAsString([]string{utils.MetaReq}) + if err != nil { + return nil, err + } + acntSummary, err := NewAccountSummaryFromJSON(stringReq) + if err != nil { + return nil, err + } + dDP.cache.Set(fldPath[:1], acntSummary) + return acntSummary.FieldAsInterface(fldPath[2:]) default: // in case of constant we give an empty DataProvider ( empty navigable map ) } return nil, utils.ErrNotFound diff --git a/utils/consts.go b/utils/consts.go index feaefd0bc..37836db98 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -708,6 +708,7 @@ const ( MetaUrl = "*url" MetaXml = "*xml" MetaReq = "*req" + MetaAsm = "*asm" MetaVars = "*vars" MetaRep = "*rep" MetaExp = "*exp"