diff --git a/engine/stats.go b/engine/stats.go
index 9275e6f93..c60b5a1bf 100644
--- a/engine/stats.go
+++ b/engine/stats.go
@@ -19,6 +19,7 @@ along with this program. If not, see
package engine
import (
+ "errors"
"fmt"
"math/rand"
"reflect"
@@ -128,6 +129,9 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) {
+ if ev.Tenant == "" {
+ return nil, errors.New("missing Tenant information")
+ }
matchingSQs := make(map[string]*StatQueue)
sqIDs, err := matchingItemIDsForEvent(ev.Fields, sS.dm.DataDB(), utils.StatQueuesStringIndex+ev.Tenant)
if err != nil {
@@ -191,7 +195,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
-// here for testing purposes
+// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS)
func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(serviceMethod, ".")
if len(methodSplit) != 2 {
@@ -216,55 +220,68 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf
return err
}
-/*
-
-
-// processEvent processes a StatsEvent through the queues and caches it when needed
-func (ss *StatService) processEvent(ev StatsEvent) (err error) {
- evStatsID := ev.ID()
- if evStatsID == "" { // ID is mandatory
- return errors.New("missing ID field")
+// processEvent processes a new event, dispatching to matching queues
+// queues matching are also cached to speed up
+func (sS *StatService) processEvent(ev *StatEvent) (err error) {
+ if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
}
- for _, stInst := range sS.queues {
- if err := stInst.ProcessEvent(ev); err != nil {
+ matchSQs, err := sS.matchingStatQueuesForEvent(ev)
+ if err != nil {
+ return err
+ }
+ if len(matchSQs) == 0 {
+ return utils.ErrNotFound
+ }
+ var withErrors bool
+ for _, sq := range matchSQs {
+ if err = sq.ProcessEvent(ev); err != nil {
utils.Logger.Warning(
- fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s",
- stInst.cfg.ID, evStatsID, err.Error()))
+ fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s",
+ sq.TenantID(), ev.TenantID(), err.Error()))
+ withErrors = true
}
- if stInst.cfg.Blocker {
+ if sq.sqPrfl.Blocker {
break
}
}
+ if withErrors {
+ err = utils.ErrPartiallyExecuted
+ }
return
}
// V1ProcessEvent implements StatV1 method for processing an Event
-func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) {
+func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) {
if err = sS.processEvent(ev); err == nil {
*reply = utils.OK
}
return
}
-// V1GetQueueIDs returns list of queue IDs configured in the service
-func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) {
- if len(sS.queuesCache) == 0 {
- return utils.ErrNotFound
- }
- for k := range sS.queuesCache {
- *reply = append(*reply, k)
+// V1StatQueuesForEvent implements StatV1 method for processing an Event
+func (sS *StatService) V1StatQueuesForEvent(ev *StatEvent, reply *StatQueues) (err error) {
+ var sQs StatQueues
+ if sQs, err = sS.matchingStatQueuesForEvent(ev); err == nil {
+ *reply = sQs
}
return
}
-// V1GetStringMetrics returns the metrics as string values
-func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) {
- sq, has := sS.queuesCache[queueID]
- if !has {
- return utils.ErrNotFound
+// V1GetQueueStringMetrics returns the metrics of a Queue as string values
+func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
+ if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
}
- metrics := make(map[string]string, len(sq.sqMetrics))
- for metricID, metric := range sq.sqMetrics {
+ sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
+ if err != nil {
+ if err != utils.ErrNotFound {
+ err = utils.NewErrServerError(err)
+ }
+ return err
+ }
+ metrics := make(map[string]string, len(sq.SQMetrics))
+ for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue("")
}
*reply = metrics
@@ -272,16 +289,21 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri
}
// V1GetFloatMetrics returns the metrics as float64 values
-func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) {
- sq, has := sS.queuesCache[queueID]
- if !has {
- return utils.ErrNotFound
+func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) {
+ if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
+ return utils.NewErrMandatoryIeMissing(missing...)
}
- metrics := make(map[string]float64, len(sq.sqMetrics))
- for metricID, metric := range sq.sqMetrics {
+ sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "")
+ if err != nil {
+ if err != utils.ErrNotFound {
+ err = utils.NewErrServerError(err)
+ }
+ return err
+ }
+ metrics := make(map[string]float64, len(sq.SQMetrics))
+ for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetFloat64Value()
}
*reply = metrics
return
}
-*/
diff --git a/utils/errors.go b/utils/errors.go
index fe7708d67..0143b9c22 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -24,7 +24,7 @@ import (
)
var (
- ErrNoMoreData = errors.New("no more data")
+ ErrNoMoreData = errors.New("NO_MORE_DATA")
ErrNotImplemented = errors.New("NOT_IMPLEMENTED")
ErrNotFound = errors.New("NOT_FOUND")
ErrTimedOut = errors.New("TIMED_OUT")
@@ -45,6 +45,7 @@ var (
ErrNotConvertible = errors.New("NOT_CONVERTIBLE")
ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE")
ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION")
+ ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED")
)
// NewCGRError initialises a new CGRError