From ae9dfaed7b5574572bbd5bb25146cf85c9c5ec15 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 20 Sep 2017 14:54:07 +0200 Subject: [PATCH] StatS processEvent and V1 APIs --- engine/stats.go | 94 ++++++++++++++++++++++++++++++------------------- utils/errors.go | 3 +- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/engine/stats.go b/engine/stats.go index 9275e6f93..c60b5a1bf 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -19,6 +19,7 @@ along with this program. If not, see package engine import ( + "errors" "fmt" "math/rand" "reflect" @@ -128,6 +129,9 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues, err error) { + if ev.Tenant == "" { + return nil, errors.New("missing Tenant information") + } matchingSQs := make(map[string]*StatQueue) sqIDs, err := matchingItemIDsForEvent(ev.Fields, sS.dm.DataDB(), utils.StatQueuesStringIndex+ev.Tenant) if err != nil { @@ -191,7 +195,7 @@ func (sS *StatService) matchingStatQueuesForEvent(ev *StatEvent) (sqs StatQueues } // Call implements rpcclient.RpcClientConnection interface for internal RPC -// here for testing purposes +// here for cases when passing StatsService as rpccclient.RpcClientConnection (ie. in ResourceS) func (ss *StatService) Call(serviceMethod string, args interface{}, reply interface{}) error { methodSplit := strings.Split(serviceMethod, ".") if len(methodSplit) != 2 { @@ -216,55 +220,68 @@ func (ss *StatService) Call(serviceMethod string, args interface{}, reply interf return err } -/* - - -// processEvent processes a StatsEvent through the queues and caches it when needed -func (ss *StatService) processEvent(ev StatsEvent) (err error) { - evStatsID := ev.ID() - if evStatsID == "" { // ID is mandatory - return errors.New("missing ID field") +// processEvent processes a new event, dispatching to matching queues +// queues matching are also cached to speed up +func (sS *StatService) processEvent(ev *StatEvent) (err error) { + if missing := utils.MissingStructFields(ev, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - for _, stInst := range sS.queues { - if err := stInst.ProcessEvent(ev); err != nil { + matchSQs, err := sS.matchingStatQueuesForEvent(ev) + if err != nil { + return err + } + if len(matchSQs) == 0 { + return utils.ErrNotFound + } + var withErrors bool + for _, sq := range matchSQs { + if err = sq.ProcessEvent(ev); err != nil { utils.Logger.Warning( - fmt.Sprintf(" QueueID: %s, ignoring event with ID: %s, error: %s", - stInst.cfg.ID, evStatsID, err.Error())) + fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", + sq.TenantID(), ev.TenantID(), err.Error())) + withErrors = true } - if stInst.cfg.Blocker { + if sq.sqPrfl.Blocker { break } } + if withErrors { + err = utils.ErrPartiallyExecuted + } return } // V1ProcessEvent implements StatV1 method for processing an Event -func (ss *StatService) V1ProcessEvent(ev StatsEvent, reply *string) (err error) { +func (sS *StatService) V1ProcessEvent(ev *StatEvent, reply *string) (err error) { if err = sS.processEvent(ev); err == nil { *reply = utils.OK } return } -// V1GetQueueIDs returns list of queue IDs configured in the service -func (ss *StatService) V1GetQueueIDs(ignored struct{}, reply *[]string) (err error) { - if len(sS.queuesCache) == 0 { - return utils.ErrNotFound - } - for k := range sS.queuesCache { - *reply = append(*reply, k) +// V1StatQueuesForEvent implements StatV1 method for processing an Event +func (sS *StatService) V1StatQueuesForEvent(ev *StatEvent, reply *StatQueues) (err error) { + var sQs StatQueues + if sQs, err = sS.matchingStatQueuesForEvent(ev); err == nil { + *reply = sQs } return } -// V1GetStringMetrics returns the metrics as string values -func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]string) (err error) { - sq, has := sS.queuesCache[queueID] - if !has { - return utils.ErrNotFound +// V1GetQueueStringMetrics returns the metrics of a Queue as string values +func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) { + if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - metrics := make(map[string]string, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { + sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") + if err != nil { + if err != utils.ErrNotFound { + err = utils.NewErrServerError(err) + } + return err + } + metrics := make(map[string]string, len(sq.SQMetrics)) + for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetStringValue("") } *reply = metrics @@ -272,16 +289,21 @@ func (ss *StatService) V1GetStringMetrics(queueID string, reply *map[string]stri } // V1GetFloatMetrics returns the metrics as float64 values -func (ss *StatService) V1GetFloatMetrics(queueID string, reply *map[string]float64) (err error) { - sq, has := sS.queuesCache[queueID] - if !has { - return utils.ErrNotFound +func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) { + if missing := utils.MissingStructFields(args, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) } - metrics := make(map[string]float64, len(sq.sqMetrics)) - for metricID, metric := range sq.sqMetrics { + sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, false, "") + if err != nil { + if err != utils.ErrNotFound { + err = utils.NewErrServerError(err) + } + return err + } + metrics := make(map[string]float64, len(sq.SQMetrics)) + for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetFloat64Value() } *reply = metrics return } -*/ diff --git a/utils/errors.go b/utils/errors.go index fe7708d67..0143b9c22 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -24,7 +24,7 @@ import ( ) var ( - ErrNoMoreData = errors.New("no more data") + ErrNoMoreData = errors.New("NO_MORE_DATA") ErrNotImplemented = errors.New("NOT_IMPLEMENTED") ErrNotFound = errors.New("NOT_FOUND") ErrTimedOut = errors.New("TIMED_OUT") @@ -45,6 +45,7 @@ var ( ErrNotConvertible = errors.New("NOT_CONVERTIBLE") ErrResourceUnavailable = errors.New("RESOURCE_UNAVAILABLE") ErrNoActiveSession = errors.New("NO_ACTIVE_SESSION") + ErrPartiallyExecuted = errors.New("PARTIALLY_EXECUTED") ) // NewCGRError initialises a new CGRError