make accounts storable in mysql

This commit is contained in:
arberkatellari
2025-10-30 09:16:08 +02:00
committed by Dan Christian Bogos
parent 7273384828
commit 1657f015fc
13 changed files with 376 additions and 83 deletions

View File

@@ -85,6 +85,10 @@ func TestAccSIT(t *testing.T) {
case utils.MetaMongo:
accPrfConfigDIR = "tutmongo"
case utils.MetaMySQL:
accPrfConfigDIR = "mysql_acc"
for _, stest := range sTestsAccPrf {
t.Run(accPrfConfigDIR, stest)
}
accPrfConfigDIR = "tutmysql"
case utils.MetaPostgres:
accPrfConfigDIR = "tutpostgres"
@@ -1507,7 +1511,7 @@ func testAccRefundCharges(t *testing.T) {
}, &result); err != nil {
t.Error(err)
} else {
if *utils.DBType == utils.MetaPostgres {
if *utils.DBType == utils.MetaPostgres || accPrfConfigDIR == "mysql_acc" {
acc.Account.Balances["CB"].Units = utils.NewDecimalFromFloat64(50)
}
if !reflect.DeepEqual(result, acc.Account) {

View File

@@ -125,8 +125,10 @@ const CGRATES_CFG_JSON = `
},
},
"items":{
// compatible db types: <*internal|*redis|*mongo>
// compatible with all db types
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},

View File

@@ -0,0 +1,147 @@
{
// CGRateS Configuration file
//
"general": {
"reply_timeout": "50s",
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_conns": {
"*default": {
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"StorDB": { // The id of the DB connection
"db_type": "mysql", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // db port to reach the database
"db_name": "cgrates", // db database name to connect to
"db_user": "cgrates", // username to use when connecting to the database
"db_password": "CGRateS.org" // password to use when connecting to the database
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"loaders": [
{
"id": "*default",
"enabled": true,
"tenant": "cgrates.org",
"lockfile_path": ".cgr.lck",
"tp_in_dir": "/usr/share/cgrates/tariffplans/testit",
"tp_out_dir": "",
},
],
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator":{
"users_filters":["Account"],
},
"admins": {
"enabled": true,
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,141 @@
{
// CGRateS Configuration file
//
"general": {
"reply_timeout": "50s",
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_conns": {
"*default": { // The id of the DB connection
"db_type": "redis", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1",
"db_port": 6379, // db port to reach the database
"db_name": "10", // db database name to connect to
"db_user": "cgrates",
},
"StorDB": { // The id of the DB connection
"db_type": "mysql", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // db port to reach the database
"db_name": "cgrates", // db database name to connect to
"db_user": "cgrates", // username to use when connecting to the database
"db_password": "CGRateS.org" // password to use when connecting to the database
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator":{
"users_filters":["Account"],
},
"admins": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -2,4 +2,13 @@
-- Table structure for table `accounts`
--
-- placeholder file
DROP TABLE IF EXISTS accounts;
CREATE TABLE accounts (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`account` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX accounts_idx ON accounts (`id`);

View File

@@ -16,12 +16,14 @@ DIR="$(dirname "$(readlink -f "$0")")"
mysql -u $1 -p$2 -h $host < "$DIR"/create_db_with_users.sql
cu=$?
mysql -u $1 -p$2 -h $host -D cgrates < "$DIR"/create_accounts_tables.sql
acct=$?
mysql -u $1 -p$2 -h $host -D cgrates < "$DIR"/create_cdrs_tables.sql
cdrt=$?
mysql -u $1 -p$2 -h $host -D cgrates < "$DIR"/create_tariffplan_tables.sql
tpt=$?
if [ $cu = 0 ] && [ $cdrt = 0 ] && [ $tpt = 0 ]; then
if [ $cu = 0 ] && [ $acct = 0 ] && [ $cdrt = 0 ] && [ $tpt = 0 ]; then
echo "\n\t+++ CGR-DB successfully set-up! +++\n"
exit 0
fi

View File

@@ -6,8 +6,8 @@ DROP TABLE IF EXISTS accounts;
CREATE TABLE accounts (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id varchar(64) NOT NULL,
account jsonb NOT NULL,
id VARCHAR(64) NOT NULL,
account JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX accounts_unique ON accounts ("id");
CREATE UNIQUE INDEX accounts_idx ON accounts ("id");

View File

@@ -1060,23 +1060,24 @@ func (fltr *FilterRule) FilterToSQLQuery() (conditions []string) {
if len(fltr.Values) == 0 {
switch fltr.Type {
case utils.MetaExists, utils.MetaNotExists:
if not {
if not { // not existing means Column IS NULL
if firstItem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems))
conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems))
return
}
queryPart := fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems)
queryPart := fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems)
if strings.HasPrefix(restOfItems, `"*`) {
queryPart = fmt.Sprintf("JSON_UNQUOTE(%s)", queryPart)
}
conditions = append(conditions, queryPart)
return
}
// existing means Column IS NOT NULL
if firstItem == utils.EmptyString {
conditions = append(conditions, fmt.Sprintf("%s IS NULL", restOfItems))
conditions = append(conditions, fmt.Sprintf("%s IS NOT NULL", restOfItems))
return
}
queryPart := fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NULL", firstItem, restOfItems)
queryPart := fmt.Sprintf("JSON_VALUE(%s, '$.%s') IS NOT NULL", firstItem, restOfItems)
if strings.HasPrefix(restOfItems, `"*`) {
queryPart = fmt.Sprintf("JSON_UNQUOTE(%s)", queryPart)
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/mysql"
"gorm.io/gorm"
@@ -33,8 +32,9 @@ type MySQLStorage struct {
SQLStorage
}
func NewMySQLStorage(host, port, name, user, password string,
maxConn, maxIdleConn, logLevel int, connMaxLifetime time.Duration, location string, dsnParams map[string]string) (*SQLStorage, error) {
func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleConn,
logLevel int, connMaxLifetime time.Duration, location string,
dsnParams map[string]string) (*SQLStorage, error) {
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=%s&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
user, password, host, port, name, location)
db, err := gorm.Open(mysql.Open(connectString+AppendToMysqlDSNOpts(dsnParams)), &gorm.Config{AllowGlobalUpdate: true, Logger: logger.Default.LogMode(logger.LogLevel(logLevel))})
@@ -92,21 +92,6 @@ func (msqlS *MySQLStorage) SetVersions(vrs Versions, overwrite bool) (err error)
return
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) GetAccountDrv(ctx *context.Context, tenant, id string) (ap *utils.Account, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) SetAccountDrv(ctx *context.Context, ap *utils.Account) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (msqlS *MySQLStorage) RemoveAccountDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
func (msqlS *MySQLStorage) extraFieldsExistsQry(field string) string {
return fmt.Sprintf(" extra_fields LIKE '%%\"%s\":%%'", field)
}

View File

@@ -22,7 +22,6 @@ import (
"fmt"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/postgres"
"gorm.io/gorm"
@@ -102,50 +101,6 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error
return
}
func (poS *PostgresStorage) GetAccountDrv(_ *context.Context, tenant, id string) (ap *utils.Account, err error) {
var result []*AccountJSONMdl
if err = poS.db.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToAccount(result[0].Account)
}
func (poS *PostgresStorage) SetAccountDrv(_ *context.Context, ap *utils.Account) (err error) {
tx := poS.db.Begin()
mdl := &AccountJSONMdl{
Tenant: ap.Tenant,
ID: ap.ID,
Account: ap.AsMapStringInterface(),
}
if err = tx.Model(&AccountJSONMdl{}).Where(
AccountJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
AccountJSONMdl{Account: mdl.Account}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (poS *PostgresStorage) RemoveAccountDrv(_ *context.Context, tenant, id string) (err error) {
tx := poS.db.Begin()
if err = tx.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant, ID: id}).
Delete(&AccountJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (poS *PostgresStorage) extraFieldsExistsQry(field string) string {
return fmt.Sprintf(" extra_fields ?'%s'", field)
}

View File

@@ -74,13 +74,6 @@ func (sqls *SQLStorage) SelectDatabase(dbName string) (err error) {
return
}
// returns all keys in table matching the prefix
func (sqls *SQLStorage) getAllIndexKeys(_ *context.Context, table, prefix string) (ids []string, err error) {
err = sqls.db.Table(table).Select("id").Where("id LIKE ?", prefix+"%").
Pluck("id", &ids).Error
return
}
// returns all keys in table matching the Tenant and ID
func (sqls *SQLStorage) getAllKeysMatchingTenantID(_ *context.Context, table string, tntID *utils.TenantID) (ids []string, err error) {
matchingTntID := []utils.TenantID{}
@@ -387,6 +380,55 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
return
}
// GetAccountDrv will get the account from the DB matching the tenant and id provided.
// Decimal fields ending in `.0` will be read as whole numbers but still in decimal type.
// (50.0 -> 50)
func (sqls *SQLStorage) GetAccountDrv(ctx *context.Context, tenant, id string) (ap *utils.Account, err error) {
var result []*AccountJSONMdl
if err = sqls.db.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToAccount(result[0].Account)
}
// SetAccountDrv will set in DB the provided Account
func (sqls *SQLStorage) SetAccountDrv(ctx *context.Context, ap *utils.Account) (err error) {
tx := sqls.db.Begin()
mdl := &AccountJSONMdl{
Tenant: ap.Tenant,
ID: ap.ID,
Account: ap.AsMapStringInterface(),
}
if err = tx.Model(&AccountJSONMdl{}).Where(
AccountJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
AccountJSONMdl{Account: mdl.Account}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
// RemoveAccountDrv will remove from DB the account matching the tenamt and id provided
func (sqls *SQLStorage) RemoveAccountDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&AccountJSONMdl{}).Where(&AccountJSONMdl{Tenant: tenant, ID: id}).
Delete(&AccountJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {

View File

@@ -64,7 +64,8 @@ func NewDataDBConn(dbType, host, port, name, user,
opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime,
opts.MySQLLocation, opts.SQLDSNParams)
case utils.MetaInternal:
d, err = NewInternalDB(stringIndexedFields, prefixIndexedFields, opts.ToTransCacheOpts(), itmsCfg)
d, err = NewInternalDB(stringIndexedFields, prefixIndexedFields,
opts.ToTransCacheOpts(), itmsCfg)
default:
err = fmt.Errorf("unsupported db_type <%s>", dbType)
}

View File

@@ -66,6 +66,10 @@ func TestAccIT(t *testing.T) {
case utils.MetaInternal:
accConfDIR = "acc_generaltest_internal"
case utils.MetaMySQL:
accConfDIR = "acc_generaltest_mysql_acc"
for _, stest := range sTestsAcc {
t.Run(accConfDIR, stest)
}
accConfDIR = "acc_generaltest_mysql"
case utils.MetaMongo:
accConfDIR = "acc_generaltest_mongo"