mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
added tests for filter statmetric
This commit is contained in:
committed by
Dan Christian Bogos
parent
3d9a94c5c5
commit
22b96ca1e3
@@ -84,7 +84,7 @@ func (apierSv1 *APIerSv1) SetRankingProfile(ctx *context.Context, arg *engine.Ra
|
||||
}
|
||||
// delay if needed before cache call
|
||||
if apierSv1.Config.GeneralCfg().CachingDelay != 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("<SetStatQueueProfile> Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay))
|
||||
utils.Logger.Info(fmt.Sprintf("<SetRankingProfile> Delaying cache call for %v", apierSv1.Config.GeneralCfg().CachingDelay))
|
||||
time.Sleep(apierSv1.Config.GeneralCfg().CachingDelay)
|
||||
}
|
||||
//handle caching for RankingProfile
|
||||
|
||||
@@ -1328,14 +1328,14 @@ func (dm *DataManager) SetTrendProfile(srp *TrendProfile) (err error) {
|
||||
}
|
||||
|
||||
func (dm *DataManager) RemoveTrendProfile(tenant, id string) (err error) {
|
||||
oldSgs, err := dm.GetTrendProfile(tenant, id)
|
||||
oldTrs, err := dm.GetTrendProfile(tenant, id)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
if err = dm.DataDB().RemTrendProfileDrv(tenant, id); err != nil {
|
||||
return
|
||||
}
|
||||
if oldSgs == nil {
|
||||
if oldTrs == nil {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate {
|
||||
@@ -1369,7 +1369,7 @@ func (dm *DataManager) GetRankingProfile(tenant, id string, cacheRead, cacheWrit
|
||||
if err != nil {
|
||||
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; err == utils.ErrNotFound && itm.Remote {
|
||||
if err = dm.connMgr.Call(context.TODO(), config.CgrConfig().DataDbCfg().RmtConns,
|
||||
utils.ReplicatorSv1GetSagProfile,
|
||||
utils.ReplicatorSv1GetRankingProfile,
|
||||
&utils.TenantIDWithAPIOpts{
|
||||
TenantID: &utils.TenantID{Tenant: tenant, ID: id},
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString,
|
||||
@@ -1410,7 +1410,7 @@ func (dm *DataManager) SetRankingProfile(sgp *RankingProfile) (err error) {
|
||||
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
|
||||
config.CgrConfig().DataDbCfg().RplFiltered,
|
||||
utils.RankingsProfilePrefix, sgp.TenantID(),
|
||||
utils.ReplicatorSv1SetSagProfile,
|
||||
utils.ReplicatorSv1SetRankingProfile,
|
||||
&RankingProfileWithAPIOpts{
|
||||
RankingProfile: sgp,
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
|
||||
@@ -2759,3 +2759,65 @@ func TestHttpInlineFilter(t *testing.T) {
|
||||
t.Error("should had passed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterStats(t *testing.T) {
|
||||
tmpConn := connMgr
|
||||
defer func() {
|
||||
connMgr = tmpConn
|
||||
}()
|
||||
clientConn := make(chan context.ClientConnector, 1)
|
||||
clientConn <- &ccMock{
|
||||
calls: map[string]func(ctx *context.Context, args any, reply any) error{
|
||||
utils.StatSv1GetQueueFloatMetrics: func(ctx *context.Context, args, reply any) error {
|
||||
*reply.(*map[string]float64) = map[string]float64{
|
||||
"*sum#~*req.Usage": 45,
|
||||
utils.MetaTCD: 1,
|
||||
utils.MetaAverage + utils.HashtagSep + utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Usage: 15,
|
||||
utils.MetaACC: 22.2,
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
connMgr = NewConnManager(config.NewDefaultCGRConfig(), map[string]chan context.ClientConnector{
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): clientConn,
|
||||
})
|
||||
testCases := []struct {
|
||||
name string
|
||||
infilter string
|
||||
experr error
|
||||
expPass bool
|
||||
}{
|
||||
{"ComposedStatMetric", "*gte:~*stats.SQ_1002.*sum#~*req.Usage:20", nil, false},
|
||||
{"TCDStatMetric", "*lte:~*stats.SQ_1.*tcd:5", nil, true},
|
||||
{"AverageStatMetric", "*gt:~*stats.SQ_2.*average#~req.Usage:12", nil, false},
|
||||
{"AverageCallCostStatMetric", "*eq:~*stats.SQ_3.*acc:22.2", nil, true},
|
||||
}
|
||||
initDP := utils.MapStorage{}
|
||||
dp := newDynamicDP(nil, []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}, nil, "cgrates.org", initDP)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fl, err := NewFilterFromInline("cgrates.org", tc.infilter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pass, err := fl.Rules[0].Pass(dp)
|
||||
if tc.experr != nil {
|
||||
if err == nil {
|
||||
t.Fatalf("Expected to receive error ,got nil")
|
||||
}
|
||||
if err != tc.experr {
|
||||
t.Errorf("Expected error %q,got %q instead\n", tc.experr, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error,got %q instead\n", err)
|
||||
}
|
||||
if pass != tc.expPass {
|
||||
t.Error("Exected filter rule to pass")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1481,7 +1481,7 @@ func (sqls *SQLStorage) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsP
|
||||
}
|
||||
|
||||
func (sqls *SQLStorage) GetTPRankings(tpid string, tenant string, id string) ([]*utils.TPRankingProfile, error) {
|
||||
var sgs RankingsMdls
|
||||
var rgs RankingsMdls
|
||||
q := sqls.db.Where("tpid = ?", tpid)
|
||||
if len(id) != 0 {
|
||||
q = q.Where("id = ?", id)
|
||||
@@ -1489,10 +1489,10 @@ func (sqls *SQLStorage) GetTPRankings(tpid string, tenant string, id string) ([]
|
||||
if len(tenant) != 0 {
|
||||
q = q.Where("tenant = ?", tenant)
|
||||
}
|
||||
if err := q.Find(&sgs).Error; err != nil {
|
||||
if err := q.Find(&rgs).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
asgs := sgs.AsTPRanking()
|
||||
asgs := rgs.AsTPRanking()
|
||||
if len(asgs) == 0 {
|
||||
return asgs, utils.ErrNotFound
|
||||
}
|
||||
|
||||
@@ -438,27 +438,27 @@ func (ldr *Loader) storeLoadedData(loaderType string,
|
||||
case utils.MetaRankings:
|
||||
cacheIDs = []string{utils.CacheRankingFilterIndexes}
|
||||
for _, lDataSet := range lds {
|
||||
stsModels := make(engine.RankingsMdls, len(lDataSet))
|
||||
rnkModels := make(engine.RankingsMdls, len(lDataSet))
|
||||
for i, ld := range lDataSet {
|
||||
stsModels[i] = new(engine.RankingsMdl)
|
||||
if err = utils.UpdateStructWithIfaceMap(stsModels[i], ld); err != nil {
|
||||
rnkModels[i] = new(engine.RankingsMdl)
|
||||
if err = utils.UpdateStructWithIfaceMap(rnkModels[i], ld); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, tpSgs := range stsModels.AsTPRanking() {
|
||||
sgsPrf, err := engine.APItoRanking(tpSgs)
|
||||
for _, tpRgs := range rnkModels.AsTPRanking() {
|
||||
rgsPrf, err := engine.APItoRanking(tpRgs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ldr.dryRun {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s-%s> DRY_RUN: RankingProfile: %s",
|
||||
utils.LoaderS, ldr.ldrID, utils.ToJSON(sgsPrf)))
|
||||
utils.LoaderS, ldr.ldrID, utils.ToJSON(rgsPrf)))
|
||||
continue
|
||||
}
|
||||
// get IDs so we can reload in cache
|
||||
ids = append(ids, sgsPrf.TenantID())
|
||||
if err := ldr.dm.SetRankingProfile(sgsPrf); err != nil {
|
||||
ids = append(ids, rgsPrf.TenantID())
|
||||
if err := ldr.dm.SetRankingProfile(rgsPrf); err != nil {
|
||||
return err
|
||||
}
|
||||
cacheArgs[utils.CacheRankingFilterIndexes] = ids
|
||||
|
||||
@@ -64,18 +64,17 @@ type RankingService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sag *RankingService) Start() error {
|
||||
if sag.IsRunning() {
|
||||
func (rg *RankingService) Start() error {
|
||||
if rg.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
sag.srvDep[utils.DataDB].Add(1)
|
||||
<-sag.cacheS.GetPrecacheChannel(utils.CacheStatQueueProfiles)
|
||||
<-sag.cacheS.GetPrecacheChannel(utils.CacheStatQueues)
|
||||
<-sag.cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes)
|
||||
rg.srvDep[utils.DataDB].Add(1)
|
||||
<-rg.cacheS.GetPrecacheChannel(utils.CacheRankingProfiles)
|
||||
<-rg.cacheS.GetPrecacheChannel(utils.CacheRankingFilterIndexes)
|
||||
|
||||
filterS := <-sag.filterSChan
|
||||
sag.filterSChan <- filterS
|
||||
dbchan := sag.dm.GetDMChan()
|
||||
filterS := <-rg.filterSChan
|
||||
rg.filterSChan <- filterS
|
||||
dbchan := rg.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
@@ -85,42 +84,42 @@ func (sag *RankingService) Start() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !sag.cfg.DispatcherSCfg().Enabled {
|
||||
if !rg.cfg.DispatcherSCfg().Enabled {
|
||||
for _, s := range srv {
|
||||
sag.server.RpcRegister(s)
|
||||
rg.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
sag.connChan <- sag.anz.GetInternalCodec(srv, utils.StatS)
|
||||
rg.connChan <- rg.anz.GetInternalCodec(srv, utils.StatS)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sag *RankingService) Reload() (err error) {
|
||||
func (rg *RankingService) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (sag *RankingService) Shutdown() (err error) {
|
||||
defer sag.srvDep[utils.DataDB].Done()
|
||||
sag.Lock()
|
||||
defer sag.Unlock()
|
||||
<-sag.connChan
|
||||
func (rg *RankingService) Shutdown() (err error) {
|
||||
defer rg.srvDep[utils.DataDB].Done()
|
||||
rg.Lock()
|
||||
defer rg.Unlock()
|
||||
<-rg.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (sag *RankingService) IsRunning() bool {
|
||||
sag.RLock()
|
||||
defer sag.RUnlock()
|
||||
func (rg *RankingService) IsRunning() bool {
|
||||
rg.RLock()
|
||||
defer rg.RUnlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (sag *RankingService) ServiceName() string {
|
||||
func (rg *RankingService) ServiceName() string {
|
||||
return utils.RankingS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (sag *RankingService) ShouldRun() bool {
|
||||
return sag.cfg.RankingSCfg().Enabled
|
||||
func (rg *RankingService) ShouldRun() bool {
|
||||
return rg.cfg.RankingSCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -64,16 +64,16 @@ type TrendService struct {
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (sa *TrendService) Start() error {
|
||||
if sa.IsRunning() {
|
||||
func (tr *TrendService) Start() error {
|
||||
if tr.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
sa.srvDep[utils.DataDB].Add(1)
|
||||
<-sa.cacheS.GetPrecacheChannel(utils.CacheStatFilterIndexes)
|
||||
tr.srvDep[utils.DataDB].Add(1)
|
||||
<-tr.cacheS.GetPrecacheChannel(utils.CacheTrendProfiles)
|
||||
|
||||
filterS := <-sa.filterSChan
|
||||
sa.filterSChan <- filterS
|
||||
dbchan := sa.dm.GetDMChan()
|
||||
filterS := <-tr.filterSChan
|
||||
tr.filterSChan <- filterS
|
||||
dbchan := tr.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
@@ -83,42 +83,42 @@ func (sa *TrendService) Start() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !sa.cfg.DispatcherSCfg().Enabled {
|
||||
if !tr.cfg.DispatcherSCfg().Enabled {
|
||||
for _, s := range srv {
|
||||
sa.server.RpcRegister(s)
|
||||
tr.server.RpcRegister(s)
|
||||
}
|
||||
}
|
||||
sa.connChan <- sa.anz.GetInternalCodec(srv, utils.StatS)
|
||||
tr.connChan <- tr.anz.GetInternalCodec(srv, utils.StatS)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (sa *TrendService) Reload() (err error) {
|
||||
func (tr *TrendService) Reload() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (sa *TrendService) Shutdown() (err error) {
|
||||
defer sa.srvDep[utils.DataDB].Done()
|
||||
sa.Lock()
|
||||
defer sa.Unlock()
|
||||
<-sa.connChan
|
||||
func (tr *TrendService) Shutdown() (err error) {
|
||||
defer tr.srvDep[utils.DataDB].Done()
|
||||
tr.Lock()
|
||||
defer tr.Unlock()
|
||||
<-tr.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (sa *TrendService) IsRunning() bool {
|
||||
sa.RLock()
|
||||
defer sa.RUnlock()
|
||||
func (tr *TrendService) IsRunning() bool {
|
||||
tr.RLock()
|
||||
defer tr.RUnlock()
|
||||
return false
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (sa *TrendService) ServiceName() string {
|
||||
func (tr *TrendService) ServiceName() string {
|
||||
return utils.TrendS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (sa *TrendService) ShouldRun() bool {
|
||||
return sa.cfg.TrendSCfg().Enabled
|
||||
func (tr *TrendService) ShouldRun() bool {
|
||||
return tr.cfg.TrendSCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -1258,7 +1258,7 @@ const (
|
||||
ReplicatorSv1GetThreshold = "ReplicatorSv1.GetThreshold"
|
||||
ReplicatorSv1GetThresholdProfile = "ReplicatorSv1.GetThresholdProfile"
|
||||
ReplicatorSv1GetStatQueueProfile = "ReplicatorSv1.GetStatQueueProfile"
|
||||
ReplicatorSv1GetSagProfile = "ReplicatorSv1.GetSagProfile"
|
||||
ReplicatorSv1GetRankingProfile = "ReplicatorSv1.GetRankingProfile"
|
||||
ReplicatorSv1GetTrendProfile = "ReplicatorSv1.GetTrendProfile"
|
||||
ReplicatorSv1GetTiming = "ReplicatorSv1.GetTiming"
|
||||
ReplicatorSv1GetResource = "ReplicatorSv1.GetResource"
|
||||
@@ -1285,7 +1285,7 @@ const (
|
||||
ReplicatorSv1SetStatQueue = "ReplicatorSv1.SetStatQueue"
|
||||
ReplicatorSv1SetFilter = "ReplicatorSv1.SetFilter"
|
||||
ReplicatorSv1SetStatQueueProfile = "ReplicatorSv1.SetStatQueueProfile"
|
||||
ReplicatorSv1SetSagProfile = "ReplicatorSv1.SetSagProfile"
|
||||
ReplicatorSv1SetRankingProfile = "ReplicatorSv1.SetRankingProfile"
|
||||
ReplicatorSv1SetTrendProfile = "ReplicatorSv1.SetTrendProfile"
|
||||
ReplicatorSv1SetTiming = "ReplicatorSv1.SetTiming"
|
||||
ReplicatorSv1SetResource = "ReplicatorSv1.SetResource"
|
||||
|
||||
Reference in New Issue
Block a user