Start update event from RALs to ThresholdS and StatS

This commit is contained in:
TeoV
2020-10-21 18:08:11 +03:00
committed by Dan Christian Bogos
parent 5ad6245501
commit 18c4d1f94b
5 changed files with 159 additions and 95 deletions

View File

@@ -244,6 +244,7 @@ var (
testV1TSRemThresholdProfileWithoutTenant,
testV1TSProcessEventWithoutTenant,
testV1TSGetThresholdsWithoutTenant,
testV1TSProcessAccountUpdateEvent,
testV1TSStopEngine,
}
)
@@ -865,3 +866,79 @@ func testV1TSGetThresholdsWithoutTenant(t *testing.T) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(expectedThreshold.Tenant), utils.ToJSON(reply.Tenant))
}
}
func testV1TSProcessAccountUpdateEvent(t *testing.T) {
var result string
tPrfl := &engine.ThresholdWithCache{
ThresholdProfile: &engine.ThresholdProfile{
Tenant: tenant,
ID: "TH_ACNT_UPDATE_EV",
FilterIDs: []string{
"*string:~*opts.eventType:AccountUpdate",
"*gt:~*asm.BalanceSummaries.HolidayBalance.Value:1.0"},
MaxHits: 10,
MinSleep: time.Duration(1 * time.Second),
Weight: 20.0,
ActionIDs: []string{"LOG_WARNING"},
Async: true,
},
}
if err := tSv1Rpc.Call(utils.APIerSv1SetThresholdProfile, tPrfl, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
var reply *engine.ThresholdProfile
if err := tSv1Rpc.Call(utils.APIerSv1GetThresholdProfile,
&utils.TenantID{Tenant: tenant, ID: "TH_ACNT_UPDATE_EV"}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl.ThresholdProfile, reply) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl.ThresholdProfile, reply)
}
attrSetBalance := &utils.AttrSetBalance{
Tenant: "cgrates.org",
Account: "testV1TSProcessAccountUpdateEvent",
BalanceType: "*monetary",
Value: 1.5,
Balance: map[string]interface{}{
utils.ID: "HolidayBalance",
},
}
if err := tSv1Rpc.Call(utils.APIerSv1SetBalance, attrSetBalance, &result); err != nil {
t.Error("Got error on APIerSv1.SetBalance: ", err.Error())
} else if result != utils.OK {
t.Errorf("Calling APIerSv1.SetBalance received: %s", result)
}
var acnt *engine.Account
attrs := &utils.AttrGetAccount{
Tenant: "cgrates.org",
Account: "testV1TSProcessAccountUpdateEvent",
}
if err := tSv1Rpc.Call(utils.APIerSv2GetAccount, attrs, &acnt); err != nil {
t.Error(err)
}
tEv := &engine.ThresholdsArgsProcessEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{ // hitting THD_ACNT_BALANCE_1
Tenant: "cgrates.org",
ID: "SIMULATE_ACNT_UPDATE_EV",
Event: acnt.AsAccountSummary().AsMapInterface(),
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.AccountUpdate,
},
},
}
var ids []string
eIDs := []string{"THD_ACNT_BALANCE_1"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
}

View File

@@ -1097,53 +1097,46 @@ func (acc *Account) AsAccountSummary() *AccountSummary {
// Publish sends the account to stats and threshold
func (acc *Account) Publish() {
acntTnt := utils.NewTenantID(acc.ID)
acntSummary := acc.AsAccountSummary()
cgrEv := &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: acntTnt.Tenant,
Tenant: acntSummary.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.AllowNegative: acc.AllowNegative,
utils.Disabled: acc.Disabled,
},
Event: acntSummary.AsMapInterface(),
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.AccountUpdate,
},
}
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
go func() {
var reply []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREventWithOpts: cgrEv}, &reply); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with StatS.",
err.Error(), cgrEv))
}
}()
}
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
go func() {
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent,
&ThresholdsArgsProcessEvent{CGREventWithOpts: cgrEv}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv))
}
}()
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{
CGREventWithOpts: cgrEv,
}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv))
}
}
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
var stsIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{
CGREventWithOpts: cgrEv,
}, &stsIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with StatS.", err.Error(), cgrEv))
}
}
}
// NewAccountSummaryFromJSON creates a new AcccountSummary from a json string
func NewAccountSummaryFromJSON(jsn string) (acntSummary *AccountSummary, err error) {
if !utils.SliceHasMember([]string{"", "null"}, jsn) { // Unmarshal only when content
json.Unmarshal([]byte(jsn), &acntSummary)
err = json.Unmarshal([]byte(jsn), &acntSummary)
}
return
}
@@ -1238,3 +1231,14 @@ func (as *AccountSummary) FieldAsInterface(fldPath []string) (val interface{}, e
return as.Disabled, nil
}
}
func (as *AccountSummary) AsMapInterface() map[string]interface{} {
return map[string]interface{}{
utils.Tenant: as.Tenant,
utils.ID: as.ID,
utils.AllowNegative: as.AllowNegative,
utils.Disabled: as.Disabled,
utils.BalanceSummaries: as.BalanceSummaries,
}
}

