Update stat queue on Get APIs. Fixes #2188

This commit is contained in:
Trial97
2020-10-30 16:37:24 +02:00
committed by Dan Christian Bogos
parent 5da4cf6d1c
commit 3888094001
5 changed files with 116 additions and 26 deletions

View File

@@ -91,6 +91,7 @@ var (
testV1STSOverWriteStats,
testV1STSProcessStatWithThreshold2,
testV1STSSimulateAccountUpdate,
testV1STSGetStatQueueWithoutExpired,
testV1STSStopEngine,
}
)
@@ -1450,3 +1451,68 @@ func testV1STSSimulateAccountUpdate(t *testing.T) {
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
}
}
func testV1STSGetStatQueueWithoutExpired(t *testing.T) {
var result string
var reply *engine.StatQueueProfile
statConfig = &engine.StatQueueWithCache{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: "Sq1Nanao",
FilterIDs: []string{"*string:~*req.StatQ:Sq1Nanao"},
QueueLength: 10,
TTL: 1,
Metrics: []*engine.MetricWithFilters{{
MetricID: utils.MetaTCD,
}},
Blocker: true,
Stored: true,
Weight: 200,
MinItems: 1,
},
}
if err := stsV1Rpc.Call(utils.APIerSv1SetStatQueueProfile, statConfig, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
if err := stsV1Rpc.Call(utils.APIerSv1GetStatQueueProfile,
&utils.TenantID{Tenant: "cgrates.org", ID: "Sq1Nanao"}, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(statConfig.StatQueueProfile, reply) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(reply))
}
var metrics map[string]string
expectedMetrics := map[string]string{
utils.MetaTCD: utils.NOT_AVAILABLE,
}
//process event
var reply2 []string
expected := []string{"Sq1Nanao"}
args := engine.StatsArgsProcessEvent{
CGREventWithOpts: &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1012",
Event: map[string]interface{}{
"StatQ": "Sq1Nanao",
utils.Usage: 10,
},
},
},
}
if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply2, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply2)
}
//verify metrics after first process
if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics,
&utils.TenantIDWithOpts{
TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "Sq1Nanao"}}, &metrics); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expectedMetrics, metrics) {
t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics)
}
}

View File

@@ -1,4 +1,4 @@
#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12]
cgrates.org,Stats1,FLTR_STS1,2014-07-29T15:00:00Z,100,1s,2,*asr;*acc;*tcc;*acd;*tcd,,true,false,20,*none
cgrates.org,Stats1,FLTR_STS1,2014-07-29T15:00:00Z,100,3s,2,*asr;*acc;*tcc;*acd;*tcd,,true,false,20,*none
cgrates.org,Stats1,,,,,,*sum#~*req.Usage;*average#~*req.Usage,,,,,
cgrates.org,Stats1,,,,,,*pdd,*exists:~*req.PDD:,,,,
1 #Tenant[0] Id[1] FilterIDs[2] ActivationInterval[3] QueueLength[4] TTL[5] MinItems[6] Metrics[7] MetricFilterIDs[8] Stored[9] Blocker[10] Weight[11] ThresholdIDs[12]
2 cgrates.org Stats1 FLTR_STS1 2014-07-29T15:00:00Z 100 1s 3s 2 *asr;*acc;*tcc;*acd;*tcd true false 20 *none
3 cgrates.org Stats1 *sum#~*req.Usage;*average#~*req.Usage
4 cgrates.org Stats1 *pdd *exists:~*req.PDD:

View File

@@ -547,15 +547,14 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
if dm.dataDB.GetStorageType() != utils.MetaInternal {
// in case of internal we don't marshal
if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil {
return
return nil, err
}
}
err = dm.dataDB.SetStatQueueDrv(ssq, sq)
}
}
if err != nil {
err = utils.CastRPCErr(err)
if err == utils.ErrNotFound && cacheWrite {
if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite {
if errCh := Cache.Set(utils.CacheStatQueues, tntID, nil, nil,
cacheCommit(transactionID), transactionID); errCh != nil {
return nil, errCh

View File

@@ -79,11 +79,11 @@ func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err
sSQ.SQItems[i] = sqItm
}
for metricID, metric := range sq.SQMetrics {
if marshaled, err := metric.Marshal(ms); err != nil {
marshaled, err := metric.Marshal(ms)
if err != nil {
return nil, err
} else {
sSQ.SQMetrics[metricID] = marshaled
}
sSQ.SQMetrics[metricID] = marshaled
}
return
}
@@ -242,11 +242,7 @@ func (sq *StatQueue) addStatEvent(tnt, evID string, filterS *FilterS, evNm utils
if sq.ttl != nil {
expTime = utils.TimePointer(time.Now().Add(*sq.ttl))
}
sq.SQItems = append(sq.SQItems,
struct {
EventID string
ExpiryTime *time.Time
}{evID, expTime})
sq.SQItems = append(sq.SQItems, SQItem{EventID: evID, ExpiryTime: expTime})
var pass bool
// recreate the request without *opts
dDP := newDynamicDP(config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns,

View File

@@ -114,7 +114,7 @@ func (sS *StatService) storeStats() {
failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup
}
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
// randomize the CPU load and give up thread control
time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond)
}
@@ -189,7 +189,7 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string,
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "")
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
if err != nil {
return nil, err
}
@@ -257,6 +257,33 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent {
}
}
func (sS *StatService) removeExpiredWithStore(sq *StatQueue) (err error) {
lkID := utils.StatQueuePrefix + sq.TenantID()
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.remExpired()
return
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
if err = sq.remExpired(); err != nil {
return
}
sS.storeStatQueue(sq)
return
}
// storeStatQueue will store the sq if needed
func (sS *StatService) storeStatQueue(sq *StatQueue) {
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
*sq.dirty = true // mark it to be saved
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
sS.StoreStatQueue(sq)
} else {
sS.ssqMux.Lock()
sS.storedStatQueues[sq.TenantID()] = true
sS.ssqMux.Unlock()
}
}
}
// processEvent processes a new event, dispatching to matching queues
// queues matching are also cached to speed up
func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) {
@@ -283,24 +310,14 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm)
return
}, config.CgrConfig().GeneralCfg().LockingTimeout, lkID)
}, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<StatS> Queue: %s, ignoring event: %s, error: %s",
sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error()))
withErrors = true
}
if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save
if sS.cgrcfg.StatSCfg().StoreInterval == -1 {
*sq.dirty = true
sS.StoreStatQueue(sq)
} else {
*sq.dirty = true // mark it to be saved
sS.ssqMux.Lock()
sS.storedStatQueues[sq.TenantID()] = true
sS.ssqMux.Unlock()
}
}
sS.storeStatQueue(sq)
if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 {
var thIDs []string
if len(sq.sqPrfl.ThresholdIDs) != 0 {
@@ -400,6 +417,9 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl
// V1GetStatQueue returns a StatQueue object
func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQueue) (err error) {
if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing
return utils.NewErrMandatoryIeMissing(missing...)
}
tnt := args.Tenant
if tnt == utils.EmptyString {
tnt = sS.cgrcfg.GeneralCfg().DefaultTenant
@@ -408,6 +428,9 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQ
if err != nil {
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
*reply = *sq
return
}
@@ -428,6 +451,9 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[
}
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
sq.RLock()
metrics := make(map[string]string, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {
@@ -454,6 +480,9 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s
}
return err
}
if err = sS.removeExpiredWithStore(sq); err != nil {
return
}
sq.RLock()
metrics := make(map[string]float64, len(sq.SQMetrics))
for metricID, metric := range sq.SQMetrics {