diff --git a/apier/v1/trends.go b/apier/v1/trends.go index 31b123d0a..cee7095ca 100644 --- a/apier/v1/trends.go +++ b/apier/v1/trends.go @@ -106,9 +106,19 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T // NewTrendSv1 initializes TrendSV1 func NewTrendSv1(trs *engine.TrendS) *TrendSv1 { - return &TrendSv1{} + return &TrendSv1{ + trS: trs, + } } type TrendSv1 struct { trS *engine.TrendS } + +func (trs *TrendSv1) ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) error { + return trs.trS.V1ScheduleQueries(ctx, args, scheduled) +} + +func (trs *TrendSv1) GetTrend(ctx *context.Context, args *utils.ArgGetTrend, trend *engine.Trend) error { + return trs.trS.V1GetTrend(ctx, args, trend) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 119f69ddf..a6bd3e0e9 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -143,7 +143,7 @@ func initConfigSv1(internalConfigChan chan birpc.ClientConnector, func startRPC(server *cores.Server, internalRaterChan, internalCdrSChan, internalRsChan, internalStatSChan, - internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, + internalAttrSChan, internalChargerSChan, internalThdSChan, internalTrendSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, internalEEsChan, internalERsChan chan birpc.ClientConnector, @@ -166,6 +166,8 @@ func startRPC(server *cores.Server, internalRaterChan, internalChargerSChan <- chrgS case thS := <-internalThdSChan: internalThdSChan <- thS + case trS := <-internalTrendSChan: + internalTrendSChan <- trS case splS := <-internalSuplSChan: internalSuplSChan <- splS case analyzerS := <-internalAnalyzerSChan: @@ -584,7 +586,7 @@ func main() { tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz, srvDep) stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server, internalStatSChan, connManager, anz, srvDep) - srS := services.NewTrendService(cfg, dmService, cacheS, filterSChan, server, + trS := services.NewTrendService(cfg, dmService, cacheS, filterSChan, server, internalTrendSChan, connManager, anz, srvDep) sgS := services.NewRankingService(cfg, dmService, cacheS, filterSChan, server, internalRankingSChan, connManager, anz, srvDep) @@ -613,7 +615,7 @@ func main() { ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz, srvDep) - srvManager.AddServices(gvService, attrS, chrS, tS, stS, srS, sgS, reS, routeS, schS, rals, + srvManager.AddServices(gvService, attrS, chrS, tS, stS, trS, sgS, reS, routeS, schS, rals, apiSv1, apiSv2, cdrS, smg, coreS, services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep), services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), @@ -657,6 +659,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedulerSChan) engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSessionSChan) engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan) + engine.IntRPC.AddInternalRPCClient(utils.TrendSv1, internalTrendSChan) engine.IntRPC.AddInternalRPCClient(utils.RouteSv1, internalRouteSChan) engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan) engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan) @@ -680,7 +683,7 @@ func main() { go startRPC(server, internalResponderChan, internalCDRServerChan, internalResourceSChan, internalStatSChan, internalAttributeSChan, internalChargerSChan, internalThresholdSChan, - internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, + internalTrendSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, internalCacheSChan, internalEEsChan, internalERsChan, shdChan) diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 65c848c8c..7ce1d2e39 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -25,6 +25,11 @@ "db_type": "*internal" }, +"trends": { + "enabled": true, + "stats_conns":["*internal"], +}, + "rals": { "enabled": true, diff --git a/engine/libtrends.go b/engine/libtrends.go index 5fec45b44..665a4f788 100644 --- a/engine/libtrends.go +++ b/engine/libtrends.go @@ -111,8 +111,6 @@ type Trend struct { // // thread safe since it should be used close to source func (t *Trend) Compile(cleanTtl time.Duration, qLength int) { - t.tMux.Lock() - defer t.tMux.Unlock() t.cleanup(cleanTtl, qLength) if t.mTotals == nil { // indexes were not yet built t.computeIndexes() @@ -121,23 +119,26 @@ func (t *Trend) Compile(cleanTtl time.Duration, qLength int) { // cleanup will clean stale data out of func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) { - expTime := time.Now().Add(-ttl) - var expIdx *int - for i, rT := range t.RunTimes { - if rT.After(expTime) { - continue + if ttl >= 0 { + expTime := time.Now().Add(-ttl) + var expIdx *int + for i, rT := range t.RunTimes { + if rT.After(expTime) { + continue + } + expIdx = &i + delete(t.Metrics, rT) } - expIdx = &i - delete(t.Metrics, rT) - } - if expIdx != nil { - if len(t.RunTimes)-1 == *expIdx { - t.RunTimes = make([]time.Time, 0) - } else { - t.RunTimes = t.RunTimes[*expIdx+1:] + if expIdx != nil { + if len(t.RunTimes)-1 == *expIdx { + t.RunTimes = make([]time.Time, 0) + } else { + t.RunTimes = t.RunTimes[*expIdx+1:] + } + altered = true } - altered = true } + diffLen := len(t.RunTimes) - qLength if qLength > 0 && diffLen > 0 { var rmTms []time.Time diff --git a/engine/trends.go b/engine/trends.go index c7bc790a1..c8f7068e4 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -39,8 +39,9 @@ func NewTrendS(dm *DataManager, connMgr: connMgr, filterS: filterS, cgrcfg: cgrcfg, + crn: cron.New(), loopStopped: make(chan struct{}), - crnTQsMux: new(sync.RWMutex), + crnTQsMux: &sync.RWMutex{}, crnTQs: make(map[string]map[string]cron.EntryID), } } @@ -82,6 +83,7 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { ID: tP.ID, RunTimes: make([]time.Time, 0), Metrics: make(map[time.Time]map[string]*MetricWithTrend), + tMux: new(sync.RWMutex), } } else if err != nil { utils.Logger.Warning( @@ -90,15 +92,16 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { utils.TrendS, tP.Tenant, tP.ID, err.Error())) return } - + if trend.tMux == nil { + trend.tMux = new(sync.RWMutex) + } trend.tMux.Lock() defer trend.tMux.Unlock() - trend.cleanup(tP.TTL, tP.QueueLength) - if trend.mTotals == nil { // indexes were not yet built + + if len(trend.mTotals) == 0 { // indexes were not yet built trend.computeIndexes() } - now := time.Now() var metrics []string if len(tP.Metrics) != 0 { @@ -113,6 +116,9 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { return // nothing to compute } trend.RunTimes = append(trend.RunTimes, now) + if trend.Metrics == nil { + trend.Metrics = make(map[time.Time]map[string]*MetricWithTrend) + } trend.Metrics[now] = make(map[string]*MetricWithTrend) for _, mID := range metrics { mWt := &MetricWithTrend{ID: mID} @@ -139,6 +145,15 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) { } +func (tS *TrendS) StartScheduling() { + tS.crn.Start() +} + +func (tS *TrendS) StopScheduling() { + ctx := tS.crn.Stop() + <-ctx.Done() +} + // scheduleTrendQueries will schedule/re-schedule specific trend queries func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) { var partial bool @@ -161,8 +176,9 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs [] "<%s> scheduling TrendProfile <%s:%s>, error: <%s>", utils.TrendS, tnt, tID, err.Error())) partial = true - } else { + } else { // log the entry ID for debugging tS.crnTQsMux.Lock() + tS.crnTQs[tP.Tenant] = make(map[string]cron.EntryID) tS.crnTQs[tP.Tenant][tP.ID] = entryID tS.crnTQsMux.Unlock() } @@ -183,3 +199,10 @@ func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSchedul } return } + +func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, trend *Trend) (err error) { + var tr *Trend + tr, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional) + *trend = *tr + return +} diff --git a/engine/trends_test.go b/engine/trends_test.go index 505e335d1..4138a3e78 100644 --- a/engine/trends_test.go +++ b/engine/trends_test.go @@ -84,7 +84,4 @@ func TestNewTrendS(t *testing.T) { t.Errorf("Expected crnTQs to be empty, but got length %d", len(trendS.crnTQs)) } - if trendS.crn != nil { - t.Errorf("Expected crn to be nil, but got %v", trendS.crn) - } } diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go new file mode 100644 index 000000000..1983e67fe --- /dev/null +++ b/general_tests/trends_schedule_it_test.go @@ -0,0 +1,153 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestTrendSchedule(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + content := `{ + +"general": { + "log_level": 7, +}, + +"data_db": { + "db_type": "*internal" +}, + +"stor_db": { + "db_type": "*internal" +}, + +"trends": { + "enabled": true, + "stats_conns":["*localhost"], +}, + +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"apiers": { + "enabled": true, +}, + +} +` + tpFiles := map[string]string{ + utils.TrendsCsv: `#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11] +cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false, +cgrates.org,TREND_2,@every 1s,Stats1_2,,-1,-1,1,*last,1,false,`, + utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] +cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,, +cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,,`} + + testEnv := TestEnvironment{ + Name: "TestTrendSchedule", + ConfigJSON: content, + TpFiles: tpFiles, + } + + client, _ := testEnv.Setup(t, *utils.WaitRater) + t.Run("CheckTrendSchedule", func(t *testing.T) { + var scheduled int + if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries, + &utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1", "TREND_2"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil { + t.Fatal(err) + } else if scheduled != 2 { + t.Errorf("expected 2, got %d", scheduled) + } + }) + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", 1), + Event: map[string]any{ + utils.AccountField: "1001", + utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(rand.Intn(3600)+60) * time.Second, + utils.Cost: rand.Float64()*20 + 0.1, + utils.PDD: time.Duration(rand.Intn(20)+1) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + + }) + time.Sleep(1 * time.Second) + t.Run("TestGetTrend", func(t *testing.T) { + var tr engine.Trend + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + t.Error(err) + } else if len(tr.RunTimes) != 1 && len(tr.Metrics) != 1 { + t.Error("expected metrics to be calculated") + } + }) + + t.Run("ProcessStats", func(t *testing.T) { + var reply []string + if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{ + Tenant: "cgrates.org", + ID: fmt.Sprintf("event%d", 2), + Event: map[string]any{ + utils.AccountField: "1001", + utils.AnswerTime: time.Date(2024, 9, 22, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(rand.Intn(3600)+60) * time.Second / 2, + utils.Cost: rand.Float64() * 30, + utils.PDD: time.Duration(rand.Intn(20)+4) * time.Second, + }}, &reply); err != nil { + t.Error(err) + } + }) + + time.Sleep(1 * time.Second) + t.Run("TestGetTrend", func(t *testing.T) { + var tr engine.Trend + if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil { + t.Error(err) + } else if len(tr.RunTimes) != 2 && len(tr.Metrics) != 2 { + t.Error("expected metrics to be calculated") + } else if tr.Metrics[tr.RunTimes[1]]["*acd"].TrendLabel != utils.MetaNegative { + t.Error("expected TrendLabel to be negative") + } else if tr.Metrics[tr.RunTimes[1]]["*tcc"].TrendLabel != utils.MetaPositive { + t.Error("expected TrendLabel to be positive") + } else if tr.Metrics[tr.RunTimes[1]]["*tcd"].TrendLabel != utils.MetaPositive { + t.Error("expected TrendLabel to be positive") + } + }) +} diff --git a/services/trends.go b/services/trends.go index e24c13891..755f234c4 100644 --- a/services/trends.go +++ b/services/trends.go @@ -78,12 +78,12 @@ func (trs *TrendService) Start() error { dbchan := trs.dm.GetDMChan() dm := <-dbchan dbchan <- dm - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TrendS)) trs.Lock() defer trs.Unlock() trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg) + trs.trs.StartScheduling() srv, err := engine.NewService(v1.NewTrendSv1(trs.trs)) if err != nil { return err @@ -105,6 +105,7 @@ func (tr *TrendService) Shutdown() (err error) { defer tr.srvDep[utils.DataDB].Done() tr.Lock() defer tr.Unlock() + tr.trs.StopScheduling() <-tr.connChan return } @@ -113,7 +114,7 @@ func (tr *TrendService) Shutdown() (err error) { func (tr *TrendService) IsRunning() bool { tr.RLock() defer tr.RUnlock() - return false + return tr.trs != nil } // ServiceName returns the service name diff --git a/utils/apitpdata.go b/utils/apitpdata.go index fba8a43ca..aeba0fadd 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1658,3 +1658,8 @@ type ArgScheduleTrendQueries struct { TenantIDWithAPIOpts TrendIDs []string } + +type ArgGetTrend struct { + TenantWithAPIOpts + ID string +} diff --git a/utils/consts.go b/utils/consts.go index 9bfd0da7c..148b5af46 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1703,6 +1703,8 @@ const ( APIerSv1GetTrendProfileIDs = "APIerSv1.GetTrendProfileIDs" APIerSv1GetTrendProfiles = "APIerSv1.GetTrendProfiles" TrendSv1Ping = "TrendSv1.Ping" + TrendSv1ScheduleQueries = "TrendSv1.ScheduleQueries" + TrendSv1GetTrend = "TrendSv1.GetTrend" ) // RankingS APIs