Make Action Profiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-04 10:36:03 +02:00
committed by Dan Christian Bogos
parent 89653a9c80
commit d78f34bdc5
18 changed files with 346 additions and 53 deletions

View File

@@ -104,7 +104,7 @@ func TestActionsIT(t *testing.T) {
case utils.MetaMongo:
actConfigDIR = "apis_actions_mongo"
case utils.MetaRedis:
t.SkipNow()
actConfigDIR = "apis_actions_redis"
case utils.MetaMySQL:
actConfigDIR = "apis_actions_mysql"
case utils.MetaPostgres:

View File

@@ -62,7 +62,7 @@ var (
func TestAnalyzerSIT(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal, utils.MetaMySQL, utils.MetaPostgres:
case utils.MetaInternal, utils.MetaRedis, utils.MetaMySQL, utils.MetaPostgres:
t.SkipNow()
case utils.MetaMongo:
anzCfgPath = path.Join(*utils.DataDir, "conf", "samples", "analyzers")

View File

@@ -129,7 +129,8 @@ const CGRATES_CFG_JSON = `
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*ip_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*ip_allocations": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
@@ -143,7 +144,6 @@ const CGRATES_CFG_JSON = `
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*versions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},

View File

@@ -1061,7 +1061,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.Internal, utils.Redis, utils.Mongo}
if item != utils.MetaAccounts && item != utils.MetaIPProfiles &&
item != utils.MetaIPAllocations {
item != utils.MetaIPAllocations && item != utils.MetaActionProfiles {
if item == utils.MetaCDRs {
if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) {
return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item,

View File

@@ -21,7 +21,8 @@
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
}
},

View File

@@ -23,7 +23,8 @@
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*action_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
}
},

View File

@@ -0,0 +1,55 @@
{
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_port": 6379,
"db_name": "10"
},
"StorDB": { // The id of the DB connection
"db_type": "mysql", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // db port to reach the database
"db_name": "cgrates", // db database name to connect to
"db_user": "cgrates", // username to use when connecting to the database
"db_password": "CGRateS.org" // password to use when connecting to the database
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"actions": {
"enabled": true,
"thresholds_conns": ["*internal"],
"stats_conns": ["*internal"],
"accounts_conns": ["*internal"]
},
"accounts": {
"enabled": true
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
"actions_conns": ["*internal"]
},
"admins": {
"enabled": true
}
}

View File

@@ -36,7 +36,7 @@
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
}
},

View File

@@ -36,7 +36,7 @@
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
}
},

View File

@@ -33,4 +33,15 @@ CREATE TABLE ip_allocations (
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX ip_allocations_idx ON ip_allocations (`id`);
CREATE UNIQUE INDEX ip_allocations_idx ON ip_allocations (`id`);
DROP TABLE IF EXISTS action_profiles;
CREATE TABLE action_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`action_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX action_profiles_idx ON action_profiles (`id`);

View File

@@ -32,3 +32,14 @@ CREATE TABLE ip_allocations (
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX ip_allocations_idx ON ip_allocations ("id");
DROP TABLE IF EXISTS action_profiles;
CREATE TABLE action_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
action_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX action_profiles_idx ON action_profiles ("id");

View File

@@ -415,3 +415,14 @@ type IPAllocationMdl struct {
func (IPAllocationMdl) TableName() string {
return utils.TBLIPAllocations
}
type ActionProfileJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
ActionProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (ActionProfileJSONMdl) TableName() string {
return utils.TBLActionProfilesJSON
}

View File

@@ -104,6 +104,8 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLIPProfiles, tntID)
case utils.IPAllocationsPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLIPAllocations, tntID)
case utils.ActionProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLActionProfilesJSON, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -488,7 +490,7 @@ func (sqls *SQLStorage) GetIPAllocationsDrv(ctx *context.Context, tenant, id str
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToIPAllocations(result[0].IPAllocation)
return utils.MapStringInterfaceToIPAllocations(result[0].IPAllocation), nil
}
func (sqls *SQLStorage) SetIPAllocationsDrv(ctx *context.Context, ip *utils.IPAllocations) error {
@@ -523,6 +525,51 @@ func (sqls *SQLStorage) RemoveIPAllocationsDrv(ctx *context.Context, tenant, id
return nil
}
func (sqls *SQLStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *utils.ActionProfile, err error) {
var result []*ActionProfileJSONMdl
if err := sqls.db.Model(&ActionProfileJSONMdl{}).Where(&ActionProfileJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
ap, err = utils.MapStringInterfaceToActionProfile(result[0].ActionProfile)
return
}
func (sqls *SQLStorage) SetActionProfileDrv(ctx *context.Context, ap *utils.ActionProfile) (err error) {
tx := sqls.db.Begin()
mdl := &ActionProfileJSONMdl{
Tenant: ap.Tenant,
ID: ap.ID,
ActionProfile: ap.AsMapStringInterface(),
}
if err := tx.Model(&ActionProfileJSONMdl{}).Where(
ActionProfileJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
ActionProfileJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
if err := tx.Save(mdl).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (sqls *SQLStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err := tx.Model(&ActionProfileJSONMdl{}).Where(&ActionProfileJSONMdl{Tenant: tenant, ID: id}).
Delete(&ActionProfileJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
@@ -815,20 +862,6 @@ func (sqls *SQLStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id st
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *utils.ActionProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetActionProfileDrv(ctx *context.Context, ap *utils.ActionProfile) (err error) {
return utils.ErrNotImplemented
}
func (sqls *SQLStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetIndexesDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -40,13 +40,47 @@ func TestDynThdIT(t *testing.T) {
case utils.MetaInternal:
dbCfg = engine.InternalDBCfg
case utils.MetaRedis:
t.SkipNow()
case utils.MetaMySQL:
dbCfg = engine.MySQLDBCfg
case utils.MetaMySQL:
dbCfg = engine.DBCfg{DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaActionProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
}}
case utils.MetaMongo:
dbCfg = engine.MongoDBCfg
case utils.MetaPostgres:
dbCfg = engine.PostgresDBCfg
dbCfg = engine.DBCfg{DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.StorDB: {
Type: utils.StringPointer(utils.MetaPostgres),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(5432),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaActionProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
}}
default:
t.Fatal("Unknown Database type")
}

View File

@@ -291,6 +291,120 @@ func (ap *ActionProfile) FieldAsInterface(fldPath []string) (_ any, err error) {
}
}
// AsMapStringInterface converts ActionProfile struct to map[string]any
func (ap *ActionProfile) AsMapStringInterface() map[string]any {
if ap == nil {
return nil
}
return map[string]any{
Tenant: ap.Tenant,
ID: ap.ID,
FilterIDs: ap.FilterIDs,
Weights: ap.Weights,
Blockers: ap.Blockers,
Schedule: ap.Schedule,
Targets: ap.Targets,
Actions: ap.Actions,
}
}
// MapStringInterfaceToActionProfile converts map[string]any to ActionProfile struct
func MapStringInterfaceToActionProfile(m map[string]any) (ap *ActionProfile, err error) {
ap = &ActionProfile{}
if v, ok := m[Tenant].(string); ok {
ap.Tenant = v
}
if v, ok := m[ID].(string); ok {
ap.ID = v
}
ap.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
ap.Weights = InterfaceToDynamicWeights(m[Weights])
ap.Blockers = InterfaceToDynamicBlockers(m[Blockers])
if v, ok := m[Schedule].(string); ok {
ap.Schedule = v
}
ap.Targets = InterfaceToMapStringStringSet(m[Targets])
if ap.Actions, err = InterfaceToAPActions(m[Actions]); err != nil {
return nil, err
}
return ap, nil
}
// InterfaceToAPActions converts any to []*APAction
func InterfaceToAPActions(m any) ([]*APAction, error) {
v, ok := m.([]any)
if !ok {
return nil, nil
}
actions := make([]*APAction, 0, len(v))
for _, actionAny := range v {
if actionMap, ok := actionAny.(map[string]any); ok {
action, err := MapToAPAction(actionMap)
if err != nil {
return nil, err
}
actions = append(actions, action)
}
}
return actions, nil
}
// MapToAPAction converts map[string]any to APAction struct
func MapToAPAction(m map[string]any) (*APAction, error) {
action := &APAction{}
if v, ok := m[ID].(string); ok {
action.ID = v
}
action.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
if v, ok := m[TTL].(string); ok {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
action.TTL = dur
}
} else if v, ok := m[TTL].(float64); ok { // for -1 cases
action.TTL = time.Duration(v)
}
if v, ok := m[Type].(string); ok {
action.Type = v
}
if v, ok := m[Opts].(map[string]any); ok {
action.Opts = v
}
action.Weights = InterfaceToDynamicWeights(m[Weights])
action.Blockers = InterfaceToDynamicBlockers(m[Blockers])
if v, ok := m[Diktats].([]any); ok {
action.Diktats = make([]*APDiktat, 0, len(v))
for _, diktatAny := range v {
if diktatMap, ok := diktatAny.(map[string]any); ok {
diktat, err := MapToAPDiktat(diktatMap)
if err != nil {
return nil, err
}
action.Diktats = append(action.Diktats, diktat)
}
}
}
return action, nil
}
// MapToAPDiktat converts map[string]any to APDiktat struct
func MapToAPDiktat(m map[string]any) (*APDiktat, error) {
diktat := &APDiktat{}
if v, ok := m[ID].(string); ok {
diktat.ID = v
}
diktat.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
if v, ok := m[Opts].(map[string]any); ok {
diktat.Opts = v
}
diktat.Weights = InterfaceToDynamicWeights(m[Weights])
diktat.Blockers = InterfaceToDynamicBlockers(m[Blockers])
return diktat, nil
}
// APAction defines action related information used within an ActionProfile.
type APAction struct {
ID string // Action ID

View File

@@ -1897,27 +1897,28 @@ const (
// Table Name
const (
TBLTPResources = "tp_resources"
TBLTPStats = "tp_stats"
TBLTPRankings = "tp_rankings"
TBLTPTrends = "tp_trends"
TBLTPThresholds = "tp_thresholds"
TBLTPFilters = "tp_filters"
SessionCostsTBL = "session_costs"
CDRsTBL = "cdrs"
TBLTPRoutes = "tp_routes"
TBLTPAttributes = "tp_attributes"
TBLTPChargers = "tp_chargers"
TBLVersions = "versions"
TBLAccounts = "accounts"
TBLIPProfiles = "ip_profiles"
TBLIPAllocations = "ip_allocations"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"
TBLTPRateProfiles = "tp_rate_profiles"
TBLTPActionProfiles = "tp_action_profiles"
TBLTPAccounts = "tp_accounts"
TBLTPResources = "tp_resources"
TBLTPStats = "tp_stats"
TBLTPRankings = "tp_rankings"
TBLTPTrends = "tp_trends"
TBLTPThresholds = "tp_thresholds"
TBLTPFilters = "tp_filters"
SessionCostsTBL = "session_costs"
CDRsTBL = "cdrs"
TBLTPRoutes = "tp_routes"
TBLTPAttributes = "tp_attributes"
TBLTPChargers = "tp_chargers"
TBLVersions = "versions"
TBLAccounts = "accounts"
TBLIPProfiles = "ip_profiles"
TBLIPAllocations = "ip_allocations"
TBLActionProfilesJSON = "action_profiles"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"
TBLTPRateProfiles = "tp_rate_profiles"
TBLTPActionProfiles = "tp_action_profiles"
TBLTPAccounts = "tp_accounts"
)
// Cache Name

View File

@@ -658,7 +658,9 @@ func MapStringInterfaceToIPProfile(m map[string]any) (*IPProfile, error) {
ipp.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
ipp.Weights = InterfaceToDynamicWeights(m[Weights])
if v, ok := m[TTL].(string); ok {
if dur, err := time.ParseDuration(v); err == nil {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
ipp.TTL = dur
}
} else if v, ok := m[TTL].(float64); ok { // for -1 cases
@@ -752,7 +754,7 @@ func IPAllocationsLockKey(tnt, id string) string {
return ConcatenatedKey(CacheIPAllocations, tnt, id)
}
// AsMapStringInterface converts IPProfile struct to map[string]any
// AsMapStringInterface converts IPAllocations struct to map[string]any
func (p *IPAllocations) AsMapStringInterface() map[string]any {
if p == nil {
return nil
@@ -766,7 +768,7 @@ func (p *IPAllocations) AsMapStringInterface() map[string]any {
}
// MapStringInterfaceToIPAllocations converts map[string]any to IPAllocations struct
func MapStringInterfaceToIPAllocations(m map[string]any) (*IPAllocations, error) {
func MapStringInterfaceToIPAllocations(m map[string]any) *IPAllocations {
ipa := &IPAllocations{}
if v, ok := m[Tenant].(string); ok {
@@ -777,7 +779,7 @@ func MapStringInterfaceToIPAllocations(m map[string]any) (*IPAllocations, error)
}
ipa.Allocations = InterfaceToAllocations(m[Allocations])
ipa.TTLIndex = InterfaceToStringSlice(m[TTLIndex])
return ipa, nil
return ipa
}
// InterfaceToAllocations converts any to map[string]*PoolAllocation

View File

@@ -161,3 +161,22 @@ func (s StringSet) FieldAsString(fldPath []string) (_ string, err error) {
}
return "{}", nil // noting in it as is a empty structure
}
// InterfaceToMapStringStringSet converts map[string]any to map[string]StringSet
func InterfaceToMapStringStringSet(m any) map[string]StringSet {
v, ok := m.(map[string]any)
if !ok {
return nil
}
targets := make(map[string]StringSet)
for key, val := range v {
if valMap, ok := val.(map[string]any); ok {
stringSet := make(StringSet)
for k := range valMap {
stringSet[k] = struct{}{}
}
targets[key] = stringSet
}
}
return targets
}