From 0400f4a177613e9a423c6daee7901316f852b5a4 Mon Sep 17 00:00:00 2001 From: gezimbll Date: Wed, 21 Jan 2026 15:22:21 +0100 Subject: [PATCH] added all new subsytems to load with tpreader --- apis/trends.go | 33 ++++ engine/datamanager.go | 4 +- engine/loader_csv_test.go | 94 ++++++++- engine/model_helpers.go | 240 +++++++++++++++++++++++ engine/models.go | 24 +++ engine/storage_csv.go | 28 ++- engine/storage_interface.go | 1 + engine/tpreader.go | 123 +++++++++++- engine/tpreader_test.go | 12 ++ general_tests/trends_schedule_it_test.go | 20 +- utils/apitpdata.go | 20 ++ utils/consts.go | 12 +- 12 files changed, 599 insertions(+), 12 deletions(-) diff --git a/apis/trends.go b/apis/trends.go index e7c341aa3..a5eb3aa21 100644 --- a/apis/trends.go +++ b/apis/trends.go @@ -19,6 +19,9 @@ along with this program. If not, see package apis import ( + "fmt" + "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/trends" "github.com/cgrates/cgrates/utils" @@ -129,6 +132,21 @@ func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProf if err = adms.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil { return utils.APIErrorHandler(err) } + //generate a loadID for CacheTrendProfiles and store it in database + loadID := time.Now().UnixNano() + if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil { + return utils.APIErrorHandler(err) + } + // delay if needed before cache call + if adms.cfg.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay)) + time.Sleep(adms.cfg.GeneralCfg().CachingDelay) + } + //handle caching for TrendProfile + if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles, + arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } @@ -145,6 +163,21 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan if err := adms.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if adms.cfg.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay)) + time.Sleep(adms.cfg.GeneralCfg().CachingDelay) + } + //handle caching for TrendProfile + if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles, + utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheTrendProfiles and store it in database + loadID := time.Now().UnixNano() + if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } diff --git a/engine/datamanager.go b/engine/datamanager.go index 83e4265dc..a8de420b0 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1589,7 +1589,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str if dm == nil { return utils.ErrNoDatabaseConn } - oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, true, false, utils.NonTransactional) + oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, false, false, utils.NonTransactional) if err != nil && err != utils.ErrNotFound { return err } @@ -1613,7 +1613,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, dbCfg.RplCache, utils.EmptyString)}, itm) } - return + return dm.RemoveRanking(ctx, tenant, id) } func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) { tntID := utils.ConcatenatedKey(tenant, id) diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go index f0f51f0b5..686b009fe 100644 --- a/engine/loader_csv_test.go +++ b/engine/loader_csv_test.go @@ -26,6 +26,8 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" ) func TestLoaderCSV(t *testing.T) { @@ -85,6 +87,14 @@ func TestLoaderCSV(t *testing.T) { #Tenant[0],Id[1],FilterIDs[2],Weights[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Thresholds[9] cgrates.org,ResGroup21,*string:~*req.Account:1001,;10,1s,2,call,true,true, cgrates.org,ResGroup22,*string:~*req.Account:dan,;10,3600s,2,premium_call,true,true, +` + IPCSVContent := ` +#Tenant[0],ID[1],FilterIDs[2],Weights[3],TTL[4],Stored[5],PoolID[6],PoolFilterIDs[7],PoolType[8],PoolRange[9],PoolStrategy[10],PoolMessage[11],PoolWeights[12],PoolBlockers[13] +cgrates.org,IPs1,*string:~*req.Account:1001,;10,1s,true,,,,,,,, +cgrates.org,IPs1,,,,,POOL1,*string:~*req.Destination:2001,*ipv4,172.16.1.1/32,*ascending,alloc_success,;15, +cgrates.org,IPs1,,,,,POOL1,,,,,,*exists:~*req.NeedMoreWeight:;50,*exists:~*req.ShouldBlock:;true +cgrates.org,IPs1,,,,,POOL2,*string:~*req.Destination:2002,*ipv4,192.168.122.1/32,*random,alloc_new,;25,;true +cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Destination:3001,*ipv4,127.0.0.1/32,*descending,alloc_msg,;35,;true ` StatsCSVContent := ` #Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],Metrics[10],MetricFilterIDs[11],MetricBlockers[12] @@ -171,7 +181,7 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals } dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: idb}, config.CgrConfig().DbCfg()) csvr, err := NewTpReader(dbCM, NewStringCSVStorage(utils.CSVSep, - ResourcesCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent, + ResourcesCSVContent, IPCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent, RoutesCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent, DispatcherHostCSVContent, RateProfileCSVContent, ActionProfileCSVContent, AccountCSVContent), testTPID, "", nil, nil) if err != nil { @@ -183,6 +193,9 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals if err := csvr.LoadResourceProfiles(); err != nil { log.Print("error in LoadResourceProfiles:", err) } + if err := csvr.LoadIPs(); err != nil { + log.Print("error in LoadIPProfiles:") + } if err := csvr.LoadStats(); err != nil { log.Print("error in LoadStats:", err) } @@ -246,6 +259,85 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals } }) + t.Run("load IPProfiles", func(t *testing.T) { + eIPsProfiles := map[utils.TenantID]*utils.TPIPProfile{ + {Tenant: "cgrates.org", ID: "IPs1"}: { + TPid: "LoaderCSVTests", + Tenant: "cgrates.org", + ID: "IPs1", + FilterIDs: []string{"*string:~*req.Account:1001"}, + Weights: ";10", + TTL: "1s", + Stored: true, + Pools: []*utils.TPIPPool{ + { + ID: "POOL1", + FilterIDs: []string{"*string:~*req.Destination:2001"}, + Type: "*ipv4", + Range: "172.16.1.1/32", + Strategy: "*ascending", + Message: "alloc_success", + Weights: ";15", + Blockers: "", + }, + { + ID: "POOL1", + FilterIDs: nil, + Type: "", + Range: "", + Strategy: "", + Message: "", + Weights: "*exists:~*req.NeedMoreWeight:;50", + Blockers: "*exists:~*req.ShouldBlock:;true", + }, + { + ID: "POOL2", + FilterIDs: []string{"*string:~*req.Destination:2002"}, + Type: "*ipv4", + Range: "192.168.122.1/32", + Strategy: "*random", + Message: "alloc_new", + Weights: ";25", + Blockers: ";true", + }, + }, + }, + {Tenant: "cgrates.org", ID: "IPs2"}: { + TPid: "LoaderCSVTests", + Tenant: "cgrates.org", + ID: "IPs2", + FilterIDs: []string{"*string:~*req.Account:1002"}, + Weights: ";20", + TTL: "2s", + Stored: false, + Pools: []*utils.TPIPPool{ + { + ID: "POOL1", + FilterIDs: []string{"*string:~*req.Destination:3001"}, + Type: "*ipv4", + Range: "127.0.0.1/32", + Strategy: "*descending", + Message: "alloc_msg", + Weights: ";35", + Blockers: ";true", + }, + }, + }} + + if len(csvr.ipProfiles) != len(eIPsProfiles) { + t.Errorf("Failed to load IPProfiles: %s", utils.ToIJSON(csvr.ipProfiles)) + } + for key, val := range eIPsProfiles { + if diff := cmp.Diff(val, csvr.ipProfiles[key], cmpopts.SortSlices(func(a, b *utils.TPIPPool) bool { + if a.ID != b.ID { + return a.ID < b.ID + } + return a.Weights < b.Weights + })); diff != "" { + t.Errorf("IPProfile mismatch (-want +got):\n%s", diff) + } + } + }) t.Run("load StatProfiles", func(t *testing.T) { eStats := map[utils.TenantID]*utils.TPStatProfile{ {Tenant: "cgrates.org", ID: "TestStats"}: { diff --git a/engine/model_helpers.go b/engine/model_helpers.go index e2dcc506c..4558451de 100644 --- a/engine/model_helpers.go +++ b/engine/model_helpers.go @@ -310,6 +310,246 @@ func ResourceProfileToAPI(rp *utils.ResourceProfile) (tpRL *utils.TPResourceProf return } +type IPMdls []*IPMdl + +// CSVHeader return the header for csv fields as a slice of string +func (tps IPMdls) CSVHeader() []string { + return []string{ + "#" + utils.Tenant, + utils.ID, + utils.FilterIDs, + utils.Weights, + utils.TTL, + utils.Stored, + utils.PoolID, + utils.PoolFilterIDs, + utils.PoolType, + utils.PoolRange, + utils.PoolStrategy, + utils.PoolMessage, + utils.PoolWeights, + utils.PoolBlockers, + } +} + +func (tps IPMdls) AsTPIPs() []*utils.TPIPProfile { + filterMap := make(map[string]utils.StringSet) + mst := make(map[string]*utils.TPIPProfile) + poolMap := make(map[string]map[string]*utils.TPIPPool) + for _, mdl := range tps { + tenID := (&utils.TenantID{Tenant: mdl.Tenant, ID: mdl.ID}).TenantID() + tpip, found := mst[tenID] + if !found { + tpip = &utils.TPIPProfile{ + TPid: mdl.Tpid, + Tenant: mdl.Tenant, + ID: mdl.ID, + Stored: mdl.Stored, + } + } + // Handle Pool + if mdl.PoolID != utils.EmptyString { + if _, has := poolMap[tenID]; !has { + poolMap[tenID] = make(map[string]*utils.TPIPPool) + } + poolID := mdl.PoolID + if mdl.PoolFilterIDs != utils.EmptyString { + poolID = utils.ConcatenatedKey(poolID, + utils.NewStringSet(strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)).Sha1()) + } + pool, found := poolMap[tenID][poolID] + if !found { + pool = &utils.TPIPPool{ + ID: mdl.PoolID, + Type: mdl.PoolType, + Range: mdl.PoolRange, + Strategy: mdl.PoolStrategy, + Message: mdl.PoolMessage, + Weights: mdl.PoolWeights, + Blockers: mdl.PoolBlockers, + } + } + if mdl.PoolFilterIDs != utils.EmptyString { + poolFilterSplit := strings.Split(mdl.PoolFilterIDs, utils.InfieldSep) + pool.FilterIDs = append(pool.FilterIDs, poolFilterSplit...) + } + poolMap[tenID][poolID] = pool + } + // Profile-level fields + if mdl.TTL != utils.EmptyString { + tpip.TTL = mdl.TTL + } + if mdl.Weights != "" { + tpip.Weights = mdl.Weights + } + if mdl.Stored { + tpip.Stored = mdl.Stored + } + + if mdl.FilterIDs != utils.EmptyString { + if _, has := filterMap[tenID]; !has { + filterMap[tenID] = make(utils.StringSet) + } + filterMap[tenID].AddSlice(strings.Split(mdl.FilterIDs, utils.InfieldSep)) + } + mst[tenID] = tpip + } + // Build result with Pools + result := make([]*utils.TPIPProfile, len(mst)) + i := 0 + for tntID, tpip := range mst { + result[i] = tpip + for _, poolData := range poolMap[tntID] { + result[i].Pools = append(result[i].Pools, poolData) + } + result[i].FilterIDs = filterMap[tntID].AsSlice() + i++ + } + return result +} + +func APItoModelIP(tp *utils.TPIPProfile) IPMdls { + if tp == nil { + return nil + } + var mdls IPMdls + // Handle case with no pools + if len(tp.Pools) == 0 { + mdl := &IPMdl{ + Tpid: tp.TPid, + Tenant: tp.Tenant, + ID: tp.ID, + TTL: tp.TTL, + Stored: tp.Stored, + Weights: tp.Weights, + } + for i, val := range tp.FilterIDs { + if i != 0 { + mdl.FilterIDs += utils.InfieldSep + } + mdl.FilterIDs += val + } + mdls = append(mdls, mdl) + return mdls + } + for i, pool := range tp.Pools { + mdl := &IPMdl{ + Tpid: tp.TPid, + Tenant: tp.Tenant, + ID: tp.ID, + Stored: tp.Stored, + } + if i == 0 { + // Profile-level fields only on first row + mdl.TTL = tp.TTL + mdl.Weights = tp.Weights + for j, val := range tp.FilterIDs { + if j != 0 { + mdl.FilterIDs += utils.InfieldSep + } + mdl.FilterIDs += val + } + } + // Pool fields on every row + mdl.PoolID = pool.ID + mdl.PoolType = pool.Type + mdl.PoolRange = pool.Range + mdl.PoolStrategy = pool.Strategy + mdl.PoolMessage = pool.Message + mdl.PoolWeights = pool.Weights + mdl.PoolBlockers = pool.Blockers + for j, val := range pool.FilterIDs { + if j != 0 { + mdl.PoolFilterIDs += utils.InfieldSep + } + mdl.PoolFilterIDs += val + } + mdls = append(mdls, mdl) + } + return mdls +} + +func APItoIP(tp *utils.TPIPProfile) (*utils.IPProfile, error) { + ipp := &utils.IPProfile{ + Tenant: tp.Tenant, + ID: tp.ID, + Stored: tp.Stored, + FilterIDs: make([]string, len(tp.FilterIDs)), + Pools: make([]*utils.IPPool, len(tp.Pools)), + } + if tp.Weights != utils.EmptyString { + var err error + ipp.Weights, err = utils.NewDynamicWeightsFromString(tp.Weights, utils.InfieldSep, utils.ANDSep) + if err != nil { + return nil, err + } + } + if tp.TTL != utils.EmptyString { + var err error + if ipp.TTL, err = utils.ParseDurationWithNanosecs(tp.TTL); err != nil { + return nil, err + } + } + + copy(ipp.FilterIDs, tp.FilterIDs) + + for i, pool := range tp.Pools { + ipp.Pools[i] = &utils.IPPool{ + ID: pool.ID, + FilterIDs: pool.FilterIDs, + Type: pool.Type, + Range: pool.Range, + Strategy: pool.Strategy, + Message: pool.Message, + } + if pool.Weights != utils.EmptyString { + var err error + ipp.Pools[i].Weights, err = utils.NewDynamicWeightsFromString(pool.Weights, utils.InfieldSep, utils.ANDSep) + if err != nil { + return nil, err + } + } + if pool.Blockers != utils.EmptyString { + var err error + ipp.Pools[i].Blockers, err = utils.NewDynamicBlockersFromString(pool.Blockers, utils.InfieldSep, utils.ANDSep) + if err != nil { + return nil, err + } + } + } + return ipp, nil +} + +func IPProfileToAPI(ipp *utils.IPProfile) *utils.TPIPProfile { + tp := &utils.TPIPProfile{ + Tenant: ipp.Tenant, + ID: ipp.ID, + FilterIDs: make([]string, len(ipp.FilterIDs)), + Weights: ipp.Weights.String(utils.InfieldSep, utils.ANDSep), + Stored: ipp.Stored, + Pools: make([]*utils.TPIPPool, len(ipp.Pools)), + } + if ipp.TTL != time.Duration(0) { + tp.TTL = ipp.TTL.String() + } + + copy(tp.FilterIDs, ipp.FilterIDs) + + for i, pool := range ipp.Pools { + tp.Pools[i] = &utils.TPIPPool{ + ID: pool.ID, + FilterIDs: pool.FilterIDs, + Type: pool.Type, + Range: pool.Range, + Blockers: pool.Blockers.String(utils.InfieldSep, utils.ANDSep), + Weights: pool.Weights.String(utils.InfieldSep, utils.ANDSep), + Strategy: pool.Strategy, + Message: pool.Message, + } + } + return tp +} + type StatMdls []*StatMdl // CSVHeader return the header for csv fields as a slice of string diff --git a/engine/models.go b/engine/models.go index df674b7d6..38b694285 100644 --- a/engine/models.go +++ b/engine/models.go @@ -47,6 +47,30 @@ func (ResourceMdl) TableName() string { return utils.TBLTPResources } +type IPMdl struct { + PK uint `gorm:"primary_key"` + Tpid string + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + FilterIDs string `index:"2" re:".*"` + Weights string `index:"3" re:".*"` + TTL string `index:"4" re:".*"` + Stored bool `index:"5" re:".*"` + PoolID string `index:"6" re:".*"` + PoolFilterIDs string `index:"7" re:".*"` + PoolType string `index:"8" re:".*"` + PoolRange string `index:"9" re:".*"` + PoolStrategy string `index:"10" re:".*"` + PoolMessage string `index:"11" re:".*"` + PoolWeights string `index:"12" re:".*"` + PoolBlockers string `index:"13" re:".*"` + CreatedAt time.Time +} + +func (IPMdl) TableName() string { + return utils.TBLTPIPs +} + type StatMdl struct { PK uint `gorm:"primary_key"` Tpid string diff --git a/engine/storage_csv.go b/engine/storage_csv.go index fcd846bc6..8f1bfdab7 100644 --- a/engine/storage_csv.go +++ b/engine/storage_csv.go @@ -45,6 +45,7 @@ type CSVStorage struct { generator func() csvReaderCloser // file names resProfilesFn []string + ipsFn []string statsFn []string trendsFn []string rankingsFn []string @@ -60,13 +61,14 @@ type CSVStorage struct { // NewCSVStorage creates a CSV storege that takes the data from the paths specified func NewCSVStorage(sep rune, - resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, + resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, attributeProfilesFn, chargerProfilesFn, rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage { return &CSVStorage{ sep: sep, generator: NewCsvFile, resProfilesFn: resProfilesFn, + ipsFn: ipsFn, statsFn: statsFn, rankingsFn: rankingsFn, trendsFn: trendsFn, @@ -88,6 +90,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) { return nil, fmt.Errorf("could not retrieve any folders from %q: %v", dataPath, err) } resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv) + ipsPaths := appendName(allFoldersPath, utils.IPsCsv) statsPaths := appendName(allFoldersPath, utils.StatsCsv) rankingsPaths := appendName(allFoldersPath, utils.RankingsCsv) trendsPaths := appendName(allFoldersPath, utils.TrendsCsv) @@ -101,6 +104,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) { accountsFn := appendName(allFoldersPath, utils.AccountsCsv) return NewCSVStorage(sep, resourcesPaths, + ipsPaths, statsPaths, rankingsPaths, trendsPaths, @@ -117,11 +121,11 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) { // NewStringCSVStorage creates a csv storage from strings func NewStringCSVStorage(sep rune, - resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, + resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn, rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage { c := NewCSVStorage(sep, - []string{resProfilesFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn}, + []string{resProfilesFn}, []string{ipsFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn}, []string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn}, []string{rateProfilesFn}, []string{actionProfilesFn}, []string{accountsFn}) @@ -147,6 +151,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) { } c := NewCSVStorage(sep, getIfExist(utils.ResourcesStr), + getIfExist(utils.IPs), getIfExist(utils.Stats), getIfExist(utils.Rankings), getIfExist(utils.Trends), @@ -170,6 +175,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) { // NewURLCSVStorage returns a CSVStorage that can parse URLs func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage { var resourcesPaths []string + var ipsPaths []string var statsPaths []string var rankingsPaths []string var trendsPaths []string @@ -185,6 +191,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage { for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) { if !strings.HasSuffix(baseURL, utils.CSVSuffix) { resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv)) + ipsPaths = append(ipsPaths, joinURL(baseURL, utils.IPsCsv)) statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv)) rankingsPaths = append(rankingsPaths, joinURL(baseURL, utils.RankingsCsv)) trendsPaths = append(trendsPaths, joinURL(baseURL, utils.TrendsCsv)) @@ -201,6 +208,8 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage { switch { case strings.HasSuffix(baseURL, utils.ResourcesCsv): resourcesPaths = append(resourcesPaths, baseURL) + case strings.HasSuffix(baseURL, utils.IPsCsv): + ipsPaths = append(ipsPaths, baseURL) case strings.HasSuffix(baseURL, utils.StatsCsv): statsPaths = append(statsPaths, baseURL) case strings.HasSuffix(baseURL, utils.RankingsCsv): @@ -229,6 +238,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage { c := NewCSVStorage(sep, resourcesPaths, + ipsPaths, statsPaths, rankingsPaths, trendsPaths, @@ -320,6 +330,18 @@ func (csvs *CSVStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPReso return tpResLimits.AsTPResources(), nil } +func (csvs *CSVStorage) GetTPIPs(tpid, tenant, id string) ([]*utils.TPIPProfile, error) { + var tpIPS IPMdls + if err := csvs.proccesData(IPMdl{}, csvs.ipsFn, func(tp any) { + tpIP := tp.(IPMdl) + tpIP.Tpid = tpid + tpIPS = append(tpIPS, &tpIP) + }); err != nil { + return nil, err + } + return tpIPS.AsTPIPs(), nil +} + func (csvs *CSVStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) { var tpStats StatMdls if err := csvs.proccesData(StatMdl{}, csvs.statsFn, func(tp any) { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 16ee7d0f4..ecbcb2123 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -136,6 +136,7 @@ type LoadReader interface { GetTpTableIds(string, string, []string, map[string]string, *utils.PaginatorWithSearch) ([]string, error) GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error) + GetTPIPs(string, string, string) ([]*utils.TPIPProfile, error) GetTPStats(string, string, string) ([]*utils.TPStatProfile, error) GetTPRankings(tpid, tenant, id string) ([]*utils.TPRankingProfile, error) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error) diff --git a/engine/tpreader.go b/engine/tpreader.go index 7188b29e2..a29cee1a5 100644 --- a/engine/tpreader.go +++ b/engine/tpreader.go @@ -36,6 +36,7 @@ type TpReader struct { lr LoadReader resProfiles map[utils.TenantID]*utils.TPResourceProfile sqProfiles map[utils.TenantID]*utils.TPStatProfile + ipProfiles map[utils.TenantID]*utils.TPIPProfile trProfiles map[utils.TenantID]*utils.TPTrendsProfile rgProfiles map[utils.TenantID]*utils.TPRankingProfile thProfiles map[utils.TenantID]*utils.TPThresholdProfile @@ -67,6 +68,7 @@ func NewTpReader(db *DBConnManager, lr LoadReader, tpid, timezone string, func (tpr *TpReader) Init() { tpr.resProfiles = make(map[utils.TenantID]*utils.TPResourceProfile) + tpr.ipProfiles = make(map[utils.TenantID]*utils.TPIPProfile) tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStatProfile) tpr.rgProfiles = make(map[utils.TenantID]*utils.TPRankingProfile) tpr.thProfiles = make(map[utils.TenantID]*utils.TPThresholdProfile) @@ -120,6 +122,26 @@ func (tpr *TpReader) LoadStats() error { return tpr.LoadStatsFiltered("") } +func (tpr *TpReader) LoadIPsFiltered(tag string) (err error) { + tps, err := tpr.lr.GetTPIPs(tpr.tpid, "", tag) + if err != nil { + return err + } + mapIPs := make(map[utils.TenantID]*utils.TPIPProfile) + for _, ip := range tps { + if err = verifyInlineFilterS(ip.FilterIDs); err != nil { + return + } + mapIPs[utils.TenantID{Tenant: ip.Tenant, ID: ip.ID}] = ip + } + tpr.ipProfiles = mapIPs + return nil +} + +func (tpr *TpReader) LoadIPs() error { + return tpr.LoadIPsFiltered("") +} + func (tpr *TpReader) LoadRankingsFiltered(tag string) error { tps, err := tpr.lr.GetTPRankings(tpr.tpid, "", tag) if err != nil { @@ -330,6 +352,9 @@ func (tpr *TpReader) LoadAll() (err error) { if err = tpr.LoadRankings(); err != nil && err.Error() != utils.NotFoundCaps { return } + if err = tpr.LoadIPs(); err != nil && err.Error() != utils.NotFoundCaps { + return + } if err = tpr.LoadTrends(); err != nil && err.Error() != utils.NotFoundCaps { return } @@ -404,6 +429,25 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { loadIDs[utils.CacheResourceProfiles] = loadID loadIDs[utils.CacheResources] = loadID } + if verbose { + log.Print("IPProfiles") + } + for _, tpIP := range tpr.ipProfiles { + var ip *utils.IPProfile + if ip, err = APItoIP(tpIP); err != nil { + return + } + if err = tpr.dm.SetIPProfile(context.TODO(), ip, true); err != nil { + return + } + if verbose { + log.Print("\t", ip.TenantID()) + } + } + if len(tpr.ipProfiles) != 0 { + loadIDs[utils.CacheIPProfiles] = loadID + loadIDs[utils.CacheIPAllocations] = loadID + } if verbose { log.Print("StatQueueProfiles:") } @@ -594,6 +638,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) { func (tpr *TpReader) ShowStatistics() { // resource profiles log.Print("ResourceProfiles: ", len(tpr.resProfiles)) + // ip profiles + log.Print("IPProfiels", len(tpr.ipProfiles)) // stats log.Print("Stats: ", len(tpr.sqProfiles)) // thresholds @@ -627,7 +673,30 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) { i++ } return keys, nil - + case utils.IPProfilesPrefix: + keys := make([]string, len(tpr.ipProfiles)) + i := 0 + for k := range tpr.ipProfiles { + keys[i] = k.TenantID() + i++ + } + return keys, nil + case utils.TrendProfilePrefix: + keys := make([]string, len(tpr.trProfiles)) + i := 0 + for k := range tpr.trProfiles { + keys[i] = k.TenantID() + i++ + } + return keys, nil + case utils.RankingProfilePrefix: + keys := make([]string, len(tpr.rgProfiles)) + i := 0 + for k := range tpr.rgProfiles { + keys[i] = k.TenantID() + i++ + } + return keys, nil case utils.StatQueueProfilePrefix: keys := make([]string, len(tpr.sqProfiles)) i := 0 @@ -710,6 +779,17 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID)) } } + if verbose { + log.Print("IPProfiles") + } + for _, tpIP := range tpr.ipProfiles { + if err = tpr.dm.RemoveIPProfile(context.TODO(), tpIP.Tenant, tpIP.ID, true); err != nil { + return + } + if verbose { + log.Print("\t", utils.ConcatenatedKey(tpIP.Tenant, tpIP.ID)) + } + } if verbose { log.Print("StatQueueProfiles:") } @@ -732,6 +812,26 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID)) } } + if verbose { + log.Print("TrendProfiles:") + } + + for _, tpTr := range tpr.trProfiles { + if err = tpr.dm.RemoveTrendProfile(context.TODO(), tpTr.Tenant, tpTr.ID); err != nil { + return + } + log.Print("\t", utils.ConcatenatedKey(tpTr.Tenant, tpTr.ID)) + } + if verbose { + log.Print("RankingProfiles:") + } + for _, tpRnk := range tpr.rgProfiles { + if err = tpr.dm.RemoveRankingProfile(context.TODO(), tpRnk.Tenant, tpRnk.ID); err != nil { + return + } + log.Print("\t", utils.ConcatenatedKey(tpRnk.Tenant, tpRnk.ID)) + } + if verbose { log.Print("RouteProfiles:") } @@ -829,6 +929,10 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error loadIDs[utils.CacheResourceProfiles] = loadID loadIDs[utils.CacheResources] = loadID } + if len(tpr.ipProfiles) != 0 { + loadIDs[utils.CacheIPProfiles] = loadID + loadIDs[utils.CacheIPAllocations] = loadID + } if len(tpr.sqProfiles) != 0 { loadIDs[utils.CacheStatQueueProfiles] = loadID loadIDs[utils.CacheStatQueues] = loadID @@ -837,6 +941,14 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error loadIDs[utils.CacheThresholdProfiles] = loadID loadIDs[utils.CacheThresholds] = loadID } + if len(tpr.trProfiles) != 0 { + loadIDs[utils.CacheTrendProfiles] = loadID + loadIDs[utils.CacheTrends] = loadID + } + if len(tpr.rgProfiles) != 0 { + loadIDs[utils.CacheRankingProfiles] = loadID + loadIDs[utils.CacheRankings] = loadID + } if len(tpr.routeProfiles) != 0 { loadIDs[utils.CacheRouteProfiles] = loadID } @@ -867,6 +979,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix) stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix) trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix) + trnpfIDs, _ := tpr.GetLoadedIds(utils.TrendProfilePrefix) + rnkpfIDs, _ := tpr.GetLoadedIds(utils.RankingProfilePrefix) + ippfIDs, _ := tpr.GetLoadedIds(utils.IPProfilesPrefix) flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix) routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix) apfIDs, _ := tpr.GetLoadedIds(utils.AttributeProfilePrefix) @@ -889,6 +1004,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b utils.CacheChargerProfiles: chargerIDs, utils.CacheRateProfiles: ratePrfIDs, utils.CacheActionProfiles: actionPrfIDs, + utils.CacheTrendProfiles: trnpfIDs, + utils.CacheRankingProfiles: rnkpfIDs, + utils.CacheIPProfiles: ippfIDs, } // verify if we need to clear indexes @@ -896,6 +1014,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b if len(apfIDs) != 0 { cacheIDs = append(cacheIDs, utils.CacheAttributeFilterIndexes) } + if len(ippfIDs) != 0 { + cacheIDs = append(cacheIDs, utils.CacheIPFilterIndexes) + } if len(routeIDs) != 0 { cacheIDs = append(cacheIDs, utils.CacheRouteFilterIndexes) } diff --git a/engine/tpreader_test.go b/engine/tpreader_test.go index 34d2c612c..dd3bf2833 100644 --- a/engine/tpreader_test.go +++ b/engine/tpreader_test.go @@ -582,6 +582,7 @@ func TestTPReaderReloadCache(t *testing.T) { APIOpts: map[string]any{}, Tenant: "cgrates.org", ResourceProfileIDs: []string{"cgrates.org:resourceProfilesID"}, + IPProfileIDs: []string{"cgrates.org:ipProfilesID"}, StatsQueueProfileIDs: []string{"cgrates.org:statProfilesID"}, ThresholdProfileIDs: []string{"cgrates.org:thresholdProfilesID"}, FilterIDs: []string{"cgrates.org:filtersID"}, @@ -589,6 +590,8 @@ func TestTPReaderReloadCache(t *testing.T) { AttributeProfileIDs: []string{"cgrates.org:attributeProfilesID"}, ChargerProfileIDs: []string{"cgrates.org:chargerProfilesID"}, ResourceIDs: []string{"cgrates.org:resourceProfilesID"}, + TrendProfileIDs: []string{"cgrates.org:trendProfilesID"}, + RankingProfileIDs: []string{"cgrates.org:rankingProfileID"}, StatsQueueIDs: []string{"cgrates.org:statProfilesID"}, ThresholdIDs: []string{"cgrates.org:thresholdProfilesID"}, @@ -617,12 +620,21 @@ func TestTPReaderReloadCache(t *testing.T) { resProfiles: map[utils.TenantID]*utils.TPResourceProfile{ {Tenant: "cgrates.org", ID: "resourceProfilesID"}: {}, }, + ipProfiles: map[utils.TenantID]*utils.TPIPProfile{ + {Tenant: "cgrates.org", ID: "ipProfilesID"}: {}, + }, + rgProfiles: map[utils.TenantID]*utils.TPRankingProfile{ + {Tenant: "cgrates.org", ID: "rankingProfileID"}: {}, + }, sqProfiles: map[utils.TenantID]*utils.TPStatProfile{ {Tenant: "cgrates.org", ID: "statProfilesID"}: {}, }, thProfiles: map[utils.TenantID]*utils.TPThresholdProfile{ {Tenant: "cgrates.org", ID: "thresholdProfilesID"}: {}, }, + trProfiles: map[utils.TenantID]*utils.TPTrendsProfile{ + {Tenant: "cgrates.org", ID: "trendProfilesID"}: {}, + }, filters: map[utils.TenantID]*utils.TPFilterProfile{ {Tenant: "cgrates.org", ID: "filtersID"}: {}, }, diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go index 4d213390e..934c6ded6 100644 --- a/general_tests/trends_schedule_it_test.go +++ b/general_tests/trends_schedule_it_test.go @@ -1,5 +1,4 @@ -//go:build integration -// +build integration +//go:build flaky /* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments @@ -131,7 +130,22 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru } client, _ := ng.Run(t) - time.Sleep(100 * time.Millisecond) + // Wait for loader to finish loading all profiles (thresholds, stats, trends) + for range 20 { + var thPrfIDs, stPrfIDs []string + var trPrfs *[]*utils.TrendProfile + errTh := client.Call(context.Background(), utils.AdminSv1GetThresholdProfileIDs, + &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &thPrfIDs) + errSt := client.Call(context.Background(), utils.AdminSv1GetStatQueueProfileIDs, + &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &stPrfIDs) + errTr := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles, + &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &trPrfs) + if errTh == nil && errSt == nil && errTr == nil && + len(thPrfIDs) == 2 && len(stPrfIDs) == 2 && trPrfs != nil && len(*trPrfs) == 2 { + break + } + time.Sleep(100 * time.Millisecond) + } var tr *utils.Trend t.Run("TrendSchedule", func(t *testing.T) { var replyTrendProfiles *[]*utils.TrendProfile diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 612d2fb7f..1ecf4e8fc 100644 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -151,7 +151,27 @@ type TPResourceProfile struct { Weights string // Weight to sort the ResourceLimits ThresholdIDs []string // Thresholds to check after changing Limit } +type TPIPPool struct { + ID string + FilterIDs []string + Type string + Range string + Strategy string + Message string + Weights string + Blockers string +} +type TPIPProfile struct { + TPid string + Tenant string + ID string + FilterIDs []string + TTL string + Stored bool + Weights string + Pools []*TPIPPool +} type ArgsComputeFilterIndexIDs struct { Tenant string APIOpts map[string]any diff --git a/utils/consts.go b/utils/consts.go index 2e654530e..d38106bf2 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -74,8 +74,8 @@ var ( CacheThresholds: ThresholdPrefix, CacheFilters: FilterPrefix, CacheRouteProfiles: RouteProfilePrefix, - CacheRankingProfiles: RankingPrefix, - CacheRankings: RankingProfilePrefix, + CacheRankingProfiles: RankingProfilePrefix, + CacheRankings: RankingPrefix, CacheAttributeProfiles: AttributeProfilePrefix, CacheChargerProfiles: ChargerProfilePrefix, CacheRateProfiles: RateProfilePrefix, @@ -855,6 +855,13 @@ const ( SortingData = "SortingData" ProfileID = "ProfileID" PoolID = "PoolID" + PoolFilterIDs = "PoolFilterIDs" + PoolType = "PoolType" + PoolRange = "PoolRange" + PoolStrategy = "PoolStrategy" + PoolMessage = "PoolMessage" + PoolWeights = "PoolWeights" + PoolBlockers = "PoolBlockers" SortedRoutes = "SortedRoutes" MetaMonthly = "*monthly" MetaYearly = "*yearly" @@ -1946,6 +1953,7 @@ const ( // Table Name const ( TBLTPResources = "tp_resources" + TBLTPIPs = "tp_ips" TBLTPStats = "tp_stats" TBLTPRankings = "tp_rankings" TBLTPTrends = "tp_trends"