ThresholdSv1.ProcessEvent return now ids [tenant:thresholdID:EventID]

This commit is contained in:
TeoV
2018-05-16 07:03:35 -04:00
committed by Dan Christian Bogos
parent 9621cd2baf
commit 94363df4d6
4 changed files with 57 additions and 50 deletions

View File

@@ -54,8 +54,8 @@ func (tSv1 *ThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshol
}
// ProcessEvent will process an Event
func (tSv1 *ThresholdSv1) ProcessEvent(args *engine.ArgsProcessEvent, hits *int) error {
return tSv1.tS.V1ProcessEvent(args, hits)
func (tSv1 *ThresholdSv1) ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error {
return tSv1.tS.V1ProcessEvent(args, tIDs)
}
// GetThresholdProfile returns a Threshold Profile

View File

@@ -191,7 +191,7 @@ func testV1TSLoadConfig(t *testing.T) {
case "tutmongo": // Mongo needs more time to reset db, need to investigate
thdsDelay = 4000
default:
thdsDelay = 1000
thdsDelay = 2000
}
}
@@ -250,60 +250,61 @@ func testV1TSGetThresholds(t *testing.T) {
}
func testV1TSProcessEvent(t *testing.T) {
var hits int
eHits := 0
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[0], &hits); err != nil {
var ids []string
eIDs := []string{}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[0], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[1], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_ACNT_BALANCE_1:event2"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[1], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[2], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_STATS_1:event3"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[2], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 2
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[3], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_STATS_2:event4", "cgrates.org:THD_STATS_1:event4"}
eIDs2 := []string{"cgrates.org:THD_STATS_1:event4", "cgrates.org:THD_STATS_2:event4"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[3], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) && !reflect.DeepEqual(ids, eIDs2) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[4], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_STATS_3:event5"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[4], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[5], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[5], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[6], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[6], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[7], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_RES_1:event6"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[7], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
eHits = 1
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[8], &hits); err != nil {
eIDs = []string{"cgrates.org:THD_CDRS_1:cdrev1"}
if err := tSv1Rpc.Call(utils.ThresholdSv1ProcessEvent, tEvs[8], &ids); err != nil {
t.Error(err)
} else if hits != eHits {
t.Errorf("Expecting hits: %d, received: %d", eHits, hits)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
}
@@ -334,6 +335,7 @@ func testV1TSGetThresholdsAfterRestart(t *testing.T) {
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
time.Sleep(time.Duration(1 * time.Second))
var td engine.Threshold
if err := tSv1Rpc.Call(utils.ThresholdSv1GetThreshold,
&utils.TenantID{Tenant: "cgrates.org", ID: "THD_ACNT_BALANCE_1"}, &td); err != nil {
@@ -341,7 +343,6 @@ func testV1TSGetThresholdsAfterRestart(t *testing.T) {
} else if td.Snooze.IsZero() { // make sure Snooze time was reset during execution
t.Errorf("received: %+v", td)
}
time.Sleep(time.Duration(1 * time.Second))
}
func testV1TSSetThresholdProfile(t *testing.T) {

View File

@@ -65,6 +65,6 @@ func (self *CmdThresholdProcessEvent) PostprocessRpcParams() error {
}
func (self *CmdThresholdProcessEvent) RpcResult() interface{} {
var s int
return &s
var ids []string
return &ids
}

View File

@@ -280,20 +280,21 @@ type ArgsProcessEvent struct {
}
// processEvent processes a new event, dispatching to matching thresholds
func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (hits int, err error) {
func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (eventIDs []string, err error) {
matchTs, err := tS.matchingThresholdsForEvent(args)
if err != nil {
return 0, err
return nil, err
}
hits = len(matchTs)
var withErrors bool
var evIds []string
for _, t := range matchTs {
evIds = append(evIds, utils.ConcatenatedKey(t.TenantID(), args.CGREvent.ID))
t.Hits += 1
err = t.ProcessEvent(args, tS.dm)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ThresholdService> threshold: %s, ignoring event: %s, error: %s",
t.TenantID(), args.TenantID(), err.Error()))
t.TenantID(), args.CGREvent.TenantID(), err.Error()))
withErrors = true
continue
}
@@ -318,6 +319,11 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (hits int, err
tS.stMux.Unlock()
}
}
if len(evIds) != 0 {
eventIDs = append(eventIDs, evIds...)
} else {
eventIDs = []string{}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
@@ -325,16 +331,16 @@ func (tS *ThresholdService) processEvent(args *ArgsProcessEvent) (hits int, err
}
// V1ProcessEvent implements ThresholdService method for processing an Event
func (tS *ThresholdService) V1ProcessEvent(args *ArgsProcessEvent, reply *int) (err error) {
func (tS *ThresholdService) V1ProcessEvent(args *ArgsProcessEvent, reply *[]string) (err error) {
if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
} else if args.CGREvent.Event == nil {
return utils.NewErrMandatoryIeMissing("Event")
}
if hits, err := tS.processEvent(args); err != nil {
if ids, err := tS.processEvent(args); err != nil {
return err
} else {
*reply = hits
*reply = ids
}
return
}