View File

@@ -813,77 +813,47 @@ func (bc Balances) HasBalance(balance *Balance) bool {
func (bc Balances) SaveDirtyBalances(acc *Account) {
savedAccounts := make(map[string]*Account)
for _, b := range bc {
if b.dirty {
// publish event
if b.account == nil { // only publish modifications for balances with account set
continue
}
acntTnt := utils.NewTenantID(b.account.ID)
thEv := &ThresholdsArgsProcessEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.BalanceUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.BalanceID: b.ID,
utils.Units: b.Value,
},
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.BalanceUpdate,
},
},
}
if !b.ExpirationDate.IsZero() {
thEv.Event[utils.ExpiryTime] = b.ExpirationDate.Format(time.RFC3339)
}
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing balance event %+v with ThresholdS.",
err.Error(), thEv))
}
}
}
if b.account != nil && b.account != acc && b.dirty && savedAccounts[b.account.ID] == nil {
dm.SetAccount(b.account)
savedAccounts[b.account.ID] = b.account
}
}
if len(savedAccounts) != 0 && len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
if len(savedAccounts) != 0 {
for _, acnt := range savedAccounts {
acntTnt := utils.NewTenantID(acnt.ID)
thEv := &ThresholdsArgsProcessEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: acntTnt.Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.Account: acntTnt.ID,
utils.AllowNegative: acnt.AllowNegative,
utils.Disabled: acnt.Disabled,
},
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.AccountUpdate,
},
acntSummary := acnt.AsAccountSummary()
cgrEv := &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: acntSummary.Tenant,
ID: utils.GenUUID(),
Event: acntSummary.AsMapInterface(),
},
Opts: map[string]interface{}{
utils.MetaEventType: utils.AccountUpdate,
},
}
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), thEv))
if len(config.CgrConfig().RalsCfg().ThresholdSConns) != 0 {
var tIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().ThresholdSConns, nil,
utils.ThresholdSv1ProcessEvent, &ThresholdsArgsProcessEvent{
CGREventWithOpts: cgrEv,
}, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with ThresholdS.", err.Error(), cgrEv))
}
}
if len(config.CgrConfig().RalsCfg().StatSConns) != 0 {
var stsIDs []string
if err := connMgr.Call(config.CgrConfig().RalsCfg().StatSConns, nil,
utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{
CGREventWithOpts: cgrEv,
}, &stsIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<AccountS> error: %s processing account event %+v with StatS.", err.Error(), cgrEv))
}
}
}
}

View File

@@ -63,7 +63,7 @@ func (dDP *dynamicDP) RemoteHost() net.Addr {
var initialDPPrefixes = utils.NewStringSet([]string{utils.MetaReq, utils.MetaVars,
utils.MetaCgreq, utils.MetaCgrep, utils.MetaRep, utils.MetaCGRAReq,
utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts})
utils.MetaAct, utils.MetaEC, utils.MetaUCH, utils.MetaOpts, utils.MetaAsm})
func (dDP *dynamicDP) FieldAsInterface(fldPath []string) (val interface{}, err error) {
if len(fldPath) == 0 {
@@ -130,6 +130,18 @@ func (dDP *dynamicDP) fieldAsInterface(fldPath []string) (val interface{}, err e
}
dDP.cache.Set(fldPath[:2], dp)
return dp.FieldAsInterface(fldPath[2:])
case utils.MetaAsm:
// sample of fieldName ~*asm.BalanceSummaries.HolidayBalance.Value
stringReq, err := dDP.initialDP.FieldAsString([]string{utils.MetaReq})
if err != nil {
return nil, err
}
acntSummary, err := NewAccountSummaryFromJSON(stringReq)
if err != nil {
return nil, err
}
dDP.cache.Set(fldPath[:1], acntSummary)
return acntSummary.FieldAsInterface(fldPath[2:])
default: // in case of constant we give an empty DataProvider ( empty navigable map )
}
return nil, utils.ErrNotFound

View File

@@ -708,6 +708,7 @@ const (
MetaUrl = "*url"
MetaXml = "*xml"
MetaReq = "*req"
MetaAsm = "*asm"
MetaVars = "*vars"
MetaRep = "*rep"
MetaExp = "*exp"