Exclude expired metrics before retrieval

Remove all StatQueue locking methods (unused).
This commit is contained in:
ionutboangiu
2024-08-06 20:16:08 +03:00
committed by Dan Christian Bogos
parent 7cf8c69bc8
commit 656911e4aa
5 changed files with 88 additions and 94 deletions

View File

@@ -70,7 +70,7 @@ var (
testV1STSInitDataDb,
testV1STSStartEngine,
testV1STSRpcConn,
testV1STSFromFolder,
testV1STSSetStats,
testV1STSGetStats,
testV1STSProcessEvent,
testV1STSGetStatsAfterRestart,
@@ -138,13 +138,54 @@ func testV1STSRpcConn(t *testing.T) {
}
}
func testV1STSFromFolder(t *testing.T) {
func testV1STSSetStats(t *testing.T) {
// Create a StatProfile equivalent to "Stats1" from oldtutorial, in order to customize its TTL to be longer than the configured StoreInterval.
statConfig = &engine.StatQueueWithCache{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "Stats1",
FilterIDs: []string{"*string:~*req.Account:1001;1002"},
QueueLength: 100,
TTL: 10 * time.Second,
Metrics: []*engine.MetricWithFilters{
{
MetricID: utils.MetaASR,
},
{
MetricID: utils.MetaACC,
},
{
MetricID: utils.MetaTCC,
},
{
MetricID: utils.MetaACD,
},
{
MetricID: utils.MetaTCD,
},
{
MetricID: "*sum:~*req.Usage",
},
{
MetricID: "*average:~*req.Usage",
},
{
MetricID: utils.MetaPDD,
FilterIDs: []string{"*exists:~*req.PDD:"},
},
},
ThresholdIDs: []string{"*none"},
Blocker: false,
Stored: true,
Weight: 20,
MinItems: 2,
},
}
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*utils.DataDir, "tariffplans", "oldtutorial")}
if err := stsV1Rpc.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
if err := stsV1Rpc.Call(utils.APIerSv1SetStatQueueProfile, statConfig, &reply); err != nil {
t.Error(err)
}
time.Sleep(500 * time.Millisecond)
}
func testV1STSGetStats(t *testing.T) {
@@ -303,7 +344,10 @@ func testV1STSGetStatsAfterRestart(t *testing.T) {
if stsV1ConfDIR == "tutinternal" {
t.SkipNow()
}
// SIGKILL prevents graceful shutdown, so without waiting for StoreInterval, current stats won't be backed up.
time.Sleep(time.Second)
if _, err := engine.StopStartEngine(stsV1CfgPath, *utils.WaitRater); err != nil {
t.Fatal(err)
}

View File

@@ -21,7 +21,6 @@ package engine
import (
"fmt"
"sort"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -133,7 +132,6 @@ type SQItem struct {
// StatQueue represents an individual stats instance
type StatQueue struct {
lk sync.RWMutex // protect the elements from within
Tenant string
ID string
SQItems []SQItem
@@ -144,18 +142,6 @@ type StatQueue struct {
ttl *time.Duration // timeToLeave, picked on each init
}
// RLock only to implement sync.RWMutex methods
func (sq *StatQueue) RLock() { sq.lk.RLock() }
// RUnlock only to implement sync.RWMutex methods
func (sq *StatQueue) RUnlock() { sq.lk.RUnlock() }
// Lock only to implement sync.RWMutex methods
func (sq *StatQueue) Lock() { sq.lk.Lock() }
// Unlock only to implement sync.RWMutex methods
func (sq *StatQueue) Unlock() { sq.lk.Unlock() }
// TenantID will compose the unique identifier for the StatQueue out of Tenant and ID
func (sq *StatQueue) TenantID() string {
return utils.ConcatenatedKey(sq.Tenant, sq.ID)
@@ -163,7 +149,7 @@ func (sq *StatQueue) TenantID() string {
// ProcessEvent processes a utils.CGREvent, returns true if processed
func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS) (err error) {
if err = sq.remExpired(); err != nil {
if _, err = sq.remExpired(); err != nil {
return
}
if err = sq.remOnQueueLength(); err != nil {
@@ -188,7 +174,7 @@ func (sq *StatQueue) remEventWithID(evID string) (err error) {
}
// remExpired expires items in queue
func (sq *StatQueue) remExpired() (err error) {
func (sq *StatQueue) remExpired() (removed int, err error) {
var expIdx *int // index of last item to be expired
for i, item := range sq.SQItems {
if item.ExpiryTime == nil {
@@ -205,7 +191,8 @@ func (sq *StatQueue) remExpired() (err error) {
if expIdx == nil {
return
}
sq.SQItems = sq.SQItems[*expIdx+1:]
removed = *expIdx + 1
sq.SQItems = sq.SQItems[removed:]
return
}

View File

@@ -20,7 +20,6 @@ package engine
import (
"fmt"
"reflect"
"sync"
"testing"
"time"
@@ -935,46 +934,3 @@ func TestLibStatsAsStatQueue(t *testing.T) {
t.Error(rcv)
}
}
func TestLibStatsLock(t *testing.T) {
si := []SQItem{{
EventID: str,
ExpiryTime: &tm,
}}
msw := map[string]*StatWithCompress{"test": {
Stat: fl,
CompressFactor: nm,
}}
st := StatASR{
FilterIDs: slc,
Answered: fl,
Count: n,
Events: msw,
MinItems: nm,
}
msm := map[string]StatMetric{"*asr": &st}
var rwm sync.RWMutex
sq := StatQueue{
lk: rwm,
Tenant: str,
ID: str,
SQItems: si,
SQMetrics: msm,
MinItems: nm,
}
nExp := sq
sq.Lock()
if reflect.DeepEqual(sq, nExp) {
t.Error("didn't lock")
}
sq.Unlock()
if !reflect.DeepEqual(sq, nExp) {
t.Error("didn't lock")
}
}

View File

@@ -223,6 +223,33 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) (
return
}
func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) {
if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil {
return
}
var removed int
if removed, err = sq.remExpired(); err != nil || removed == 0 {
return
}
sS.storeStatQueue(sq)
return
}
// storeStatQueue will store the sq if needed
func (sS *StatService) storeStatQueue(sq *StatQueue) {
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
*sq.dirty = true // mark it to be saved
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
sS.StoreStatQueue(sq)
} else {
sS.ssqMux.Lock()
sS.storedStatQueues[sq.TenantID()] = true
sS.ssqMux.Unlock()
}
}
}
// Call implements birpc.ClientConnector interface for internal RPC
// here for cases when passing StatsService as rpccclient.RpcClientConnection
func (ss *StatService) Call(ctx *context.Context, serviceMethod string, args any, reply any) error {
@@ -260,17 +287,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [
sq.TenantID(), args.TenantID(), err.Error()))
withErrors = true
}
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
*sq.dirty = true
sS.StoreStatQueue(sq)
} else {
*sq.dirty = true // mark it to be saved
sS.ssqMux.Lock()
sS.storedStatQueues[sq.TenantID()] = true
sS.ssqMux.Unlock()
}
}
sS.storeStatQueue(sq)
if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 {
var thIDs []string
if len(sq.sqPrfl.ThresholdIDs) != 0 {
@@ -358,11 +375,11 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
// V1GetStatQueue returns a StatQueue object
func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithArgDispatcher, reply *StatQueue) (err error) {
if sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, ""); err != nil {
sq, err := sS.getStatQueue(args.Tenant, args.ID)
if err != nil {
return err
} else {
*reply = *sq
}
*reply = *sq
return
}
@@ -371,19 +388,17 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
if missing := utils.MissingStructFields(args, []string{utils.Tenant, utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "")
sq, err := sS.getStatQueue(args.Tenant, args.ID)
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
sq.RLock()
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetStringValue("")
}
sq.RUnlock()
*reply = metrics
return
}
@@ -393,19 +408,17 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
if missing := utils.MissingStructFields(args, []string{utils.Tenant, utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "")
sq, err := sS.getStatQueue(args.Tenant, args.ID)
if err != nil {
if err != utils.ErrNotFound {
err = utils.NewErrServerError(err)
}
return err
}
sq.RLock()
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
metrics[metricID] = metric.GetFloat64Value()
}
sq.RUnlock()
*reply = metrics
return
}

View File

@@ -187,7 +187,6 @@ func testV2CDRsProcessCDR(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs(t *testing.T) {
@@ -258,7 +257,6 @@ func testV2CDRsProcessCDR2(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs2(t *testing.T) {
@@ -331,7 +329,6 @@ func testV2CDRsProcessCDR3(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs3(t *testing.T) {
@@ -393,7 +390,6 @@ func testV2CDRsProcessCDR4(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs4(t *testing.T) {
@@ -479,6 +475,7 @@ func testV2CDRsSetStats(t *testing.T) {
ID: "STS_PoccessCDR",
FilterIDs: []string{"*string:~*req.OriginID:testV2CDRsProcessCDR5"},
// QueueLength: 10,
TTL: -1,
Metrics: []*engine.MetricWithFilters{{
MetricID: "*sum:~*req.Usage",
}},
@@ -583,7 +580,6 @@ func testV2CDRsProcessCDR5(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetStats1(t *testing.T) {
@@ -650,14 +646,13 @@ func testV2CDRsProcessCDR6(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetStats2(t *testing.T) {
expectedIDs := []string{"STS_PoccessCDR"}
var metrics map[string]string
expectedMetrics := map[string]string{
utils.ConcatenatedKey(utils.MetaSum, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+utils.Usage): "60000000000",
utils.ConcatenatedKey(utils.MetaSum, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+utils.Usage): "120000000000",
}
if err := cdrsRpc.Call(utils.StatSv1GetQueueStringMetrics,
&utils.TenantIDWithArgDispatcher{
@@ -709,7 +704,6 @@ func testV2CDRsProcessCDR7(t *testing.T) {
} else if reply != utils.OK {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated
}
func testV2CDRsGetCdrs7(t *testing.T) {