diff --git a/.github/workflows/integration_tests.yaml b/.github/workflows/integration_tests.yaml index 945f1d013..dea10fd62 100644 --- a/.github/workflows/integration_tests.yaml +++ b/.github/workflows/integration_tests.yaml @@ -58,5 +58,4 @@ jobs: - name: Run integration tests run: | - sudo env "PATH=$PATH" ./integration_test.sh - + sudo env "PATH=$PATH" ./integration_test.sh \ No newline at end of file diff --git a/apier/v1/tptrends_it_test.go b/apier/v1/tptrends_it_test.go index aa8f61e9a..7dcd4ae4e 100644 --- a/apier/v1/tptrends_it_test.go +++ b/apier/v1/tptrends_it_test.go @@ -113,7 +113,7 @@ func testTPTrendsRpcConn(t *testing.T) { func testTPTrendsGetTPTrendBeforeSet(t *testing.T) { var reply *utils.TPTrendsProfile - if err := tpTrendRPC.Call(context.Background(), utils.APIerSv1GetTPRanking, + if err := tpTrendRPC.Call(context.Background(), utils.APIerSv1GetTPTrend, &utils.TPTntID{TPid: "TPS1", Tenant: "cgrates.org", ID: "Trend1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { t.Error(err) @@ -122,11 +122,14 @@ func testTPTrendsGetTPTrendBeforeSet(t *testing.T) { func testTPTrendsSetTPTrend(t *testing.T) { tpTrend = &utils.TPTrendsProfile{ - Tenant: "cgrates.org", - TPid: "TPS1", - ID: "Trend1", - QueryInterval: "1m", - ThresholdIDs: []string{"ThreshValue", "ThreshValueTwo"}, + Tenant: "cgrates.org", + TPid: "TPS1", + ID: "Trend1", + ThresholdIDs: []string{"ThreshValue", "ThreshValueTwo"}, + Metrics: []utils.MetricWithSettings{ + {MetricID: "Metric"}, + }, + Trend: "*average", } sort.Strings(tpTrend.ThresholdIDs) var result string @@ -145,7 +148,7 @@ func testTPTrendsGetTPTrendAfterSet(t *testing.T) { } sort.Strings(respond.ThresholdIDs) if !reflect.DeepEqual(tpTrend, respond) { - t.Errorf("Expecting: %+v, received: %+v", tpTrend, respond) + t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(tpTrend), utils.ToJSON(respond)) } } diff --git a/apier/v1/trends_it_test.go b/apier/v1/trends_it_test.go index bfd111492..132417143 100644 --- a/apier/v1/trends_it_test.go +++ b/apier/v1/trends_it_test.go @@ -23,7 +23,6 @@ package v1 import ( "path" "testing" - "time" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" @@ -114,11 +113,9 @@ func testTrendSRPCConn(t *testing.T) { func testTrendSLoadAdd(t *testing.T) { trendProfile := &engine.TrendProfileWithAPIOpts{ TrendProfile: &engine.TrendProfile{ - Tenant: "cgrates.org", - ID: "TR_AVG", - QueryInterval: 2 * time.Minute, - StatID: "Stat1", - Trend: "*average", + Tenant: "cgrates.org", + ID: "TR_AVG", + StatID: "Stat1", }, } @@ -138,10 +135,9 @@ func testTrendSetTrendProfile(t *testing.T) { ) trendProfile = &engine.TrendProfileWithAPIOpts{ TrendProfile: &engine.TrendProfile{ - Tenant: "cgrates.org", - ID: "Trend1", - QueryInterval: time.Second * 15, - ThresholdIDs: []string{"THD1", "THD2"}}, + Tenant: "cgrates.org", + ID: "Trend1", + ThresholdIDs: []string{"THD1", "THD2"}}, } if err := trendRPC.Call(context.Background(), utils.APIerSv1GetTrendProfile, &utils.TenantID{Tenant: "cgrates.org", ID: "Trend1"}, &reply); err == nil || diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index edf8600f6..561251d84 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -4,7 +4,6 @@ // Used for cgradmin // Starts rater, scheduler - "listen": { "rpc_json": ":2012", // RPC JSON listening address "rpc_gob": ":2013", // RPC GOB listening address diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql index 2af8c6047..d3062ca17 100644 --- a/data/storage/mysql/create_tariffplan_tables.sql +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -327,12 +327,14 @@ CREATE TABLE tp_trends( `tpid` varchar(64) NOT NULL, `tenant` varchar(64) NOT NULL, `id` varchar(64) NOT NULL, - `query_interval` varchar(64) NOT NULL, + `schedule` varchar(64) NOT NULL, `stat_id` varchar(64) NOT NULL, + `metrics` varchar(128) NOT NULL, + `trend_swing_margin` decimal(8,2) NOT NULL, `queue_length` int(11) NOT NULL, `ttl` varchar(32) NOT NULL, - `purge_filter_ids` varchar(64) NOT NULL, `trend` varchar(64) NOT NULL, + `trend_type` varchar(64) NOT NULL, `threshold_ids` varchar(64) NOT NULL, `created_at` TIMESTAMP, PRIMARY KEY (`pk`), diff --git a/data/storage/postgres/create_tariffplan_tables.sql b/data/storage/postgres/create_tariffplan_tables.sql index 5b6d0f93c..060c2f227 100644 --- a/data/storage/postgres/create_tariffplan_tables.sql +++ b/data/storage/postgres/create_tariffplan_tables.sql @@ -320,12 +320,14 @@ CREATE TABLE tp_trends( "tpid" varchar(64) NOT NULL, "tenant" varchar(64) NOT NULL, "id" varchar(64) NOT NULL, - "query_interval" varchar(64) NOT NULL, + "schedule" varchar(64) NOT NULL, "stat_id" varchar(64) NOT NULL, + "metrics" varchar(128) NOT NULL, + "trend_swing_margin" decimal(8,2) NOT NULL, "queue_length" INTEGER NOT NULL, "ttl" varchar(32) NOT NULL, - "purge_filter_ids" varchar(64) NOT NULL, "trend" varchar(32) NOT NULL, + "trend_type" varchar(64) NOT NULL, "threshold_ids" varchar(64) NOT NULL, "created_at" TIMESTAMP ); diff --git a/engine/datamanager.go b/engine/datamanager.go index 59cdcc7db..555550ae7 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1450,19 +1450,17 @@ func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) { config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) } if oldTrd == nil || - oldTrd.Trend != trp.Trend || - oldTrd.QueueLength != trp.QueueLength { + oldTrd.QueueLength != trp.QueueLength || + oldTrd.Schedule != trp.Schedule { err = dm.SetTrend(&Trend{ Tenant: trp.Tenant, ID: trp.ID, - Trend: trp.Trend, }) } else if _, errTr := dm.GetTrend(trp.Tenant, trp.ID, true, false, utils.NonTransactional); errTr == utils.ErrNotFound { err = dm.SetTrend(&Trend{ Tenant: trp.Tenant, ID: trp.ID, - Trend: trp.Trend, }) } return diff --git a/engine/model_helpers.go b/engine/model_helpers.go index 49064cba1..cdb476a4f 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -1643,49 +1643,47 @@ func RankingProfileToAPI(sg *RankingProfile) (tpSG *utils.TPRankingProfile) { type TrendsMdls []*TrendsMdl func (tps TrendsMdls) CSVHeader() (result []string) { - return []string{"#" + utils.Tenant, utils.ID, utils.QueryInterval, utils.StatID, - utils.TTL, utils.PurgeFilterIDs, utils.Trend, utils.ThresholdIDs} + return []string{"#" + utils.Tenant, utils.ID, utils.Schedule, utils.StatID, + utils.Metrics, utils.QueueLength, utils.TTL, utils.TrendType, utils.ThresholdIDs} } func (models TrendsMdls) AsTPTrends() (result []*utils.TPTrendsProfile) { thresholdsMap := make(map[string]utils.StringSet) - purgeFiltersIDsMap := make(map[string]utils.StringSet) - msr := make(map[string]*utils.TPTrendsProfile) + trendMetricsMap := make(map[string]map[string]*utils.MetricWithSettings) + mtr := make(map[string]*utils.TPTrendsProfile) for _, model := range models { key := &utils.TenantID{Tenant: model.Tenant, ID: model.ID} - sr, found := msr[key.TenantID()] + tr, found := mtr[key.TenantID()] if !found { - sr = &utils.TPTrendsProfile{ - Tenant: model.Tenant, - TPid: model.Tpid, - ID: model.ID, - QueryInterval: model.QueryInterval, - StatID: model.StatID, - Trend: model.Trend, - TTL: model.TTL, - QueueLength: model.QueueLength, + tr = &utils.TPTrendsProfile{ + Tenant: model.Tenant, + TPid: model.Tpid, + ID: model.ID, + Schedule: model.Schedule, + StatID: model.StatID, + TTL: model.TTL, + Trend: model.Trend, + TrendType: model.TrendType, + QueueLength: model.QueueLength, } } - if model.QueryInterval != utils.EmptyString { - sr.QueryInterval = model.QueryInterval + if model.Schedule != utils.EmptyString { + tr.Schedule = model.Schedule } if model.StatID != utils.EmptyString { - sr.StatID = model.StatID - } - if model.Trend != utils.EmptyString { - sr.Trend = model.Trend + tr.StatID = model.StatID } if model.TTL != utils.EmptyString { - sr.TTL = model.TTL + tr.TTL = model.TTL + } + if model.Trend != utils.EmptyString { + tr.Trend = model.Trend + } + if model.TrendType != utils.EmptyString { + tr.TrendType = model.TrendType } if model.QueueLength != 0 { - sr.QueueLength = model.QueueLength - } - if model.PurgeFilterIDs != utils.EmptyString { - if _, has := purgeFiltersIDsMap[key.TenantID()]; !has { - purgeFiltersIDsMap[key.TenantID()] = make(utils.StringSet) - } - purgeFiltersIDsMap[key.TenantID()].AddSlice(strings.Split(model.PurgeFilterIDs, utils.InfieldSep)) + tr.QueueLength = model.QueueLength } if model.ThresholdIDs != utils.EmptyString { if _, has := thresholdsMap[key.TenantID()]; !has { @@ -1693,109 +1691,116 @@ func (models TrendsMdls) AsTPTrends() (result []*utils.TPTrendsProfile) { } thresholdsMap[key.TenantID()].AddSlice(strings.Split(model.ThresholdIDs, utils.InfieldSep)) } - msr[key.TenantID()] = sr + if model.Metrics != utils.EmptyString { + if _, has := trendMetricsMap[key.TenantID()]; !has { + trendMetricsMap[key.TenantID()] = make(map[string]*utils.MetricWithSettings) + } + metricsSplit := strings.Split(model.Metrics, utils.InfieldSep) + for _, metricID := range metricsSplit { + trMetric, found := trendMetricsMap[key.TenantID()][metricID] + if !found { + trMetric = &utils.MetricWithSettings{ + MetricID: metricID, + } + } + if model.TrendSwingMargin != 0 { + trMetric.TrendSwingMargin = model.TrendSwingMargin + } + trendMetricsMap[key.TenantID()][metricID] = trMetric + } + } + mtr[key.TenantID()] = tr } - result = make([]*utils.TPTrendsProfile, len(msr)) + result = make([]*utils.TPTrendsProfile, len(mtr)) i := 0 - for tntId, sr := range msr { + for tntId, sr := range mtr { result[i] = sr - result[i].PurgeFilterIDs = purgeFiltersIDsMap[tntId].AsSlice() result[i].ThresholdIDs = thresholdsMap[tntId].AsSlice() + for _, metric := range trendMetricsMap[tntId] { + result[i].Metrics = append(result[i].Metrics, *metric) + } i++ } return } -func APItoModelTrends(tpSR *utils.TPTrendsProfile) (mdls TrendsMdls) { - if tpSR == nil { - return - } - if len(tpSR.PurgeFilterIDs) == 0 { - mdl := &TrendsMdl{ - Tpid: tpSR.TPid, - Tenant: tpSR.Tenant, - ID: tpSR.ID, - QueryInterval: tpSR.QueryInterval, - StatID: tpSR.StatID, - QueueLength: tpSR.QueueLength, - Trend: tpSR.Trend, - } - for i, threshold := range tpSR.ThresholdIDs { - if i != 0 { - mdl.ThresholdIDs += utils.InfieldSep +func APItoModelTrends(tr *utils.TPTrendsProfile) (mdls TrendsMdls) { + if tr != nil && len(tr.Metrics) != 0 { + for i, metric := range tr.Metrics { + mdl := &TrendsMdl{ + Tpid: tr.TPid, + Tenant: tr.Tenant, + ID: tr.ID, } - mdl.ThresholdIDs += threshold - } - mdls = append(mdls, mdl) - } - for i, filterID := range tpSR.PurgeFilterIDs { - mdl := &TrendsMdl{ - Tpid: tpSR.TPid, - Tenant: tpSR.Tenant, - ID: tpSR.ID, - } - if i == 0 { - mdl.QueueLength = tpSR.QueueLength - mdl.QueryInterval = tpSR.QueryInterval - mdl.StatID = tpSR.StatID - mdl.TTL = tpSR.TTL - mdl.Trend = tpSR.Trend - for i, threshold := range tpSR.ThresholdIDs { - if i != 0 { - mdl.ThresholdIDs += utils.InfieldSep + if i == 0 { + for i, threshold := range tr.ThresholdIDs { + if i != 0 { + mdl.ThresholdIDs += utils.InfieldSep + } + mdl.ThresholdIDs += threshold } - mdl.ThresholdIDs += threshold + mdl.Schedule = tr.Schedule + mdl.QueueLength = tr.QueueLength + mdl.StatID = tr.StatID + mdl.Trend = tr.Trend + mdl.TrendType = tr.TrendType + mdl.TTL = tr.TTL } + mdl.TrendSwingMargin = metric.TrendSwingMargin + mdl.Metrics = metric.MetricID + mdls = append(mdls, mdl) } - mdl.PurgeFilterIDs = filterID - mdls = append(mdls, mdl) } return } -func APItoTrends(tpSR *utils.TPTrendsProfile) (sr *TrendProfile, err error) { +func APItoTrends(tr *utils.TPTrendsProfile) (sr *TrendProfile, err error) { sr = &TrendProfile{ - Tenant: tpSR.Tenant, - ID: tpSR.ID, - StatID: tpSR.StatID, - QueueLength: tpSR.QueueLength, - Trend: tpSR.Trend, - PurgeFilterIDs: make([]string, len(tpSR.PurgeFilterIDs)), - ThresholdIDs: make([]string, len(tpSR.ThresholdIDs)), + Tenant: tr.Tenant, + ID: tr.ID, + StatID: tr.StatID, + Schedule: tr.Schedule, + QueueLength: tr.QueueLength, + Metrics: make([]MetricWithSettings, len(tr.Metrics)), + TrendType: tr.TrendType, + ThresholdIDs: make([]string, len(tr.ThresholdIDs)), } - if tpSR.TTL != utils.EmptyString { - if sr.TTL, err = utils.ParseDurationWithNanosecs(tpSR.TTL); err != nil { + if tr.TTL != utils.EmptyString { + if sr.TTL, err = utils.ParseDurationWithNanosecs(tr.TTL); err != nil { return } } - if tpSR.QueryInterval != utils.EmptyString { - if sr.QueryInterval, err = utils.ParseDurationWithNanosecs(tpSR.QueryInterval); err != nil { - return + copy(sr.ThresholdIDs, tr.ThresholdIDs) + for i, metric := range sr.Metrics { + tr.Metrics[i] = utils.MetricWithSettings{ + MetricID: metric.MetricID, + TrendSwingMargin: metric.TrendSwingMargin, } } - copy(sr.ThresholdIDs, tpSR.ThresholdIDs) - copy(sr.PurgeFilterIDs, tpSR.PurgeFilterIDs) return } -func TrendProfileToAPI(sr *TrendProfile) (tpSR *utils.TPTrendsProfile) { +func TrendProfileToAPI(tr *TrendProfile) (tpSR *utils.TPTrendsProfile) { tpSR = &utils.TPTrendsProfile{ - Tenant: sr.Tenant, - ID: sr.ID, - PurgeFilterIDs: make([]string, len(sr.PurgeFilterIDs)), - ThresholdIDs: make([]string, len(sr.ThresholdIDs)), - StatID: sr.StatID, - QueueLength: sr.QueueLength, - Trend: sr.Trend, + Tenant: tr.Tenant, + ID: tr.ID, + Schedule: tr.Schedule, + StatID: tr.StatID, + ThresholdIDs: make([]string, len(tr.ThresholdIDs)), + Metrics: make([]utils.MetricWithSettings, len(tr.Metrics)), + QueueLength: tr.QueueLength, + TrendType: tr.TrendType, } - if sr.TTL != time.Duration(0) { - tpSR.TTL = sr.TTL.String() + if tr.TTL != time.Duration(0) { + tpSR.TTL = tr.TTL.String() } - if sr.QueryInterval != time.Duration(0) { - tpSR.QueryInterval = sr.QueryInterval.String() + copy(tpSR.ThresholdIDs, tr.ThresholdIDs) + for i, metric := range tr.Metrics { + tpSR.Metrics[i] = utils.MetricWithSettings{ + MetricID: metric.MetricID, + TrendSwingMargin: metric.TrendSwingMargin, + } } - copy(tpSR.ThresholdIDs, sr.ThresholdIDs) - copy(tpSR.PurgeFilterIDs, sr.PurgeFilterIDs) return } diff --git a/engine/models.go b/engine/models.go index de39434fe..3e4884816 100644 --- a/engine/models.go +++ b/engine/models.go @@ -302,18 +302,20 @@ func (RankingsMdl) TableName() string { } type TrendsMdl struct { - PK uint `gorm:"primary_key"` - Tpid string - Tenant string `index:"0" re:".*"` - ID string `index:"1" re:".*"` - QueryInterval string `index:"2" re:".*"` - StatID string `index:"3" re:".*"` - QueueLength int `index:"4" re:".*"` - TTL string `index:"5" re:".*"` - PurgeFilterIDs string `index:"6" re:".*"` - Trend string `index:"7" re:".*"` - ThresholdIDs string `index:"8" re:".*"` - CreatedAt time.Time + PK uint `gorm:"primary_key"` + Tpid string + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + Schedule string `index:"2" re:".*"` + StatID string `index:"3" re:".*"` + Metrics string `index:"4" re:".*"` + TrendSwingMargin float64 `index:"5" re:".*"` + QueueLength int `index:"6" re:".*"` + TTL string `index:"7" re:".*"` + Trend string `index:"8" re:".*"` + TrendType string `index:"9" re:".*"` + ThresholdIDs string `index:"10" re:".*"` + CreatedAt time.Time } func (TrendsMdl) TableName() string { diff --git a/engine/storage_sql.go b/engine/storage_sql.go index c250a1d7b..0a8d9430c 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -1462,7 +1462,7 @@ func (sqls *SQLStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProf } func (sqls *SQLStorage) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error) { - var srs TrendsMdls + var trs TrendsMdls q := sqls.db.Where("tpid = ?", tpid) if len(id) != 0 { q = q.Where("id = ?", id) @@ -1470,14 +1470,14 @@ func (sqls *SQLStorage) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsP if len(tenant) != 0 { q = q.Where("tenant = ?", tenant) } - if err := q.Find(&srs).Error; err != nil { + if err := q.Find(&trs).Error; err != nil { return nil, err } - asrs := srs.AsTPTrends() - if len(asrs) == 0 { - return asrs, utils.ErrNotFound + atrs := trs.AsTPTrends() + if len(atrs) == 0 { + return atrs, utils.ErrNotFound } - return asrs, nil + return atrs, nil } func (sqls *SQLStorage) GetTPRankings(tpid string, tenant string, id string) ([]*utils.TPRankingProfile, error) { diff --git a/engine/trends.go b/engine/trends.go index 768d1bda9..0e001753e 100644 --- a/engine/trends.go +++ b/engine/trends.go @@ -26,15 +26,21 @@ import ( ) type TrendProfile struct { - Tenant string - ID string - QueryInterval time.Duration - StatID string - QueueLength int - TTL time.Duration - PurgeFilterIDs []string - Trend string - ThresholdIDs []string + Tenant string + ID string + Schedule string // Cron expression scheduling gathering of the metrics + StatID string + Metrics []MetricWithSettings + QueueLength int + TTL time.Duration + TrendType string // *last, *average + ThresholdIDs []string +} + +// MetricWithSettings adds specific settings to the Metric +type MetricWithSettings struct { + MetricID string + TrendSwingMargin float64 // allow this margin for *neutral trend } type TrendProfileWithAPIOpts struct { @@ -53,11 +59,18 @@ type TrendWithAPIOpts struct { // Trend is the unit matched by filters type Trend struct { - Tenant string - ID string - Trend string - QueueLength int - trPrfl *TrendProfile + Tenant string + ID string + RunTimes []time.Time + Metrics map[time.Time]map[string]MetricWithTrend + totals map[string]float64 // cached sum, used for average calculations +} + +// MetricWithTrend represents one read from StatS +type MetricWithTrend struct { + ID string // Metric ID + Value float64 // Metric Value + Trend string // *positive, *negative, *neutral } func (tr *Trend) TenantID() string { diff --git a/utils/apitpdata.go b/utils/apitpdata.go index ffbf7a61b..b928b462f 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -1000,18 +1000,25 @@ type TPRankingProfile struct { ThresholdIDs []string } +// MetricWithSettings adds specific settings to the Metric +type MetricWithSettings struct { + MetricID string + TrendSwingMargin float64 // allow this margin for *neutral trend +} + // TPTrendProfile is used in APIs to manage remotely offline TrendProfile type TPTrendsProfile struct { - TPid string - Tenant string - ID string - QueryInterval string - StatID string - QueueLength int - TTL string - PurgeFilterIDs []string - Trend string - ThresholdIDs []string + TPid string + Tenant string + ID string + Schedule string + StatID string + Metrics []MetricWithSettings + QueueLength int + TTL string + Trend string + TrendType string + ThresholdIDs []string } // TPThresholdProfile is used in APIs to manage remotely offline ThresholdProfile diff --git a/utils/consts.go b/utils/consts.go index 4ea8bb877..479a38ad9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -637,12 +637,11 @@ const ( RouteFilterIDs = "RouteFilterIDs" AttributeFilterIDs = "AttributeFilterIDs" QueueLength = "QueueLength" - QueryInterval = "QueryInterval" - PurgeFilterIDs = "PurgeFilterIDs" - Trend = "Trend" + TrendType = "TrendType" TTL = "TTL" MinItems = "MinItems" MetricIDs = "MetricIDs" + Metrics = "Metrics" MetricFilterIDs = "MetricFilterIDs" FieldName = "FieldName" Path = "Path"