Make Ranking and RankingProfiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-14 14:02:21 +02:00
committed by Dan Christian Bogos
parent da41db3f56
commit 1d866c4d35
10 changed files with 409 additions and 44 deletions

View File

@@ -181,7 +181,9 @@ const CGRATES_CFG_JSON = `
"*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
@@ -189,8 +191,6 @@ const CGRATES_CFG_JSON = `
"*ip_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},

View File

@@ -1043,6 +1043,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.MetaFilters,
utils.MetaRouteProfiles,
utils.MetaRateProfiles,
utils.MetaRankingProfiles,
utils.MetaRankings,
}
for _, dbcfg := range cfg.dbCfg.DBConns {
if dbcfg.Type == utils.MetaInternal {

View File

@@ -176,4 +176,26 @@ CREATE TABLE rates (
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id_rate_profile_id (`tenant`, `id`, `rate_profile_id`),
FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id)
);
);
DROP TABLE IF EXISTS ranking_profiles;
CREATE TABLE ranking_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`ranking_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX ranking_profiles_idx ON ranking_profiles (`id`);
DROP TABLE IF EXISTS rankings;
CREATE TABLE rankings (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`ranking` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX rankings_idx ON rankings (`id`);

View File

@@ -172,4 +172,26 @@ CREATE TABLE rates (
rate_profile_id VARCHAR(64) NOT NULL,
UNIQUE (tenant, id, rate_profile_id),
FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id)
);
);
DROP TABLE IF EXISTS ranking_profiles;
CREATE TABLE ranking_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
ranking_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX ranking_profiles_idx ON ranking_profiles ("id");
DROP TABLE IF EXISTS rankings;
CREATE TABLE rankings (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
ranking JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX rankings_idx ON rankings ("id");

View File

@@ -410,6 +410,13 @@ func (ng TestEngine) Run(t testing.TB, extraFlags ...string) (*birpc.Client, *co
return client, newCfg
}
// Opts contains opts of database
type Opts struct {
InternalDBDumpPath *string `json:"internalDBDumpPath,omitempty"`
InternalDBDumpInterval *string `json:"internalDBDumpInterval,omitempty"`
InternalDBRewriteInterval *string `json:"internalDBRewriteInterval,omitempty"`
}
// DBConn contains database connection parameters.
type DBConn struct {
Type *string `json:"db_type,omitempty"`
@@ -418,6 +425,7 @@ type DBConn struct {
Name *string `json:"db_name,omitempty"`
User *string `json:"db_user,omitempty"`
Password *string `json:"db_password,omitempty"`
Opts Opts `json:"opts,omitempty"`
}
// Item contains db item parameters
@@ -760,6 +768,33 @@ var (
},
},
}
RedisDBCfg = DBCfg{
DB: &DBParams{
DBConns: map[string]DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
MongoDBCfg = DBCfg{
DB: &DBParams{
DBConns: map[string]DBConn{

View File

@@ -560,3 +560,25 @@ type RateMdl struct {
func (RateMdl) TableName() string {
return utils.TBLRates
}
type RankingProfileMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
RankingProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (RankingProfileMdl) TableName() string {
return utils.TBLRankingProfiles
}
type RankingJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Ranking utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (RankingJSONMdl) TableName() string {
return utils.TBLRankings
}

View File

@@ -129,6 +129,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRouteProfiles, tntID)
case utils.RateProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRateProfiles, tntID)
case utils.RankingProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankingProfiles, tntID)
case utils.RankingPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankings, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -1208,6 +1212,94 @@ func (sqls *SQLStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st
return
}
func (sqls *SQLStorage) SetRankingProfileDrv(ctx *context.Context, rp *utils.RankingProfile) (err error) {
tx := sqls.db.Begin()
mdl := &RankingProfileMdl{
Tenant: rp.Tenant,
ID: rp.ID,
RankingProfile: rp.AsMapStringInterface(),
}
if err = tx.Model(&RankingProfileMdl{}).Where(
RankingProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
RankingProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (rp *utils.RankingProfile, err error) {
var result []*RankingProfileMdl
if err = sqls.db.Model(&RankingProfileMdl{}).Where(&RankingProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToRankingProfile(result[0].RankingProfile), nil
}
func (sqls *SQLStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&RankingProfileMdl{}).Where(&RankingProfileMdl{Tenant: tenant, ID: id}).
Delete(&RankingProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) {
var result []*RankingJSONMdl
if err = sqls.db.Model(&RankingJSONMdl{}).Where(&RankingJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToRanking(result[0].Ranking), nil
}
func (sqls *SQLStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) {
tx := sqls.db.Begin()
mdl := &RankingJSONMdl{
Tenant: rn.Tenant,
ID: rn.ID,
Ranking: rn.AsMapStringInterface(),
}
if err = tx.Model(&RankingJSONMdl{}).Where(
RankingJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
RankingJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&RankingJSONMdl{}).Where(&RankingJSONMdl{Tenant: tenant, ID: id}).
Delete(&RankingJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// Used to check if specific subject is stored using prefix key attached to entity
func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (has bool, err error) {
var categoryModelMap = map[string]any{
@@ -1226,6 +1318,8 @@ func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tena
utils.AttributeProfilePrefix: &AttributeProfileMdl{},
utils.ChargerProfilePrefix: &ChargerProfileMdl{},
utils.RateProfilePrefix: &RateProfileJSONMdl{},
utils.RankingPrefix: &RankingJSONMdl{},
utils.RankingProfilePrefix: &RankingProfileMdl{},
// utils.TrendPrefix: &TrendJSONMdl{},
// utils.TrendProfilePrefix: &TrendProfileMdl{},
}
@@ -1307,41 +1401,6 @@ func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string)
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRankingProfileDrv(ctx *context.Context, sg *utils.RankingProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.RankingProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStorageType returns the storage type that is being used
func (sqls *SQLStorage) GetStorageType() string {
return utils.MetaMySQL
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -22,6 +22,7 @@ package general_tests
import (
"fmt"
"os"
"slices"
"testing"
"time"
@@ -35,14 +36,119 @@ import (
func TestRankingStore(t *testing.T) {
var dbConfig engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaInternal),
Opts: engine.Opts{
InternalDBDumpPath: utils.StringPointer("/tmp/internal_db"),
},
},
},
},
}
if err := os.MkdirAll("/tmp/internal_db", 0755); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := os.RemoveAll("/tmp/internal_db"); err != nil {
t.Error(err)
}
})
case utils.MetaRedis:
t.SkipNow()
dbConfig = engine.RedisDBCfg
case utils.MetaMySQL:
dbConfig = engine.MySQLDBCfg
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaRankingProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaRankings: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
case utils.MetaMongo:
dbConfig = engine.MongoDBCfg
case utils.MetaInternal, utils.MetaPostgres:
t.SkipNow()
case utils.MetaPostgres:
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaPostgres),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(5432),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaRankingProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaRankings: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
default:
t.Fatal("unsupported dbtype value")
}

View File

@@ -1927,6 +1927,8 @@ const (
TBLFilters = "filters"
TBLRouteProfiles = "route_profiles"
TBLRateProfiles = "rate_profiles"
TBLRankingProfiles = "ranking_profiles"
TBLRankings = "rankings"
TBLRates = "rates"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"

View File

@@ -203,6 +203,50 @@ func (rp *RankingProfile) FieldAsInterface(fldPath []string) (_ any, err error)
}
}
// AsMapStringInterface converts RankingProfile struct to map[string]any
func (rp *RankingProfile) AsMapStringInterface() map[string]any {
if rp == nil {
return nil
}
return map[string]any{
Tenant: rp.Tenant,
ID: rp.ID,
Schedule: rp.Schedule,
StatIDs: rp.StatIDs,
MetricIDs: rp.MetricIDs,
Sorting: rp.Sorting,
SortingParameters: rp.SortingParameters,
Stored: rp.Stored,
ThresholdIDs: rp.ThresholdIDs,
}
}
// MapStringInterfaceToRankingProfile converts map[string]any to RankingProfile struct
func MapStringInterfaceToRankingProfile(m map[string]any) *RankingProfile {
rp := &RankingProfile{}
if v, ok := m[Tenant].(string); ok {
rp.Tenant = v
}
if v, ok := m[ID].(string); ok {
rp.ID = v
}
if v, ok := m[Schedule].(string); ok {
rp.Schedule = v
}
rp.StatIDs = InterfaceToStringSlice(m[StatIDs])
rp.MetricIDs = InterfaceToStringSlice(m[MetricIDs])
if v, ok := m[Sorting].(string); ok {
rp.Sorting = v
}
rp.SortingParameters = InterfaceToStringSlice(m[SortingParameters])
if v, ok := m[Stored].(bool); ok {
rp.Stored = v
}
rp.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs])
return rp
}
// RankingProfileLockKey returns the ID used to lock a RankingProfile with guardian.
func RankingProfileLockKey(tnt, id string) string {
return ConcatenatedKey(CacheRankingProfiles, tnt, id)
@@ -352,6 +396,57 @@ func (r *Ranking) MetricIDs() StringSet {
return r.metricIDs
}
// AsMapStringInterface converts Ranking struct to map[string]any
func (rp *Ranking) AsMapStringInterface() map[string]any {
if rp == nil {
return nil
}
return map[string]any{
Tenant: rp.Tenant,
ID: rp.ID,
LastUpdate: rp.LastUpdate,
Metrics: rp.Metrics,
Sorting: rp.Sorting,
SortingParameters: rp.SortingParameters,
SortedStatIDs: rp.SortedStatIDs,
}
}
// MapStringInterfaceToRanking converts map[string]any to Ranking struct
func MapStringInterfaceToRanking(m map[string]any) *Ranking {
rp := &Ranking{}
if v, ok := m[Tenant].(string); ok {
rp.Tenant = v
}
if v, ok := m[ID].(string); ok {
rp.ID = v
}
if v, ok := m[LastUpdate].(string); ok {
if t, err := time.Parse(time.RFC3339, v); err == nil {
rp.LastUpdate = t
}
}
if v, ok := m[Metrics].(map[string]any); ok {
rp.Metrics = make(map[string]map[string]float64)
for key, val := range v {
if innerMap, ok := val.(map[string]any); ok {
rp.Metrics[key] = make(map[string]float64)
for innerKey, innerVal := range innerMap {
if floatVal, ok := innerVal.(float64); ok {
rp.Metrics[key][innerKey] = floatVal
}
}
}
}
}
if v, ok := m[Sorting].(string); ok {
rp.Sorting = v
}
rp.SortingParameters = InterfaceToStringSlice(m[SortingParameters])
rp.SortedStatIDs = InterfaceToStringSlice(m[SortedStatIDs])
return rp
}
// rankingSorter defines interface for different ranking sorting strategies.
type rankingSorter interface {
sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs