Added *roundingDecimals for GetQueueStringMetrics API

This commit is contained in:
Trial97
2021-11-08 12:42:20 +02:00
committed by Dan Christian Bogos
parent 493ff3cd4c
commit 6fe79e016e
11 changed files with 90 additions and 64 deletions

View File

@@ -37,7 +37,7 @@ func NewStatService(dm *DataManager, cgrcfg *config.CGRConfig,
dm: dm,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
cfg: cgrcfg,
storedStatQueues: make(utils.StringSet),
loopStopped: make(chan struct{}),
stopBackup: make(chan struct{}),
@@ -49,7 +49,7 @@ type StatService struct {
dm *DataManager
connMgr *ConnManager
filterS *FilterS
cgrcfg *config.CGRConfig
cfg *config.CGRConfig
loopStopped chan struct{}
stopBackup chan struct{}
storedStatQueues utils.StringSet // keep a record of stats which need saving, map[statsTenantID]bool
@@ -79,7 +79,7 @@ func (sS *StatService) Shutdown(ctx *context.Context) {
// runBackup will regularly store statQueues changed to dataDB
func (sS *StatService) runBackup(ctx *context.Context) {
storeInterval := sS.cgrcfg.StatSCfg().StoreInterval
storeInterval := sS.cfg.StatSCfg().StoreInterval
if storeInterval <= 0 {
sS.loopStopped <- struct{}{}
return
@@ -162,12 +162,12 @@ func (sS *StatService) matchingStatQueuesForEvent(ctx *context.Context, tnt stri
if len(sqIDs) == 0 {
ignoreFilters = false
sqIDs, err = MatchingItemIDsForEvent(ctx, evNm,
sS.cgrcfg.StatSCfg().StringIndexedFields,
sS.cgrcfg.StatSCfg().PrefixIndexedFields,
sS.cgrcfg.StatSCfg().SuffixIndexedFields,
sS.cfg.StatSCfg().StringIndexedFields,
sS.cfg.StatSCfg().PrefixIndexedFields,
sS.cfg.StatSCfg().SuffixIndexedFields,
sS.dm, utils.CacheStatFilterIndexes, tnt,
sS.cgrcfg.StatSCfg().IndexedSelects,
sS.cgrcfg.StatSCfg().NestedFields,
sS.cfg.StatSCfg().IndexedSelects,
sS.cfg.StatSCfg().NestedFields,
)
if err != nil {
return
@@ -250,9 +250,9 @@ func (sS *StatService) getStatQueue(ctx *context.Context, tnt, id string) (sq *S
// storeStatQueue will store the sq if needed
func (sS *StatService) storeStatQueue(ctx *context.Context, sq *StatQueue) {
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
if sS.cfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
*sq.dirty = true // mark it to be saved
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
if sS.cfg.StatSCfg().StoreInterval == -1 {
sS.StoreStatQueue(ctx, sq)
} else {
sS.ssqMux.Lock()
@@ -264,7 +264,7 @@ func (sS *StatService) storeStatQueue(ctx *context.Context, sq *StatQueue) {
// processThresholds will pass the event for statQueue to ThresholdS
func (sS *StatService) processThresholds(ctx *context.Context, sQs StatQueues, opts map[string]interface{}) (err error) {
if len(sS.cgrcfg.StatSCfg().ThresholdSConns) == 0 {
if len(sS.cfg.StatSCfg().ThresholdSConns) == 0 {
return
}
if opts == nil {
@@ -292,7 +292,7 @@ func (sS *StatService) processThresholds(ctx *context.Context, sQs StatQueues, o
}
var tIDs []string
if err := sS.connMgr.Call(ctx, sS.cgrcfg.StatSCfg().ThresholdSConns,
if err := sS.connMgr.Call(ctx, sS.cfg.StatSCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
(len(sq.sqPrfl.ThresholdIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
utils.Logger.Warning(
@@ -311,12 +311,12 @@ func (sS *StatService) processThresholds(ctx *context.Context, sQs StatQueues, o
func (sS *StatService) processEvent(ctx *context.Context, tnt string, args *utils.CGREvent) (statQueueIDs []string, err error) {
evNm := args.AsDataProvider()
var sqIDs []string
if sqIDs, err = GetStringSliceOpts(ctx, tnt, args, sS.filterS, sS.cgrcfg.StatSCfg().Opts.ProfileIDs,
if sqIDs, err = GetStringSliceOpts(ctx, tnt, args, sS.filterS, sS.cfg.StatSCfg().Opts.ProfileIDs,
config.StatsProfileIDsDftOpt, utils.OptsStatsProfileIDs); err != nil {
return
}
var ignFilters bool
if ignFilters, err = GetBoolOpts(ctx, tnt, args, sS.filterS, sS.cgrcfg.StatSCfg().Opts.ProfileIgnoreFilters,
if ignFilters, err = GetBoolOpts(ctx, tnt, args, sS.filterS, sS.cfg.StatSCfg().Opts.ProfileIgnoreFilters,
config.StatsProfileIgnoreFilters, utils.MetaProfileIgnoreFilters); err != nil {
return
}
@@ -357,7 +357,7 @@ func (sS *StatService) V1ProcessEvent(ctx *context.Context, args *utils.CGREvent
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
var ids []string
if ids, err = sS.processEvent(ctx, tnt, args); err != nil {
@@ -379,15 +379,15 @@ func (sS *StatService) V1GetStatQueuesForEvent(ctx *context.Context, args *utils
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
var sqIDs []string
if sqIDs, err = GetStringSliceOpts(ctx, tnt, args, sS.filterS, sS.cgrcfg.StatSCfg().Opts.ProfileIDs,
if sqIDs, err = GetStringSliceOpts(ctx, tnt, args, sS.filterS, sS.cfg.StatSCfg().Opts.ProfileIDs,
config.StatsProfileIDsDftOpt, utils.OptsStatsProfileIDs); err != nil {
return
}
var ignFilters bool
if ignFilters, err = GetBoolOpts(ctx, tnt, args, sS.filterS, sS.cgrcfg.StatSCfg().Opts.ProfileIgnoreFilters,
if ignFilters, err = GetBoolOpts(ctx, tnt, args, sS.filterS, sS.cfg.StatSCfg().Opts.ProfileIgnoreFilters,
config.StatsProfileIgnoreFilters, utils.MetaProfileIgnoreFilters); err != nil {
return
}
@@ -408,7 +408,7 @@ func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantID
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
@@ -424,13 +424,13 @@ func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantID
}
// V1GetQueueStringMetrics returns the metrics of a Queue as string values
func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]string) (err error) {
func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]string) (err error) {
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
@@ -444,22 +444,28 @@ func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils
}
return err
}
var rnd int
if rnd, err = GetIntOpts(ctx, tnt, &utils.CGREvent{Tenant: tnt}, sS.filterS,
sS.cfg.StatSCfg().Opts.RoundingDecimals, sS.cfg.GeneralCfg().RoundingDecimals,
utils.OptsRoundingDecimals); err != nil {
return
}
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue(sS.cgrcfg.GeneralCfg().RoundingDecimals) // ToDo
metrics[metricID] = metric.GetStringValue(rnd)
}
*reply = metrics
return
}
// V1GetQueueFloatMetrics returns the metrics as float64 values
func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]float64) (err error) {
func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]float64) (err error) {
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
@@ -486,13 +492,13 @@ func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.
}
// V1GetQueueDecimalMetrics returns the metrics as decimal values
func (sS *StatService) V1GetQueueDecimalMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]*utils.Decimal) (err error) {
func (sS *StatService) V1GetQueueDecimalMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]*utils.Decimal) (err error) {
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
@@ -517,7 +523,7 @@ func (sS *StatService) V1GetQueueDecimalMetrics(ctx *context.Context, args *util
// V1GetQueueIDs returns list of queueIDs registered for a tenant
func (sS *StatService) V1GetQueueIDs(ctx *context.Context, tenant string, qIDs *[]string) (err error) {
if tenant == utils.EmptyString {
tenant = sS.cgrcfg.GeneralCfg().DefaultTenant
tenant = sS.cfg.GeneralCfg().DefaultTenant
}
prfx := utils.StatQueuePrefix + tenant + utils.ConcatenatedKeySep
keys, err := sS.dm.DataDB().GetKeysForPrefix(ctx, prfx)
@@ -539,7 +545,7 @@ func (sS *StatService) V1ResetStatQueue(ctx *context.Context, tntID *utils.Tenan
}
tnt := tntID.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
tnt = sS.cfg.GeneralCfg().DefaultTenant
}
// make sure statQueue is locked at process level
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,

View File

@@ -225,7 +225,7 @@ func TestNewStatService(t *testing.T) {
sSrv := &StatService{
dm: dm,
filterS: fltrS,
cgrcfg: cfg,
cfg: cfg,
storedStatQueues: make(utils.StringSet),
}
result := NewStatService(dm, cfg, fltrS, nil)
@@ -235,8 +235,8 @@ func TestNewStatService(t *testing.T) {
if !reflect.DeepEqual(sSrv.filterS, result.filterS) {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", sSrv.filterS, result.filterS)
}
if !reflect.DeepEqual(sSrv.cgrcfg, result.cgrcfg) {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", sSrv.cgrcfg, result.cgrcfg)
if !reflect.DeepEqual(sSrv.cfg, result.cfg) {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", sSrv.cfg, result.cfg)
}
if !reflect.DeepEqual(sSrv.storedStatQueues, sSrv.storedStatQueues) {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", sSrv.storedStatQueues, sSrv.storedStatQueues)
@@ -318,7 +318,7 @@ func TestStatQueuesProcessEvent(t *testing.T) {
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: testStatsQ[0].Tenant, ID: testStatsQ[0].ID}, &stq)
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: testStatsQ[0].Tenant, ID: testStatsQ[0].ID}}, &stq)
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -330,7 +330,7 @@ func TestStatQueuesProcessEvent(t *testing.T) {
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: testStatsQ[1].Tenant, ID: testStatsQ[1].ID}, &stq)
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: testStatsQ[1].Tenant, ID: testStatsQ[1].ID}}, &stq)
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -342,7 +342,7 @@ func TestStatQueuesProcessEvent(t *testing.T) {
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantID{Tenant: testStatsQ[2].Tenant, ID: testStatsQ[2].ID}, &stq)
err = statService.V1GetQueueStringMetrics(context.TODO(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: testStatsQ[2].Tenant, ID: testStatsQ[2].ID}}, &stq)
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -361,7 +361,7 @@ func TestStatQueuesMatchWithIndexFalse(t *testing.T) {
&FilterS{dm: dmSTS, cfg: cfg}, nil)
prepareStatsData(t, dmSTS)
statService.cgrcfg.StatSCfg().IndexedSelects = false
statService.cfg.StatSCfg().IndexedSelects = false
msq, err := statService.matchingStatQueuesForEvent(context.TODO(), testStatsArgs[0].Tenant, nil,
testStatsArgs[0].AsDataProvider(), false)
if err != nil {
@@ -939,7 +939,7 @@ func TestStatQueueReload(t *testing.T) {
filterS: filterS,
stopBackup: make(chan struct{}),
loopStopped: make(chan struct{}, 1),
cgrcfg: cfg,
cfg: cfg,
}
sS.loopStopped <- struct{}{}
sS.Reload(context.Background())
@@ -957,7 +957,7 @@ func TestStatQueueStartLoop(t *testing.T) {
filterS: filterS,
stopBackup: make(chan struct{}),
loopStopped: make(chan struct{}, 1),
cgrcfg: cfg,
cfg: cfg,
}
sS.StartLoop(context.Background())
@@ -2576,9 +2576,7 @@ func TestStatQueueV1GetQueueFloatMetricsOK(t *testing.T) {
utils.MetaTCD: 3600000000000,
}
reply := map[string]float64{}
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantID{
ID: "SQ1",
}, &reply); err != nil {
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ1"}}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expected, reply)
@@ -2644,9 +2642,7 @@ func TestStatQueueV1GetQueueFloatMetricsErrNotFound(t *testing.T) {
}
reply := map[string]float64{}
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantID{
ID: "SQ2",
}, &reply); err == nil || err != utils.ErrNotFound {
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ2"}}, &reply); err == nil || err != utils.ErrNotFound {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
}
}
@@ -2709,7 +2705,7 @@ func TestStatQueueV1GetQueueFloatMetricsMissingArgs(t *testing.T) {
experr := `MANDATORY_IE_MISSING: [ID]`
reply := map[string]float64{}
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantID{}, &reply); err == nil ||
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{}}, &reply); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
@@ -2731,9 +2727,7 @@ func TestStatQueueV1GetQueueFloatMetricsErrGetStats(t *testing.T) {
experr := `SERVER_ERROR: NO_DATABASE_CONNECTION`
reply := map[string]float64{}
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantID{
ID: "SQ1",
}, &reply); err == nil || err.Error() != experr {
if err := sS.V1GetQueueFloatMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ1"}}, &reply); err == nil || err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
}
@@ -2800,9 +2794,7 @@ func TestStatQueueV1GetQueueStringMetricsOK(t *testing.T) {
utils.MetaTCD: "1h0m0s",
}
reply := map[string]string{}
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantID{
ID: "SQ1",
}, &reply); err != nil {
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ1"}}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", expected, reply)
@@ -2866,9 +2858,7 @@ func TestStatQueueV1GetQueueStringMetricsErrNotFound(t *testing.T) {
}
reply := map[string]string{}
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantID{
ID: "SQ2",
}, &reply); err == nil || err != utils.ErrNotFound {
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ2"}}, &reply); err == nil || err != utils.ErrNotFound {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", utils.ErrNotFound, err)
}
}
@@ -2931,7 +2921,7 @@ func TestStatQueueV1GetQueueStringMetricsMissingArgs(t *testing.T) {
experr := `MANDATORY_IE_MISSING: [ID]`
reply := map[string]string{}
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantID{}, &reply); err == nil ||
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{}}, &reply); err == nil ||
err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
@@ -2953,9 +2943,7 @@ func TestStatQueueV1GetQueueStringMetricsErrGetStats(t *testing.T) {
experr := `SERVER_ERROR: NO_DATABASE_CONNECTION`
reply := map[string]string{}
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantID{
ID: "SQ1",
}, &reply); err == nil || err.Error() != experr {
if err := sS.V1GetQueueStringMetrics(context.Background(), &utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{ID: "SQ1"}}, &reply); err == nil || err.Error() != experr {
t.Errorf("expected: <%+v>, \nreceived: <%+v>", experr, err)
}
}