added it tests and new APIs for TrendS

This commit is contained in:
gezimbll
2024-09-26 17:01:04 +02:00
committed by Dan Christian Bogos
parent d9a5458a43
commit b134df83b7
10 changed files with 232 additions and 32 deletions

View File

@@ -106,9 +106,19 @@ func (apierSv1 *APIerSv1) RemoveTrendProfile(ctx *context.Context, args *utils.T
// NewTrendSv1 initializes TrendSV1
func NewTrendSv1(trs *engine.TrendS) *TrendSv1 {
return &TrendSv1{}
return &TrendSv1{
trS: trs,
}
}
type TrendSv1 struct {
trS *engine.TrendS
}
func (trs *TrendSv1) ScheduleQueries(ctx *context.Context, args *utils.ArgScheduleTrendQueries, scheduled *int) error {
return trs.trS.V1ScheduleQueries(ctx, args, scheduled)
}
func (trs *TrendSv1) GetTrend(ctx *context.Context, args *utils.ArgGetTrend, trend *engine.Trend) error {
return trs.trS.V1GetTrend(ctx, args, trend)
}

View File

@@ -143,7 +143,7 @@ func initConfigSv1(internalConfigChan chan birpc.ClientConnector,
func startRPC(server *cores.Server, internalRaterChan,
internalCdrSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalTrendSChan, internalSuplSChan,
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
internalEEsChan, internalERsChan chan birpc.ClientConnector,
@@ -166,6 +166,8 @@ func startRPC(server *cores.Server, internalRaterChan,
internalChargerSChan <- chrgS
case thS := <-internalThdSChan:
internalThdSChan <- thS
case trS := <-internalTrendSChan:
internalTrendSChan <- trS
case splS := <-internalSuplSChan:
internalSuplSChan <- splS
case analyzerS := <-internalAnalyzerSChan:
@@ -584,7 +586,7 @@ func main() {
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, server, internalThresholdSChan, anz, srvDep)
stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server,
internalStatSChan, connManager, anz, srvDep)
srS := services.NewTrendService(cfg, dmService, cacheS, filterSChan, server,
trS := services.NewTrendService(cfg, dmService, cacheS, filterSChan, server,
internalTrendSChan, connManager, anz, srvDep)
sgS := services.NewRankingService(cfg, dmService, cacheS, filterSChan, server,
internalRankingSChan, connManager, anz, srvDep)
@@ -613,7 +615,7 @@ func main() {
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server,
internalLoaderSChan, connManager, anz, srvDep)
srvManager.AddServices(gvService, attrS, chrS, tS, stS, srS, sgS, reS, routeS, schS, rals,
srvManager.AddServices(gvService, attrS, chrS, tS, stS, trS, sgS, reS, routeS, schS, rals,
apiSv1, apiSv2, cdrS, smg, coreS,
services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep),
@@ -657,6 +659,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, internalSchedulerSChan)
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSessionSChan)
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan)
engine.IntRPC.AddInternalRPCClient(utils.TrendSv1, internalTrendSChan)
engine.IntRPC.AddInternalRPCClient(utils.RouteSv1, internalRouteSChan)
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan)
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
@@ -680,7 +683,7 @@ func main() {
go startRPC(server, internalResponderChan, internalCDRServerChan,
internalResourceSChan, internalStatSChan,
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
internalTrendSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, internalRALsChan,
internalCacheSChan, internalEEsChan, internalERsChan, shdChan)

View File

@@ -25,6 +25,11 @@
"db_type": "*internal"
},
"trends": {
"enabled": true,
"stats_conns":["*internal"],
},
"rals": {
"enabled": true,

View File

@@ -111,8 +111,6 @@ type Trend struct {
//
// thread safe since it should be used close to source
func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
t.tMux.Lock()
defer t.tMux.Unlock()
t.cleanup(cleanTtl, qLength)
if t.mTotals == nil { // indexes were not yet built
t.computeIndexes()
@@ -121,23 +119,26 @@ func (t *Trend) Compile(cleanTtl time.Duration, qLength int) {
// cleanup will clean stale data out of
func (t *Trend) cleanup(ttl time.Duration, qLength int) (altered bool) {
expTime := time.Now().Add(-ttl)
var expIdx *int
for i, rT := range t.RunTimes {
if rT.After(expTime) {
continue
if ttl >= 0 {
expTime := time.Now().Add(-ttl)
var expIdx *int
for i, rT := range t.RunTimes {
if rT.After(expTime) {
continue
}
expIdx = &i
delete(t.Metrics, rT)
}
expIdx = &i
delete(t.Metrics, rT)
}
if expIdx != nil {
if len(t.RunTimes)-1 == *expIdx {
t.RunTimes = make([]time.Time, 0)
} else {
t.RunTimes = t.RunTimes[*expIdx+1:]
if expIdx != nil {
if len(t.RunTimes)-1 == *expIdx {
t.RunTimes = make([]time.Time, 0)
} else {
t.RunTimes = t.RunTimes[*expIdx+1:]
}
altered = true
}
altered = true
}
diffLen := len(t.RunTimes) - qLength
if qLength > 0 && diffLen > 0 {
var rmTms []time.Time

View File

@@ -39,8 +39,9 @@ func NewTrendS(dm *DataManager,
connMgr: connMgr,
filterS: filterS,
cgrcfg: cgrcfg,
crn: cron.New(),
loopStopped: make(chan struct{}),
crnTQsMux: new(sync.RWMutex),
crnTQsMux: &sync.RWMutex{},
crnTQs: make(map[string]map[string]cron.EntryID),
}
}
@@ -82,6 +83,7 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
ID: tP.ID,
RunTimes: make([]time.Time, 0),
Metrics: make(map[time.Time]map[string]*MetricWithTrend),
tMux: new(sync.RWMutex),
}
} else if err != nil {
utils.Logger.Warning(
@@ -90,15 +92,16 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
utils.TrendS, tP.Tenant, tP.ID, err.Error()))
return
}
if trend.tMux == nil {
trend.tMux = new(sync.RWMutex)
}
trend.tMux.Lock()
defer trend.tMux.Unlock()
trend.cleanup(tP.TTL, tP.QueueLength)
if trend.mTotals == nil { // indexes were not yet built
if len(trend.mTotals) == 0 { // indexes were not yet built
trend.computeIndexes()
}
now := time.Now()
var metrics []string
if len(tP.Metrics) != 0 {
@@ -113,6 +116,9 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
return // nothing to compute
}
trend.RunTimes = append(trend.RunTimes, now)
if trend.Metrics == nil {
trend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
}
trend.Metrics[now] = make(map[string]*MetricWithTrend)
for _, mID := range metrics {
mWt := &MetricWithTrend{ID: mID}
@@ -139,6 +145,15 @@ func (tS *TrendS) computeTrend(tP *TrendProfile) {
}
func (tS *TrendS) StartScheduling() {
tS.crn.Start()
}
func (tS *TrendS) StopScheduling() {
ctx := tS.crn.Stop()
<-ctx.Done()
}
// scheduleTrendQueries will schedule/re-schedule specific trend queries
func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []string) (scheduled int, err error) {
var partial bool
@@ -161,8 +176,9 @@ func (tS *TrendS) scheduleTrendQueries(ctx *context.Context, tnt string, tIDs []
"<%s> scheduling TrendProfile <%s:%s>, error: <%s>",
utils.TrendS, tnt, tID, err.Error()))
partial = true
} else {
} else { // log the entry ID for debugging
tS.crnTQsMux.Lock()
tS.crnTQs[tP.Tenant] = make(map[string]cron.EntryID)
tS.crnTQs[tP.Tenant][tP.ID] = entryID
tS.crnTQsMux.Unlock()
}
@@ -183,3 +199,10 @@ func (tS *TrendS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSchedul
}
return
}
func (tS *TrendS) V1GetTrend(ctx *context.Context, arg *utils.ArgGetTrend, trend *Trend) (err error) {
var tr *Trend
tr, err = tS.dm.GetTrend(arg.Tenant, arg.ID, true, true, utils.NonTransactional)
*trend = *tr
return
}

View File

@@ -84,7 +84,4 @@ func TestNewTrendS(t *testing.T) {
t.Errorf("Expected crnTQs to be empty, but got length %d", len(trendS.crnTQs))
}
if trendS.crn != nil {
t.Errorf("Expected crn to be nil, but got %v", trendS.crn)
}
}

View File

@@ -0,0 +1,153 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestTrendSchedule(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
content := `{
"general": {
"log_level": 7,
},
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"trends": {
"enabled": true,
"stats_conns":["*localhost"],
},
"stats": {
"enabled": true,
"store_interval": "-1",
},
"apiers": {
"enabled": true,
},
}
`
tpFiles := map[string]string{
utils.TrendsCsv: `#Tenant[0],Id[1],Schedule[2],StatID[3],Metrics[4],TTL[5],QueueLength[6],MinItems[7],CorrelationType[8],Tolerance[9],Stored[10],ThresholdIDs[11]
cgrates.org,TREND_1,@every 1s,Stats1_1,,-1,-1,1,*last,1,false,
cgrates.org,TREND_2,@every 1s,Stats1_2,,-1,-1,1,*last,1,false,`,
utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12]
cgrates.org,Stats1_1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,
cgrates.org,Stats1_2,*string:~*req.Account:1002,,,,,*sum#~*req.Usage;*pdd,,,,,`}
testEnv := TestEnvironment{
Name: "TestTrendSchedule",
ConfigJSON: content,
TpFiles: tpFiles,
}
client, _ := testEnv.Setup(t, *utils.WaitRater)
t.Run("CheckTrendSchedule", func(t *testing.T) {
var scheduled int
if err := client.Call(context.Background(), utils.TrendSv1ScheduleQueries,
&utils.ArgScheduleTrendQueries{TrendIDs: []string{"TREND_1", "TREND_2"}, TenantIDWithAPIOpts: utils.TenantIDWithAPIOpts{TenantID: &utils.TenantID{Tenant: "cgrates.org"}}}, &scheduled); err != nil {
t.Fatal(err)
} else if scheduled != 2 {
t.Errorf("expected 2, got %d", scheduled)
}
})
t.Run("ProcessStats", func(t *testing.T) {
var reply []string
if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("event%d", 1),
Event: map[string]any{
utils.AccountField: "1001",
utils.AnswerTime: time.Date(2024, 8, 22, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(rand.Intn(3600)+60) * time.Second,
utils.Cost: rand.Float64()*20 + 0.1,
utils.PDD: time.Duration(rand.Intn(20)+1) * time.Second,
}}, &reply); err != nil {
t.Error(err)
}
})
time.Sleep(1 * time.Second)
t.Run("TestGetTrend", func(t *testing.T) {
var tr engine.Trend
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil {
t.Error(err)
} else if len(tr.RunTimes) != 1 && len(tr.Metrics) != 1 {
t.Error("expected metrics to be calculated")
}
})
t.Run("ProcessStats", func(t *testing.T) {
var reply []string
if err := client.Call(context.Background(), utils.StatSv1ProcessEvent, &utils.CGREvent{
Tenant: "cgrates.org",
ID: fmt.Sprintf("event%d", 2),
Event: map[string]any{
utils.AccountField: "1001",
utils.AnswerTime: time.Date(2024, 9, 22, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(rand.Intn(3600)+60) * time.Second / 2,
utils.Cost: rand.Float64() * 30,
utils.PDD: time.Duration(rand.Intn(20)+4) * time.Second,
}}, &reply); err != nil {
t.Error(err)
}
})
time.Sleep(1 * time.Second)
t.Run("TestGetTrend", func(t *testing.T) {
var tr engine.Trend
if err := client.Call(context.Background(), utils.TrendSv1GetTrend, &utils.ArgGetTrend{ID: "TREND_1", TenantWithAPIOpts: utils.TenantWithAPIOpts{Tenant: "cgrates.org"}}, &tr); err != nil {
t.Error(err)
} else if len(tr.RunTimes) != 2 && len(tr.Metrics) != 2 {
t.Error("expected metrics to be calculated")
} else if tr.Metrics[tr.RunTimes[1]]["*acd"].TrendLabel != utils.MetaNegative {
t.Error("expected TrendLabel to be negative")
} else if tr.Metrics[tr.RunTimes[1]]["*tcc"].TrendLabel != utils.MetaPositive {
t.Error("expected TrendLabel to be positive")
} else if tr.Metrics[tr.RunTimes[1]]["*tcd"].TrendLabel != utils.MetaPositive {
t.Error("expected TrendLabel to be positive")
}
})
}

View File

@@ -78,12 +78,12 @@ func (trs *TrendService) Start() error {
dbchan := trs.dm.GetDMChan()
dm := <-dbchan
dbchan <- dm
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem",
utils.CoreS, utils.TrendS))
trs.Lock()
defer trs.Unlock()
trs.trs = engine.NewTrendS(dm, trs.connMgr, filterS, trs.cfg)
trs.trs.StartScheduling()
srv, err := engine.NewService(v1.NewTrendSv1(trs.trs))
if err != nil {
return err
@@ -105,6 +105,7 @@ func (tr *TrendService) Shutdown() (err error) {
defer tr.srvDep[utils.DataDB].Done()
tr.Lock()
defer tr.Unlock()
tr.trs.StopScheduling()
<-tr.connChan
return
}
@@ -113,7 +114,7 @@ func (tr *TrendService) Shutdown() (err error) {
func (tr *TrendService) IsRunning() bool {
tr.RLock()
defer tr.RUnlock()
return false
return tr.trs != nil
}
// ServiceName returns the service name

View File

@@ -1658,3 +1658,8 @@ type ArgScheduleTrendQueries struct {
TenantIDWithAPIOpts
TrendIDs []string
}
type ArgGetTrend struct {
TenantWithAPIOpts
ID string
}

View File

@@ -1703,6 +1703,8 @@ const (
APIerSv1GetTrendProfileIDs = "APIerSv1.GetTrendProfileIDs"
APIerSv1GetTrendProfiles = "APIerSv1.GetTrendProfiles"
TrendSv1Ping = "TrendSv1.Ping"
TrendSv1ScheduleQueries = "TrendSv1.ScheduleQueries"
TrendSv1GetTrend = "TrendSv1.GetTrend"
)
// RankingS APIs