diff --git a/apis/trends.go b/apis/trends.go
index e7c341aa3..a5eb3aa21 100644
--- a/apis/trends.go
+++ b/apis/trends.go
@@ -19,6 +19,9 @@ along with this program. If not, see
package apis
import (
+ "fmt"
+ "time"
+
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/trends"
"github.com/cgrates/cgrates/utils"
@@ -129,6 +132,21 @@ func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProf
if err = adms.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil {
return utils.APIErrorHandler(err)
}
+ //generate a loadID for CacheTrendProfiles and store it in database
+ loadID := time.Now().UnixNano()
+ if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ // delay if needed before cache call
+ if adms.cfg.GeneralCfg().CachingDelay != 0 {
+ utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
+ time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
+ }
+ //handle caching for TrendProfile
+ if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles,
+ arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil {
+ return utils.APIErrorHandler(err)
+ }
*reply = utils.OK
return nil
}
@@ -145,6 +163,21 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan
if err := adms.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil {
return utils.APIErrorHandler(err)
}
+ // delay if needed before cache call
+ if adms.cfg.GeneralCfg().CachingDelay != 0 {
+ utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
+ time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
+ }
+ //handle caching for TrendProfile
+ if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles,
+ utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil {
+ return utils.APIErrorHandler(err)
+ }
+ //generate a loadID for CacheTrendProfiles and store it in database
+ loadID := time.Now().UnixNano()
+ if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
+ return utils.APIErrorHandler(err)
+ }
*reply = utils.OK
return nil
}
diff --git a/engine/datamanager.go b/engine/datamanager.go
index 83e4265dc..a8de420b0 100644
--- a/engine/datamanager.go
+++ b/engine/datamanager.go
@@ -1589,7 +1589,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
if dm == nil {
return utils.ErrNoDatabaseConn
}
- oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, true, false, utils.NonTransactional)
+ oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, false, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound {
return err
}
@@ -1613,7 +1613,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
dbCfg.RplCache, utils.EmptyString)}, itm)
}
- return
+ return dm.RemoveRanking(ctx, tenant, id)
}
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) {
tntID := utils.ConcatenatedKey(tenant, id)
diff --git a/engine/loader_csv_test.go b/engine/loader_csv_test.go
index f0f51f0b5..686b009fe 100644
--- a/engine/loader_csv_test.go
+++ b/engine/loader_csv_test.go
@@ -26,6 +26,8 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
)
func TestLoaderCSV(t *testing.T) {
@@ -85,6 +87,14 @@ func TestLoaderCSV(t *testing.T) {
#Tenant[0],Id[1],FilterIDs[2],Weights[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Thresholds[9]
cgrates.org,ResGroup21,*string:~*req.Account:1001,;10,1s,2,call,true,true,
cgrates.org,ResGroup22,*string:~*req.Account:dan,;10,3600s,2,premium_call,true,true,
+`
+ IPCSVContent := `
+#Tenant[0],ID[1],FilterIDs[2],Weights[3],TTL[4],Stored[5],PoolID[6],PoolFilterIDs[7],PoolType[8],PoolRange[9],PoolStrategy[10],PoolMessage[11],PoolWeights[12],PoolBlockers[13]
+cgrates.org,IPs1,*string:~*req.Account:1001,;10,1s,true,,,,,,,,
+cgrates.org,IPs1,,,,,POOL1,*string:~*req.Destination:2001,*ipv4,172.16.1.1/32,*ascending,alloc_success,;15,
+cgrates.org,IPs1,,,,,POOL1,,,,,,*exists:~*req.NeedMoreWeight:;50,*exists:~*req.ShouldBlock:;true
+cgrates.org,IPs1,,,,,POOL2,*string:~*req.Destination:2002,*ipv4,192.168.122.1/32,*random,alloc_new,;25,;true
+cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Destination:3001,*ipv4,127.0.0.1/32,*descending,alloc_msg,;35,;true
`
StatsCSVContent := `
#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],Metrics[10],MetricFilterIDs[11],MetricBlockers[12]
@@ -171,7 +181,7 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
}
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: idb}, config.CgrConfig().DbCfg())
csvr, err := NewTpReader(dbCM, NewStringCSVStorage(utils.CSVSep,
- ResourcesCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent,
+ ResourcesCSVContent, IPCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent,
RoutesCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent,
DispatcherHostCSVContent, RateProfileCSVContent, ActionProfileCSVContent, AccountCSVContent), testTPID, "", nil, nil)
if err != nil {
@@ -183,6 +193,9 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
if err := csvr.LoadResourceProfiles(); err != nil {
log.Print("error in LoadResourceProfiles:", err)
}
+ if err := csvr.LoadIPs(); err != nil {
+ log.Print("error in LoadIPProfiles:")
+ }
if err := csvr.LoadStats(); err != nil {
log.Print("error in LoadStats:", err)
}
@@ -246,6 +259,85 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
}
})
+ t.Run("load IPProfiles", func(t *testing.T) {
+ eIPsProfiles := map[utils.TenantID]*utils.TPIPProfile{
+ {Tenant: "cgrates.org", ID: "IPs1"}: {
+ TPid: "LoaderCSVTests",
+ Tenant: "cgrates.org",
+ ID: "IPs1",
+ FilterIDs: []string{"*string:~*req.Account:1001"},
+ Weights: ";10",
+ TTL: "1s",
+ Stored: true,
+ Pools: []*utils.TPIPPool{
+ {
+ ID: "POOL1",
+ FilterIDs: []string{"*string:~*req.Destination:2001"},
+ Type: "*ipv4",
+ Range: "172.16.1.1/32",
+ Strategy: "*ascending",
+ Message: "alloc_success",
+ Weights: ";15",
+ Blockers: "",
+ },
+ {
+ ID: "POOL1",
+ FilterIDs: nil,
+ Type: "",
+ Range: "",
+ Strategy: "",
+ Message: "",
+ Weights: "*exists:~*req.NeedMoreWeight:;50",
+ Blockers: "*exists:~*req.ShouldBlock:;true",
+ },
+ {
+ ID: "POOL2",
+ FilterIDs: []string{"*string:~*req.Destination:2002"},
+ Type: "*ipv4",
+ Range: "192.168.122.1/32",
+ Strategy: "*random",
+ Message: "alloc_new",
+ Weights: ";25",
+ Blockers: ";true",
+ },
+ },
+ },
+ {Tenant: "cgrates.org", ID: "IPs2"}: {
+ TPid: "LoaderCSVTests",
+ Tenant: "cgrates.org",
+ ID: "IPs2",
+ FilterIDs: []string{"*string:~*req.Account:1002"},
+ Weights: ";20",
+ TTL: "2s",
+ Stored: false,
+ Pools: []*utils.TPIPPool{
+ {
+ ID: "POOL1",
+ FilterIDs: []string{"*string:~*req.Destination:3001"},
+ Type: "*ipv4",
+ Range: "127.0.0.1/32",
+ Strategy: "*descending",
+ Message: "alloc_msg",
+ Weights: ";35",
+ Blockers: ";true",
+ },
+ },
+ }}
+
+ if len(csvr.ipProfiles) != len(eIPsProfiles) {
+ t.Errorf("Failed to load IPProfiles: %s", utils.ToIJSON(csvr.ipProfiles))
+ }
+ for key, val := range eIPsProfiles {
+ if diff := cmp.Diff(val, csvr.ipProfiles[key], cmpopts.SortSlices(func(a, b *utils.TPIPPool) bool {
+ if a.ID != b.ID {
+ return a.ID < b.ID
+ }
+ return a.Weights < b.Weights
+ })); diff != "" {
+ t.Errorf("IPProfile mismatch (-want +got):\n%s", diff)
+ }
+ }
+ })
t.Run("load StatProfiles", func(t *testing.T) {
eStats := map[utils.TenantID]*utils.TPStatProfile{
{Tenant: "cgrates.org", ID: "TestStats"}: {
diff --git a/engine/model_helpers.go b/engine/model_helpers.go
index e2dcc506c..4558451de 100644
--- a/engine/model_helpers.go
+++ b/engine/model_helpers.go
@@ -310,6 +310,246 @@ func ResourceProfileToAPI(rp *utils.ResourceProfile) (tpRL *utils.TPResourceProf
return
}
+type IPMdls []*IPMdl
+
+// CSVHeader return the header for csv fields as a slice of string
+func (tps IPMdls) CSVHeader() []string {
+ return []string{
+ "#" + utils.Tenant,
+ utils.ID,
+ utils.FilterIDs,
+ utils.Weights,
+ utils.TTL,
+ utils.Stored,
+ utils.PoolID,
+ utils.PoolFilterIDs,
+ utils.PoolType,
+ utils.PoolRange,
+ utils.PoolStrategy,
+ utils.PoolMessage,
+ utils.PoolWeights,
+ utils.PoolBlockers,
+ }
+}
+
+func (tps IPMdls) AsTPIPs() []*utils.TPIPProfile {
+ filterMap := make(map[string]utils.StringSet)
+ mst := make(map[string]*utils.TPIPProfile)
+ poolMap := make(map[string]map[string]*utils.TPIPPool)
+ for _, mdl := range tps {
+ tenID := (&utils.TenantID{Tenant: mdl.Tenant, ID: mdl.ID}).TenantID()
+ tpip, found := mst[tenID]
+ if !found {
+ tpip = &utils.TPIPProfile{
+ TPid: mdl.Tpid,
+ Tenant: mdl.Tenant,
+ ID: mdl.ID,
+ Stored: mdl.Stored,
+ }
+ }
+ // Handle Pool
+ if mdl.PoolID != utils.EmptyString {
+ if _, has := poolMap[tenID]; !has {
+ poolMap[tenID] = make(map[string]*utils.TPIPPool)
+ }
+ poolID := mdl.PoolID
+ if mdl.PoolFilterIDs != utils.EmptyString {
+ poolID = utils.ConcatenatedKey(poolID,
+ utils.NewStringSet(strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)).Sha1())
+ }
+ pool, found := poolMap[tenID][poolID]
+ if !found {
+ pool = &utils.TPIPPool{
+ ID: mdl.PoolID,
+ Type: mdl.PoolType,
+ Range: mdl.PoolRange,
+ Strategy: mdl.PoolStrategy,
+ Message: mdl.PoolMessage,
+ Weights: mdl.PoolWeights,
+ Blockers: mdl.PoolBlockers,
+ }
+ }
+ if mdl.PoolFilterIDs != utils.EmptyString {
+ poolFilterSplit := strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)
+ pool.FilterIDs = append(pool.FilterIDs, poolFilterSplit...)
+ }
+ poolMap[tenID][poolID] = pool
+ }
+ // Profile-level fields
+ if mdl.TTL != utils.EmptyString {
+ tpip.TTL = mdl.TTL
+ }
+ if mdl.Weights != "" {
+ tpip.Weights = mdl.Weights
+ }
+ if mdl.Stored {
+ tpip.Stored = mdl.Stored
+ }
+
+ if mdl.FilterIDs != utils.EmptyString {
+ if _, has := filterMap[tenID]; !has {
+ filterMap[tenID] = make(utils.StringSet)
+ }
+ filterMap[tenID].AddSlice(strings.Split(mdl.FilterIDs, utils.InfieldSep))
+ }
+ mst[tenID] = tpip
+ }
+ // Build result with Pools
+ result := make([]*utils.TPIPProfile, len(mst))
+ i := 0
+ for tntID, tpip := range mst {
+ result[i] = tpip
+ for _, poolData := range poolMap[tntID] {
+ result[i].Pools = append(result[i].Pools, poolData)
+ }
+ result[i].FilterIDs = filterMap[tntID].AsSlice()
+ i++
+ }
+ return result
+}
+
+func APItoModelIP(tp *utils.TPIPProfile) IPMdls {
+ if tp == nil {
+ return nil
+ }
+ var mdls IPMdls
+ // Handle case with no pools
+ if len(tp.Pools) == 0 {
+ mdl := &IPMdl{
+ Tpid: tp.TPid,
+ Tenant: tp.Tenant,
+ ID: tp.ID,
+ TTL: tp.TTL,
+ Stored: tp.Stored,
+ Weights: tp.Weights,
+ }
+ for i, val := range tp.FilterIDs {
+ if i != 0 {
+ mdl.FilterIDs += utils.InfieldSep
+ }
+ mdl.FilterIDs += val
+ }
+ mdls = append(mdls, mdl)
+ return mdls
+ }
+ for i, pool := range tp.Pools {
+ mdl := &IPMdl{
+ Tpid: tp.TPid,
+ Tenant: tp.Tenant,
+ ID: tp.ID,
+ Stored: tp.Stored,
+ }
+ if i == 0 {
+ // Profile-level fields only on first row
+ mdl.TTL = tp.TTL
+ mdl.Weights = tp.Weights
+ for j, val := range tp.FilterIDs {
+ if j != 0 {
+ mdl.FilterIDs += utils.InfieldSep
+ }
+ mdl.FilterIDs += val
+ }
+ }
+ // Pool fields on every row
+ mdl.PoolID = pool.ID
+ mdl.PoolType = pool.Type
+ mdl.PoolRange = pool.Range
+ mdl.PoolStrategy = pool.Strategy
+ mdl.PoolMessage = pool.Message
+ mdl.PoolWeights = pool.Weights
+ mdl.PoolBlockers = pool.Blockers
+ for j, val := range pool.FilterIDs {
+ if j != 0 {
+ mdl.PoolFilterIDs += utils.InfieldSep
+ }
+ mdl.PoolFilterIDs += val
+ }
+ mdls = append(mdls, mdl)
+ }
+ return mdls
+}
+
+func APItoIP(tp *utils.TPIPProfile) (*utils.IPProfile, error) {
+ ipp := &utils.IPProfile{
+ Tenant: tp.Tenant,
+ ID: tp.ID,
+ Stored: tp.Stored,
+ FilterIDs: make([]string, len(tp.FilterIDs)),
+ Pools: make([]*utils.IPPool, len(tp.Pools)),
+ }
+ if tp.Weights != utils.EmptyString {
+ var err error
+ ipp.Weights, err = utils.NewDynamicWeightsFromString(tp.Weights, utils.InfieldSep, utils.ANDSep)
+ if err != nil {
+ return nil, err
+ }
+ }
+ if tp.TTL != utils.EmptyString {
+ var err error
+ if ipp.TTL, err = utils.ParseDurationWithNanosecs(tp.TTL); err != nil {
+ return nil, err
+ }
+ }
+
+ copy(ipp.FilterIDs, tp.FilterIDs)
+
+ for i, pool := range tp.Pools {
+ ipp.Pools[i] = &utils.IPPool{
+ ID: pool.ID,
+ FilterIDs: pool.FilterIDs,
+ Type: pool.Type,
+ Range: pool.Range,
+ Strategy: pool.Strategy,
+ Message: pool.Message,
+ }
+ if pool.Weights != utils.EmptyString {
+ var err error
+ ipp.Pools[i].Weights, err = utils.NewDynamicWeightsFromString(pool.Weights, utils.InfieldSep, utils.ANDSep)
+ if err != nil {
+ return nil, err
+ }
+ }
+ if pool.Blockers != utils.EmptyString {
+ var err error
+ ipp.Pools[i].Blockers, err = utils.NewDynamicBlockersFromString(pool.Blockers, utils.InfieldSep, utils.ANDSep)
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ return ipp, nil
+}
+
+func IPProfileToAPI(ipp *utils.IPProfile) *utils.TPIPProfile {
+ tp := &utils.TPIPProfile{
+ Tenant: ipp.Tenant,
+ ID: ipp.ID,
+ FilterIDs: make([]string, len(ipp.FilterIDs)),
+ Weights: ipp.Weights.String(utils.InfieldSep, utils.ANDSep),
+ Stored: ipp.Stored,
+ Pools: make([]*utils.TPIPPool, len(ipp.Pools)),
+ }
+ if ipp.TTL != time.Duration(0) {
+ tp.TTL = ipp.TTL.String()
+ }
+
+ copy(tp.FilterIDs, ipp.FilterIDs)
+
+ for i, pool := range ipp.Pools {
+ tp.Pools[i] = &utils.TPIPPool{
+ ID: pool.ID,
+ FilterIDs: pool.FilterIDs,
+ Type: pool.Type,
+ Range: pool.Range,
+ Blockers: pool.Blockers.String(utils.InfieldSep, utils.ANDSep),
+ Weights: pool.Weights.String(utils.InfieldSep, utils.ANDSep),
+ Strategy: pool.Strategy,
+ Message: pool.Message,
+ }
+ }
+ return tp
+}
+
type StatMdls []*StatMdl
// CSVHeader return the header for csv fields as a slice of string
diff --git a/engine/models.go b/engine/models.go
index df674b7d6..38b694285 100644
--- a/engine/models.go
+++ b/engine/models.go
@@ -47,6 +47,30 @@ func (ResourceMdl) TableName() string {
return utils.TBLTPResources
}
+type IPMdl struct {
+ PK uint `gorm:"primary_key"`
+ Tpid string
+ Tenant string `index:"0" re:".*"`
+ ID string `index:"1" re:".*"`
+ FilterIDs string `index:"2" re:".*"`
+ Weights string `index:"3" re:".*"`
+ TTL string `index:"4" re:".*"`
+ Stored bool `index:"5" re:".*"`
+ PoolID string `index:"6" re:".*"`
+ PoolFilterIDs string `index:"7" re:".*"`
+ PoolType string `index:"8" re:".*"`
+ PoolRange string `index:"9" re:".*"`
+ PoolStrategy string `index:"10" re:".*"`
+ PoolMessage string `index:"11" re:".*"`
+ PoolWeights string `index:"12" re:".*"`
+ PoolBlockers string `index:"13" re:".*"`
+ CreatedAt time.Time
+}
+
+func (IPMdl) TableName() string {
+ return utils.TBLTPIPs
+}
+
type StatMdl struct {
PK uint `gorm:"primary_key"`
Tpid string
diff --git a/engine/storage_csv.go b/engine/storage_csv.go
index fcd846bc6..8f1bfdab7 100644
--- a/engine/storage_csv.go
+++ b/engine/storage_csv.go
@@ -45,6 +45,7 @@ type CSVStorage struct {
generator func() csvReaderCloser
// file names
resProfilesFn []string
+ ipsFn []string
statsFn []string
trendsFn []string
rankingsFn []string
@@ -60,13 +61,14 @@ type CSVStorage struct {
// NewCSVStorage creates a CSV storege that takes the data from the paths specified
func NewCSVStorage(sep rune,
- resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
+ resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
attributeProfilesFn, chargerProfilesFn,
rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage {
return &CSVStorage{
sep: sep,
generator: NewCsvFile,
resProfilesFn: resProfilesFn,
+ ipsFn: ipsFn,
statsFn: statsFn,
rankingsFn: rankingsFn,
trendsFn: trendsFn,
@@ -88,6 +90,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
return nil, fmt.Errorf("could not retrieve any folders from %q: %v", dataPath, err)
}
resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv)
+ ipsPaths := appendName(allFoldersPath, utils.IPsCsv)
statsPaths := appendName(allFoldersPath, utils.StatsCsv)
rankingsPaths := appendName(allFoldersPath, utils.RankingsCsv)
trendsPaths := appendName(allFoldersPath, utils.TrendsCsv)
@@ -101,6 +104,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
accountsFn := appendName(allFoldersPath, utils.AccountsCsv)
return NewCSVStorage(sep,
resourcesPaths,
+ ipsPaths,
statsPaths,
rankingsPaths,
trendsPaths,
@@ -117,11 +121,11 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
// NewStringCSVStorage creates a csv storage from strings
func NewStringCSVStorage(sep rune,
- resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
+ resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn,
rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage {
c := NewCSVStorage(sep,
- []string{resProfilesFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn},
+ []string{resProfilesFn}, []string{ipsFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn},
[]string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn},
[]string{rateProfilesFn},
[]string{actionProfilesFn}, []string{accountsFn})
@@ -147,6 +151,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
}
c := NewCSVStorage(sep,
getIfExist(utils.ResourcesStr),
+ getIfExist(utils.IPs),
getIfExist(utils.Stats),
getIfExist(utils.Rankings),
getIfExist(utils.Trends),
@@ -170,6 +175,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
// NewURLCSVStorage returns a CSVStorage that can parse URLs
func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
var resourcesPaths []string
+ var ipsPaths []string
var statsPaths []string
var rankingsPaths []string
var trendsPaths []string
@@ -185,6 +191,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) {
if !strings.HasSuffix(baseURL, utils.CSVSuffix) {
resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv))
+ ipsPaths = append(ipsPaths, joinURL(baseURL, utils.IPsCsv))
statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv))
rankingsPaths = append(rankingsPaths, joinURL(baseURL, utils.RankingsCsv))
trendsPaths = append(trendsPaths, joinURL(baseURL, utils.TrendsCsv))
@@ -201,6 +208,8 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
switch {
case strings.HasSuffix(baseURL, utils.ResourcesCsv):
resourcesPaths = append(resourcesPaths, baseURL)
+ case strings.HasSuffix(baseURL, utils.IPsCsv):
+ ipsPaths = append(ipsPaths, baseURL)
case strings.HasSuffix(baseURL, utils.StatsCsv):
statsPaths = append(statsPaths, baseURL)
case strings.HasSuffix(baseURL, utils.RankingsCsv):
@@ -229,6 +238,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
c := NewCSVStorage(sep,
resourcesPaths,
+ ipsPaths,
statsPaths,
rankingsPaths,
trendsPaths,
@@ -320,6 +330,18 @@ func (csvs *CSVStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPReso
return tpResLimits.AsTPResources(), nil
}
+func (csvs *CSVStorage) GetTPIPs(tpid, tenant, id string) ([]*utils.TPIPProfile, error) {
+ var tpIPS IPMdls
+ if err := csvs.proccesData(IPMdl{}, csvs.ipsFn, func(tp any) {
+ tpIP := tp.(IPMdl)
+ tpIP.Tpid = tpid
+ tpIPS = append(tpIPS, &tpIP)
+ }); err != nil {
+ return nil, err
+ }
+ return tpIPS.AsTPIPs(), nil
+}
+
func (csvs *CSVStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) {
var tpStats StatMdls
if err := csvs.proccesData(StatMdl{}, csvs.statsFn, func(tp any) {
diff --git a/engine/storage_interface.go b/engine/storage_interface.go
index 16ee7d0f4..ecbcb2123 100644
--- a/engine/storage_interface.go
+++ b/engine/storage_interface.go
@@ -136,6 +136,7 @@ type LoadReader interface {
GetTpTableIds(string, string, []string,
map[string]string, *utils.PaginatorWithSearch) ([]string, error)
GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error)
+ GetTPIPs(string, string, string) ([]*utils.TPIPProfile, error)
GetTPStats(string, string, string) ([]*utils.TPStatProfile, error)
GetTPRankings(tpid, tenant, id string) ([]*utils.TPRankingProfile, error)
GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error)
diff --git a/engine/tpreader.go b/engine/tpreader.go
index 7188b29e2..a29cee1a5 100644
--- a/engine/tpreader.go
+++ b/engine/tpreader.go
@@ -36,6 +36,7 @@ type TpReader struct {
lr LoadReader
resProfiles map[utils.TenantID]*utils.TPResourceProfile
sqProfiles map[utils.TenantID]*utils.TPStatProfile
+ ipProfiles map[utils.TenantID]*utils.TPIPProfile
trProfiles map[utils.TenantID]*utils.TPTrendsProfile
rgProfiles map[utils.TenantID]*utils.TPRankingProfile
thProfiles map[utils.TenantID]*utils.TPThresholdProfile
@@ -67,6 +68,7 @@ func NewTpReader(db *DBConnManager, lr LoadReader, tpid, timezone string,
func (tpr *TpReader) Init() {
tpr.resProfiles = make(map[utils.TenantID]*utils.TPResourceProfile)
+ tpr.ipProfiles = make(map[utils.TenantID]*utils.TPIPProfile)
tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStatProfile)
tpr.rgProfiles = make(map[utils.TenantID]*utils.TPRankingProfile)
tpr.thProfiles = make(map[utils.TenantID]*utils.TPThresholdProfile)
@@ -120,6 +122,26 @@ func (tpr *TpReader) LoadStats() error {
return tpr.LoadStatsFiltered("")
}
+func (tpr *TpReader) LoadIPsFiltered(tag string) (err error) {
+ tps, err := tpr.lr.GetTPIPs(tpr.tpid, "", tag)
+ if err != nil {
+ return err
+ }
+ mapIPs := make(map[utils.TenantID]*utils.TPIPProfile)
+ for _, ip := range tps {
+ if err = verifyInlineFilterS(ip.FilterIDs); err != nil {
+ return
+ }
+ mapIPs[utils.TenantID{Tenant: ip.Tenant, ID: ip.ID}] = ip
+ }
+ tpr.ipProfiles = mapIPs
+ return nil
+}
+
+func (tpr *TpReader) LoadIPs() error {
+ return tpr.LoadIPsFiltered("")
+}
+
func (tpr *TpReader) LoadRankingsFiltered(tag string) error {
tps, err := tpr.lr.GetTPRankings(tpr.tpid, "", tag)
if err != nil {
@@ -330,6 +352,9 @@ func (tpr *TpReader) LoadAll() (err error) {
if err = tpr.LoadRankings(); err != nil && err.Error() != utils.NotFoundCaps {
return
}
+ if err = tpr.LoadIPs(); err != nil && err.Error() != utils.NotFoundCaps {
+ return
+ }
if err = tpr.LoadTrends(); err != nil && err.Error() != utils.NotFoundCaps {
return
}
@@ -404,6 +429,25 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
loadIDs[utils.CacheResourceProfiles] = loadID
loadIDs[utils.CacheResources] = loadID
}
+ if verbose {
+ log.Print("IPProfiles")
+ }
+ for _, tpIP := range tpr.ipProfiles {
+ var ip *utils.IPProfile
+ if ip, err = APItoIP(tpIP); err != nil {
+ return
+ }
+ if err = tpr.dm.SetIPProfile(context.TODO(), ip, true); err != nil {
+ return
+ }
+ if verbose {
+ log.Print("\t", ip.TenantID())
+ }
+ }
+ if len(tpr.ipProfiles) != 0 {
+ loadIDs[utils.CacheIPProfiles] = loadID
+ loadIDs[utils.CacheIPAllocations] = loadID
+ }
if verbose {
log.Print("StatQueueProfiles:")
}
@@ -594,6 +638,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
func (tpr *TpReader) ShowStatistics() {
// resource profiles
log.Print("ResourceProfiles: ", len(tpr.resProfiles))
+ // ip profiles
+ log.Print("IPProfiels", len(tpr.ipProfiles))
// stats
log.Print("Stats: ", len(tpr.sqProfiles))
// thresholds
@@ -627,7 +673,30 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
i++
}
return keys, nil
-
+ case utils.IPProfilesPrefix:
+ keys := make([]string, len(tpr.ipProfiles))
+ i := 0
+ for k := range tpr.ipProfiles {
+ keys[i] = k.TenantID()
+ i++
+ }
+ return keys, nil
+ case utils.TrendProfilePrefix:
+ keys := make([]string, len(tpr.trProfiles))
+ i := 0
+ for k := range tpr.trProfiles {
+ keys[i] = k.TenantID()
+ i++
+ }
+ return keys, nil
+ case utils.RankingProfilePrefix:
+ keys := make([]string, len(tpr.rgProfiles))
+ i := 0
+ for k := range tpr.rgProfiles {
+ keys[i] = k.TenantID()
+ i++
+ }
+ return keys, nil
case utils.StatQueueProfilePrefix:
keys := make([]string, len(tpr.sqProfiles))
i := 0
@@ -710,6 +779,17 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID))
}
}
+ if verbose {
+ log.Print("IPProfiles")
+ }
+ for _, tpIP := range tpr.ipProfiles {
+ if err = tpr.dm.RemoveIPProfile(context.TODO(), tpIP.Tenant, tpIP.ID, true); err != nil {
+ return
+ }
+ if verbose {
+ log.Print("\t", utils.ConcatenatedKey(tpIP.Tenant, tpIP.ID))
+ }
+ }
if verbose {
log.Print("StatQueueProfiles:")
}
@@ -732,6 +812,26 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID))
}
}
+ if verbose {
+ log.Print("TrendProfiles:")
+ }
+
+ for _, tpTr := range tpr.trProfiles {
+ if err = tpr.dm.RemoveTrendProfile(context.TODO(), tpTr.Tenant, tpTr.ID); err != nil {
+ return
+ }
+ log.Print("\t", utils.ConcatenatedKey(tpTr.Tenant, tpTr.ID))
+ }
+ if verbose {
+ log.Print("RankingProfiles:")
+ }
+ for _, tpRnk := range tpr.rgProfiles {
+ if err = tpr.dm.RemoveRankingProfile(context.TODO(), tpRnk.Tenant, tpRnk.ID); err != nil {
+ return
+ }
+ log.Print("\t", utils.ConcatenatedKey(tpRnk.Tenant, tpRnk.ID))
+ }
+
if verbose {
log.Print("RouteProfiles:")
}
@@ -829,6 +929,10 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
loadIDs[utils.CacheResourceProfiles] = loadID
loadIDs[utils.CacheResources] = loadID
}
+ if len(tpr.ipProfiles) != 0 {
+ loadIDs[utils.CacheIPProfiles] = loadID
+ loadIDs[utils.CacheIPAllocations] = loadID
+ }
if len(tpr.sqProfiles) != 0 {
loadIDs[utils.CacheStatQueueProfiles] = loadID
loadIDs[utils.CacheStatQueues] = loadID
@@ -837,6 +941,14 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
loadIDs[utils.CacheThresholdProfiles] = loadID
loadIDs[utils.CacheThresholds] = loadID
}
+ if len(tpr.trProfiles) != 0 {
+ loadIDs[utils.CacheTrendProfiles] = loadID
+ loadIDs[utils.CacheTrends] = loadID
+ }
+ if len(tpr.rgProfiles) != 0 {
+ loadIDs[utils.CacheRankingProfiles] = loadID
+ loadIDs[utils.CacheRankings] = loadID
+ }
if len(tpr.routeProfiles) != 0 {
loadIDs[utils.CacheRouteProfiles] = loadID
}
@@ -867,6 +979,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix)
trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix)
+ trnpfIDs, _ := tpr.GetLoadedIds(utils.TrendProfilePrefix)
+ rnkpfIDs, _ := tpr.GetLoadedIds(utils.RankingProfilePrefix)
+ ippfIDs, _ := tpr.GetLoadedIds(utils.IPProfilesPrefix)
flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix)
routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix)
apfIDs, _ := tpr.GetLoadedIds(utils.AttributeProfilePrefix)
@@ -889,6 +1004,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
utils.CacheChargerProfiles: chargerIDs,
utils.CacheRateProfiles: ratePrfIDs,
utils.CacheActionProfiles: actionPrfIDs,
+ utils.CacheTrendProfiles: trnpfIDs,
+ utils.CacheRankingProfiles: rnkpfIDs,
+ utils.CacheIPProfiles: ippfIDs,
}
// verify if we need to clear indexes
@@ -896,6 +1014,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
if len(apfIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheAttributeFilterIndexes)
}
+ if len(ippfIDs) != 0 {
+ cacheIDs = append(cacheIDs, utils.CacheIPFilterIndexes)
+ }
if len(routeIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheRouteFilterIndexes)
}
diff --git a/engine/tpreader_test.go b/engine/tpreader_test.go
index 34d2c612c..dd3bf2833 100644
--- a/engine/tpreader_test.go
+++ b/engine/tpreader_test.go
@@ -582,6 +582,7 @@ func TestTPReaderReloadCache(t *testing.T) {
APIOpts: map[string]any{},
Tenant: "cgrates.org",
ResourceProfileIDs: []string{"cgrates.org:resourceProfilesID"},
+ IPProfileIDs: []string{"cgrates.org:ipProfilesID"},
StatsQueueProfileIDs: []string{"cgrates.org:statProfilesID"},
ThresholdProfileIDs: []string{"cgrates.org:thresholdProfilesID"},
FilterIDs: []string{"cgrates.org:filtersID"},
@@ -589,6 +590,8 @@ func TestTPReaderReloadCache(t *testing.T) {
AttributeProfileIDs: []string{"cgrates.org:attributeProfilesID"},
ChargerProfileIDs: []string{"cgrates.org:chargerProfilesID"},
ResourceIDs: []string{"cgrates.org:resourceProfilesID"},
+ TrendProfileIDs: []string{"cgrates.org:trendProfilesID"},
+ RankingProfileIDs: []string{"cgrates.org:rankingProfileID"},
StatsQueueIDs: []string{"cgrates.org:statProfilesID"},
ThresholdIDs: []string{"cgrates.org:thresholdProfilesID"},
@@ -617,12 +620,21 @@ func TestTPReaderReloadCache(t *testing.T) {
resProfiles: map[utils.TenantID]*utils.TPResourceProfile{
{Tenant: "cgrates.org", ID: "resourceProfilesID"}: {},
},
+ ipProfiles: map[utils.TenantID]*utils.TPIPProfile{
+ {Tenant: "cgrates.org", ID: "ipProfilesID"}: {},
+ },
+ rgProfiles: map[utils.TenantID]*utils.TPRankingProfile{
+ {Tenant: "cgrates.org", ID: "rankingProfileID"}: {},
+ },
sqProfiles: map[utils.TenantID]*utils.TPStatProfile{
{Tenant: "cgrates.org", ID: "statProfilesID"}: {},
},
thProfiles: map[utils.TenantID]*utils.TPThresholdProfile{
{Tenant: "cgrates.org", ID: "thresholdProfilesID"}: {},
},
+ trProfiles: map[utils.TenantID]*utils.TPTrendsProfile{
+ {Tenant: "cgrates.org", ID: "trendProfilesID"}: {},
+ },
filters: map[utils.TenantID]*utils.TPFilterProfile{
{Tenant: "cgrates.org", ID: "filtersID"}: {},
},
diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go
index 4d213390e..934c6ded6 100644
--- a/general_tests/trends_schedule_it_test.go
+++ b/general_tests/trends_schedule_it_test.go
@@ -1,5 +1,4 @@
-//go:build integration
-// +build integration
+//go:build flaky
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
@@ -131,7 +130,22 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
}
client, _ := ng.Run(t)
- time.Sleep(100 * time.Millisecond)
+ // Wait for loader to finish loading all profiles (thresholds, stats, trends)
+ for range 20 {
+ var thPrfIDs, stPrfIDs []string
+ var trPrfs *[]*utils.TrendProfile
+ errTh := client.Call(context.Background(), utils.AdminSv1GetThresholdProfileIDs,
+ &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &thPrfIDs)
+ errSt := client.Call(context.Background(), utils.AdminSv1GetStatQueueProfileIDs,
+ &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &stPrfIDs)
+ errTr := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
+ &utils.ArgsItemIDs{Tenant: "cgrates.org"}, &trPrfs)
+ if errTh == nil && errSt == nil && errTr == nil &&
+ len(thPrfIDs) == 2 && len(stPrfIDs) == 2 && trPrfs != nil && len(*trPrfs) == 2 {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
var tr *utils.Trend
t.Run("TrendSchedule", func(t *testing.T) {
var replyTrendProfiles *[]*utils.TrendProfile
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index 612d2fb7f..1ecf4e8fc 100644
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -151,7 +151,27 @@ type TPResourceProfile struct {
Weights string // Weight to sort the ResourceLimits
ThresholdIDs []string // Thresholds to check after changing Limit
}
+type TPIPPool struct {
+ ID string
+ FilterIDs []string
+ Type string
+ Range string
+ Strategy string
+ Message string
+ Weights string
+ Blockers string
+}
+type TPIPProfile struct {
+ TPid string
+ Tenant string
+ ID string
+ FilterIDs []string
+ TTL string
+ Stored bool
+ Weights string
+ Pools []*TPIPPool
+}
type ArgsComputeFilterIndexIDs struct {
Tenant string
APIOpts map[string]any
diff --git a/utils/consts.go b/utils/consts.go
index 2e654530e..d38106bf2 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -74,8 +74,8 @@ var (
CacheThresholds: ThresholdPrefix,
CacheFilters: FilterPrefix,
CacheRouteProfiles: RouteProfilePrefix,
- CacheRankingProfiles: RankingPrefix,
- CacheRankings: RankingProfilePrefix,
+ CacheRankingProfiles: RankingProfilePrefix,
+ CacheRankings: RankingPrefix,
CacheAttributeProfiles: AttributeProfilePrefix,
CacheChargerProfiles: ChargerProfilePrefix,
CacheRateProfiles: RateProfilePrefix,
@@ -855,6 +855,13 @@ const (
SortingData = "SortingData"
ProfileID = "ProfileID"
PoolID = "PoolID"
+ PoolFilterIDs = "PoolFilterIDs"
+ PoolType = "PoolType"
+ PoolRange = "PoolRange"
+ PoolStrategy = "PoolStrategy"
+ PoolMessage = "PoolMessage"
+ PoolWeights = "PoolWeights"
+ PoolBlockers = "PoolBlockers"
SortedRoutes = "SortedRoutes"
MetaMonthly = "*monthly"
MetaYearly = "*yearly"
@@ -1946,6 +1953,7 @@ const (
// Table Name
const (
TBLTPResources = "tp_resources"
+ TBLTPIPs = "tp_ips"
TBLTPStats = "tp_stats"
TBLTPRankings = "tp_rankings"
TBLTPTrends = "tp_trends"