Make Thresholds and ThresholdProfiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-10 14:52:52 +02:00
committed by Dan Christian Bogos
parent dcdf55e5d4
commit 61e7bacab1
13 changed files with 371 additions and 37 deletions

View File

@@ -78,11 +78,11 @@ func TestThresholdsIT(t *testing.T) {
case utils.MetaMongo:
thConfigDIR = "thresholds_mongo"
case utils.MetaRedis:
t.SkipNow()
thConfigDIR = "thresholds_redis"
case utils.MetaMySQL:
thConfigDIR = "thresholds_mysql"
case utils.MetaPostgres:
t.SkipNow()
thConfigDIR = "thresholds_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -176,11 +176,11 @@ const CGRATES_CFG_JSON = `
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*route_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
@@ -203,7 +203,7 @@ const CGRATES_CFG_JSON = `
"*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
// compatible db types: <*internal|*mysql|*mongo|*postgres>
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"} // Compatible only for Internal, MySQL, Mongo and PostgresSQL databases
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}
},
},

View File

@@ -1037,6 +1037,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.MetaResources,
utils.MetaStatQueueProfiles,
utils.MetaStatQueues,
utils.MetaThresholdProfiles,
utils.MetaThresholds,
}
for _, dbcfg := range cfg.dbCfg.DBConns {
if dbcfg.Type == utils.MetaInternal {

View File

@@ -24,7 +24,9 @@
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},

View File

@@ -0,0 +1,47 @@
{
// CGRateS Configuration file
// will be used in apis/thresholds_it_test.go
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "postgres",
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"actions": {
"enabled": true,
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
"actions_conns": ["*internal"],
},
"admins": {
"enabled": true,
}
}

View File

@@ -0,0 +1,45 @@
{
// CGRateS Configuration file
// will be used in apis/thresholds_it_test.go
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"actions": {
"enabled": true,
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
"actions_conns": ["*internal"],
},
"admins": {
"enabled": true,
}
}

View File

@@ -110,4 +110,26 @@ CREATE TABLE stat_queues (
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`);
CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`);
DROP TABLE IF EXISTS threshold_profiles;
CREATE TABLE threshold_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`threshold_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX threshold_profiles_idx ON threshold_profiles (`id`);
DROP TABLE IF EXISTS thresholds;
CREATE TABLE thresholds (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`threshold` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX thresholds_idx ON thresholds (`id`);

View File

@@ -108,3 +108,25 @@ CREATE TABLE stat_queues (
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX stat_queues_idx ON stat_queues ("id");
DROP TABLE IF EXISTS threshold_profiles;
CREATE TABLE threshold_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
threshold_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX threshold_profiles_idx ON threshold_profiles ("id");
DROP TABLE IF EXISTS thresholds;
CREATE TABLE thresholds (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
threshold JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX thresholds_idx ON thresholds ("id");

View File

@@ -492,3 +492,25 @@ type StatQueueMdl struct {
func (StatQueueMdl) TableName() string {
return utils.TBLStatQueues
}
type ThresholdProfileMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
ThresholdProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (ThresholdProfileMdl) TableName() string {
return utils.TBLThresholdProfiles
}
type ThresholdJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Threshold utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (ThresholdJSONMdl) TableName() string {
return utils.TBLThresholds
}

View File

@@ -119,6 +119,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueueProfiles, tntID)
case utils.StatQueuePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueues, tntID)
case utils.ThresholdProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholdProfiles, tntID)
case utils.ThresholdPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholds, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -856,6 +860,94 @@ func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string)
return
}
func (sqls *SQLStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, id string) (tp *ThresholdProfile, err error) {
var result []*ThresholdProfileMdl
if err = sqls.db.Model(&ThresholdProfileMdl{}).Where(&ThresholdProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return MapStringInterfaceToThresholdProfile(result[0].ThresholdProfile)
}
func (sqls *SQLStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) {
tx := sqls.db.Begin()
mdl := &ThresholdProfileMdl{
Tenant: tp.Tenant,
ID: tp.ID,
ThresholdProfile: tp.AsMapStringInterface(),
}
if err = tx.Model(&ThresholdProfileMdl{}).Where(
ThresholdProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
ThresholdProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&ThresholdProfileMdl{}).Where(&ThresholdProfileMdl{Tenant: tenant, ID: id}).
Delete(&ThresholdProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (tp *Threshold, err error) {
var result []*ThresholdJSONMdl
if err = sqls.db.Model(&ThresholdJSONMdl{}).Where(&ThresholdJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return MapStringInterfaceToThreshold(result[0].Threshold)
}
func (sqls *SQLStorage) SetThresholdDrv(ctx *context.Context, t *Threshold) (err error) {
tx := sqls.db.Begin()
mdl := &ThresholdJSONMdl{
Tenant: t.Tenant,
ID: t.ID,
Threshold: t.AsMapStringInterface(),
}
if err = tx.Model(&ThresholdJSONMdl{}).Where(
ThresholdJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
ThresholdJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&ThresholdJSONMdl{}).Where(&ThresholdJSONMdl{Tenant: tenant, ID: id}).
Delete(&ThresholdJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
@@ -958,36 +1050,6 @@ func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string
return utils.ErrNotImplemented
}
// GetThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) {
return nil, utils.ErrNotImplemented
}
// SetThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) {
return utils.ErrNotImplemented
}
// RemThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (r *Threshold, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetThresholdDrv(ctx *context.Context, r *Threshold) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetFilterDrv(ctx *context.Context, tenant, id string) (r *Filter, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -220,6 +220,40 @@ func (t *Threshold) isLocked() bool {
return t.lkID != utils.EmptyString
}
// AsMapStringInterface converts Threshold struct to map[string]any
func (t *Threshold) AsMapStringInterface() map[string]any {
if t == nil {
return nil
}
return map[string]any{
utils.Tenant: t.Tenant,
utils.ID: t.ID,
utils.Hits: t.Hits,
utils.Snooze: t.Snooze,
}
}
// MapStringInterfaceToThreshold converts map[string]any to Threshold struct
func MapStringInterfaceToThreshold(m map[string]any) (*Threshold, error) {
th := &Threshold{}
if v, ok := m[utils.Tenant].(string); ok {
th.Tenant = v
}
if v, ok := m[utils.ID].(string); ok {
th.ID = v
}
if v, ok := m[utils.Hits].(float64); ok {
th.Hits = int(v)
}
if v, ok := m[utils.Snooze].(string); ok {
if t, err := time.Parse(time.RFC3339, v); err == nil {
th.Snooze = t
}
}
return th, nil
}
// unlockThresholds unlocks all locked Thresholds in the given slice.
func unlockThresholds(ts []*Threshold) {
for _, t := range ts {
@@ -841,3 +875,61 @@ func (tp *ThresholdProfile) FieldAsInterface(fldPath []string) (_ any, err error
return tp.EeIDs, nil
}
}
// AsMapStringInterface converts ThresholdProfile struct to map[string]any
func (tp *ThresholdProfile) AsMapStringInterface() map[string]any {
if tp == nil {
return nil
}
return map[string]any{
utils.Tenant: tp.Tenant,
utils.ID: tp.ID,
utils.FilterIDs: tp.FilterIDs,
utils.MaxHits: tp.MaxHits,
utils.MinHits: tp.MinHits,
utils.MinSleep: tp.MinSleep,
utils.Blocker: tp.Blocker,
utils.Weights: tp.Weights,
utils.ActionProfileIDs: tp.ActionProfileIDs,
utils.Async: tp.Async,
utils.EeIDs: tp.EeIDs,
}
}
// MapStringInterfaceToThresholdProfile converts map[string]any to ThresholdProfile struct
func MapStringInterfaceToThresholdProfile(m map[string]any) (*ThresholdProfile, error) {
tp := &ThresholdProfile{}
if v, ok := m[utils.Tenant].(string); ok {
tp.Tenant = v
}
if v, ok := m[utils.ID].(string); ok {
tp.ID = v
}
tp.FilterIDs = utils.InterfaceToStringSlice(m[utils.FilterIDs])
if v, ok := m[utils.MaxHits].(float64); ok {
tp.MaxHits = int(v)
}
if v, ok := m[utils.MinHits].(float64); ok {
tp.MinHits = int(v)
}
if v, ok := m[utils.MinSleep].(string); ok {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
tp.MinSleep = dur
}
} else if v, ok := m[utils.MinSleep].(float64); ok { // for -1 cases
tp.MinSleep = time.Duration(v)
}
if v, ok := m[utils.Blocker].(bool); ok {
tp.Blocker = v
}
tp.Weights = utils.InterfaceToDynamicWeights(m[utils.Weights])
tp.ActionProfileIDs = utils.InterfaceToStringSlice(m[utils.ActionProfileIDs])
if v, ok := m[utils.Async].(bool); ok {
tp.Async = v
}
tp.EeIDs = utils.InterfaceToStringSlice(m[utils.EeIDs])
return tp, nil
}

View File

@@ -58,6 +58,14 @@ func TestDynThdIT(t *testing.T) {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholdProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholds: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
}}
case utils.MetaMongo:
@@ -79,6 +87,14 @@ func TestDynThdIT(t *testing.T) {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholdProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholds: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
}}
default:

View File

@@ -1922,6 +1922,8 @@ const (
TBLResources = "resources"
TBLStatQueueProfiles = "stat_queue_profiles"
TBLStatQueues = "stat_queues"
TBLThresholdProfiles = "threshold_profiles"
TBLThresholds = "thresholds"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"