From 61e7bacab1f71b6460a0e6948e93e8714b528dfc Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Mon, 10 Nov 2025 14:52:52 +0200 Subject: [PATCH] Make Thresholds and ThresholdProfiles storable in MySQL and Postgres --- apis/thresholds_it_test.go | 4 +- config/config_defaults.go | 6 +- config/configsanity.go | 2 + .../samples/thresholds_mysql/cgrates.json | 4 +- .../samples/thresholds_postgres/cgrates.json | 47 +++++++ .../samples/thresholds_redis/cgrates.json | 45 +++++++ data/storage/mysql/create_db_tables.sql | 24 +++- data/storage/postgres/create_db_tables.sql | 22 ++++ engine/models.go | 22 ++++ engine/storage_sql.go | 122 +++++++++++++----- engine/thresholds.go | 92 +++++++++++++ general_tests/dynamic_thresholds_it_test.go | 16 +++ utils/consts.go | 2 + 13 files changed, 371 insertions(+), 37 deletions(-) create mode 100644 data/conf/samples/thresholds_postgres/cgrates.json create mode 100644 data/conf/samples/thresholds_redis/cgrates.json diff --git a/apis/thresholds_it_test.go b/apis/thresholds_it_test.go index 8465988ad..2f501a37a 100644 --- a/apis/thresholds_it_test.go +++ b/apis/thresholds_it_test.go @@ -78,11 +78,11 @@ func TestThresholdsIT(t *testing.T) { case utils.MetaMongo: thConfigDIR = "thresholds_mongo" case utils.MetaRedis: - t.SkipNow() + thConfigDIR = "thresholds_redis" case utils.MetaMySQL: thConfigDIR = "thresholds_mysql" case utils.MetaPostgres: - t.SkipNow() + thConfigDIR = "thresholds_postgres" default: t.Fatal("Unknown Database type") } diff --git a/config/config_defaults.go b/config/config_defaults.go index 585d3b93b..3b39a4bfb 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -176,11 +176,11 @@ const CGRATES_CFG_JSON = ` "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, // compatible db types: <*internal|*redis|*mongo> "*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, @@ -203,7 +203,7 @@ const CGRATES_CFG_JSON = ` "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, // compatible db types: <*internal|*mysql|*mongo|*postgres> - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"} // Compatible only for Internal, MySQL, Mongo and PostgresSQL databases + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"} }, }, diff --git a/config/configsanity.go b/config/configsanity.go index 1af67639d..afb2f426a 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1037,6 +1037,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { utils.MetaResources, utils.MetaStatQueueProfiles, utils.MetaStatQueues, + utils.MetaThresholdProfiles, + utils.MetaThresholds, } for _, dbcfg := range cfg.dbCfg.DBConns { if dbcfg.Type == utils.MetaInternal { diff --git a/data/conf/samples/thresholds_mysql/cgrates.json b/data/conf/samples/thresholds_mysql/cgrates.json index 129c19df0..872983029 100644 --- a/data/conf/samples/thresholds_mysql/cgrates.json +++ b/data/conf/samples/thresholds_mysql/cgrates.json @@ -24,7 +24,9 @@ } }, "items": { - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} } }, diff --git a/data/conf/samples/thresholds_postgres/cgrates.json b/data/conf/samples/thresholds_postgres/cgrates.json new file mode 100644 index 000000000..f664fb1b9 --- /dev/null +++ b/data/conf/samples/thresholds_postgres/cgrates.json @@ -0,0 +1,47 @@ +{ + // CGRateS Configuration file + // will be used in apis/thresholds_it_test.go + "logger": { + "level": 7 + }, + + "db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "postgres", + "db_host": "127.0.0.1", + "db_port": 5432, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } + }, + + "actions": { + "enabled": true, + "thresholds_conns": ["*internal"], + }, + + "thresholds": { + "enabled": true, + "store_interval": "-1", + "actions_conns": ["*internal"], + }, + + "admins": { + "enabled": true, + } + } \ No newline at end of file diff --git a/data/conf/samples/thresholds_redis/cgrates.json b/data/conf/samples/thresholds_redis/cgrates.json new file mode 100644 index 000000000..129c19df0 --- /dev/null +++ b/data/conf/samples/thresholds_redis/cgrates.json @@ -0,0 +1,45 @@ +{ + // CGRateS Configuration file + // will be used in apis/thresholds_it_test.go + "logger": { + "level": 7 + }, + + "db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "mysql", + "db_host": "127.0.0.1", + "db_port": 3306, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } + }, + + "actions": { + "enabled": true, + "thresholds_conns": ["*internal"], + }, + + "thresholds": { + "enabled": true, + "store_interval": "-1", + "actions_conns": ["*internal"], + }, + + "admins": { + "enabled": true, + } + } \ No newline at end of file diff --git a/data/storage/mysql/create_db_tables.sql b/data/storage/mysql/create_db_tables.sql index 9fbdd9c70..8d81d4035 100644 --- a/data/storage/mysql/create_db_tables.sql +++ b/data/storage/mysql/create_db_tables.sql @@ -110,4 +110,26 @@ CREATE TABLE stat_queues ( PRIMARY KEY (`pk`), UNIQUE KEY unique_tenant_id (`tenant`, `id`) ); -CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`); \ No newline at end of file +CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`); + +DROP TABLE IF EXISTS threshold_profiles; +CREATE TABLE threshold_profiles ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `threshold_profile` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX threshold_profiles_idx ON threshold_profiles (`id`); + +DROP TABLE IF EXISTS thresholds; +CREATE TABLE thresholds ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `threshold` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX thresholds_idx ON thresholds (`id`); \ No newline at end of file diff --git a/data/storage/postgres/create_db_tables.sql b/data/storage/postgres/create_db_tables.sql index 9b39d121e..93d3458ce 100644 --- a/data/storage/postgres/create_db_tables.sql +++ b/data/storage/postgres/create_db_tables.sql @@ -108,3 +108,25 @@ CREATE TABLE stat_queues ( UNIQUE (tenant, id) ); CREATE UNIQUE INDEX stat_queues_idx ON stat_queues ("id"); + + +DROP TABLE IF EXISTS threshold_profiles; +CREATE TABLE threshold_profiles ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + threshold_profile JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX threshold_profiles_idx ON threshold_profiles ("id"); + + +DROP TABLE IF EXISTS thresholds; +CREATE TABLE thresholds ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + threshold JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX thresholds_idx ON thresholds ("id"); diff --git a/engine/models.go b/engine/models.go index 26beac4b5..2f05c702c 100644 --- a/engine/models.go +++ b/engine/models.go @@ -492,3 +492,25 @@ type StatQueueMdl struct { func (StatQueueMdl) TableName() string { return utils.TBLStatQueues } + +type ThresholdProfileMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + ThresholdProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (ThresholdProfileMdl) TableName() string { + return utils.TBLThresholdProfiles +} + +type ThresholdJSONMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + Threshold utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (ThresholdJSONMdl) TableName() string { + return utils.TBLThresholds +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 4e62b2206..543aec309 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -119,6 +119,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueueProfiles, tntID) case utils.StatQueuePrefix: keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueues, tntID) + case utils.ThresholdProfilePrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholdProfiles, tntID) + case utils.ThresholdPrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholds, tntID) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) } @@ -856,6 +860,94 @@ func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) return } +func (sqls *SQLStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, id string) (tp *ThresholdProfile, err error) { + var result []*ThresholdProfileMdl + if err = sqls.db.Model(&ThresholdProfileMdl{}).Where(&ThresholdProfileMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return MapStringInterfaceToThresholdProfile(result[0].ThresholdProfile) +} + +func (sqls *SQLStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { + tx := sqls.db.Begin() + mdl := &ThresholdProfileMdl{ + Tenant: tp.Tenant, + ID: tp.ID, + ThresholdProfile: tp.AsMapStringInterface(), + } + if err = tx.Model(&ThresholdProfileMdl{}).Where( + ThresholdProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + ThresholdProfileMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&ThresholdProfileMdl{}).Where(&ThresholdProfileMdl{Tenant: tenant, ID: id}). + Delete(&ThresholdProfileMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + +func (sqls *SQLStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (tp *Threshold, err error) { + var result []*ThresholdJSONMdl + if err = sqls.db.Model(&ThresholdJSONMdl{}).Where(&ThresholdJSONMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return MapStringInterfaceToThreshold(result[0].Threshold) +} + +func (sqls *SQLStorage) SetThresholdDrv(ctx *context.Context, t *Threshold) (err error) { + tx := sqls.db.Begin() + mdl := &ThresholdJSONMdl{ + Tenant: t.Tenant, + ID: t.ID, + Threshold: t.AsMapStringInterface(), + } + if err = tx.Model(&ThresholdJSONMdl{}).Where( + ThresholdJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + ThresholdJSONMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&ThresholdJSONMdl{}).Where(&ThresholdJSONMdl{Tenant: tenant, ID: id}). + Delete(&ThresholdJSONMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + // AddLoadHistory DataDB method not implemented yet func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { @@ -958,36 +1050,6 @@ func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string return utils.ErrNotImplemented } -// GetThresholdProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) { - return nil, utils.ErrNotImplemented -} - -// SetThresholdProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) { - return utils.ErrNotImplemented -} - -// RemThresholdProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (r *Threshold, err error) { - return nil, utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) SetThresholdDrv(ctx *context.Context, r *Threshold) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - // DataDB method not implemented yet func (sqls *SQLStorage) GetFilterDrv(ctx *context.Context, tenant, id string) (r *Filter, err error) { return nil, utils.ErrNotImplemented diff --git a/engine/thresholds.go b/engine/thresholds.go index 19cad6c44..335021fd6 100644 --- a/engine/thresholds.go +++ b/engine/thresholds.go @@ -220,6 +220,40 @@ func (t *Threshold) isLocked() bool { return t.lkID != utils.EmptyString } +// AsMapStringInterface converts Threshold struct to map[string]any +func (t *Threshold) AsMapStringInterface() map[string]any { + if t == nil { + return nil + } + return map[string]any{ + utils.Tenant: t.Tenant, + utils.ID: t.ID, + utils.Hits: t.Hits, + utils.Snooze: t.Snooze, + } +} + +// MapStringInterfaceToThreshold converts map[string]any to Threshold struct +func MapStringInterfaceToThreshold(m map[string]any) (*Threshold, error) { + th := &Threshold{} + + if v, ok := m[utils.Tenant].(string); ok { + th.Tenant = v + } + if v, ok := m[utils.ID].(string); ok { + th.ID = v + } + if v, ok := m[utils.Hits].(float64); ok { + th.Hits = int(v) + } + if v, ok := m[utils.Snooze].(string); ok { + if t, err := time.Parse(time.RFC3339, v); err == nil { + th.Snooze = t + } + } + return th, nil +} + // unlockThresholds unlocks all locked Thresholds in the given slice. func unlockThresholds(ts []*Threshold) { for _, t := range ts { @@ -841,3 +875,61 @@ func (tp *ThresholdProfile) FieldAsInterface(fldPath []string) (_ any, err error return tp.EeIDs, nil } } + +// AsMapStringInterface converts ThresholdProfile struct to map[string]any +func (tp *ThresholdProfile) AsMapStringInterface() map[string]any { + if tp == nil { + return nil + } + return map[string]any{ + utils.Tenant: tp.Tenant, + utils.ID: tp.ID, + utils.FilterIDs: tp.FilterIDs, + utils.MaxHits: tp.MaxHits, + utils.MinHits: tp.MinHits, + utils.MinSleep: tp.MinSleep, + utils.Blocker: tp.Blocker, + utils.Weights: tp.Weights, + utils.ActionProfileIDs: tp.ActionProfileIDs, + utils.Async: tp.Async, + utils.EeIDs: tp.EeIDs, + } +} + +// MapStringInterfaceToThresholdProfile converts map[string]any to ThresholdProfile struct +func MapStringInterfaceToThresholdProfile(m map[string]any) (*ThresholdProfile, error) { + tp := &ThresholdProfile{} + + if v, ok := m[utils.Tenant].(string); ok { + tp.Tenant = v + } + if v, ok := m[utils.ID].(string); ok { + tp.ID = v + } + tp.FilterIDs = utils.InterfaceToStringSlice(m[utils.FilterIDs]) + if v, ok := m[utils.MaxHits].(float64); ok { + tp.MaxHits = int(v) + } + if v, ok := m[utils.MinHits].(float64); ok { + tp.MinHits = int(v) + } + if v, ok := m[utils.MinSleep].(string); ok { + if dur, err := time.ParseDuration(v); err != nil { + return nil, err + } else { + tp.MinSleep = dur + } + } else if v, ok := m[utils.MinSleep].(float64); ok { // for -1 cases + tp.MinSleep = time.Duration(v) + } + if v, ok := m[utils.Blocker].(bool); ok { + tp.Blocker = v + } + tp.Weights = utils.InterfaceToDynamicWeights(m[utils.Weights]) + tp.ActionProfileIDs = utils.InterfaceToStringSlice(m[utils.ActionProfileIDs]) + if v, ok := m[utils.Async].(bool); ok { + tp.Async = v + } + tp.EeIDs = utils.InterfaceToStringSlice(m[utils.EeIDs]) + return tp, nil +} diff --git a/general_tests/dynamic_thresholds_it_test.go b/general_tests/dynamic_thresholds_it_test.go index e798855b9..4985435b0 100644 --- a/general_tests/dynamic_thresholds_it_test.go +++ b/general_tests/dynamic_thresholds_it_test.go @@ -58,6 +58,14 @@ func TestDynThdIT(t *testing.T) { Limit: utils.IntPointer(-1), DbConn: utils.StringPointer(utils.StorDB), }, + utils.MetaThresholdProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholds: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, }, }} case utils.MetaMongo: @@ -79,6 +87,14 @@ func TestDynThdIT(t *testing.T) { Limit: utils.IntPointer(-1), DbConn: utils.StringPointer(utils.StorDB), }, + utils.MetaThresholdProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholds: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, }, }} default: diff --git a/utils/consts.go b/utils/consts.go index 30a461df9..652589e9c 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1922,6 +1922,8 @@ const ( TBLResources = "resources" TBLStatQueueProfiles = "stat_queue_profiles" TBLStatQueues = "stat_queues" + TBLThresholdProfiles = "threshold_profiles" + TBLThresholds = "thresholds" OldSMCosts = "sm_costs" TBLTPDispatchers = "tp_dispatcher_profiles" TBLTPDispatcherHosts = "tp_dispatcher_hosts"