make accounts storable in postgres

This commit is contained in:
arberkatellari
2025-10-27 18:01:31 +02:00
committed by Dan Christian Bogos
parent 8af781fb75
commit 7273384828
15 changed files with 434 additions and 43 deletions

View File

@@ -1507,8 +1507,11 @@ func testAccRefundCharges(t *testing.T) {
}, &result); err != nil {
t.Error(err)
} else {
if *utils.DBType == utils.MetaPostgres {
acc.Account.Balances["CB"].Units = utils.NewDecimalFromFloat64(50)
}
if !reflect.DeepEqual(result, acc.Account) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(acc.Account), utils.ToJSON(result))
t.Errorf("Expected %+v \n, received %+v", acc.Account, result)
}
}
}

View File

@@ -1060,15 +1060,16 @@ func (cfg *CGRConfig) checkConfigSanity() error {
dataDBTypes := []string{utils.MetaInternal, utils.MetaRedis, utils.MetaMongo,
utils.Internal, utils.Redis, utils.Mongo}
// if *cdrs item db type is not supported, return error
if item == utils.MetaCDRs {
if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) {
return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item,
storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type)
}
} else {
if !slices.Contains(dataDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) {
return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, dataDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type)
if item != utils.MetaAccounts {
if item == utils.MetaCDRs {
if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) {
return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item,
storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type)
}
} else {
if !slices.Contains(dataDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) {
return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, dataDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type)
}
}
}
found1RmtConns := false

View File

@@ -5,6 +5,10 @@
// Starts rater, scheduler
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
@@ -31,7 +35,8 @@
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},

View File

@@ -0,0 +1,5 @@
--
-- Table structure for table `accounts`
--
-- placeholder file

View File

@@ -0,0 +1,13 @@
--
-- Table structure for table `accounts`
--
DROP TABLE IF EXISTS accounts;
CREATE TABLE accounts (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id varchar(64) NOT NULL,
account jsonb NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX accounts_unique ON accounts ("id");

View File

@@ -17,12 +17,14 @@ DIR="$(dirname "$(readlink -f "$0")")"
export PGPASSWORD="CGRateS.org"
psql -U $user -h $host -d cgrates -f "$DIR"/create_accounts_tables.sql
acct=$?
psql -U $user -h $host -d cgrates -f "$DIR"/create_cdrs_tables.sql
cdrt=$?
psql -U $user -h $host -d cgrates -f "$DIR"/create_tariffplan_tables.sql
tpt=$?
if [ $cdrt = 0 ] && [ $tpt = 0 ]; then
if [ $cdrt = 0 ] && [ $tpt = 0 ] && [ $acct = 0 ]; then
echo "\n\t+++ CGR-DB successfully set-up! +++\n"
exit 0
fi

View File

@@ -398,11 +398,16 @@ func (ng TestEngine) Run(t testing.TB, extraFlags ...string) (*birpc.Client, *co
if ng.TpPath == "" {
ng.TpPath = cfg.LoaderCfg()[0].TpInDir
}
if cfg.LoaderCfg().Enabled() {
// cfg gets edited in files but not in variable, get the cfg variable from files
newCfg, err := config.NewCGRConfigFromPath(context.Background(), cfg.ConfigPath)
if err != nil {
t.Fatal(err)
}
if newCfg.LoaderCfg().Enabled() {
WaitForServiceStart(t, client, utils.LoaderS, 200*time.Millisecond)
}
loadCSVs(t, ng.TpPath, ng.TpFiles)
return client, cfg
return client, newCfg
}
// DBConn contains database connection parameters.

View File

@@ -382,3 +382,14 @@ type AccountMdl struct {
func (AccountMdl) TableName() string {
return utils.TBLTPAccounts
}
type AccountJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Account utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (AccountJSONMdl) TableName() string {
return utils.TBLAccounts
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
"gorm.io/gorm"
@@ -91,6 +92,21 @@ func (msqlS *MySQLStorage) SetVersions(vrs Versions, overwrite bool) (err error)
return
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) GetAccountDrv(ctx *context.Context, tenant, id string) (ap *utils.Account, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) SetAccountDrv(ctx *context.Context, ap *utils.Account) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) RemoveAccountDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
func (msqlS *MySQLStorage) extraFieldsExistsQry(field string) string {
return fmt.Sprintf(" extra_fields LIKE '%%\"%s\":%%'", field)
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@@ -33,8 +34,8 @@ type PostgresStorage struct {
}
// NewPostgresStorage returns the posgres DB
func NewPostgresStorage(host, port, name, user, password,
sslmode, sslcert, sslkey, sslpassword, sslcertmode, sslrootcert string,
func NewPostgresStorage(host, port, name, user, password, sslmode, sslcert,
sslkey, sslpassword, sslcertmode, sslrootcert string,
maxConn, maxIdleConn, sqlLogLevel int, connMaxLifetime time.Duration) (*SQLStorage, error) {
connStr := fmt.Sprintf(
"host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
@@ -101,6 +102,50 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error
return
}
func (poS *PostgresStorage) GetAccountDrv(_ *context.Context, tenant, id string) (ap *utils.Account, err error) {
var result []*AccountJSONMdl
if err = poS.db.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToAccount(result[0].Account)
}
func (poS *PostgresStorage) SetAccountDrv(_ *context.Context, ap *utils.Account) (err error) {
tx := poS.db.Begin()
mdl := &AccountJSONMdl{
Tenant: ap.Tenant,
ID: ap.ID,
Account: ap.AsMapStringInterface(),
}
if err = tx.Model(&AccountJSONMdl{}).Where(
AccountJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
AccountJSONMdl{Account: mdl.Account}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (poS *PostgresStorage) RemoveAccountDrv(_ *context.Context, tenant, id string) (err error) {
tx := poS.db.Begin()
if err = tx.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant, ID: id}).
Delete(&AccountJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (poS *PostgresStorage) extraFieldsExistsQry(field string) string {
return fmt.Sprintf(" extra_fields ?'%s'", field)
}

View File

@@ -58,7 +58,8 @@ func (sqls *SQLStorage) ExportGormDB() *gorm.DB {
}
func (sqls *SQLStorage) Flush(scriptsPath string) (err error) {
for _, scriptName := range []string{utils.CreateCDRsTablesSQL, utils.CreateTariffPlanTablesSQL} {
for _, scriptName := range []string{utils.CreateAccountsTablesSQL,
utils.CreateCDRsTablesSQL, utils.CreateTariffPlanTablesSQL} {
if err := sqls.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil {
return err
}
@@ -73,8 +74,44 @@ func (sqls *SQLStorage) SelectDatabase(dbName string) (err error) {
return
}
func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) ([]string, error) {
return nil, utils.ErrNotImplemented
// returns all keys in table matching the prefix
func (sqls *SQLStorage) getAllIndexKeys(_ *context.Context, table, prefix string) (ids []string, err error) {
err = sqls.db.Table(table).Select("id").Where("id LIKE ?", prefix+"%").
Pluck("id", &ids).Error
return
}
// returns all keys in table matching the Tenant and ID
func (sqls *SQLStorage) getAllKeysMatchingTenantID(_ *context.Context, table string, tntID *utils.TenantID) (ids []string, err error) {
matchingTntID := []utils.TenantID{}
err = sqls.db.Table(table).Select("tenant, id").Where("tenant = ? AND id LIKE ?", tntID.Tenant, tntID.ID+"%").
Find(&matchingTntID).Error
ids = make([]string, len(matchingTntID))
for i, result := range matchingTntID {
ids[i] = utils.ConcatenatedKey(result.Tenant, result.ID)
}
return
}
// GetKeysForPrefix will look for keys matching the prefix given
func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (keys []string, err error) {
keyLen := len(utils.AccountPrefix)
if len(prefix) < keyLen {
return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
category := prefix[:keyLen]
tntID := utils.NewTenantID(prefix[keyLen:])
switch category {
case utils.AccountPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLAccounts, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
for i := range keys { // bring the prefix back to match redis style keys to satisfy functions using it
keys[i] = category + keys[i]
}
return keys, err
}
func (sqls *SQLStorage) CreateTablesFromScript(scriptPath string) error {
@@ -702,21 +739,6 @@ func (sqls *SQLStorage) RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCt
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetAccountDrv(ctx *context.Context, tenant, id string) (ap *utils.Account, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetAccountDrv(ctx *context.Context, ap *utils.Account) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveAccountDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetConfigSectionsDrv(ctx *context.Context, nodeID string, sectionIDs []string) (sectionMap map[string][]byte, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -45,21 +45,24 @@ func NewDataDBConn(dbType, host, port, name, user,
host += ":" + port
}
d, err = NewRedisStorage(host, dbNo, user, pass, marshaler,
opts.RedisMaxConns, opts.RedisConnectAttempts,
opts.RedisSentinel,
opts.RedisMaxConns, opts.RedisConnectAttempts, opts.RedisSentinel,
opts.RedisCluster, opts.RedisClusterSync, opts.RedisClusterOndownDelay,
opts.RedisConnectTimeout, opts.RedisReadTimeout, opts.RedisWriteTimeout,
opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit,
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey, opts.RedisCACertificate)
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey,
opts.RedisCACertificate)
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, stringIndexedFields, opts.MongoQueryTimeout)
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass,
marshaler, stringIndexedFields, opts.MongoQueryTimeout)
case utils.MetaPostgres:
d, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode,
opts.PgSSLCert, opts.PgSSLKey, opts.PgSSLPassword, opts.PgSSLCertMode, opts.PgSSLRootCert,
opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime)
opts.PgSSLCert, opts.PgSSLKey, opts.PgSSLPassword, opts.PgSSLCertMode,
opts.PgSSLRootCert, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
opts.SQLLogLevel, opts.SQLConnMaxLifetime)
case utils.MetaMySQL:
d, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
opts.SQLLogLevel, opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.SQLDSNParams)
d, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns,
opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime,
opts.MySQLLocation, opts.SQLDSNParams)
case utils.MetaInternal:
d, err = NewInternalDB(stringIndexedFields, prefixIndexedFields, opts.ToTransCacheOpts(), itmsCfg)
default:

View File

@@ -371,6 +371,265 @@ func (acc *Account) CacheClone() any {
return acc.Clone()
}
func (acc *Account) AsMapStringInterface() map[string]any {
if acc == nil {
return nil
}
return map[string]any{
Tenant: acc.Tenant,
ID: acc.ID,
FilterIDs: acc.FilterIDs,
Weights: acc.Weights,
Blockers: acc.Blockers,
Opts: acc.Opts,
Balances: acc.Balances,
ThresholdIDs: acc.ThresholdIDs,
}
}
// MapStringInterfaceToAccount converts map[string]any to Account struct
func MapStringInterfaceToAccount(m map[string]any) (*Account, error) {
acc := &Account{}
if v, ok := m[Tenant].(string); ok {
acc.Tenant = v
}
if v, ok := m[ID].(string); ok {
acc.ID = v
}
acc.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
acc.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs])
acc.Weights = InterfaceToDynamicWeights(m[Weights])
acc.Blockers = InterfaceToDynamicBlockers(m[Blockers])
if v, ok := m[Opts].(map[string]any); ok {
acc.Opts = v
}
if balances, err := InterfaceToBalances(m[Balances]); err != nil {
return nil, err
} else {
acc.Balances = balances
}
return acc, nil
}
// InterfaceToStringSlice converts any to []string
func InterfaceToStringSlice(v any) []string {
if v == nil {
return nil
}
switch val := v.(type) {
case []string:
return val
case []any:
result := make([]string, 0, len(val))
for _, item := range val {
if s, ok := item.(string); ok {
result = append(result, s)
}
}
return result
}
return nil
}
// InterfaceToDynamicWeights converts any to DynamicWeights
func InterfaceToDynamicWeights(v any) DynamicWeights {
if v == nil {
return nil
}
switch val := v.(type) {
case DynamicWeights:
return val
case []*DynamicWeight:
return DynamicWeights(val)
case []any:
result := make(DynamicWeights, 0, len(val))
for _, item := range val {
if dwMap, ok := item.(map[string]any); ok {
dw := &DynamicWeight{
FilterIDs: InterfaceToStringSlice(dwMap[FilterIDs]),
}
if weight, ok := dwMap[Weight].(float64); ok {
dw.Weight = weight
}
result = append(result, dw)
}
}
return result
}
return nil
}
// InterfaceToDynamicBlockers converts any to DynamicBlockers
func InterfaceToDynamicBlockers(v any) DynamicBlockers {
if v == nil {
return nil
}
switch val := v.(type) {
case DynamicBlockers:
return val
case []*DynamicBlocker:
return DynamicBlockers(val)
case []any:
result := make(DynamicBlockers, 0, len(val))
for _, item := range val {
if dbMap, ok := item.(map[string]any); ok {
db := &DynamicBlocker{
FilterIDs: InterfaceToStringSlice(dbMap[FilterIDs]),
}
if blocker, ok := dbMap[Blocker].(bool); ok {
db.Blocker = blocker
}
result = append(result, db)
}
}
return result
}
return nil
}
// NewDecimalFromInterface converts any to *Decimal
func NewDecimalFromInterface(v any) (*Decimal, error) {
if v == nil {
return nil, nil
}
switch val := v.(type) {
case *Decimal:
return val, nil
case string:
return NewDecimalFromString(val)
case float64:
return NewDecimalFromFloat64(val), nil
}
return nil, nil
}
// InterfaceToUnitFactors converts any to []*UnitFactor
func InterfaceToUnitFactors(v any) ([]*UnitFactor, error) {
if v == nil {
return nil, nil
}
switch val := v.(type) {
case []*UnitFactor:
return val, nil
case []any:
result := make([]*UnitFactor, 0, len(val))
for _, item := range val {
if ufMap, ok := item.(map[string]any); ok {
uf := &UnitFactor{
FilterIDs: InterfaceToStringSlice(ufMap[FilterIDs]),
}
factor, err := NewDecimalFromInterface(ufMap[Factor])
if err != nil {
return nil, err
}
uf.Factor = factor
result = append(result, uf)
}
}
return result, nil
}
return nil, nil
}
// InterfaceToCostIncrements converts any to []*CostIncrement
func InterfaceToCostIncrements(v any) ([]*CostIncrement, error) {
if v == nil {
return nil, nil
}
switch val := v.(type) {
case []*CostIncrement:
return val, nil
case []any:
result := make([]*CostIncrement, 0, len(val))
for _, item := range val {
if ciMap, ok := item.(map[string]any); ok {
ci := &CostIncrement{
FilterIDs: InterfaceToStringSlice(ciMap[FilterIDs]),
}
var err error
if ci.Increment, err = NewDecimalFromInterface(ciMap[Increment]); err != nil {
return nil, err
}
if ci.FixedFee, err = NewDecimalFromInterface(ciMap[FixedFee]); err != nil {
return nil, err
}
if ci.RecurrentFee, err = NewDecimalFromInterface(ciMap[RecurrentFee]); err != nil {
return nil, err
}
result = append(result, ci)
}
}
return result, nil
}
return nil, nil
}
// InterfaceToBalances converts any to map[string]*Balance
func InterfaceToBalances(v any) (map[string]*Balance, error) {
if v == nil {
return nil, nil
}
switch val := v.(type) {
case map[string]*Balance:
return val, nil
case map[string]any:
result := make(map[string]*Balance)
for k, v := range val {
if balMap, ok := v.(map[string]any); ok {
bal, err := MapStringInterfaceToBalance(balMap)
if err != nil {
return nil, err
}
result[k] = bal
} else if bal, ok := v.(*Balance); ok {
result[k] = bal
}
}
return result, nil
}
return nil, nil
}
// MapStringInterfaceToBalance converts map[string]any to *Balance
func MapStringInterfaceToBalance(m map[string]any) (*Balance, error) {
bal := &Balance{}
if v, ok := m[ID].(string); ok {
bal.ID = v
}
if v, ok := m[Type].(string); ok {
bal.Type = v
}
bal.FilterIDs = InterfaceToStringSlice(m[FilterIDs])
bal.AttributeIDs = InterfaceToStringSlice(m[AttributeIDs])
bal.RateProfileIDs = InterfaceToStringSlice(m[RateProfileIDs])
bal.Weights = InterfaceToDynamicWeights(m[Weights])
bal.Blockers = InterfaceToDynamicBlockers(m[Blockers])
if v, ok := m[Opts].(map[string]any); ok {
bal.Opts = v
}
var err error
if bal.Units, err = NewDecimalFromInterface(m[Units]); err != nil {
return nil, err
}
if bal.UnitFactors, err = InterfaceToUnitFactors(m[UnitFactors]); err != nil {
return nil, err
}
if bal.CostIncrements, err = InterfaceToCostIncrements(m[CostIncrements]); err != nil {
return nil, err
}
return bal, nil
}
// Clone returns a clone of the ActivationInterval
func (aI *ActivationInterval) Clone() *ActivationInterval {
if aI == nil {

View File

@@ -286,6 +286,7 @@ const (
LoadIDPrefix = "lid_"
LoadInstKey = "load_history"
CreateCDRsTablesSQL = "create_cdrs_tables.sql"
CreateAccountsTablesSQL = "create_accounts_tables.sql"
CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql"
TestSQL = "TEST_SQL"
MetaAsc = "*asc"
@@ -1906,6 +1907,7 @@ const (
TBLTPAttributes = "tp_attributes"
TBLTPChargers = "tp_chargers"
TBLVersions = "versions"
TBLAccounts = "accounts"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"

View File

@@ -144,7 +144,6 @@ func SubstractDecimal(x, y *Decimal) *Decimal {
// NewDecimalFromFloat64 is a constructor for Decimal out of float64
// passing through string is necessary due to differences between decimal and binary representation of float64
func NewDecimalFromFloat64(f float64) *Decimal {
// Might want to use SetFloat here.
d, _ := decimal.WithContext(DecimalContext).SetString(strconv.FormatFloat(f, 'f', -1, 64))
return &Decimal{d}