diff --git a/apier/v1/stats_it_test.go b/apier/v1/stats_it_test.go index 88838d257..6d395c106 100644 --- a/apier/v1/stats_it_test.go +++ b/apier/v1/stats_it_test.go @@ -70,7 +70,7 @@ var ( testV1STSInitDataDb, testV1STSStartEngine, testV1STSRpcConn, - testV1STSFromFolder, + testV1STSSetStats, testV1STSGetStats, testV1STSProcessEvent, testV1STSGetStatsAfterRestart, @@ -138,13 +138,54 @@ func testV1STSRpcConn(t *testing.T) { } } -func testV1STSFromFolder(t *testing.T) { +func testV1STSSetStats(t *testing.T) { + // Create a StatProfile equivalent to "Stats1" from oldtutorial, in order to customize its TTL to be longer than the configured StoreInterval. + statConfig = &engine.StatQueueWithCache{ + StatQueueProfile: &engine.StatQueueProfile{ + Tenant: "cgrates.org", + ID: "Stats1", + FilterIDs: []string{"*string:~*req.Account:1001;1002"}, + QueueLength: 100, + TTL: 10 * time.Second, + Metrics: []*engine.MetricWithFilters{ + { + MetricID: utils.MetaASR, + }, + { + MetricID: utils.MetaACC, + }, + { + MetricID: utils.MetaTCC, + }, + { + MetricID: utils.MetaACD, + }, + { + MetricID: utils.MetaTCD, + }, + { + MetricID: "*sum:~*req.Usage", + }, + { + MetricID: "*average:~*req.Usage", + }, + { + MetricID: utils.MetaPDD, + FilterIDs: []string{"*exists:~*req.PDD:"}, + }, + }, + ThresholdIDs: []string{"*none"}, + Blocker: false, + Stored: true, + Weight: 20, + MinItems: 2, + }, + } + var reply string - attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*utils.DataDir, "tariffplans", "oldtutorial")} - if err := stsV1Rpc.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil { + if err := stsV1Rpc.Call(utils.APIerSv1SetStatQueueProfile, statConfig, &reply); err != nil { t.Error(err) } - time.Sleep(500 * time.Millisecond) } func testV1STSGetStats(t *testing.T) { @@ -303,7 +344,10 @@ func testV1STSGetStatsAfterRestart(t *testing.T) { if stsV1ConfDIR == "tutinternal" { t.SkipNow() } + + // SIGKILL prevents graceful shutdown, so without waiting for StoreInterval, current stats won't be backed up. time.Sleep(time.Second) + if _, err := engine.StopStartEngine(stsV1CfgPath, *utils.WaitRater); err != nil { t.Fatal(err) } diff --git a/engine/libstats.go b/engine/libstats.go index b8c8f3113..8544b9fa5 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -21,7 +21,6 @@ package engine import ( "fmt" "sort" - "sync" "time" "github.com/cgrates/cgrates/config" @@ -133,7 +132,6 @@ type SQItem struct { // StatQueue represents an individual stats instance type StatQueue struct { - lk sync.RWMutex // protect the elements from within Tenant string ID string SQItems []SQItem @@ -144,18 +142,6 @@ type StatQueue struct { ttl *time.Duration // timeToLeave, picked on each init } -// RLock only to implement sync.RWMutex methods -func (sq *StatQueue) RLock() { sq.lk.RLock() } - -// RUnlock only to implement sync.RWMutex methods -func (sq *StatQueue) RUnlock() { sq.lk.RUnlock() } - -// Lock only to implement sync.RWMutex methods -func (sq *StatQueue) Lock() { sq.lk.Lock() } - -// Unlock only to implement sync.RWMutex methods -func (sq *StatQueue) Unlock() { sq.lk.Unlock() } - // TenantID will compose the unique identifier for the StatQueue out of Tenant and ID func (sq *StatQueue) TenantID() string { return utils.ConcatenatedKey(sq.Tenant, sq.ID) @@ -163,7 +149,7 @@ func (sq *StatQueue) TenantID() string { // ProcessEvent processes a utils.CGREvent, returns true if processed func (sq *StatQueue) ProcessEvent(ev *utils.CGREvent, filterS *FilterS) (err error) { - if err = sq.remExpired(); err != nil { + if _, err = sq.remExpired(); err != nil { return } if err = sq.remOnQueueLength(); err != nil { @@ -188,7 +174,7 @@ func (sq *StatQueue) remEventWithID(evID string) (err error) { } // remExpired expires items in queue -func (sq *StatQueue) remExpired() (err error) { +func (sq *StatQueue) remExpired() (removed int, err error) { var expIdx *int // index of last item to be expired for i, item := range sq.SQItems { if item.ExpiryTime == nil { @@ -205,7 +191,8 @@ func (sq *StatQueue) remExpired() (err error) { if expIdx == nil { return } - sq.SQItems = sq.SQItems[*expIdx+1:] + removed = *expIdx + 1 + sq.SQItems = sq.SQItems[removed:] return } diff --git a/engine/libstats_test.go b/engine/libstats_test.go index fba07b1dc..7e4add923 100644 --- a/engine/libstats_test.go +++ b/engine/libstats_test.go @@ -20,7 +20,6 @@ package engine import ( "fmt" "reflect" - "sync" "testing" "time" @@ -935,46 +934,3 @@ func TestLibStatsAsStatQueue(t *testing.T) { t.Error(rcv) } } - -func TestLibStatsLock(t *testing.T) { - si := []SQItem{{ - EventID: str, - ExpiryTime: &tm, - }} - msw := map[string]*StatWithCompress{"test": { - Stat: fl, - CompressFactor: nm, - }} - st := StatASR{ - FilterIDs: slc, - Answered: fl, - Count: n, - Events: msw, - MinItems: nm, - } - msm := map[string]StatMetric{"*asr": &st} - - var rwm sync.RWMutex - sq := StatQueue{ - lk: rwm, - Tenant: str, - ID: str, - SQItems: si, - SQMetrics: msm, - MinItems: nm, - } - - nExp := sq - - sq.Lock() - - if reflect.DeepEqual(sq, nExp) { - t.Error("didn't lock") - } - - sq.Unlock() - - if !reflect.DeepEqual(sq, nExp) { - t.Error("didn't lock") - } -} diff --git a/engine/stats.go b/engine/stats.go index e36ec915e..8293c6564 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -223,6 +223,33 @@ func (sS *StatService) matchingStatQueuesForEvent(args *StatsArgsProcessEvent) ( return } +func (sS *StatService) getStatQueue(tnt, id string) (sq *StatQueue, err error) { + if sq, err = sS.dm.GetStatQueue(tnt, id, true, true, utils.EmptyString); err != nil { + return + } + + var removed int + if removed, err = sq.remExpired(); err != nil || removed == 0 { + return + } + sS.storeStatQueue(sq) + return +} + +// storeStatQueue will store the sq if needed +func (sS *StatService) storeStatQueue(sq *StatQueue) { + if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save + *sq.dirty = true // mark it to be saved + if sS.cgrcfg.StatSCfg().StoreInterval == -1 { + sS.StoreStatQueue(sq) + } else { + sS.ssqMux.Lock() + sS.storedStatQueues[sq.TenantID()] = true + sS.ssqMux.Unlock() + } + } +} + // Call implements birpc.ClientConnector interface for internal RPC // here for cases when passing StatsService as rpccclient.RpcClientConnection func (ss *StatService) Call(ctx *context.Context, serviceMethod string, args any, reply any) error { @@ -260,17 +287,7 @@ func (sS *StatService) processEvent(args *StatsArgsProcessEvent) (statQueueIDs [ sq.TenantID(), args.TenantID(), err.Error())) withErrors = true } - if sS.cgrcfg.StatSCfg().StoreInterval != 0 && sq.dirty != nil { // don't save - if sS.cgrcfg.StatSCfg().StoreInterval == -1 { - *sq.dirty = true - sS.StoreStatQueue(sq) - } else { - *sq.dirty = true // mark it to be saved - sS.ssqMux.Lock() - sS.storedStatQueues[sq.TenantID()] = true - sS.ssqMux.Unlock() - } - } + sS.storeStatQueue(sq) if len(sS.cgrcfg.StatSCfg().ThresholdSConns) != 0 { var thIDs []string if len(sq.sqPrfl.ThresholdIDs) != 0 { @@ -358,11 +375,11 @@ func (sS *StatService) V1GetStatQueuesForEvent(args *StatsArgsProcessEvent, repl // V1GetStatQueue returns a StatQueue object func (sS *StatService) V1GetStatQueue(args *utils.TenantIDWithArgDispatcher, reply *StatQueue) (err error) { - if sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, ""); err != nil { + sq, err := sS.getStatQueue(args.Tenant, args.ID) + if err != nil { return err - } else { - *reply = *sq } + *reply = *sq return } @@ -371,19 +388,17 @@ func (sS *StatService) V1GetQueueStringMetrics(args *utils.TenantID, reply *map[ if missing := utils.MissingStructFields(args, []string{utils.Tenant, utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "") + sq, err := sS.getStatQueue(args.Tenant, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) } return err } - sq.RLock() metrics := make(map[string]string, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetStringValue("") } - sq.RUnlock() *reply = metrics return } @@ -393,19 +408,17 @@ func (sS *StatService) V1GetQueueFloatMetrics(args *utils.TenantID, reply *map[s if missing := utils.MissingStructFields(args, []string{utils.Tenant, utils.ID}); len(missing) != 0 { //Params missing return utils.NewErrMandatoryIeMissing(missing...) } - sq, err := sS.dm.GetStatQueue(args.Tenant, args.ID, true, true, "") + sq, err := sS.getStatQueue(args.Tenant, args.ID) if err != nil { if err != utils.ErrNotFound { err = utils.NewErrServerError(err) } return err } - sq.RLock() metrics := make(map[string]float64, len(sq.SQMetrics)) for metricID, metric := range sq.SQMetrics { metrics[metricID] = metric.GetFloat64Value() } - sq.RUnlock() *reply = metrics return } diff --git a/general_tests/cdrs_it_test.go b/general_tests/cdrs_it_test.go index d31198857..e8de3b390 100644 --- a/general_tests/cdrs_it_test.go +++ b/general_tests/cdrs_it_test.go @@ -187,7 +187,6 @@ func testV2CDRsProcessCDR(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetCdrs(t *testing.T) { @@ -258,7 +257,6 @@ func testV2CDRsProcessCDR2(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetCdrs2(t *testing.T) { @@ -331,7 +329,6 @@ func testV2CDRsProcessCDR3(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetCdrs3(t *testing.T) { @@ -393,7 +390,6 @@ func testV2CDRsProcessCDR4(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetCdrs4(t *testing.T) { @@ -479,6 +475,7 @@ func testV2CDRsSetStats(t *testing.T) { ID: "STS_PoccessCDR", FilterIDs: []string{"*string:~*req.OriginID:testV2CDRsProcessCDR5"}, // QueueLength: 10, + TTL: -1, Metrics: []*engine.MetricWithFilters{{ MetricID: "*sum:~*req.Usage", }}, @@ -583,7 +580,6 @@ func testV2CDRsProcessCDR5(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetStats1(t *testing.T) { @@ -650,14 +646,13 @@ func testV2CDRsProcessCDR6(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetStats2(t *testing.T) { expectedIDs := []string{"STS_PoccessCDR"} var metrics map[string]string expectedMetrics := map[string]string{ - utils.ConcatenatedKey(utils.MetaSum, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+utils.Usage): "60000000000", + utils.ConcatenatedKey(utils.MetaSum, utils.DynamicDataPrefix+utils.MetaReq+utils.NestingSep+utils.Usage): "120000000000", } if err := cdrsRpc.Call(utils.StatSv1GetQueueStringMetrics, &utils.TenantIDWithArgDispatcher{ @@ -709,7 +704,6 @@ func testV2CDRsProcessCDR7(t *testing.T) { } else if reply != utils.OK { t.Error("Unexpected reply received: ", reply) } - time.Sleep(time.Duration(150) * time.Millisecond) // Give time for CDR to be rated } func testV2CDRsGetCdrs7(t *testing.T) {