diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 6b1f3ee95..50c8a6a72 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -91,6 +91,7 @@ var ( testV1STSOverWriteStats, testV1STSProcessStatWithThreshold2, testV1STSSimulateAccountUpdate, + testV1STSGetStatQueueWithoutExpired, testV1STSStopEngine, } ) @@ -1450,3 +1451,68 @@ func testV1STSSimulateAccountUpdate(t *testing.T) { t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) } } + +func testV1STSGetStatQueueWithoutExpired(t *testing.T) { + var result string + var reply *engine.StatQueueProfile + statConfig = &engine.StatQueueWithCache{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "Sq1Nanao", + FilterIDs: []string{"*string:~*req.StatQ:Sq1Nanao"}, + QueueLength: 10, + TTL: 1, + Metrics: []*engine.MetricWithFilters{{ + MetricID: utils.MetaTCD, + }}, + Blocker: true, + Stored: true, + Weight: 200, + MinItems: 1, + }, + } + if err := stsV1Rpc.Call(utils.APIerSv1SetStatQueueProfile, statConfig, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + if err := stsV1Rpc.Call(utils.APIerSv1GetStatQueueProfile, + &utils.TenantID{Tenant: "cgrates.org", ID: "Sq1Nanao"}, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(statConfig.StatQueueProfile, reply) { + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(statConfig.StatQueueProfile), utils.ToJSON(reply)) + } + + var metrics map[string]string + expectedMetrics := map[string]string{ + utils.MetaTCD: utils.NOT_AVAILABLE, + } + //process event + var reply2 []string + expected := []string{"Sq1Nanao"} + args := engine.StatsArgsProcessEvent{ + CGREventWithOpts: &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1012", + Event: map[string]interface{}{ + "StatQ": "Sq1Nanao", + utils.Usage: 10, + }, + }, + }, + } + if err := stsV1Rpc.Call(utils.StatSv1ProcessEvent, &args, &reply2); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply2, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply2) + } + //verify metrics after first process + if err := stsV1Rpc.Call(utils.StatSv1GetQueueStringMetrics, + &utils.TenantIDWithOpts{ + TenantID: &utils.TenantID{Tenant: "cgrates.org", ID: "Sq1Nanao"}}, &metrics); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(expectedMetrics, metrics) { + t.Errorf("expecting: %+v, received reply: %s", expectedMetrics, metrics) + } +} diff --git a/data/tariffplans/oldtutorial/Stats.csv b/data/tariffplans/oldtutorial/Stats.csv index 03243eab1..c5d67031d 100644 --- a/data/tariffplans/oldtutorial/Stats.csv +++ b/data/tariffplans/oldtutorial/Stats.csv @@ -1,4 +1,4 @@ #Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] -cgrates.org,Stats1,FLTR_STS1,2014-07-29T15:00:00Z,100,1s,2,*asr;*acc;*tcc;*acd;*tcd,,true,false,20,*none +cgrates.org,Stats1,FLTR_STS1,2014-07-29T15:00:00Z,100,3s,2,*asr;*acc;*tcc;*acd;*tcd,,true,false,20,*none cgrates.org,Stats1,,,,,,*sum#~*req.Usage;*average#~*req.Usage,,,,, cgrates.org,Stats1,,,,,,*pdd,*exists:~*req.PDD:,,,, \ No newline at end of file diff --git a/engine/datamanager.go b/engine/datamanager.go index 47e296d47..d1f5722f3 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -547,15 +547,14 @@ func (dm *DataManager) GetStatQueue(tenant, id string, if dm.dataDB.GetStorageType() != utils.MetaInternal { // in case of internal we don't marshal if ssq, err = NewStoredStatQueue(sq, dm.ms); err != nil { - return + return nil, err } } err = dm.dataDB.SetStatQueueDrv(ssq, sq) } } if err != nil { - err = utils.CastRPCErr(err) - if err == utils.ErrNotFound && cacheWrite { + if err = utils.CastRPCErr(err); err == utils.ErrNotFound && cacheWrite { if errCh := Cache.Set(utils.CacheStatQueues, tntID, nil, nil, cacheCommit(transactionID), transactionID); errCh != nil { return nil, errCh diff --git a/engine/libstats.go b/engine/libstats.go index 9fa8a6c40..fe5d1059e 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -79,11 +79,11 @@ func NewStoredStatQueue(sq *StatQueue, ms Marshaler) (sSQ *StoredStatQueue, err sSQ.SQItems[i] = sqItm } for metricID, metric := range sq.SQMetrics { - if marshaled, err := metric.Marshal(ms); err != nil { + marshaled, err := metric.Marshal(ms) + if err != nil { return nil, err - } else { - sSQ.SQMetrics[metricID] = marshaled } + sSQ.SQMetrics[metricID] = marshaled } return } @@ -242,11 +242,7 @@ func (sq *StatQueue) addStatEvent(tnt, evID string, filterS *FilterS, evNm utils if sq.ttl != nil { expTime = utils.TimePointer(time.Now().Add(*sq.ttl)) } - sq.SQItems = append(sq.SQItems, - struct { - EventID string - ExpiryTime *time.Time - }{evID, expTime}) + sq.SQItems = append(sq.SQItems, SQItem{EventID: evID, ExpiryTime: expTime}) var pass bool // recreate the request without *opts dDP := newDynamicDP(config.CgrConfig().FilterSCfg().ResourceSConns, config.CgrConfig().FilterSCfg().StatSConns, diff --git a/engine/stats.go b/engine/stats.go index ff3875106..bf345ec20 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -114,7 +114,7 @@ func (sS *StatService) storeStats() { failedSqIDs = append(failedSqIDs, sID) // record failure so we can schedule it for next backup } return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) + }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) // randomize the CPU load and give up thread control time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) } @@ -189,7 +189,7 @@ func (sS *StatService) matchingStatQueuesForEvent(tnt string, statsIDs []string, guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { sq, err = sS.dm.GetStatQueue(sqPrfl.Tenant, sqPrfl.ID, true, true, "") return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) + }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) if err != nil { return nil, err } @@ -257,6 +257,33 @@ func (attr *StatsArgsProcessEvent) Clone() *StatsArgsProcessEvent { } } +func (sS *StatService) removeExpiredWithStore(sq *StatQueue) (err error) { + lkID := utils.StatQueuePrefix + sq.TenantID() + guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { + err = sq.remExpired() + return + }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) + if err = sq.remExpired(); err != nil { + return + } + sS.storeStatQueue(sq) + return +} + +// storeStatQueue will store the sq if needed +func (sS *StatService) storeStatQueue(sq *StatQueue) { + if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save + *sq.dirty = true // mark it to be saved + if sS.cgrcfg.StatSCfg().StoreInterval == -1 { + sS.StoreStatQueue(sq) + } else { + sS.ssqMux.Lock() + sS.storedStatQueues[sq.TenantID()] = true + sS.ssqMux.Unlock() + } + } +} + // processEvent processes a new event, dispatching to matching queues // queues matching are also cached to speed up func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (statQueueIDs []string, err error) { @@ -283,24 +310,14 @@ func (sS *StatService) processEvent(tnt string, args *StatsArgsProcessEvent) (st guardian.Guardian.Guard(func() (gRes interface{}, gErr error) { err = sq.ProcessEvent(tnt, args.ID, sS.filterS, evNm) return - }, config.CgrConfig().GeneralCfg().LockingTimeout, lkID) + }, sS.cgrcfg.GeneralCfg().LockingTimeout, lkID) if err != nil { utils.Logger.Warning( fmt.Sprintf(" Queue: %s, ignoring event: %s, error: %s", sq.TenantID(), utils.ConcatenatedKey(tnt, args.ID), err.Error())) withErrors = true } - if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save - if sS.cgrcfg.StatSCfg().StoreInterval == -1 { - *sq.dirty = true - sS.StoreStatQueue(sq) - } else { - *sq.dirty = true // mark it to be saved - sS.ssqMux.Lock() - sS.storedStatQueues[sq.TenantID()] = true - sS.ssqMux.Unlock() - } - } + sS.storeStatQueue(sq) if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 { var thIDs []string if len(sq.sqPrfl.ThresholdIDs) != 0 { @@ -400,6 +417,9 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl // V1GetStatQueue returns a StatQueue object func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQueue) (err error) { + if missing := utils.MissingStructFields(args, []string{utils.ID}); len(missing) != 0 { //Params missing + return utils.NewErrMandatoryIeMissing(missing...) + } tnt := args.Tenant if tnt == utils.EmptyString { tnt = sS.cgrcfg.GeneralCfg().DefaultTenant @@ -408,6 +428,9 @@ func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithOpts, reply *StatQ if err != nil { return err } + if err = sS.removeExpiredWithStore(sq); err != nil { + return + } *reply = *sq return } @@ -428,6 +451,9 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ } return err } + if err = sS.removeExpiredWithStore(sq); err != nil { + return + } sq.RLock() metrics := make(map[string]string, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { @@ -454,6 +480,9 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s } return err } + if err = sS.removeExpiredWithStore(sq); err != nil { + return + } sq.RLock() metrics := make(map[string]float64, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics {