Make resources storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-06 15:20:02 +02:00
committed by Dan Christian Bogos
parent d760ab319e
commit bb5d589dce
14 changed files with 421 additions and 47 deletions

View File

@@ -172,11 +172,11 @@ const CGRATES_CFG_JSON = `
"*versions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*charger_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},

View File

@@ -1033,6 +1033,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.MetaActionProfiles,
utils.MetaChargerProfiles,
utils.MetaAttributeProfiles,
utils.MetaResourceProfiles,
utils.MetaResources,
}
for _, dbcfg := range cfg.dbCfg.DBConns {
if dbcfg.Type == utils.MetaInternal {

View File

@@ -9,9 +9,9 @@
"*default": {
"db_type": "*internal",
"opts":{
"internalDBRewriteInterval": "0s",
"internalDBDumpInterval": "0s"
}
"internalDBDumpPath": "/tmp/internal_db/db",
"internalDBDumpInterval": "-1"
}
}
}
},

View File

@@ -23,7 +23,9 @@
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},

View File

@@ -0,0 +1,50 @@
{
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "postgres",
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"actions": {
"enabled": true
},
"thresholds": {
"enabled": true,
"actions_conns": ["*internal"]
},
"resources": {
"enabled": true,
"thresholds_conns": ["*internal"]
},
"admins": {
"enabled": true
}
}

View File

@@ -0,0 +1,48 @@
{
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"actions": {
"enabled": true
},
"thresholds": {
"enabled": true,
"actions_conns": ["*internal"]
},
"resources": {
"enabled": true,
"thresholds_conns": ["*internal"]
},
"admins": {
"enabled": true
}
}

View File

@@ -66,4 +66,26 @@ CREATE TABLE attribute_profiles (
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX attribute_profiles_idx ON attribute_profiles (`id`);
CREATE UNIQUE INDEX attribute_profiles_idx ON attribute_profiles (`id`);
DROP TABLE IF EXISTS resource_profiles;
CREATE TABLE resource_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`resource_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX resource_profiles_idx ON resource_profiles (`id`);
DROP TABLE IF EXISTS resources;
CREATE TABLE resources (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`resource` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX resources_idx ON resources (`id`);

View File

@@ -65,3 +65,25 @@ CREATE TABLE attribute_profiles (
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX attribute_profiles_idx ON attribute_profiles ("id");
DROP TABLE IF EXISTS resource_profiles;
CREATE TABLE resource_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
resource_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX resource_profiles_idx ON resource_profiles ("id");
DROP TABLE IF EXISTS resources;
CREATE TABLE resources (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
resource JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX resources_idx ON resources ("id");

View File

@@ -448,3 +448,25 @@ type AttributeProfileMdl struct {
func (AttributeProfileMdl) TableName() string {
return utils.TBLAttributeProfiles
}
type ResourceProfileMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
ResourceProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (ResourceProfileMdl) TableName() string {
return utils.TBLResourceProfiles
}
type ResourceJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Resource utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (ResourceJSONMdl) TableName() string {
return utils.TBLResources
}

View File

@@ -110,6 +110,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLChargerProfiles, tntID)
case utils.AttributeProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLAttributeProfiles, tntID)
case utils.ResourceProfilesPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResourceProfiles, tntID)
case utils.ResourcesPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResources, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -627,7 +631,6 @@ func (sqls *SQLStorage) GetAttributeProfileDrv(ctx *context.Context, tenant, id
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToAttributeProfile(result[0].AttributeProfile)
}
@@ -663,6 +666,94 @@ func (sqls *SQLStorage) RemoveAttributeProfileDrv(ctx *context.Context, tenant,
return
}
func (sqls *SQLStorage) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rsp *utils.ResourceProfile, err error) {
var result []*ResourceProfileMdl
if err = sqls.db.Model(&ResourceProfileMdl{}).Where(&ResourceProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToResourceProfile(result[0].ResourceProfile)
}
func (sqls *SQLStorage) SetResourceProfileDrv(ctx *context.Context, rsp *utils.ResourceProfile) (err error) {
tx := sqls.db.Begin()
mdl := &ResourceProfileMdl{
Tenant: rsp.Tenant,
ID: rsp.ID,
ResourceProfile: rsp.AsMapStringInterface(),
}
if err = tx.Model(&ResourceProfileMdl{}).Where(
ResourceProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
ResourceProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&ResourceProfileMdl{}).Where(&ResourceProfileMdl{Tenant: tenant, ID: id}).
Delete(&ResourceProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetResourceDrv(ctx *context.Context, tenant, id string) (r *utils.Resource, err error) {
var result []*ResourceJSONMdl
if err = sqls.db.Model(&ResourceJSONMdl{}).Where(&ResourceJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToResource(result[0].Resource), nil
}
func (sqls *SQLStorage) SetResourceDrv(ctx *context.Context, r *utils.Resource) (err error) {
tx := sqls.db.Begin()
mdl := &ResourceJSONMdl{
Tenant: r.Tenant,
ID: r.ID,
Resource: r.AsMapStringInterface(),
}
if err = tx.Model(&ResourceJSONMdl{}).Where(
ResourceJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
ResourceJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&ResourceJSONMdl{}).Where(&ResourceJSONMdl{Tenant: tenant, ID: id}).
Delete(&ResourceJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
@@ -705,36 +796,6 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rsp *utils.ResourceProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetResourceProfileDrv(ctx *context.Context, rsp *utils.ResourceProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetResourceDrv(ctx *context.Context, tenant, id string) (r *utils.Resource, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetResourceDrv(ctx *context.Context, r *utils.Resource) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -21,6 +21,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package general_tests
import (
"os"
"path"
"testing"
@@ -55,15 +56,23 @@ var (
func TestRsV1IT(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
rlsV1ConfDIR = "tutinternal"
rlsV1ConfDIR = "resources_internal"
if err := os.MkdirAll("/tmp/internal_db/db", 0755); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := os.RemoveAll("/tmp/internal_db"); err != nil {
t.Error(err)
}
})
case utils.MetaRedis:
t.SkipNow()
case utils.MetaMySQL:
rlsV1ConfDIR = "tutmysql"
case utils.MetaMySQL:
rlsV1ConfDIR = "resources_mysql"
case utils.MetaMongo:
rlsV1ConfDIR = "tutmongo"
case utils.MetaPostgres:
t.SkipNow()
rlsV1ConfDIR = "resources_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -25,6 +25,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"os"
"path"
"reflect"
"sort"
@@ -92,16 +93,23 @@ var (
func TestResourceSIT(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
// rsConfigDIR = "resources_internal"
t.SkipNow()
rsConfigDIR = "resources_internal"
if err := os.MkdirAll("/tmp/internal_db/db", 0755); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := os.RemoveAll("/tmp/internal_db"); err != nil {
t.Error(err)
}
})
case utils.MetaMongo:
rsConfigDIR = "resources_mongo"
case utils.MetaRedis:
t.SkipNow()
rsConfigDIR = "resources_redis"
case utils.MetaMySQL:
rsConfigDIR = "resources_mysql"
case utils.MetaPostgres:
t.SkipNow()
rsConfigDIR = "resources_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -527,6 +527,8 @@ const (
Weight = "Weight"
Limit = "Limit"
UsageTTL = "UsageTTL"
Usages = "Usages"
TTLIdx = "TTLIdx"
AllocationMessage = "AllocationMessage"
AddressPool = "AddressPool"
Pools = "Pools"
@@ -1915,6 +1917,8 @@ const (
TBLActionProfiles = "action_profiles"
TBLChargerProfiles = "charger_profiles"
TBLAttributeProfiles = "attribute_profiles"
TBLResourceProfiles = "resource_profiles"
TBLResources = "resources"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"

View File

@@ -112,7 +112,7 @@ type Resource struct {
Tenant string
ID string
Usages map[string]*ResourceUsage
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disableda
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled
}
// Clone clones *Resource (lkID excluded)
@@ -163,6 +163,75 @@ func (r *Resource) TotalUsage() float64 {
return tu
}
// AsMapStringInterface converts Resource struct to map[string]any
func (rp *Resource) AsMapStringInterface() map[string]any {
if rp == nil {
return nil
}
return map[string]any{
Tenant: rp.Tenant,
ID: rp.ID,
Usages: rp.Usages,
TTLIdx: rp.TTLIdx,
}
}
// MapStringInterfaceToResource converts map[string]any to Resource struct
func MapStringInterfaceToResource(m map[string]any) *Resource {
rp := &Resource{}
if v, ok := m[Tenant].(string); ok {
rp.Tenant = v
}
if v, ok := m[ID].(string); ok {
rp.ID = v
}
rp.Usages = InterfaceToMapStringResourceUsage(m[Usages])
rp.TTLIdx = InterfaceToStringSlice(m[TTLIdx])
return rp
}
// InterfaceToMapStringResourceUsage converts any to map[string]*ResourceUsage
func InterfaceToMapStringResourceUsage(v any) map[string]*ResourceUsage {
if v == nil {
return nil
}
switch val := v.(type) {
case map[string]*ResourceUsage:
return val
case map[string]any:
result := make(map[string]*ResourceUsage)
for k, v := range val {
if balMap, ok := v.(map[string]any); ok {
result[k] = MapStringInterfaceToResourceUsage(balMap)
} else if bal, ok := v.(*ResourceUsage); ok {
result[k] = bal
}
}
return result
}
return nil
}
// MapStringInterfaceToResourceUsage converts map[string]any to *ResourceUsage
func MapStringInterfaceToResourceUsage(m map[string]any) *ResourceUsage {
resUsage := &ResourceUsage{}
if v, ok := m[Tenant].(string); ok {
resUsage.Tenant = v
}
if v, ok := m[ID].(string); ok {
resUsage.ID = v
}
if v, ok := m[ExpiryTime].(string); ok {
if t, err := time.Parse(time.RFC3339, v); err == nil {
resUsage.ExpiryTime = t
}
}
if v, ok := m[Units].(float64); ok {
resUsage.Units = v
}
return resUsage
}
// Available returns the available number of units
// Exported method to be used by filterS
func (r *ResourceWithConfig) Available() float64 {
@@ -301,3 +370,58 @@ func ResourceProfileLockKey(tnt, id string) string {
func ResourceLockKey(tnt, id string) string {
return ConcatenatedKey(CacheResources, tnt, id)
}
// AsMapStringInterface converts ResourceProfile struct to map[string]any
func (rp *ResourceProfile) AsMapStringInterface() map[string]any {
if rp == nil {
return nil
}
return map[string]any{
Tenant: rp.Tenant,
ID: rp.ID,
FilterIDs: rp.FilterIDs,
UsageTTL: rp.FilterIDs,
Limit: rp.Limit,
AllocationMessage: rp.AllocationMessage,
Blocker: rp.Blocker,
Stored: rp.Stored,
Weights: rp.Weights,
ThresholdIDs: rp.ThresholdIDs,
}
}
// MapStringInterfaceToResourceProfile converts map[string]any to ResourceProfile struct
func MapStringInterfaceToResourceProfile(m map[string]any) (rp *ResourceProfile, err error) {
rp = &ResourceProfile{}
if v, ok := m[Tenant].(string); ok {
rp.Tenant = v
}
if v, ok := m[ID].(string); ok {
rp.ID = v
}
rp.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
if v, ok := m[UsageTTL].(string); ok {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
rp.UsageTTL = dur
}
} else if v, ok := m[UsageTTL].(float64); ok { // for -1 cases
rp.UsageTTL = time.Duration(v)
}
if v, ok := m[Limit].(float64); ok {
rp.Limit = v
}
if v, ok := m[AllocationMessage].(string); ok {
rp.AllocationMessage = v
}
if v, ok := m[Blocker].(bool); ok {
rp.Blocker = v
}
if v, ok := m[Stored].(bool); ok {
rp.Stored = v
}
rp.Weights = InterfaceToDynamicWeights(m[Weights])
rp.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs])
return
}