mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Update Threshold ProcessEvent in Stats and Resource
This commit is contained in:
committed by
Dan Christian Bogos
parent
94363df4d6
commit
e55d6629dc
@@ -269,9 +269,10 @@ func (kev KamEvent) AsKamAuthReply(authArgs *sessions.V1AuthorizeArgs,
|
||||
if authArgs.GetSuppliers && authReply.Suppliers != nil {
|
||||
kar.Suppliers = authReply.Suppliers.Digest()
|
||||
}
|
||||
if authArgs.ProcessThresholds != nil && *authArgs.ProcessThresholds {
|
||||
kar.ThresholdHits = *authReply.ThresholdHits
|
||||
}
|
||||
|
||||
// if authArgs.ProcessThresholds != nil && *authArgs.ProcessThresholds {
|
||||
// kar.ThresholdIDs = *authReply.ThresholdHits
|
||||
// }
|
||||
return
|
||||
}
|
||||
|
||||
@@ -344,8 +345,8 @@ type KamAuthReply struct {
|
||||
ResourceAllocation string
|
||||
MaxUsage int // Maximum session time in case of success, -1 for unlimited
|
||||
Suppliers string // List of suppliers, comma separated
|
||||
ThresholdHits int
|
||||
Error string // Reply in case of error
|
||||
//ThresholdIDs string need to check if can use []string
|
||||
Error string // Reply in case of error
|
||||
}
|
||||
|
||||
func (self *KamAuthReply) String() string {
|
||||
|
||||
@@ -174,8 +174,8 @@ func testV1FIdxCaProcessEventWithNotFound(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.BalanceUpdate,
|
||||
utils.Account: "1001"}}}
|
||||
var hits int
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err.Error() != utils.ErrNotFound.Error() {
|
||||
var thIDs []string
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
if indexes, err = onStor.GetFilterReverseIndexes(utils.PrefixToIndexCache[utils.ThresholdProfilePrefix],
|
||||
@@ -239,14 +239,14 @@ func testV1FIdxCaSetThresholdProfile(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.BalanceUpdate,
|
||||
utils.Account: "1001"}}}
|
||||
var hits int
|
||||
eHits := 1
|
||||
var thIDs []string
|
||||
eIDs := []string{"TEST_PROFILE1"}
|
||||
//Testing ProcessEvent on set thresholdprofile using apier
|
||||
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting hits: %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//test to make sure indexes are made as expected
|
||||
fldNameVal := map[string]string{"TEST_PROFILE1": ""}
|
||||
@@ -275,14 +275,14 @@ func testV1FIdxCaGetThresholdFromTP(t *testing.T) {
|
||||
utils.Account: "1001",
|
||||
utils.BalanceID: utils.META_DEFAULT,
|
||||
utils.Units: 12.3}}}
|
||||
var hits int
|
||||
eHits := 1
|
||||
var thIDs []string
|
||||
eIDs := []string{"THD_ACNT_BALANCE_1"}
|
||||
//Testing ProcessEvent on set thresholdprofile using apier
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent,
|
||||
tEv, &hits); err != nil {
|
||||
tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting hits: %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//test to make sure indexes are made as expected
|
||||
idx := map[string]utils.StringMap{
|
||||
@@ -355,13 +355,13 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.AccountUpdate,
|
||||
utils.Account: "1001"}}}
|
||||
var hits int
|
||||
eHits := 0
|
||||
var thIDs []string
|
||||
eIDs := []string{}
|
||||
//Testing ProcessEvent on set thresholdprofile after update making sure there are no hits
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %+v, received: %+v", eIDs, thIDs)
|
||||
}
|
||||
//matches thresholdprofile after update
|
||||
tEv2 := &engine.ArgsProcessEvent{
|
||||
@@ -371,12 +371,12 @@ func testV1FIdxCaUpdateThresholdProfile(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.EventType: utils.AccountUpdate,
|
||||
utils.Account: "1002"}}}
|
||||
eHits = 1
|
||||
eIDs = []string{"TEST_PROFILE1"}
|
||||
//Testing ProcessEvent on set thresholdprofile after update
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//test to make sure indexes are made as expecte
|
||||
fldNameVal := map[string]string{"TEST_PROFILE1": ""}
|
||||
@@ -426,6 +426,7 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) {
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
reply.FilterIDs = []string{"TestFilter3"}
|
||||
reply.ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC)}
|
||||
|
||||
@@ -442,13 +443,13 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.Account: "1002",
|
||||
utils.EventType: utils.BalanceUpdate}}}
|
||||
var hits int
|
||||
eHits := 0
|
||||
var thIDs []string
|
||||
eIDs := []string{}
|
||||
//Testing ProcessEvent on set thresholdprofile using apier
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
tEv2 := &engine.ArgsProcessEvent{
|
||||
CGREvent: utils.CGREvent{
|
||||
@@ -457,12 +458,12 @@ func testV1FIdxCaUpdateThresholdProfileFromTP(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.Account: "1003",
|
||||
utils.EventType: utils.BalanceUpdate}}}
|
||||
eHits = 1
|
||||
eIDs = []string{"THD_ACNT_BALANCE_1"}
|
||||
//Testing ProcessEvent on set thresholdprofile using apier
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//test to make sure indexes are made as expecte
|
||||
fldNameVal := map[string]string{"TEST_PROFILE1": ""}
|
||||
@@ -488,12 +489,12 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.Account: "1002",
|
||||
utils.EventType: utils.AccountUpdate}}}
|
||||
var hits int
|
||||
eHits := 1
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil {
|
||||
var thIDs []string
|
||||
eIDs := []string{"TEST_PROFILE1"}
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
|
||||
tEv2 := &engine.ArgsProcessEvent{
|
||||
@@ -503,11 +504,11 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) {
|
||||
Event: map[string]interface{}{
|
||||
utils.Account: "1003",
|
||||
utils.EventType: utils.BalanceUpdate}}}
|
||||
eHits = 1
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil {
|
||||
eIDs = []string{"THD_ACNT_BALANCE_1"}
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//Remove threshold profile that was set form api
|
||||
if err := tFIdxCaRpc.Call("ApierV1.RemoveThresholdProfile",
|
||||
@@ -536,17 +537,16 @@ func testV1FIdxCaRemoveThresholdProfile(t *testing.T) {
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
t.Error(err)
|
||||
}
|
||||
eHits = 0
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &hits); err != nil {
|
||||
eIDs = []string{}
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &hits); err != nil {
|
||||
if err := tFIdxCaRpc.Call(utils.ThresholdSv1ProcessEvent, tEv2, &thIDs); err != nil {
|
||||
t.Error(err)
|
||||
} else if hits != eHits {
|
||||
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
|
||||
} else if !reflect.DeepEqual(thIDs, eIDs) {
|
||||
t.Errorf("Expecting : %s, received: %s", eIDs, thIDs)
|
||||
}
|
||||
//test to make sure indexes are made as expected
|
||||
fldNameVal2 := map[string]string{"THD_ACNT_BALANCE_1": "", "TEST_PROFILE1": ""}
|
||||
@@ -839,13 +839,14 @@ func testV1FIdxCaUpdateStatQueueProfileFromTP(t *testing.T) {
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
var reply *engine.StatQueueProfile
|
||||
var reply engine.StatQueueProfile
|
||||
if err := tFIdxCaRpc.Call("ApierV1.GetStatQueueProfile",
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "Stats1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
(*reply).FilterIDs = []string{"FLTR_3"}
|
||||
(*reply).ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
reply.FilterIDs = []string{"FLTR_3"}
|
||||
reply.ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}
|
||||
if err := tFIdxCaRpc.Call("ApierV1.SetStatQueueProfile",
|
||||
reply, &result); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1206,10 +1207,11 @@ func testV1FIdxCaUpdateAttributeProfileFromTP(t *testing.T) {
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
var reply *engine.AttributeProfile
|
||||
var reply engine.AttributeProfile
|
||||
if err := tFIdxCaRpc.Call("ApierV1.GetAttributeProfile", &utils.TenantID{Tenant: "cgrates.org", ID: "ATTR_1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
reply.FilterIDs = []string{"TestFilter3"}
|
||||
if err := tFIdxCaRpc.Call("ApierV1.SetAttributeProfile", reply, &result); err != nil {
|
||||
t.Error(err)
|
||||
@@ -1616,16 +1618,16 @@ func testV1FIdxCaUpdateResourceProfileFromTP(t *testing.T) {
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
}
|
||||
var reply *engine.ResourceProfile
|
||||
var reply engine.ResourceProfile
|
||||
if err := tFIdxCaRpc.Call("ApierV1.GetResourceProfile",
|
||||
&utils.TenantID{Tenant: "cgrates.org", ID: "ResGroup1"}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
reply.FilterIDs = []string{"FLTR_RES_RCFG3"}
|
||||
reply.ActivationInterval = &utils.ActivationInterval{ActivationTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC)}
|
||||
|
||||
if err := tFIdxCaRpc.Call("ApierV1.SetResourceProfile", reply, &result); err != nil {
|
||||
if err := tFIdxCaRpc.Call("ApierV1.SetResourceProfile", &reply, &result); err != nil {
|
||||
t.Error(err)
|
||||
} else if result != utils.OK {
|
||||
t.Error("Unexpected reply returned", result)
|
||||
|
||||
@@ -193,8 +193,8 @@ func TestSessionSv1ItAuth(t *testing.T) {
|
||||
if *rply.ResourceAllocation == "" {
|
||||
t.Errorf("Unexpected ResourceAllocation: %s", *rply.ResourceAllocation)
|
||||
}
|
||||
if *rply.ThresholdHits != 1 {
|
||||
t.Errorf("Unexpected ThresholdHits: %v", *rply.ThresholdHits)
|
||||
if !reflect.DeepEqual(*rply.ThresholdIDs, []string{"THD_ACNT_1001"}) {
|
||||
t.Errorf("Unexpected ThresholdIDs: %v", *rply.ThresholdIDs)
|
||||
}
|
||||
// Hit threshold and execute action (topup with 10 units)
|
||||
expectedAccount := &engine.Account{
|
||||
@@ -258,8 +258,8 @@ func TestSessionSv1ItInitiateSession(t *testing.T) {
|
||||
args, &rply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if *rply.ThresholdHits != 1 {
|
||||
t.Errorf("Unexpected ThresholdHits: %v", *rply.ThresholdHits)
|
||||
if !reflect.DeepEqual(*rply.ThresholdIDs, []string{"THD_ACNT_1001"}) {
|
||||
t.Errorf("Unexpected ThresholdIDs: %v", *rply.ThresholdIDs)
|
||||
}
|
||||
expectedAccount := &engine.Account{
|
||||
ID: "cgrates.org:1001",
|
||||
|
||||
@@ -257,50 +257,48 @@ func testV1TSProcessEvent(t *testing.T) {
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_ACNT_BALANCE_1:event2"}
|
||||
eIDs = []string{"THD_ACNT_BALANCE_1"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[1], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_STATS_1:event3"}
|
||||
eIDs = []string{"THD_STATS_1"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[2], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_STATS_2:event4", "cgrates.org:THD_STATS_1:event4"}
|
||||
eIDs2 := []string{"cgrates.org:THD_STATS_1:event4", "cgrates.org:THD_STATS_2:event4"}
|
||||
eIDs = []string{"THD_STATS_2", "THD_STATS_1"}
|
||||
eIDs2 := []string{"THD_STATS_1", "THD_STATS_2"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[3], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) && !reflect.DeepEqual(ids, eIDs2) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_STATS_3:event5"}
|
||||
eIDs = []string{"THD_STATS_3"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[4], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
|
||||
eIDs = []string{"THD_RES_1"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[5], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[6], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[7], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
|
||||
}
|
||||
eIDs = []string{"cgrates.org:THD_CDRS_1:cdrev1"}
|
||||
eIDs = []string{"THD_CDRS_1"}
|
||||
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[8], &ids); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(ids, eIDs) {
|
||||
|
||||
@@ -519,8 +519,8 @@ func (rS *ResourceService) processThresholds(r *Resource) (err error) {
|
||||
utils.EventType: utils.ResourceUpdate,
|
||||
utils.ResourceID: r.ID,
|
||||
utils.Usage: r.totalUsage()}}}
|
||||
var hits int
|
||||
if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
|
||||
var tIDs []string
|
||||
if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ResourceS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
|
||||
@@ -260,8 +260,8 @@ func (sS *StatService) processEvent(ev *utils.CGREvent) (err error) {
|
||||
thEv.Event[metricID] = metric.GetValue()
|
||||
}
|
||||
if sS.thdS != nil {
|
||||
var hits int
|
||||
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
|
||||
var tIDs []string
|
||||
if err := sS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
|
||||
@@ -280,15 +280,15 @@ type ArgsProcessEvent struct {
|
||||
}
|
||||
|
||||
// processEvent processes a new event, dispatching to matching thresholds
|
||||
func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (eventIDs []string, err error) {
|
||||
func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (thresholdsIDs []string, err error) {
|
||||
matchTs, err := tS.matchingThresholdsForEvent(args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var withErrors bool
|
||||
var evIds []string
|
||||
var tIDs []string
|
||||
for _, t := range matchTs {
|
||||
evIds = append(evIds, utils.ConcatenatedKey(t.TenantID(), args.CGREvent.ID))
|
||||
tIDs = append(tIDs, t.ID)
|
||||
t.Hits += 1
|
||||
err = t.ProcessEvent(args, tS.dm)
|
||||
if err != nil {
|
||||
@@ -319,10 +319,10 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (eventIDs []str
|
||||
tS.stMux.Unlock()
|
||||
}
|
||||
}
|
||||
if len(evIds) != 0 {
|
||||
eventIDs = append(eventIDs, evIds...)
|
||||
if len(tIDs) != 0 {
|
||||
thresholdsIDs = append(thresholdsIDs, tIDs...)
|
||||
} else {
|
||||
eventIDs = []string{}
|
||||
thresholdsIDs = []string{}
|
||||
}
|
||||
if withErrors {
|
||||
err = utils.ErrPartiallyExecuted
|
||||
|
||||
@@ -1321,7 +1321,7 @@ type V1AuthorizeReply struct {
|
||||
ResourceAllocation *string
|
||||
MaxUsage *time.Duration
|
||||
Suppliers *engine.SortedSuppliers
|
||||
ThresholdHits *int
|
||||
ThresholdIDs *[]string
|
||||
}
|
||||
|
||||
// AsCGRReply is part of utils.CGRReplier interface
|
||||
@@ -1345,8 +1345,8 @@ func (v1AuthReply *V1AuthorizeReply) AsCGRReply() (cgrReply utils.CGRReply, err
|
||||
if v1AuthReply.Suppliers != nil {
|
||||
cgrReply[utils.CapSuppliers] = v1AuthReply.Suppliers.Digest()
|
||||
}
|
||||
if v1AuthReply.ThresholdHits != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1AuthReply.ThresholdHits
|
||||
if v1AuthReply.ThresholdIDs != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1AuthReply.ThresholdIDs
|
||||
}
|
||||
cgrReply[utils.Error] = "" // so we can compare in filters
|
||||
return
|
||||
@@ -1442,16 +1442,16 @@ func (smg *SMGeneric) BiRPCv1AuthorizeEvent(clnt rpcclient.RpcClientConnection,
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
var hits int
|
||||
var tIDs []string
|
||||
thEv := &engine.ArgsProcessEvent{
|
||||
CGREvent: args.CGREvent,
|
||||
}
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
}
|
||||
authReply.ThresholdHits = &hits
|
||||
authReply.ThresholdIDs = &tIDs
|
||||
}
|
||||
checkStatQueues := smg.statS != nil
|
||||
if args.ProcessStatQueues != nil {
|
||||
@@ -1522,7 +1522,7 @@ type V1InitSessionReply struct {
|
||||
Attributes *engine.AttrSProcessEventReply
|
||||
ResourceAllocation *string
|
||||
MaxUsage *time.Duration
|
||||
ThresholdHits *int
|
||||
ThresholdIDs *[]string
|
||||
}
|
||||
|
||||
// AsCGRReply is part of utils.CGRReplier interface
|
||||
@@ -1543,8 +1543,8 @@ func (v1Rply *V1InitSessionReply) AsCGRReply() (cgrReply utils.CGRReply, err err
|
||||
if v1Rply.MaxUsage != nil {
|
||||
cgrReply[utils.CapMaxUsage] = *v1Rply.MaxUsage
|
||||
}
|
||||
if v1Rply.ThresholdHits != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1Rply.ThresholdHits
|
||||
if v1Rply.ThresholdIDs != nil {
|
||||
cgrReply[utils.CapThresholdHits] = *v1Rply.ThresholdIDs
|
||||
}
|
||||
cgrReply[utils.Error] = ""
|
||||
return
|
||||
@@ -1616,16 +1616,16 @@ func (smg *SMGeneric) BiRPCv1InitiateSession(clnt rpcclient.RpcClientConnection,
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
var hits int
|
||||
var tIDs []string
|
||||
thEv := &engine.ArgsProcessEvent{
|
||||
CGREvent: args.CGREvent,
|
||||
}
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
}
|
||||
rply.ThresholdHits = &hits
|
||||
rply.ThresholdIDs = &tIDs
|
||||
}
|
||||
checkStatQueues := smg.statS != nil
|
||||
if args.ProcessStatQueues != nil {
|
||||
@@ -1807,11 +1807,11 @@ func (smg *SMGeneric) BiRPCv1TerminateSession(clnt rpcclient.RpcClientConnection
|
||||
if smg.thdS == nil {
|
||||
return utils.NewErrNotConnected(utils.ThresholdS)
|
||||
}
|
||||
var hits int
|
||||
var tIDs []string
|
||||
thEv := &engine.ArgsProcessEvent{
|
||||
CGREvent: args.CGREvent,
|
||||
}
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &hits); err != nil &&
|
||||
if err := smg.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<SessionS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
||||
|
||||
Reference in New Issue
Block a user