From 9ee73dd1ae1b40678fceced723cb28ff606281a0 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 24 May 2021 15:52:34 +0300 Subject: [PATCH] Add stats APIs and fill in context where needed --- apis/filter_indexes.go | 8 +- apis/stats.go | 163 ++++++++++++++++++++++++++++++++++++ engine/datamanager.go | 44 +++++----- engine/stats.go | 92 ++++++++++---------- engine/storage_interface.go | 12 +-- 5 files changed, 241 insertions(+), 78 deletions(-) create mode 100644 apis/stats.go diff --git a/apis/filter_indexes.go b/apis/filter_indexes.go index ddbf4019d..cf73412bb 100644 --- a/apis/filter_indexes.go +++ b/apis/filter_indexes.go @@ -229,7 +229,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny} if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheThresholdFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) + th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -248,7 +248,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny} if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheStatFilterIndexes, nil, transactionID, func(tnt, id, ctx string) (*[]string, error) { - sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) + sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -439,7 +439,7 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. //ThresholdProfile Indexes if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheThresholdFilterIndexes, &args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional) + th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } @@ -457,7 +457,7 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils. //StatQueueProfile Indexes if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheStatFilterIndexes, &args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) { - sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional) + sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional) if e != nil { return nil, e } diff --git a/apis/stats.go b/apis/stats.go new file mode 100644 index 000000000..1781a50c4 --- /dev/null +++ b/apis/stats.go @@ -0,0 +1,163 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package apis + +import ( + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// GetStatQueueProfile returns a StatQueue profile +func (adms *AdminSv1) GetStatQueueProfile(ctx *context.Context, arg *utils.TenantID, reply *engine.StatQueueProfile) (err error) { + if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := arg.Tenant + if tnt == utils.EmptyString { + tnt = adms.cfg.GeneralCfg().DefaultTenant + } + sCfg, err := adms.dm.GetStatQueueProfile(ctx, tnt, arg.ID, + true, true, utils.NonTransactional) + if err != nil { + return utils.APIErrorHandler(err) + } + *reply = *sCfg + return +} + +// GetStatQueueProfileIDs returns list of statQueueProfile IDs registered for a tenant +func (adms *AdminSv1) GetStatQueueProfileIDs(ctx *context.Context, args *utils.PaginatorWithTenant, stsPrfIDs *[]string) error { + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = adms.cfg.GeneralCfg().DefaultTenant + } + prfx := utils.StatQueueProfilePrefix + tnt + utils.ConcatenatedKeySep + keys, err := adms.dm.DataDB().GetKeysForPrefix(ctx, prfx) + if err != nil { + return err + } + if len(keys) == 0 { + return utils.ErrNotFound + } + retIDs := make([]string, len(keys)) + for i, key := range keys { + retIDs[i] = key[len(prfx):] + } + *stsPrfIDs = args.PaginateStringSlice(retIDs) + return nil +} + +// SetStatQueueProfile alters/creates a StatQueueProfile +func (adms *AdminSv1) SetStatQueueProfile(ctx *context.Context, arg *engine.StatQueueProfileWithAPIOpts, reply *string) (err error) { + if missing := utils.MissingStructFields(arg.StatQueueProfile, []string{utils.ID}); len(missing) != 0 { + return utils.NewErrMandatoryIeMissing(missing...) + } + if arg.Tenant == utils.EmptyString { + arg.Tenant = adms.cfg.GeneralCfg().DefaultTenant + } + if err = adms.dm.SetStatQueueProfile(ctx, arg.StatQueueProfile, true); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database + //make 1 insert for both StatQueueProfile and StatQueue instead of 2 + loadID := time.Now().UnixNano() + if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for StatQueueProfile + if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheStatQueueProfiles, + arg.TenantID(), &arg.FilterIDs, nil, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +// RemoveStatQueueProfile remove a specific stat configuration +func (adms *AdminSv1) RemoveStatQueueProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) error { + if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } + tnt := args.Tenant + if tnt == utils.EmptyString { + tnt = adms.cfg.GeneralCfg().DefaultTenant + } + if err := adms.dm.RemoveStatQueueProfile(ctx, tnt, args.ID, utils.NonTransactional, true); err != nil { + return utils.APIErrorHandler(err) + } + //handle caching for StatQueueProfile + if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheStatQueueProfiles, + utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database + //make 1 insert for both StatQueueProfile and StatQueue instead of 2 + loadID := time.Now().UnixNano() + if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil { + return utils.APIErrorHandler(err) + } + *reply = utils.OK + return nil +} + +// NewStatSv1 initializes StatSV1 +func NewStatSv1(sS *engine.StatService) *StatSv1 { + return &StatSv1{sS: sS} +} + +// StatSv1 exports RPC from RLs +type StatSv1 struct { + ping + sS *engine.StatService +} + +// GetQueueIDs returns list of queueIDs registered for a tenant +func (stsv1 *StatSv1) GetQueueIDs(ctx *context.Context, tenant *utils.TenantWithAPIOpts, qIDs *[]string) error { + return stsv1.sS.V1GetQueueIDs(ctx, tenant.Tenant, qIDs) +} + +// ProcessEvent returns processes a new Event +func (stsv1 *StatSv1) ProcessEvent(ctx *context.Context, args *engine.StatsArgsProcessEvent, reply *[]string) error { + return stsv1.sS.V1ProcessEvent(ctx, args, reply) +} + +// GetStatQueuesForEvent returns the list of queues IDs in the system +func (stsv1 *StatSv1) GetStatQueuesForEvent(ctx *context.Context, args *engine.StatsArgsProcessEvent, reply *[]string) (err error) { + return stsv1.sS.V1GetStatQueuesForEvent(ctx, args, reply) +} + +// GetStatQueue returns a StatQueue object +func (stsv1 *StatSv1) GetStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *engine.StatQueue) (err error) { + return stsv1.sS.V1GetStatQueue(ctx, args, reply) +} + +// GetQueueStringMetrics returns the string metrics for a Queue +func (stsv1 *StatSv1) GetQueueStringMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]string) (err error) { + return stsv1.sS.V1GetQueueStringMetrics(ctx, args.TenantID, reply) +} + +// GetQueueFloatMetrics returns the float metrics for a Queue +func (stsv1 *StatSv1) GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]float64) (err error) { + return stsv1.sS.V1GetQueueFloatMetrics(ctx, args.TenantID, reply) +} + +// ResetStatQueue resets the stat queue +func (stsv1 *StatSv1) ResetStatQueue(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *string) error { + return stsv1.sS.V1ResetStatQueue(ctx, tntID.TenantID, reply) +} diff --git a/engine/datamanager.go b/engine/datamanager.go index e6b0d607f..1ac6100fa 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -147,19 +147,19 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] _, err = dm.GetResourceProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.ResourcesPrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetResource(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetResource(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.StatQueueProfilePrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetStatQueueProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetStatQueueProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.StatQueuePrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetStatQueue(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetStatQueue(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.ThresholdProfilePrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetThresholdProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetThresholdProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.ThresholdPrefix: tntID := utils.NewTenantID(dataID) - _, err = dm.GetThreshold(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) + _, err = dm.GetThreshold(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) case utils.FilterPrefix: tntID := utils.NewTenantID(dataID) _, err = dm.GetFilter(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional) @@ -280,7 +280,7 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids [] // GetStatQueue retrieves a StatQueue from dataDB // handles caching and deserialization of metrics -func (dm *DataManager) GetStatQueue(tenant, id string, +func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) { tntID := utils.ConcatenatedKey(tenant, id) if cacheRead { @@ -295,10 +295,10 @@ func (dm *DataManager) GetStatQueue(tenant, id string, err = utils.ErrNoDatabaseConn return } - sq, err = dm.dataDB.GetStatQueueDrv(tenant, id) + sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id) if err != nil { if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote { - if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue, + if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue, &utils.TenantIDWithAPIOpts{ TenantID: &utils.TenantID{Tenant: tenant, ID: id}, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, @@ -312,12 +312,12 @@ func (dm *DataManager) GetStatQueue(tenant, id string, return nil, err } } - err = dm.dataDB.SetStatQueueDrv(ssq, sq) + err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq) } } if err != nil { if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal { - if errCh := Cache.Set(context.TODO(), utils.CacheStatQueues, tntID, nil, nil, + if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh } @@ -327,7 +327,7 @@ func (dm *DataManager) GetStatQueue(tenant, id string, } } if cacheWrite { - if errCh := Cache.Set(context.TODO(), utils.CacheStatQueues, tntID, sq, nil, + if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil, cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh } @@ -336,7 +336,7 @@ func (dm *DataManager) GetStatQueue(tenant, id string, } // SetStatQueue converts to StoredStatQueue and stores the result in dataDB -func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, +func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue, metrics []*MetricWithFilters, minItems int, ttl *time.Duration, queueLength int, simpleSet bool) (err error) { if dm == nil { return utils.ErrNoDatabaseConn @@ -345,7 +345,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, tnt := sq.Tenant // save the tenant id := sq.ID // save the ID from the initial StatQueue // handle metrics for statsQueue - sq, err = dm.GetStatQueue(tnt, id, true, false, utils.NonTransactional) + sq, err = dm.GetStatQueue(ctx, tnt, id, true, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return } @@ -406,11 +406,11 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, return } } - if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { + if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil { return } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - err = replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache utils.ReplicatorSv1SetStatQueue, @@ -423,15 +423,15 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters, } // RemoveStatQueue removes the StoredStatQueue -func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) { +func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) { if dm == nil { return utils.ErrNoDatabaseConn } - if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { + if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil { return } if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache utils.ReplicatorSv1RemoveStatQueue, @@ -822,7 +822,7 @@ func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id stri err = utils.ErrNoDatabaseConn return } - sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id) + sqp, err = dm.dataDB.GetStatQueueProfileDrv(ctx, tenant, id) if err != nil { if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; err == utils.ErrNotFound && itm.Remote { if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, @@ -833,7 +833,7 @@ func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id stri utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, config.CgrConfig().GeneralCfg().NodeID)), }, &sqp); err == nil { - err = dm.dataDB.SetStatQueueProfileDrv(sqp) + err = dm.dataDB.SetStatQueueProfileDrv(ctx, sqp) } } if err != nil { @@ -872,7 +872,7 @@ func (dm *DataManager) SetStatQueueProfile(ctx *context.Context, sqp *StatQueueP if err != nil && err != utils.ErrNotFound { return err } - if err = dm.DataDB().SetStatQueueProfileDrv(sqp); err != nil { + if err = dm.DataDB().SetStatQueueProfileDrv(ctx, sqp); err != nil { return err } if withIndex { @@ -907,7 +907,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id, if err != nil && err != utils.ErrNotFound { return err } - if err = dm.DataDB().RemStatQueueProfileDrv(tenant, id); err != nil { + if err = dm.DataDB().RemStatQueueProfileDrv(ctx, tenant, id); err != nil { return } if oldSts == nil { diff --git a/engine/stats.go b/engine/stats.go index ee8adea6b..1ea59d435 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -58,22 +58,22 @@ type StatService struct { } // Shutdown is called to shutdown the service -func (sS *StatService) Shutdown() { +func (sS *StatService) Shutdown(ctx *context.Context) { utils.Logger.Info(" service shutdown initialized") close(sS.stopBackup) - sS.storeStats() + sS.storeStats(ctx) utils.Logger.Info(" service shutdown complete") } // runBackup will regularly store resources changed to dataDB -func (sS *StatService) runBackup() { +func (sS *StatService) runBackup(ctx *context.Context) { storeInterval := sS.cgrcfg.StatSCfg().StoreInterval if storeInterval <= 0 { sS.loopStoped <- struct{}{} return } for { - sS.storeStats() + sS.storeStats(ctx) select { case <-sS.stopBackup: sS.loopStoped <- struct{}{} @@ -84,7 +84,7 @@ func (sS *StatService) runBackup() { } // storeResources represents one task of complete backup -func (sS *StatService) storeStats() { +func (sS *StatService) storeStats(ctx *context.Context) { var failedSqIDs []string for { // don't stop untill we store all dirty statQueues sS.ssqMux.Lock() @@ -96,12 +96,12 @@ func (sS *StatService) storeStats() { if sID == "" { break // no more keys, backup completed } - guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) { + guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) { if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil { utils.Logger.Warning( fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s", utils.StatService, sID)) - } else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil { + } else if err := sS.StoreStatQueue(ctx, sqIf.(*StatQueue)); err != nil { failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup } return @@ -117,18 +117,18 @@ func (sS *StatService) storeStats() { } // StoreStatQueue stores the statQueue in DB and corrects dirty flag -func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { +func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err error) { if sq.dirty == nil || !*sq.dirty { return } - if err = sS.dm.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil { + if err = sS.dm.SetStatQueue(ctx, sq, nil, 0, nil, 0, true); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed saving StatQueue with ID: %s, error: %s", sq.TenantID(), err.Error())) return } //since we no longer handle cache in DataManager do here a manual caching - if err = sS.dm.CacheDataFromDB(context.TODO(), utils.StatQueuePrefix, []string{sq.TenantID()}, true); err != nil { + if err = sS.dm.CacheDataFromDB(ctx, utils.StatQueuePrefix, []string{sq.TenantID()}, true); err != nil { utils.Logger.Warning( fmt.Sprintf(" failed caching StatQueue with ID: %s, error: %s", sq.TenantID(), err.Error())) @@ -139,10 +139,10 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) { } // matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call -func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) { +func (sS *StatService) matchingStatQueuesForEvent(ctx *context.Context, tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) { sqIDs := utils.NewStringSet(statsIDs) if len(sqIDs) == 0 { - sqIDs, err = MatchingItemIDsForEvent(context.TODO(), evNm, + sqIDs, err = MatchingItemIDsForEvent(ctx, evNm, sS.cgrcfg.StatSCfg().StringIndexedFields, sS.cgrcfg.StatSCfg().PrefixIndexedFields, sS.cgrcfg.StatSCfg().SuffixIndexedFields, @@ -156,14 +156,14 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, } sqs = make(StatQueues, 0, len(sqIDs)) for sqID := range sqIDs { - sqPrfl, err := sS.dm.GetStatQueueProfile(tnt, sqID, true, true, utils.NonTransactional) + sqPrfl, err := sS.dm.GetStatQueueProfile(ctx, tnt, sqID, true, true, utils.NonTransactional) if err != nil { if err == utils.ErrNotFound { continue } return nil, err } - if pass, err := sS.filterS.Pass(context.TODO(), tnt, sqPrfl.FilterIDs, + if pass, err := sS.filterS.Pass(ctx, tnt, sqPrfl.FilterIDs, evNm); err != nil { return nil, err } else if !pass { @@ -171,8 +171,8 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, } var sq *StatQueue lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID) - guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) { - sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "") + guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) { + sq, err = sS.dm.GetStatQueue(ctx, sqPrfl.Tenant, sqPrfl.ID, true, true, "") return }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) if err != nil { @@ -236,29 +236,29 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent { } } -func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) { - if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil { +func (sS *StatService) getStatQueue(ctx *context.Context, tnt, id string) (sq *StatQueue, err error) { + if sq, err = sS.dm.GetStatQueue(ctx, tnt, id, true, true, utils.EmptyString); err != nil { return } lkID := utils.StatQueuePrefix + sq.TenantID() var removed int - guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) { + guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) { removed, err = sq.remExpired() return }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) if err != nil || removed == 0 { return } - sS.storeStatQueue(sq) + sS.storeStatQueue(ctx, sq) return } // storeStatQueue will store the sq if needed -func (sS *StatService) storeStatQueue(sq *StatQueue) { +func (sS *StatService) storeStatQueue(ctx *context.Context, sq *StatQueue) { if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save *sq.dirty = true // mark it to be saved if sS.cgrcfg.StatSCfg().StoreInterval == -1 { - sS.StoreStatQueue(sq) + sS.StoreStatQueue(ctx, sq) } else { sS.ssqMux.Lock() sS.storedStatQueues.Add(sq.TenantID()) @@ -269,12 +269,12 @@ func (sS *StatService) storeStatQueue(sq *StatQueue) { // processEvent processes a new event, dispatching to matching queues // queues matching are also cached to speed up -func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) { +func (sS *StatService) processEvent(ctx *context.Context, tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) { evNm := utils.MapStorage{ utils.MetaReq: args.Event, utils.MetaOpts: args.APIOpts, } - matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args.StatIDs, evNm) + matchSQs, err := sS.matchingStatQueuesForEvent(ctx, tnt, args.StatIDs, evNm) if err != nil { return nil, err } @@ -290,7 +290,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st for _, sq := range matchSQs { stsIDs = append(stsIDs, sq.ID) lkID := utils.StatQueuePrefix + sq.TenantID() - guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (_ interface{}, _ error) { + guardian.Guardian.Guard(ctx, func(_ *context.Context) (_ interface{}, _ error) { err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm) return }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) @@ -300,7 +300,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error())) withErrors = true } - sS.storeStatQueue(sq) + sS.storeStatQueue(ctx, sq) if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 { var thIDs []string if len(sq.sqPrfl.ThresholdIDs) != 0 { @@ -325,7 +325,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st thEv.Event[metricID] = metric.GetValue(sS.cgrcfg.GeneralCfg().RoundingDecimals) } var tIDs []string - if err := sS.connMgr.Call(context.TODO(), sS.cgrcfg.StatSCfg().ThresholdSConns, + if err := sS.connMgr.Call(ctx, sS.cgrcfg.StatSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil && err.Error() != utils.ErrNotFound.Error() { utils.Logger.Warning( @@ -346,7 +346,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st } // V1ProcessEvent implements StatV1 method for processing an Event -func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) { +func (sS *StatService) V1ProcessEvent(ctx *context.Context, args *StatsArgsProcessEvent, reply *[]string) (err error) { if args.CGREvent == nil { return utils.NewErrMandatoryIeMissing(utils.CGREventString) } @@ -360,7 +360,7 @@ func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]stri tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } var ids []string - if ids, err = sS.processEvent(tnt, args); err != nil { + if ids, err = sS.processEvent(ctx, tnt, args); err != nil { return } *reply = ids @@ -368,7 +368,7 @@ func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]stri } // V1GetStatQueuesForEvent implements StatV1 method for processing an Event -func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) { +func (sS *StatService) V1GetStatQueuesForEvent(ctx *context.Context, args *StatsArgsProcessEvent, reply *[]string) (err error) { if args.CGREvent == nil { return utils.NewErrMandatoryIeMissing(utils.CGREventString) } @@ -382,7 +382,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } var sQs StatQueues - if sQs, err = sS.matchingStatQueuesForEvent(tnt, args.StatIDs, utils.MapStorage{ + if sQs, err = sS.matchingStatQueuesForEvent(ctx, tnt, args.StatIDs, utils.MapStorage{ utils.MetaReq: args.Event, utils.MetaOpts: args.APIOpts, }); err != nil { @@ -397,7 +397,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl } // V1GetStatQueue returns a StatQueue object -func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *StatQueue) (err error) { +func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *StatQueue) (err error) { if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -405,7 +405,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.getStatQueue(tnt, args.ID) + sq, err := sS.getStatQueue(ctx, tnt, args.ID) if err != nil { return err } @@ -414,7 +414,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St } // V1GetQueueStringMetrics returns the metrics of a Queue as string values -func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) { +func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]string) (err error) { if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -422,7 +422,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.getStatQueue(tnt, args.ID) + sq, err := sS.getStatQueue(ctx, tnt, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) @@ -440,7 +440,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ } // V1GetQueueFloatMetrics returns the metrics as float64 values -func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) { +func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]float64) (err error) { if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } @@ -448,7 +448,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant } - sq, err := sS.getStatQueue(tnt, args.ID) + sq, err := sS.getStatQueue(ctx, tnt, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) @@ -466,12 +466,12 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s } // V1GetQueueIDs returns list of queueIDs registered for a tenant -func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) { +func (sS *StatService) V1GetQueueIDs(ctx *context.Context, tenant string, qIDs *[]string) (err error) { if tenant == utils.EmptyString { tenant = sS.cgrcfg.GeneralCfg().DefaultTenant } prfx := utils.StatQueuePrefix + tenant + utils.ConcatenatedKeySep - keys, err := sS.dm.DataDB().GetKeysForPrefix(context.TODO(), prfx) + keys, err := sS.dm.DataDB().GetKeysForPrefix(ctx, prfx) if err != nil { return err } @@ -484,22 +484,22 @@ func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) } // Reload stops the backupLoop and restarts it -func (sS *StatService) Reload() { +func (sS *StatService) Reload(ctx *context.Context) { close(sS.stopBackup) <-sS.loopStoped // wait until the loop is done sS.stopBackup = make(chan struct{}) - go sS.runBackup() + go sS.runBackup(ctx) } // StartLoop starsS the gorutine with the backup loop -func (sS *StatService) StartLoop() { - go sS.runBackup() +func (sS *StatService) StartLoop(ctx *context.Context) { + go sS.runBackup(ctx) } // V1ResetStatQueue resets the stat queue -func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (err error) { +func (sS *StatService) V1ResetStatQueue(ctx *context.Context, tntID *utils.TenantID, rply *string) (err error) { var sq *StatQueue - if sq, err = sS.dm.GetStatQueue(tntID.Tenant, tntID.ID, + if sq, err = sS.dm.GetStatQueue(ctx, tntID.Tenant, tntID.ID, true, true, utils.NonTransactional); err != nil { return } @@ -517,7 +517,7 @@ func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (er sq.SQMetrics[id] = metric } sq.dirty = utils.BoolPointer(true) - sS.storeStatQueue(sq) + sS.storeStatQueue(ctx, sq) *rply = utils.OK return } diff --git a/engine/storage_interface.go b/engine/storage_interface.go index a8d45bca1..4f067c3d5 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -60,12 +60,12 @@ type DataDB interface { SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error) - GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error) - SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) - RemStatQueueProfileDrv(tenant, id string) (err error) - GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error) - SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error) - RemStatQueueDrv(tenant, id string) (err error) + GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *StatQueueProfile, err error) + SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) + RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) + GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) + SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) + RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) GetThresholdProfileDrv(ctx *context.Context, tenant string, ID string) (tp *ThresholdProfile, err error) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error)