mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add stats APIs and fill in context where needed
This commit is contained in:
committed by
Dan Christian Bogos
parent
a9c3c8b937
commit
9ee73dd1ae
@@ -229,7 +229,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
|
||||
cacheIDs[utils.ThresholdFilterIndexIDs] = []string{utils.MetaAny}
|
||||
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheThresholdFilterIndexes,
|
||||
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
|
||||
th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
|
||||
th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
@@ -248,7 +248,7 @@ func (adms *AdminSv1) ComputeFilterIndexes(cntxt *context.Context, args *utils.A
|
||||
cacheIDs[utils.StatFilterIndexIDs] = []string{utils.MetaAny}
|
||||
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheStatFilterIndexes,
|
||||
nil, transactionID, func(tnt, id, ctx string) (*[]string, error) {
|
||||
sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
|
||||
sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
@@ -439,7 +439,7 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
|
||||
//ThresholdProfile Indexes
|
||||
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheThresholdFilterIndexes,
|
||||
&args.ThresholdIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
|
||||
th, e := adms.dm.GetThresholdProfile(tnt, id, true, false, utils.NonTransactional)
|
||||
th, e := adms.dm.GetThresholdProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
@@ -457,7 +457,7 @@ func (adms *AdminSv1) ComputeFilterIndexIDs(cntxt *context.Context, args *utils.
|
||||
//StatQueueProfile Indexes
|
||||
if indexes, err = engine.ComputeIndexes(cntxt, adms.dm, tnt, args.Subsystem, utils.CacheStatFilterIndexes,
|
||||
&args.StatIDs, transactionID, func(tnt, id, ctx string) (*[]string, error) {
|
||||
sq, e := adms.dm.GetStatQueueProfile(tnt, id, true, false, utils.NonTransactional)
|
||||
sq, e := adms.dm.GetStatQueueProfile(cntxt, tnt, id, true, false, utils.NonTransactional)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
|
||||
163
apis/stats.go
Normal file
163
apis/stats.go
Normal file
@@ -0,0 +1,163 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package apis
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// GetStatQueueProfile returns a StatQueue profile
|
||||
func (adms *AdminSv1) GetStatQueueProfile(ctx *context.Context, arg *utils.TenantID, reply *engine.StatQueueProfile) (err error) {
|
||||
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
tnt := arg.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = adms.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
sCfg, err := adms.dm.GetStatQueueProfile(ctx, tnt, arg.ID,
|
||||
true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = *sCfg
|
||||
return
|
||||
}
|
||||
|
||||
// GetStatQueueProfileIDs returns list of statQueueProfile IDs registered for a tenant
|
||||
func (adms *AdminSv1) GetStatQueueProfileIDs(ctx *context.Context, args *utils.PaginatorWithTenant, stsPrfIDs *[]string) error {
|
||||
tnt := args.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = adms.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
prfx := utils.StatQueueProfilePrefix + tnt + utils.ConcatenatedKeySep
|
||||
keys, err := adms.dm.DataDB().GetKeysForPrefix(ctx, prfx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
retIDs := make([]string, len(keys))
|
||||
for i, key := range keys {
|
||||
retIDs[i] = key[len(prfx):]
|
||||
}
|
||||
*stsPrfIDs = args.PaginateStringSlice(retIDs)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetStatQueueProfile alters/creates a StatQueueProfile
|
||||
func (adms *AdminSv1) SetStatQueueProfile(ctx *context.Context, arg *engine.StatQueueProfileWithAPIOpts, reply *string) (err error) {
|
||||
if missing := utils.MissingStructFields(arg.StatQueueProfile, []string{utils.ID}); len(missing) != 0 {
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
if arg.Tenant == utils.EmptyString {
|
||||
arg.Tenant = adms.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
if err = adms.dm.SetStatQueueProfile(ctx, arg.StatQueueProfile, true); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database
|
||||
//make 1 insert for both StatQueueProfile and StatQueue instead of 2
|
||||
loadID := time.Now().UnixNano()
|
||||
if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//handle caching for StatQueueProfile
|
||||
if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.CacheOpt]), arg.Tenant, utils.CacheStatQueueProfiles,
|
||||
arg.TenantID(), &arg.FilterIDs, nil, arg.APIOpts); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveStatQueueProfile remove a specific stat configuration
|
||||
func (adms *AdminSv1) RemoveStatQueueProfile(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) error {
|
||||
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
tnt := args.Tenant
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = adms.cfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
if err := adms.dm.RemoveStatQueueProfile(ctx, tnt, args.ID, utils.NonTransactional, true); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//handle caching for StatQueueProfile
|
||||
if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.CacheOpt]), tnt, utils.CacheStatQueueProfiles,
|
||||
utils.ConcatenatedKey(tnt, args.ID), nil, nil, args.APIOpts); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//generate a loadID for CacheStatQueueProfiles and CacheStatQueues and store it in database
|
||||
//make 1 insert for both StatQueueProfile and StatQueue instead of 2
|
||||
loadID := time.Now().UnixNano()
|
||||
if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheStatQueueProfiles: loadID, utils.CacheStatQueues: loadID}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewStatSv1 initializes StatSV1
|
||||
func NewStatSv1(sS *engine.StatService) *StatSv1 {
|
||||
return &StatSv1{sS: sS}
|
||||
}
|
||||
|
||||
// StatSv1 exports RPC from RLs
|
||||
type StatSv1 struct {
|
||||
ping
|
||||
sS *engine.StatService
|
||||
}
|
||||
|
||||
// GetQueueIDs returns list of queueIDs registered for a tenant
|
||||
func (stsv1 *StatSv1) GetQueueIDs(ctx *context.Context, tenant *utils.TenantWithAPIOpts, qIDs *[]string) error {
|
||||
return stsv1.sS.V1GetQueueIDs(ctx, tenant.Tenant, qIDs)
|
||||
}
|
||||
|
||||
// ProcessEvent returns processes a new Event
|
||||
func (stsv1 *StatSv1) ProcessEvent(ctx *context.Context, args *engine.StatsArgsProcessEvent, reply *[]string) error {
|
||||
return stsv1.sS.V1ProcessEvent(ctx, args, reply)
|
||||
}
|
||||
|
||||
// GetStatQueuesForEvent returns the list of queues IDs in the system
|
||||
func (stsv1 *StatSv1) GetStatQueuesForEvent(ctx *context.Context, args *engine.StatsArgsProcessEvent, reply *[]string) (err error) {
|
||||
return stsv1.sS.V1GetStatQueuesForEvent(ctx, args, reply)
|
||||
}
|
||||
|
||||
// GetStatQueue returns a StatQueue object
|
||||
func (stsv1 *StatSv1) GetStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *engine.StatQueue) (err error) {
|
||||
return stsv1.sS.V1GetStatQueue(ctx, args, reply)
|
||||
}
|
||||
|
||||
// GetQueueStringMetrics returns the string metrics for a Queue
|
||||
func (stsv1 *StatSv1) GetQueueStringMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]string) (err error) {
|
||||
return stsv1.sS.V1GetQueueStringMetrics(ctx, args.TenantID, reply)
|
||||
}
|
||||
|
||||
// GetQueueFloatMetrics returns the float metrics for a Queue
|
||||
func (stsv1 *StatSv1) GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *map[string]float64) (err error) {
|
||||
return stsv1.sS.V1GetQueueFloatMetrics(ctx, args.TenantID, reply)
|
||||
}
|
||||
|
||||
// ResetStatQueue resets the stat queue
|
||||
func (stsv1 *StatSv1) ResetStatQueue(ctx *context.Context, tntID *utils.TenantIDWithAPIOpts, reply *string) error {
|
||||
return stsv1.sS.V1ResetStatQueue(ctx, tntID.TenantID, reply)
|
||||
}
|
||||
@@ -147,19 +147,19 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
|
||||
_, err = dm.GetResourceProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.ResourcesPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetResource(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetResource(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.StatQueueProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetStatQueueProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetStatQueueProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.StatQueuePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetStatQueue(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetStatQueue(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.ThresholdProfilePrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetThresholdProfile(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetThresholdProfile(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.ThresholdPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetThreshold(tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
_, err = dm.GetThreshold(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
case utils.FilterPrefix:
|
||||
tntID := utils.NewTenantID(dataID)
|
||||
_, err = dm.GetFilter(ctx, tntID.Tenant, tntID.ID, false, true, utils.NonTransactional)
|
||||
@@ -280,7 +280,7 @@ func (dm *DataManager) CacheDataFromDB(ctx *context.Context, prfx string, ids []
|
||||
|
||||
// GetStatQueue retrieves a StatQueue from dataDB
|
||||
// handles caching and deserialization of metrics
|
||||
func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
func (dm *DataManager) GetStatQueue(ctx *context.Context, tenant, id string,
|
||||
cacheRead, cacheWrite bool, transactionID string) (sq *StatQueue, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
if cacheRead {
|
||||
@@ -295,10 +295,10 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
err = utils.ErrNoDatabaseConn
|
||||
return
|
||||
}
|
||||
sq, err = dm.dataDB.GetStatQueueDrv(tenant, id)
|
||||
sq, err = dm.dataDB.GetStatQueueDrv(ctx, tenant, id)
|
||||
if err != nil {
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; err == utils.ErrNotFound && itm.Remote {
|
||||
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue,
|
||||
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns, utils.ReplicatorSv1GetStatQueue,
|
||||
&utils.TenantIDWithAPIOpts{
|
||||
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
|
||||
@@ -312,12 +312,12 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
err = dm.dataDB.SetStatQueueDrv(ssq, sq)
|
||||
err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite && dm.dataDB.GetStorageType() != utils.Internal {
|
||||
if errCh := Cache.Set(context.TODO(), utils.CacheStatQueues, tntID, nil, nil,
|
||||
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, nil, nil,
|
||||
cacheCommit(transactionID), transactionID); errCh != nil {
|
||||
return nil, errCh
|
||||
}
|
||||
@@ -327,7 +327,7 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
}
|
||||
}
|
||||
if cacheWrite {
|
||||
if errCh := Cache.Set(context.TODO(), utils.CacheStatQueues, tntID, sq, nil,
|
||||
if errCh := Cache.Set(ctx, utils.CacheStatQueues, tntID, sq, nil,
|
||||
cacheCommit(transactionID), transactionID); errCh != nil {
|
||||
return nil, errCh
|
||||
}
|
||||
@@ -336,7 +336,7 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
|
||||
}
|
||||
|
||||
// SetStatQueue converts to StoredStatQueue and stores the result in dataDB
|
||||
func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
|
||||
func (dm *DataManager) SetStatQueue(ctx *context.Context, sq *StatQueue, metrics []*MetricWithFilters,
|
||||
minItems int, ttl *time.Duration, queueLength int, simpleSet bool) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
@@ -345,7 +345,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
|
||||
tnt := sq.Tenant // save the tenant
|
||||
id := sq.ID // save the ID from the initial StatQueue
|
||||
// handle metrics for statsQueue
|
||||
sq, err = dm.GetStatQueue(tnt, id, true, false, utils.NonTransactional)
|
||||
sq, err = dm.GetStatQueue(ctx, tnt, id, true, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return
|
||||
}
|
||||
@@ -406,11 +406,11 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
|
||||
return
|
||||
}
|
||||
}
|
||||
if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil {
|
||||
if err = dm.dataDB.SetStatQueueDrv(ctx, ssq, sq); err != nil {
|
||||
return
|
||||
}
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
|
||||
err = replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
err = replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
config.CgrConfig().DataDbCfg().RplFiltered,
|
||||
utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1SetStatQueue,
|
||||
@@ -423,15 +423,15 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue, metrics []*MetricWithFilters,
|
||||
}
|
||||
|
||||
// RemoveStatQueue removes the StoredStatQueue
|
||||
func (dm *DataManager) RemoveStatQueue(tenant, id string, transactionID string) (err error) {
|
||||
func (dm *DataManager) RemoveStatQueue(ctx *context.Context, tenant, id string, transactionID string) (err error) {
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil {
|
||||
if err = dm.dataDB.RemStatQueueDrv(ctx, tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate {
|
||||
replicate(context.TODO(), dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
replicate(ctx, dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
config.CgrConfig().DataDbCfg().RplFiltered,
|
||||
utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
|
||||
utils.ReplicatorSv1RemoveStatQueue,
|
||||
@@ -822,7 +822,7 @@ func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id stri
|
||||
err = utils.ErrNoDatabaseConn
|
||||
return
|
||||
}
|
||||
sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id)
|
||||
sqp, err = dm.dataDB.GetStatQueueProfileDrv(ctx, tenant, id)
|
||||
if err != nil {
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; err == utils.ErrNotFound && itm.Remote {
|
||||
if err = dm.connMgr.Call(ctx, config.CgrConfig().DataDbCfg().RmtConns,
|
||||
@@ -833,7 +833,7 @@ func (dm *DataManager) GetStatQueueProfile(ctx *context.Context, tenant, id stri
|
||||
utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID,
|
||||
config.CgrConfig().GeneralCfg().NodeID)),
|
||||
}, &sqp); err == nil {
|
||||
err = dm.dataDB.SetStatQueueProfileDrv(sqp)
|
||||
err = dm.dataDB.SetStatQueueProfileDrv(ctx, sqp)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
@@ -872,7 +872,7 @@ func (dm *DataManager) SetStatQueueProfile(ctx *context.Context, sqp *StatQueueP
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err = dm.DataDB().SetStatQueueProfileDrv(sqp); err != nil {
|
||||
if err = dm.DataDB().SetStatQueueProfileDrv(ctx, sqp); err != nil {
|
||||
return err
|
||||
}
|
||||
if withIndex {
|
||||
@@ -907,7 +907,7 @@ func (dm *DataManager) RemoveStatQueueProfile(ctx *context.Context, tenant, id,
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err = dm.DataDB().RemStatQueueProfileDrv(tenant, id); err != nil {
|
||||
if err = dm.DataDB().RemStatQueueProfileDrv(ctx, tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
if oldSts == nil {
|
||||
|
||||
@@ -58,22 +58,22 @@ type StatService struct {
|
||||
}
|
||||
|
||||
// Shutdown is called to shutdown the service
|
||||
func (sS *StatService) Shutdown() {
|
||||
func (sS *StatService) Shutdown(ctx *context.Context) {
|
||||
utils.Logger.Info("<StatS> service shutdown initialized")
|
||||
close(sS.stopBackup)
|
||||
sS.storeStats()
|
||||
sS.storeStats(ctx)
|
||||
utils.Logger.Info("<StatS> service shutdown complete")
|
||||
}
|
||||
|
||||
// runBackup will regularly store resources changed to dataDB
|
||||
func (sS *StatService) runBackup() {
|
||||
func (sS *StatService) runBackup(ctx *context.Context) {
|
||||
storeInterval := sS.cgrcfg.StatSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
sS.loopStoped <- struct{}{}
|
||||
return
|
||||
}
|
||||
for {
|
||||
sS.storeStats()
|
||||
sS.storeStats(ctx)
|
||||
select {
|
||||
case <-sS.stopBackup:
|
||||
sS.loopStoped <- struct{}{}
|
||||
@@ -84,7 +84,7 @@ func (sS *StatService) runBackup() {
|
||||
}
|
||||
|
||||
// storeResources represents one task of complete backup
|
||||
func (sS *StatService) storeStats() {
|
||||
func (sS *StatService) storeStats(ctx *context.Context) {
|
||||
var failedSqIDs []string
|
||||
for { // don't stop untill we store all dirty statQueues
|
||||
sS.ssqMux.Lock()
|
||||
@@ -96,12 +96,12 @@ func (sS *StatService) storeStats() {
|
||||
if sID == "" {
|
||||
break // no more keys, backup completed
|
||||
}
|
||||
guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
if sqIf, ok := Cache.Get(utils.CacheStatQueues, sID); !ok || sqIf == nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed retrieving from cache stat queue with ID: %s",
|
||||
utils.StatService, sID))
|
||||
} else if err := sS.StoreStatQueue(sqIf.(*StatQueue)); err != nil {
|
||||
} else if err := sS.StoreStatQueue(ctx, sqIf.(*StatQueue)); err != nil {
|
||||
failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
return
|
||||
@@ -117,18 +117,18 @@ func (sS *StatService) storeStats() {
|
||||
}
|
||||
|
||||
// StoreStatQueue stores the statQueue in DB and corrects dirty flag
|
||||
func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
|
||||
func (sS *StatService) StoreStatQueue(ctx *context.Context, sq *StatQueue) (err error) {
|
||||
if sq.dirty == nil || !*sq.dirty {
|
||||
return
|
||||
}
|
||||
if err = sS.dm.SetStatQueue(sq, nil, 0, nil, 0, true); err != nil {
|
||||
if err = sS.dm.SetStatQueue(ctx, sq, nil, 0, nil, 0, true); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatS> failed saving StatQueue with ID: %s, error: %s",
|
||||
sq.TenantID(), err.Error()))
|
||||
return
|
||||
}
|
||||
//since we no longer handle cache in DataManager do here a manual caching
|
||||
if err = sS.dm.CacheDataFromDB(context.TODO(), utils.StatQueuePrefix, []string{sq.TenantID()}, true); err != nil {
|
||||
if err = sS.dm.CacheDataFromDB(ctx, utils.StatQueuePrefix, []string{sq.TenantID()}, true); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<StatS> failed caching StatQueue with ID: %s, error: %s",
|
||||
sq.TenantID(), err.Error()))
|
||||
@@ -139,10 +139,10 @@ func (sS *StatService) StoreStatQueue(sq *StatQueue) (err error) {
|
||||
}
|
||||
|
||||
// matchingStatQueuesForEvent returns ordered list of matching resources which are active by the time of the call
|
||||
func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) {
|
||||
func (sS *StatService) matchingStatQueuesForEvent(ctx *context.Context, tnt string, statsIDs []string, evNm utils.MapStorage) (sqs StatQueues, err error) {
|
||||
sqIDs := utils.NewStringSet(statsIDs)
|
||||
if len(sqIDs) == 0 {
|
||||
sqIDs, err = MatchingItemIDsForEvent(context.TODO(), evNm,
|
||||
sqIDs, err = MatchingItemIDsForEvent(ctx, evNm,
|
||||
sS.cgrcfg.StatSCfg().StringIndexedFields,
|
||||
sS.cgrcfg.StatSCfg().PrefixIndexedFields,
|
||||
sS.cgrcfg.StatSCfg().SuffixIndexedFields,
|
||||
@@ -156,14 +156,14 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string,
|
||||
}
|
||||
sqs = make(StatQueues, 0, len(sqIDs))
|
||||
for sqID := range sqIDs {
|
||||
sqPrfl, err := sS.dm.GetStatQueueProfile(tnt, sqID, true, true, utils.NonTransactional)
|
||||
sqPrfl, err := sS.dm.GetStatQueueProfile(ctx, tnt, sqID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if pass, err := sS.filterS.Pass(context.TODO(), tnt, sqPrfl.FilterIDs,
|
||||
if pass, err := sS.filterS.Pass(ctx, tnt, sqPrfl.FilterIDs,
|
||||
evNm); err != nil {
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
@@ -171,8 +171,8 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string,
|
||||
}
|
||||
var sq *StatQueue
|
||||
lkID := utils.StatQueuePrefix + utils.ConcatenatedKey(sqPrfl.Tenant, sqPrfl.ID)
|
||||
guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "")
|
||||
guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
sq, err = sS.dm.GetStatQueue(ctx, sqPrfl.Tenant, sqPrfl.ID, true, true, "")
|
||||
return
|
||||
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
|
||||
if err != nil {
|
||||
@@ -236,29 +236,29 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
|
||||
}
|
||||
}
|
||||
|
||||
func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) {
|
||||
if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil {
|
||||
func (sS *StatService) getStatQueue(ctx *context.Context, tnt, id string) (sq *StatQueue, err error) {
|
||||
if sq, err = sS.dm.GetStatQueue(ctx, tnt, id, true, true, utils.EmptyString); err != nil {
|
||||
return
|
||||
}
|
||||
lkID := utils.StatQueuePrefix + sq.TenantID()
|
||||
var removed int
|
||||
guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
guardian.Guardian.Guard(ctx, func(_ *context.Context) (gRes interface{}, gErr error) {
|
||||
removed, err = sq.remExpired()
|
||||
return
|
||||
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
|
||||
if err != nil || removed == 0 {
|
||||
return
|
||||
}
|
||||
sS.storeStatQueue(sq)
|
||||
sS.storeStatQueue(ctx, sq)
|
||||
return
|
||||
}
|
||||
|
||||
// storeStatQueue will store the sq if needed
|
||||
func (sS *StatService) storeStatQueue(sq *StatQueue) {
|
||||
func (sS *StatService) storeStatQueue(ctx *context.Context, sq *StatQueue) {
|
||||
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
|
||||
*sq.dirty = true // mark it to be saved
|
||||
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
|
||||
sS.StoreStatQueue(sq)
|
||||
sS.StoreStatQueue(ctx, sq)
|
||||
} else {
|
||||
sS.ssqMux.Lock()
|
||||
sS.storedStatQueues.Add(sq.TenantID())
|
||||
@@ -269,12 +269,12 @@ func (sS *StatService) storeStatQueue(sq *StatQueue) {
|
||||
|
||||
// processEvent processes a new event, dispatching to matching queues
|
||||
// queues matching are also cached to speed up
|
||||
func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) {
|
||||
func (sS *StatService) processEvent(ctx *context.Context, tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) {
|
||||
evNm := utils.MapStorage{
|
||||
utils.MetaReq: args.Event,
|
||||
utils.MetaOpts: args.APIOpts,
|
||||
}
|
||||
matchSQs, err := sS.matchingStatQueuesForEvent(tnt, args.StatIDs, evNm)
|
||||
matchSQs, err := sS.matchingStatQueuesForEvent(ctx, tnt, args.StatIDs, evNm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -290,7 +290,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
|
||||
for _, sq := range matchSQs {
|
||||
stsIDs = append(stsIDs, sq.ID)
|
||||
lkID := utils.StatQueuePrefix + sq.TenantID()
|
||||
guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) (_ interface{}, _ error) {
|
||||
guardian.Guardian.Guard(ctx, func(_ *context.Context) (_ interface{}, _ error) {
|
||||
err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm)
|
||||
return
|
||||
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
|
||||
@@ -300,7 +300,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
|
||||
sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error()))
|
||||
withErrors = true
|
||||
}
|
||||
sS.storeStatQueue(sq)
|
||||
sS.storeStatQueue(ctx, sq)
|
||||
if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 {
|
||||
var thIDs []string
|
||||
if len(sq.sqPrfl.ThresholdIDs) != 0 {
|
||||
@@ -325,7 +325,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
|
||||
thEv.Event[metricID] = metric.GetValue(sS.cgrcfg.GeneralCfg().RoundingDecimals)
|
||||
}
|
||||
var tIDs []string
|
||||
if err := sS.connMgr.Call(context.TODO(), sS.cgrcfg.StatSCfg().ThresholdSConns,
|
||||
if err := sS.connMgr.Call(ctx, sS.cgrcfg.StatSCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
@@ -346,7 +346,7 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
|
||||
}
|
||||
|
||||
// V1ProcessEvent implements StatV1 method for processing an Event
|
||||
func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) {
|
||||
func (sS *StatService) V1ProcessEvent(ctx *context.Context, args *StatsArgsProcessEvent, reply *[]string) (err error) {
|
||||
if args.CGREvent == nil {
|
||||
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
|
||||
}
|
||||
@@ -360,7 +360,7 @@ func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]stri
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
var ids []string
|
||||
if ids, err = sS.processEvent(tnt, args); err != nil {
|
||||
if ids, err = sS.processEvent(ctx, tnt, args); err != nil {
|
||||
return
|
||||
}
|
||||
*reply = ids
|
||||
@@ -368,7 +368,7 @@ func (sS *StatService) V1ProcessEvent(args *StatsArgsProcessEvent, reply *[]stri
|
||||
}
|
||||
|
||||
// V1GetStatQueuesForEvent implements StatV1 method for processing an Event
|
||||
func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, reply *[]string) (err error) {
|
||||
func (sS *StatService) V1GetStatQueuesForEvent(ctx *context.Context, args *StatsArgsProcessEvent, reply *[]string) (err error) {
|
||||
if args.CGREvent == nil {
|
||||
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
|
||||
}
|
||||
@@ -382,7 +382,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
var sQs StatQueues
|
||||
if sQs, err = sS.matchingStatQueuesForEvent(tnt, args.StatIDs, utils.MapStorage{
|
||||
if sQs, err = sS.matchingStatQueuesForEvent(ctx, tnt, args.StatIDs, utils.MapStorage{
|
||||
utils.MetaReq: args.Event,
|
||||
utils.MetaOpts: args.APIOpts,
|
||||
}); err != nil {
|
||||
@@ -397,7 +397,7 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
|
||||
}
|
||||
|
||||
// V1GetStatQueue returns a StatQueue object
|
||||
func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *StatQueue) (err error) {
|
||||
func (sS *StatService) V1GetStatQueue(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *StatQueue) (err error) {
|
||||
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
@@ -405,7 +405,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
sq, err := sS.getStatQueue(tnt, args.ID)
|
||||
sq, err := sS.getStatQueue(ctx, tnt, args.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -414,7 +414,7 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithAPIOpts, reply *St
|
||||
}
|
||||
|
||||
// V1GetQueueStringMetrics returns the metrics of a Queue as string values
|
||||
func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
|
||||
func (sS *StatService) V1GetQueueStringMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]string) (err error) {
|
||||
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
@@ -422,7 +422,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
sq, err := sS.getStatQueue(tnt, args.ID)
|
||||
sq, err := sS.getStatQueue(ctx, tnt, args.ID)
|
||||
if err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
err = utils.NewErrServerError(err)
|
||||
@@ -440,7 +440,7 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
|
||||
}
|
||||
|
||||
// V1GetQueueFloatMetrics returns the metrics as float64 values
|
||||
func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) {
|
||||
func (sS *StatService) V1GetQueueFloatMetrics(ctx *context.Context, args *utils.TenantID, reply *map[string]float64) (err error) {
|
||||
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
@@ -448,7 +448,7 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
|
||||
if tnt == utils.EmptyString {
|
||||
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
sq, err := sS.getStatQueue(tnt, args.ID)
|
||||
sq, err := sS.getStatQueue(ctx, tnt, args.ID)
|
||||
if err != nil {
|
||||
if err != utils.ErrNotFound {
|
||||
err = utils.NewErrServerError(err)
|
||||
@@ -466,12 +466,12 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
|
||||
}
|
||||
|
||||
// V1GetQueueIDs returns list of queueIDs registered for a tenant
|
||||
func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error) {
|
||||
func (sS *StatService) V1GetQueueIDs(ctx *context.Context, tenant string, qIDs *[]string) (err error) {
|
||||
if tenant == utils.EmptyString {
|
||||
tenant = sS.cgrcfg.GeneralCfg().DefaultTenant
|
||||
}
|
||||
prfx := utils.StatQueuePrefix + tenant + utils.ConcatenatedKeySep
|
||||
keys, err := sS.dm.DataDB().GetKeysForPrefix(context.TODO(), prfx)
|
||||
keys, err := sS.dm.DataDB().GetKeysForPrefix(ctx, prfx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -484,22 +484,22 @@ func (sS *StatService) V1GetQueueIDs(tenant string, qIDs *[]string) (err error)
|
||||
}
|
||||
|
||||
// Reload stops the backupLoop and restarts it
|
||||
func (sS *StatService) Reload() {
|
||||
func (sS *StatService) Reload(ctx *context.Context) {
|
||||
close(sS.stopBackup)
|
||||
<-sS.loopStoped // wait until the loop is done
|
||||
sS.stopBackup = make(chan struct{})
|
||||
go sS.runBackup()
|
||||
go sS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// StartLoop starsS the gorutine with the backup loop
|
||||
func (sS *StatService) StartLoop() {
|
||||
go sS.runBackup()
|
||||
func (sS *StatService) StartLoop(ctx *context.Context) {
|
||||
go sS.runBackup(ctx)
|
||||
}
|
||||
|
||||
// V1ResetStatQueue resets the stat queue
|
||||
func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (err error) {
|
||||
func (sS *StatService) V1ResetStatQueue(ctx *context.Context, tntID *utils.TenantID, rply *string) (err error) {
|
||||
var sq *StatQueue
|
||||
if sq, err = sS.dm.GetStatQueue(tntID.Tenant, tntID.ID,
|
||||
if sq, err = sS.dm.GetStatQueue(ctx, tntID.Tenant, tntID.ID,
|
||||
true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -517,7 +517,7 @@ func (sS *StatService) V1ResetStatQueue(tntID *utils.TenantID, rply *string) (er
|
||||
sq.SQMetrics[id] = metric
|
||||
}
|
||||
sq.dirty = utils.BoolPointer(true)
|
||||
sS.storeStatQueue(sq)
|
||||
sS.storeStatQueue(ctx, sq)
|
||||
*rply = utils.OK
|
||||
return
|
||||
}
|
||||
|
||||
@@ -60,12 +60,12 @@ type DataDB interface {
|
||||
SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
|
||||
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error)
|
||||
RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err error)
|
||||
GetStatQueueProfileDrv(tenant string, ID string) (sq *StatQueueProfile, err error)
|
||||
SetStatQueueProfileDrv(sq *StatQueueProfile) (err error)
|
||||
RemStatQueueProfileDrv(tenant, id string) (err error)
|
||||
GetStatQueueDrv(tenant, id string) (sq *StatQueue, err error)
|
||||
SetStatQueueDrv(ssq *StoredStatQueue, sq *StatQueue) (err error)
|
||||
RemStatQueueDrv(tenant, id string) (err error)
|
||||
GetStatQueueProfileDrv(ctx *context.Context, tenant string, ID string) (sq *StatQueueProfile, err error)
|
||||
SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error)
|
||||
RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error)
|
||||
GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error)
|
||||
SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error)
|
||||
RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error)
|
||||
GetThresholdProfileDrv(ctx *context.Context, tenant string, ID string) (tp *ThresholdProfile, err error)
|
||||
SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error)
|
||||
RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error)
|
||||
|
||||
Reference in New Issue
Block a user