diff --git a/config/config_defaults.go b/config/config_defaults.go index 33b769585..137d93db5 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -181,7 +181,9 @@ const CGRATES_CFG_JSON = ` "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - + "*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + // compatible db types: <*internal|*redis|*mongo> "*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, @@ -189,8 +191,6 @@ const CGRATES_CFG_JSON = ` "*ip_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, - "*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, diff --git a/config/configsanity.go b/config/configsanity.go index d7cc38468..d7432e60b 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1043,6 +1043,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { utils.MetaFilters, utils.MetaRouteProfiles, utils.MetaRateProfiles, + utils.MetaRankingProfiles, + utils.MetaRankings, } for _, dbcfg := range cfg.dbCfg.DBConns { if dbcfg.Type == utils.MetaInternal { diff --git a/data/storage/mysql/create_db_tables.sql b/data/storage/mysql/create_db_tables.sql index 903f6b216..bdd042d28 100644 --- a/data/storage/mysql/create_db_tables.sql +++ b/data/storage/mysql/create_db_tables.sql @@ -176,4 +176,26 @@ CREATE TABLE rates ( PRIMARY KEY (`pk`), UNIQUE KEY unique_tenant_id_rate_profile_id (`tenant`, `id`, `rate_profile_id`), FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id) -); \ No newline at end of file +); + +DROP TABLE IF EXISTS ranking_profiles; +CREATE TABLE ranking_profiles ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `ranking_profile` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX ranking_profiles_idx ON ranking_profiles (`id`); + +DROP TABLE IF EXISTS rankings; +CREATE TABLE rankings ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `ranking` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX rankings_idx ON rankings (`id`); diff --git a/data/storage/postgres/create_db_tables.sql b/data/storage/postgres/create_db_tables.sql index 1f11c66b0..c75e9942c 100644 --- a/data/storage/postgres/create_db_tables.sql +++ b/data/storage/postgres/create_db_tables.sql @@ -172,4 +172,26 @@ CREATE TABLE rates ( rate_profile_id VARCHAR(64) NOT NULL, UNIQUE (tenant, id, rate_profile_id), FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id) -); \ No newline at end of file +); + + +DROP TABLE IF EXISTS ranking_profiles; +CREATE TABLE ranking_profiles ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + ranking_profile JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX ranking_profiles_idx ON ranking_profiles ("id"); + + +DROP TABLE IF EXISTS rankings; +CREATE TABLE rankings ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + ranking JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX rankings_idx ON rankings ("id"); diff --git a/engine/libtest.go b/engine/libtest.go index f72b8e204..2aac54b5e 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -410,6 +410,13 @@ func (ng TestEngine) Run(t testing.TB, extraFlags ...string) (*birpc.Client, *co return client, newCfg } +// Opts contains opts of database +type Opts struct { + InternalDBDumpPath *string `json:"internalDBDumpPath,omitempty"` + InternalDBDumpInterval *string `json:"internalDBDumpInterval,omitempty"` + InternalDBRewriteInterval *string `json:"internalDBRewriteInterval,omitempty"` +} + // DBConn contains database connection parameters. type DBConn struct { Type *string `json:"db_type,omitempty"` @@ -418,6 +425,7 @@ type DBConn struct { Name *string `json:"db_name,omitempty"` User *string `json:"db_user,omitempty"` Password *string `json:"db_password,omitempty"` + Opts Opts `json:"opts,omitempty"` } // Item contains db item parameters @@ -760,6 +768,33 @@ var ( }, }, } + RedisDBCfg = DBCfg{ + DB: &DBParams{ + DBConns: map[string]DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaMySQL), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(3306), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } MongoDBCfg = DBCfg{ DB: &DBParams{ DBConns: map[string]DBConn{ diff --git a/engine/models.go b/engine/models.go index a1feaed20..179924093 100644 --- a/engine/models.go +++ b/engine/models.go @@ -560,3 +560,25 @@ type RateMdl struct { func (RateMdl) TableName() string { return utils.TBLRates } + +type RankingProfileMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + RankingProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (RankingProfileMdl) TableName() string { + return utils.TBLRankingProfiles +} + +type RankingJSONMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + Ranking utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (RankingJSONMdl) TableName() string { + return utils.TBLRankings +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 904bd1feb..44095ffa1 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -129,6 +129,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRouteProfiles, tntID) case utils.RateProfilePrefix: keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRateProfiles, tntID) + case utils.RankingProfilePrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankingProfiles, tntID) + case utils.RankingPrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankings, tntID) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) } @@ -1208,6 +1212,94 @@ func (sqls *SQLStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st return } +func (sqls *SQLStorage) SetRankingProfileDrv(ctx *context.Context, rp *utils.RankingProfile) (err error) { + tx := sqls.db.Begin() + mdl := &RankingProfileMdl{ + Tenant: rp.Tenant, + ID: rp.ID, + RankingProfile: rp.AsMapStringInterface(), + } + if err = tx.Model(&RankingProfileMdl{}).Where( + RankingProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + RankingProfileMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (rp *utils.RankingProfile, err error) { + var result []*RankingProfileMdl + if err = sqls.db.Model(&RankingProfileMdl{}).Where(&RankingProfileMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return utils.MapStringInterfaceToRankingProfile(result[0].RankingProfile), nil +} + +func (sqls *SQLStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&RankingProfileMdl{}).Where(&RankingProfileMdl{Tenant: tenant, ID: id}). + Delete(&RankingProfileMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + +func (sqls *SQLStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) { + var result []*RankingJSONMdl + if err = sqls.db.Model(&RankingJSONMdl{}).Where(&RankingJSONMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return utils.MapStringInterfaceToRanking(result[0].Ranking), nil +} + +func (sqls *SQLStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) { + tx := sqls.db.Begin() + mdl := &RankingJSONMdl{ + Tenant: rn.Tenant, + ID: rn.ID, + Ranking: rn.AsMapStringInterface(), + } + if err = tx.Model(&RankingJSONMdl{}).Where( + RankingJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + RankingJSONMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&RankingJSONMdl{}).Where(&RankingJSONMdl{Tenant: tenant, ID: id}). + Delete(&RankingJSONMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + // Used to check if specific subject is stored using prefix key attached to entity func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (has bool, err error) { var categoryModelMap = map[string]any{ @@ -1226,6 +1318,8 @@ func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tena utils.AttributeProfilePrefix: &AttributeProfileMdl{}, utils.ChargerProfilePrefix: &ChargerProfileMdl{}, utils.RateProfilePrefix: &RateProfileJSONMdl{}, + utils.RankingPrefix: &RankingJSONMdl{}, + utils.RankingProfilePrefix: &RankingProfileMdl{}, // utils.TrendPrefix: &TrendJSONMdl{}, // utils.TrendProfilePrefix: &TrendProfileMdl{}, } @@ -1307,41 +1401,6 @@ func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) return utils.ErrNotImplemented } -// DataDB method not implemented yet -func (sqls *SQLStorage) SetRankingProfileDrv(ctx *context.Context, sg *utils.RankingProfile) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.RankingProfile, err error) { - return nil, utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) { - return nil, utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - -// GetStorageType returns the storage type that is being used -func (sqls *SQLStorage) GetStorageType() string { - return utils.MetaMySQL -} - // DataDB method not implemented yet func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) { return nil, utils.ErrNotImplemented diff --git a/general_tests/rankings_stored_it_test.go b/general_tests/rankings_stored_it_test.go index edabc2f3e..ad830e660 100644 --- a/general_tests/rankings_stored_it_test.go +++ b/general_tests/rankings_stored_it_test.go @@ -22,6 +22,7 @@ package general_tests import ( "fmt" + "os" "slices" "testing" "time" @@ -35,14 +36,119 @@ import ( func TestRankingStore(t *testing.T) { var dbConfig engine.DBCfg switch *utils.DBType { + case utils.MetaInternal: + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaInternal), + Opts: engine.Opts{ + InternalDBDumpPath: utils.StringPointer("/tmp/internal_db"), + }, + }, + }, + }, + } + if err := os.MkdirAll("/tmp/internal_db", 0755); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + if err := os.RemoveAll("/tmp/internal_db"); err != nil { + t.Error(err) + } + }) case utils.MetaRedis: - t.SkipNow() + dbConfig = engine.RedisDBCfg case utils.MetaMySQL: - dbConfig = engine.MySQLDBCfg + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaMySQL), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(3306), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaRankingProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaRankings: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } case utils.MetaMongo: dbConfig = engine.MongoDBCfg - case utils.MetaInternal, utils.MetaPostgres: - t.SkipNow() + case utils.MetaPostgres: + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaPostgres), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(5432), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaRankingProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaRankings: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } default: t.Fatal("unsupported dbtype value") } diff --git a/utils/consts.go b/utils/consts.go index 23a0bc874..89a1a1f95 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1927,6 +1927,8 @@ const ( TBLFilters = "filters" TBLRouteProfiles = "route_profiles" TBLRateProfiles = "rate_profiles" + TBLRankingProfiles = "ranking_profiles" + TBLRankings = "rankings" TBLRates = "rates" OldSMCosts = "sm_costs" TBLTPDispatchers = "tp_dispatcher_profiles" diff --git a/utils/rankings.go b/utils/rankings.go index c52fc8063..0a00031ae 100644 --- a/utils/rankings.go +++ b/utils/rankings.go @@ -203,6 +203,50 @@ func (rp *RankingProfile) FieldAsInterface(fldPath []string) (_ any, err error) } } +// AsMapStringInterface converts RankingProfile struct to map[string]any +func (rp *RankingProfile) AsMapStringInterface() map[string]any { + if rp == nil { + return nil + } + return map[string]any{ + Tenant: rp.Tenant, + ID: rp.ID, + Schedule: rp.Schedule, + StatIDs: rp.StatIDs, + MetricIDs: rp.MetricIDs, + Sorting: rp.Sorting, + SortingParameters: rp.SortingParameters, + Stored: rp.Stored, + ThresholdIDs: rp.ThresholdIDs, + } +} + +// MapStringInterfaceToRankingProfile converts map[string]any to RankingProfile struct +func MapStringInterfaceToRankingProfile(m map[string]any) *RankingProfile { + rp := &RankingProfile{} + + if v, ok := m[Tenant].(string); ok { + rp.Tenant = v + } + if v, ok := m[ID].(string); ok { + rp.ID = v + } + if v, ok := m[Schedule].(string); ok { + rp.Schedule = v + } + rp.StatIDs = InterfaceToStringSlice(m[StatIDs]) + rp.MetricIDs = InterfaceToStringSlice(m[MetricIDs]) + if v, ok := m[Sorting].(string); ok { + rp.Sorting = v + } + rp.SortingParameters = InterfaceToStringSlice(m[SortingParameters]) + if v, ok := m[Stored].(bool); ok { + rp.Stored = v + } + rp.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs]) + return rp +} + // RankingProfileLockKey returns the ID used to lock a RankingProfile with guardian. func RankingProfileLockKey(tnt, id string) string { return ConcatenatedKey(CacheRankingProfiles, tnt, id) @@ -352,6 +396,57 @@ func (r *Ranking) MetricIDs() StringSet { return r.metricIDs } +// AsMapStringInterface converts Ranking struct to map[string]any +func (rp *Ranking) AsMapStringInterface() map[string]any { + if rp == nil { + return nil + } + return map[string]any{ + Tenant: rp.Tenant, + ID: rp.ID, + LastUpdate: rp.LastUpdate, + Metrics: rp.Metrics, + Sorting: rp.Sorting, + SortingParameters: rp.SortingParameters, + SortedStatIDs: rp.SortedStatIDs, + } +} + +// MapStringInterfaceToRanking converts map[string]any to Ranking struct +func MapStringInterfaceToRanking(m map[string]any) *Ranking { + rp := &Ranking{} + if v, ok := m[Tenant].(string); ok { + rp.Tenant = v + } + if v, ok := m[ID].(string); ok { + rp.ID = v + } + if v, ok := m[LastUpdate].(string); ok { + if t, err := time.Parse(time.RFC3339, v); err == nil { + rp.LastUpdate = t + } + } + if v, ok := m[Metrics].(map[string]any); ok { + rp.Metrics = make(map[string]map[string]float64) + for key, val := range v { + if innerMap, ok := val.(map[string]any); ok { + rp.Metrics[key] = make(map[string]float64) + for innerKey, innerVal := range innerMap { + if floatVal, ok := innerVal.(float64); ok { + rp.Metrics[key][innerKey] = floatVal + } + } + } + } + } + if v, ok := m[Sorting].(string); ok { + rp.Sorting = v + } + rp.SortingParameters = InterfaceToStringSlice(m[SortingParameters]) + rp.SortedStatIDs = InterfaceToStringSlice(m[SortedStatIDs]) + return rp +} + // rankingSorter defines interface for different ranking sorting strategies. type rankingSorter interface { sortStatIDs() []string // sortStatIDs returns the sorted list of statIDs