mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
computeRanking and V1GetRanking functions
This commit is contained in:
@@ -71,15 +71,20 @@ func (rkP *RankingProfile) Clone() (cln *RankingProfile) {
|
||||
}
|
||||
|
||||
// NewRankingFromProfile is a constructor for an empty ranking out of it's profile
|
||||
func NewRankingFromProfile(rkP *RankingProfile) *Ranking {
|
||||
return &Ranking{
|
||||
func NewRankingFromProfile(rkP *RankingProfile) (rk *Ranking) {
|
||||
rk = &Ranking{
|
||||
Tenant: rkP.Tenant,
|
||||
ID: rkP.ID,
|
||||
Sorting: rkP.Sorting,
|
||||
StatMetrics: make(map[string]map[string]float64),
|
||||
|
||||
rkPrfl: rkP,
|
||||
metricIDs: utils.NewStringSet(rkP.MetricIDs),
|
||||
}
|
||||
if rkP.SortingParameters != nil {
|
||||
copy(rk.SortingParameters, rkP.SortingParameters)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type RankingWithAPIOpts struct {
|
||||
|
||||
@@ -75,24 +75,19 @@ type RankingS struct {
|
||||
//
|
||||
// it is to be called by Cron service
|
||||
func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
||||
|
||||
/*rk, err := rkS.dm.GetRanking(tP.Tenant, tP.ID, true, true, utils.NonTransactional)
|
||||
rk, err := rkS.dm.GetRanking(rkP.Tenant, rkP.ID, true, true, utils.NonTransactional)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> querying rkP with id: <%s:%s> dm error: <%s>",
|
||||
utils.RankingS, tP.Tenant, tP.ID, err.Error()))
|
||||
"<%s> querying RankingProfile with ID: <%s:%s> dm error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
*/
|
||||
rk := NewRankingFromProfile(rkP)
|
||||
rk.rMux.Lock()
|
||||
defer rk.rMux.Unlock()
|
||||
/*if trnd.tPrfl == nil {
|
||||
trnd.tPrfl = tP
|
||||
if rk.rkPrfl == nil {
|
||||
rk.rkPrfl = rkP
|
||||
}
|
||||
*/
|
||||
|
||||
for _, statID := range rkP.StatIDs {
|
||||
var floatMetrics map[string]float64
|
||||
if err := rkS.connMgr.Call(context.Background(), rkS.cgrcfg.RankingSCfg().StatSConns,
|
||||
@@ -101,7 +96,7 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
||||
&floatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> computing Ranking with id: <%s:%s> for stats <%s> error: <%s>",
|
||||
"<%s> computing Ranking with ID: <%s:%s> for Stats <%s> error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, statID, err.Error()))
|
||||
return
|
||||
}
|
||||
@@ -119,16 +114,21 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
||||
rk.StatMetrics[statID][metricID] = val
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
if err = rkS.storeRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> setting Ranking with id: <%s:%s> DM error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
*/
|
||||
if rk.SortedStatIDs, err = rankingSortStats(rkP.Sorting,
|
||||
rkP.SortingParameters, rk.StatMetrics); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> sorting stats for Ranking with ID: <%s:%s> error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
if err = rkS.storeRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
"<%s> storing Ranking with ID: <%s:%s> DM error: <%s>",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
return
|
||||
}
|
||||
if err := rkS.processThresholds(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf(
|
||||
@@ -141,7 +141,6 @@ func (rkS *RankingS) computeRanking(rkP *RankingProfile) {
|
||||
"<%s> Trend with id <%s:%s> error: <%s> with EEs",
|
||||
utils.RankingS, rkP.Tenant, rkP.ID, err.Error()))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// processThresholds will pass the Ranking event to ThresholdS
|
||||
@@ -149,7 +148,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.TrendSCfg().ThresholdSConns) == 0 {
|
||||
if len(rkS.cgrcfg.RankingSCfg().ThresholdSConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
@@ -176,7 +175,7 @@ func (rkS *RankingS) processThresholds(rk *Ranking) (err error) {
|
||||
}
|
||||
var withErrs bool
|
||||
var rkIDs []string
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().ThresholdSConns,
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().ThresholdSConns,
|
||||
utils.ThresholdSv1ProcessEvent, ev, &rkIDs); err != nil &&
|
||||
(len(thIDs) != 0 || err.Error() != utils.ErrNotFound.Error()) {
|
||||
utils.Logger.Warning(
|
||||
@@ -194,7 +193,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
||||
if len(rk.SortedStatIDs) == 0 {
|
||||
return
|
||||
}
|
||||
if len(rkS.cgrcfg.TrendSCfg().EEsConns) == 0 {
|
||||
if len(rkS.cgrcfg.RankingSCfg().EEsConns) == 0 {
|
||||
return
|
||||
}
|
||||
opts := map[string]any{
|
||||
@@ -211,7 +210,7 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
||||
}
|
||||
var withErrs bool
|
||||
var reply map[string]map[string]any
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.TrendSCfg().EEsConns,
|
||||
if err := rkS.connMgr.Call(context.TODO(), rkS.cgrcfg.RankingSCfg().EEsConns,
|
||||
utils.EeSv1ProcessEvent, ev, &reply); err != nil &&
|
||||
err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(
|
||||
@@ -226,15 +225,12 @@ func (rkS *RankingS) processEEs(rk *Ranking) (err error) {
|
||||
|
||||
// storeTrend will store or schedule the trend based on settings
|
||||
func (rkS *RankingS) storeRanking(rk *Ranking) (err error) {
|
||||
if rkS.cgrcfg.TrendSCfg().StoreInterval == 0 {
|
||||
if rkS.cgrcfg.RankingSCfg().StoreInterval == 0 {
|
||||
return
|
||||
}
|
||||
/*
|
||||
if rkS.cgrcfg.TrendSCfg().StoreInterval == -1 {
|
||||
return rkS.dm.SetRanking(rk)
|
||||
}
|
||||
*/
|
||||
|
||||
if rkS.cgrcfg.RankingSCfg().StoreInterval == -1 {
|
||||
return rkS.dm.SetRanking(rk)
|
||||
}
|
||||
// schedule the asynchronous save, relies for Ranking to be in cache
|
||||
rkS.sRksMux.Lock()
|
||||
rkS.storedRankings.Add(rk.rkPrfl.TenantID())
|
||||
@@ -268,14 +264,12 @@ func (rkS *RankingS) storeRankings() {
|
||||
}
|
||||
rk := rkIf.(*Ranking)
|
||||
rk.rMux.RLock()
|
||||
/*
|
||||
if err := rkS.dm.SetRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
||||
utils.RankingS, rkID, err))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
*/
|
||||
if err := rkS.dm.SetRanking(rk); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> failed storing Trend with ID: %q, err: %q",
|
||||
utils.RankingS, rkID, err))
|
||||
failedRkIDs = append(failedRkIDs, rkID) // record failure so we can schedule it for next backup
|
||||
}
|
||||
rk.rMux.RUnlock()
|
||||
// randomize the CPU load and give up thread control
|
||||
runtime.Gosched()
|
||||
@@ -289,7 +283,7 @@ func (rkS *RankingS) storeRankings() {
|
||||
|
||||
// asyncStoreRankings runs as a backround process, calling storeRankings based on storeInterval
|
||||
func (rkS *RankingS) asyncStoreRankings() {
|
||||
storeInterval := rkS.cgrcfg.TrendSCfg().StoreInterval
|
||||
storeInterval := rkS.cgrcfg.RankingSCfg().StoreInterval
|
||||
if storeInterval <= 0 {
|
||||
close(rkS.storingStopped)
|
||||
return
|
||||
@@ -358,7 +352,7 @@ func (rkS *RankingS) Reload() {
|
||||
// scheduleAutomaticQueries will schedule the queries at start/reload based on configured
|
||||
func (rkS *RankingS) scheduleAutomaticQueries() error {
|
||||
schedData := make(map[string][]string)
|
||||
for k, v := range rkS.cgrcfg.TrendSCfg().ScheduledIDs {
|
||||
for k, v := range rkS.cgrcfg.RankingSCfg().ScheduledIDs {
|
||||
schedData[k] = v
|
||||
}
|
||||
var tnts []string
|
||||
@@ -438,10 +432,8 @@ func (rkS *RankingS) V1ScheduleQueries(ctx *context.Context, args *utils.ArgSche
|
||||
return
|
||||
}
|
||||
|
||||
/*
|
||||
// V1GetRanking is the API to return the Ranking instance
|
||||
//
|
||||
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking, retRanking *Ranking) (err error) {
|
||||
func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, retRanking *Ranking) (err error) {
|
||||
if missing := utils.MissingStructFields(arg, []string{utils.ID}); len(missing) != 0 { //Params missing
|
||||
return utils.NewErrMandatoryIeMissing(missing...)
|
||||
}
|
||||
@@ -449,49 +441,22 @@ func (rkS *RankingS) V1GetRanking(ctx *context.Context, arg *utils.ArgGetRanking
|
||||
if rk, err = rkS.dm.GetRanking(arg.Tenant, arg.ID, true, true, utils.NonTransactional); err != nil {
|
||||
return
|
||||
}
|
||||
retRanking.Tenant = trnd.Tenant // avoid vet complaining for mutex copying
|
||||
retTrend.ID = trnd.ID
|
||||
starrkIDx := arg.RunIndexStart
|
||||
if starrkIDx > len(trnd.RunTimes) {
|
||||
starrkIDx = len(trnd.RunTimes)
|
||||
}
|
||||
endIdx := arg.RunIndexEnd
|
||||
if endIdx > len(trnd.RunTimes) ||
|
||||
endIdx < starrkIDx ||
|
||||
endIdx == 0 {
|
||||
endIdx = len(trnd.RunTimes)
|
||||
}
|
||||
runTimes := trnd.RunTimes[starrkIDx:endIdx]
|
||||
if len(runTimes) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
var tStart, tEnd time.Time
|
||||
if arg.RunTimeStart == utils.EmptyString {
|
||||
tStart = runTimes[0]
|
||||
} else if tStart, err = utils.ParseTimeDetectLayout(arg.RunTimeStart, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
if arg.RunTimeEnd == utils.EmptyString {
|
||||
tEnd = runTimes[len(runTimes)-1].Add(time.Duration(1))
|
||||
} else if tEnd, err = utils.ParseTimeDetectLayout(arg.RunTimeEnd, rkS.cgrcfg.GeneralCfg().DefaultTimezone); err != nil {
|
||||
return
|
||||
}
|
||||
retTrend.RunTimes = make([]time.Time, 0, len(runTimes))
|
||||
for _, runTime := range runTimes {
|
||||
if !runTime.Before(tStart) && runTime.Before(tEnd) {
|
||||
retTrend.RunTimes = append(retTrend.RunTimes, runTime)
|
||||
rk.rMux.RLock()
|
||||
defer rk.rMux.RUnlock()
|
||||
retRanking.Tenant = rk.Tenant // avoid vet complaining for mutex copying
|
||||
retRanking.ID = rk.ID
|
||||
retRanking.StatMetrics = make(map[string]map[string]float64)
|
||||
for statID, metrics := range rk.StatMetrics {
|
||||
retRanking.StatMetrics[statID] = make(map[string]float64)
|
||||
for metricID, val := range metrics {
|
||||
retRanking.StatMetrics[statID][metricID] = val
|
||||
}
|
||||
}
|
||||
if len(retTrend.RunTimes) == 0 { // filtered out all
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
retTrend.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
|
||||
for _, runTime := range retTrend.RunTimes {
|
||||
retTrend.Metrics[runTime] = trnd.Metrics[runTime]
|
||||
}
|
||||
retRanking.Sorting = rk.Sorting
|
||||
copy(retRanking.SortingParameters, rk.SortingParameters)
|
||||
copy(retRanking.SortedStatIDs, rk.SortedStatIDs)
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
// V1GetSchedule returns the active schedule for Raking queries
|
||||
func (rkS *RankingS) V1GetSchedule(ctx *context.Context, args *utils.ArgScheduledRankings, schedRankings *[]utils.ScheduledRanking) (err error) {
|
||||
|
||||
Reference in New Issue
Block a user