Revise InternalDB constructor

This commit is contained in:
arberkatellari
2025-04-07 17:14:14 +02:00
committed by Dan Christian Bogos
parent d4b8963a6c
commit de349e2482
12 changed files with 104 additions and 77 deletions

View File

@@ -145,7 +145,7 @@ const CGRATES_CFG_JSON = `
"internalDBStartTimeout": "5m", // the amount of wait time until timeout for DB startup
"internalDBDumpInterval": "0s", // dump datadb regularly to a file: "0" - disables it; "-1" - dump on each set/remove; <""|$dur>
"internalDBRewriteInterval": "0s", // rewrite dump files regularly: "0" - disables it; "-1" - rewrite on engine start; "-2" - rewrite on engine shutdown; <""|$dur>
"internalDBWriteLimit": 100, // maximum size in MiB that can be written in a singular dump file
"internalDBFileSizeLimit": "1GB", // maximum size that can be written in a singular dump file
"redisMaxConns": 10, // the connection pool size
"redisConnectAttempts": 20, // the maximum amount of dial attempts
"redisSentinel": "", // the name of sentinel when used
@@ -182,7 +182,7 @@ const CGRATES_CFG_JSON = `
"internalDBStartTimeout": "5m", // the amount of wait time until timeout for DB startup
"internalDBDumpInterval": "0s", // dump datadb regularly to a file: "0" - disables it; "-1" - dump on each set/remove; <""|$dur>
"internalDBRewriteInterval": "0s", // rewrite dump files regularly: "0" - disables it; "-1" - rewrite on engine start; "-2" - rewrite on engine shutdown; <""|$dur>
"internalDBWriteLimit": 100, // maximum size in MiB that can be written in a singular dump file
"internalDBFileSizeLimit": "1GB", // maximum size that can be written in a singular dump file
"sqlMaxOpenConns": 100, // maximum database connections opened, not applying for mongo
"sqlMaxIdleConns": 10, // maximum database connections idle, not applying for mongo
"sqlLogLevel": 3, // sql logger verbosity: 1=Silent, 2=Error, 3=Warn, 4=Info

View File

@@ -1014,9 +1014,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
if cfg.storDbCfg.Type == utils.MetaInternal &&
(cfg.storDbCfg.Opts.InternalDBDumpInterval != 0 ||
cfg.storDbCfg.Opts.InternalDBRewriteInterval != 0) &&
cfg.storDbCfg.Opts.InternalDBWriteLimit <= 0 {
return fmt.Errorf("<%s> internalDBWriteLimit field cannot be equal or smaller than 0: <%v>", utils.StorDB,
cfg.storDbCfg.Opts.InternalDBWriteLimit)
cfg.storDbCfg.Opts.InternalDBFileSizeLimit <= 0 {
return fmt.Errorf("<%s> internalDBFileSizeLimit field cannot be equal or smaller than 0: <%v>", utils.StorDB,
cfg.storDbCfg.Opts.InternalDBFileSizeLimit)
}
if cfg.storDbCfg.Type == utils.MetaPostgres {
if !slices.Contains([]string{utils.PgSSLModeDisable, utils.PgSSLModeAllow,
@@ -1038,9 +1038,9 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
if (cfg.dataDbCfg.Opts.InternalDBDumpInterval != 0 ||
cfg.dataDbCfg.Opts.InternalDBRewriteInterval != 0) &&
cfg.dataDbCfg.Opts.InternalDBWriteLimit <= 0 {
return fmt.Errorf("<%s> internalDBWriteLimit field cannot be equal or smaller than 0: <%v>", utils.DataDB,
cfg.dataDbCfg.Opts.InternalDBWriteLimit)
cfg.dataDbCfg.Opts.InternalDBFileSizeLimit <= 0 {
return fmt.Errorf("<%s> internalDBFileSizeLimit field cannot be equal or smaller than 0: <%v>", utils.DataDB,
cfg.dataDbCfg.Opts.InternalDBFileSizeLimit)
}
}
for item, val := range cfg.dataDbCfg.Items {

View File

@@ -25,6 +25,7 @@ import (
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
type DataDBOpts struct {
@@ -33,7 +34,7 @@ type DataDBOpts struct {
InternalDBStartTimeout time.Duration // Transcache recover from dump files timeout duration
InternalDBDumpInterval time.Duration // Regurarly dump database to file
InternalDBRewriteInterval time.Duration // Regurarly rewrite dump files
InternalDBWriteLimit int // maximum size in MiB that can be written in a singular dump file
InternalDBFileSizeLimit int64 // maximum size that can be written in a singular dump file
RedisMaxConns int
RedisConnectAttempts int
RedisSentinel string
@@ -95,8 +96,10 @@ func (dbOpts *DataDBOpts) loadFromJSONCfg(jsnCfg *DBOptsJson) (err error) {
return err
}
}
if jsnCfg.InternalDBWriteLimit != nil {
dbOpts.InternalDBWriteLimit = *jsnCfg.InternalDBWriteLimit
if jsnCfg.InternalDBFileSizeLimit != nil {
if dbOpts.InternalDBFileSizeLimit, err = utils.ParseBinarySize(*jsnCfg.InternalDBFileSizeLimit); err != nil {
return err
}
}
if jsnCfg.RedisMaxConns != nil {
dbOpts.RedisMaxConns = *jsnCfg.RedisMaxConns
@@ -251,7 +254,7 @@ func (dbOpts *DataDBOpts) Clone() *DataDBOpts {
InternalDBStartTimeout: dbOpts.InternalDBStartTimeout,
InternalDBDumpInterval: dbOpts.InternalDBDumpInterval,
InternalDBRewriteInterval: dbOpts.InternalDBRewriteInterval,
InternalDBWriteLimit: dbOpts.InternalDBWriteLimit,
InternalDBFileSizeLimit: dbOpts.InternalDBFileSizeLimit,
RedisMaxConns: dbOpts.RedisMaxConns,
RedisConnectAttempts: dbOpts.RedisConnectAttempts,
RedisSentinel: dbOpts.RedisSentinel,
@@ -309,7 +312,7 @@ func (dbcfg *DataDbCfg) AsMapInterface() (mp map[string]any) {
utils.InternalDBStartTimeoutCfg: dbcfg.Opts.InternalDBStartTimeout,
utils.InternalDBDumpIntervalCfg: dbcfg.Opts.InternalDBDumpInterval,
utils.InternalDBRewriteIntervalCfg: dbcfg.Opts.InternalDBRewriteInterval,
utils.InternalDBWriteLimitCfg: dbcfg.Opts.InternalDBWriteLimit,
utils.InternalDBFileSizeLimitCfg: dbcfg.Opts.InternalDBFileSizeLimit,
utils.RedisMaxConnsCfg: dbcfg.Opts.RedisMaxConns,
utils.RedisConnectAttemptsCfg: dbcfg.Opts.RedisConnectAttempts,
utils.RedisSentinelNameCfg: dbcfg.Opts.RedisSentinel,
@@ -354,6 +357,21 @@ func (dbcfg *DataDbCfg) AsMapInterface() (mp map[string]any) {
return
}
// ToTransCacheOpts returns to ltcache.TransCacheOpts from DataDBOpts
func (d *DataDBOpts) ToTransCacheOpts() (tco *ltcache.TransCacheOpts) {
if d == nil {
return
}
return &ltcache.TransCacheOpts{
DumpPath: d.InternalDBDumpPath,
BackupPath: d.InternalDBBackupPath,
StartTimeout: d.InternalDBStartTimeout,
DumpInterval: d.InternalDBDumpInterval,
RewriteInterval: d.InternalDBRewriteInterval,
FileSizeLimit: d.InternalDBFileSizeLimit,
}
}
// ItemOpt the options for the stored items
type ItemOpt struct {
Limit int

View File

@@ -110,7 +110,7 @@ type DBOptsJson struct {
InternalDBStartTimeout *string `json:"internalDBStartTimeout"`
InternalDBDumpInterval *string `json:"internalDBDumpInterval"`
InternalDBRewriteInterval *string `json:"internalDBRewriteInterval"`
InternalDBWriteLimit *int `json:"internalDBWriteLimit"`
InternalDBFileSizeLimit *string `json:"internalDBFileSizeLimit"`
RedisMaxConns *int `json:"redisMaxConns"`
RedisConnectAttempts *int `json:"redisConnectAttempts"`
RedisSentinel *string `json:"redisSentinel"`

View File

@@ -25,6 +25,7 @@ import (
"time"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
type StorDBOpts struct {
@@ -33,7 +34,7 @@ type StorDBOpts struct {
InternalDBStartTimeout time.Duration // Transcache recover from dump files timeout duration
InternalDBDumpInterval time.Duration // Regurarly dump database to file
InternalDBRewriteInterval time.Duration // Regurarly rewrite dump files
InternalDBWriteLimit int // maximum size in MiB that can be written in a singular dump file
InternalDBFileSizeLimit int64 // maximum size that can be written in a singular dump file
SQLMaxOpenConns int
SQLMaxIdleConns int
SQLConnMaxLifetime time.Duration
@@ -92,8 +93,10 @@ func (dbOpts *StorDBOpts) loadFromJSONCfg(jsnCfg *DBOptsJson) (err error) {
return err
}
}
if jsnCfg.InternalDBWriteLimit != nil {
dbOpts.InternalDBWriteLimit = *jsnCfg.InternalDBWriteLimit
if jsnCfg.InternalDBFileSizeLimit != nil {
if dbOpts.InternalDBFileSizeLimit, err = utils.ParseBinarySize(*jsnCfg.InternalDBFileSizeLimit); err != nil {
return err
}
}
if jsnCfg.SQLMaxOpenConns != nil {
dbOpts.SQLMaxOpenConns = *jsnCfg.SQLMaxOpenConns
@@ -225,7 +228,7 @@ func (dbOpts *StorDBOpts) Clone() *StorDBOpts {
InternalDBStartTimeout: dbOpts.InternalDBStartTimeout,
InternalDBDumpInterval: dbOpts.InternalDBDumpInterval,
InternalDBRewriteInterval: dbOpts.InternalDBRewriteInterval,
InternalDBWriteLimit: dbOpts.InternalDBWriteLimit,
InternalDBFileSizeLimit: dbOpts.InternalDBFileSizeLimit,
SQLMaxOpenConns: dbOpts.SQLMaxOpenConns,
SQLMaxIdleConns: dbOpts.SQLMaxIdleConns,
SQLConnMaxLifetime: dbOpts.SQLConnMaxLifetime,
@@ -287,7 +290,7 @@ func (dbcfg *StorDbCfg) AsMapInterface() (mp map[string]any) {
utils.InternalDBStartTimeoutCfg: dbcfg.Opts.InternalDBStartTimeout,
utils.InternalDBDumpIntervalCfg: dbcfg.Opts.InternalDBDumpInterval,
utils.InternalDBRewriteIntervalCfg: dbcfg.Opts.InternalDBRewriteInterval,
utils.InternalDBWriteLimitCfg: dbcfg.Opts.InternalDBWriteLimit,
utils.InternalDBFileSizeLimitCfg: dbcfg.Opts.InternalDBFileSizeLimit,
utils.SQLMaxOpenConnsCfg: dbcfg.Opts.SQLMaxOpenConns,
utils.SQLMaxIdleConnsCfg: dbcfg.Opts.SQLMaxIdleConns,
utils.SQLConnMaxLifetime: dbcfg.Opts.SQLConnMaxLifetime.String(),
@@ -339,3 +342,18 @@ func (dbcfg *StorDbCfg) AsMapInterface() (mp map[string]any) {
}
return
}
// ToTransCacheOpts returns to ltcache.TransCacheOpts from StorDBOpts
func (s *StorDBOpts) ToTransCacheOpts() (tco *ltcache.TransCacheOpts) {
if s == nil {
return
}
return &ltcache.TransCacheOpts{
DumpPath: s.InternalDBDumpPath,
BackupPath: s.InternalDBBackupPath,
StartTimeout: s.InternalDBStartTimeout,
DumpInterval: s.InternalDBDumpInterval,
RewriteInterval: s.InternalDBRewriteInterval,
FileSizeLimit: s.InternalDBFileSizeLimit,
}
}

View File

@@ -34,7 +34,8 @@ var (
)
func init() {
dm = NewDataManager(NewInternalDB(nil, nil, true, false, config.CgrConfig().DataDbCfg().Items), config.CgrConfig().CacheCfg(), connMgr)
idb, _ := NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) // make TransCacheOpts nil to not create dump files for the DB
dm = NewDataManager(idb, config.CgrConfig().CacheCfg(), connMgr)
httpPstrTransport = config.CgrConfig().HTTPCfg().ClientOpts
}

View File

@@ -45,52 +45,35 @@ type InternalDB struct {
}
// NewInternalDB constructs an InternalDB
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string, isDataDB, clone bool,
itmsCfg map[string]*config.ItemOpt) *InternalDB {
func NewInternalDB(stringIndexedFields, prefixIndexedFields []string, isDataDB bool,
transCacheOpts *ltcache.TransCacheOpts, itmsCfg map[string]*config.ItemOpt) (iDB *InternalDB,
err error) {
tcCfg := make(map[string]*ltcache.CacheConfig, len(itmsCfg))
for k, cPcfg := range itmsCfg {
tcCfg[k] = &ltcache.CacheConfig{
MaxItems: cPcfg.Limit,
TTL: cPcfg.TTL,
StaticTTL: cPcfg.StaticTTL,
Clone: clone,
Clone: true, // cloning is mandatory for databases
}
}
if transCacheOpts != nil && transCacheOpts.DumpInterval == 0 && transCacheOpts.RewriteInterval == 0 {
transCacheOpts = nil // create TransCache without offline collector if neither
// DumpInterval or RewriteInterval are provided
}
tc, err := ltcache.NewTransCacheWithOfflineCollector(transCacheOpts, tcCfg, utils.Logger)
if err != nil {
return nil, err
}
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
return newInternalDB(stringIndexedFields, prefixIndexedFields, isDataDB, ms,
ltcache.NewTransCache(tcCfg))
}
// newInternalDB constructs an InternalDB struct with a recovered or new TransCache
func newInternalDB(stringIndexedFields, prefixIndexedFields []string, isDataDB bool, ms Marshaler, db *ltcache.TransCache) *InternalDB {
return &InternalDB{
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
cnter: utils.NewCounter(time.Now().UnixNano(), 0),
ms: ms,
db: db,
db: tc,
isDataDB: isDataDB,
}
}
// Will recover a database from a dump file to memory
func RecoverDB(stringIndexedFields, prefixIndexedFields []string, isDataDB bool,
itmsCfg map[string]*config.ItemOpt, fldrPath, backupPath string, timeout time.Duration, dumpInterval, rewriteInterval time.Duration, writeLimit int) (*InternalDB, error) {
tcCfg := make(map[string]*ltcache.CacheConfig, len(itmsCfg))
for k, cPcfg := range itmsCfg {
tcCfg[k] = &ltcache.CacheConfig{
MaxItems: cPcfg.Limit,
TTL: cPcfg.TTL,
StaticTTL: cPcfg.StaticTTL,
Clone: true,
}
}
ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding)
tc, err := ltcache.NewTransCacheWithOfflineCollector(fldrPath, backupPath, timeout, dumpInterval, rewriteInterval, writeLimit, tcCfg, utils.Logger)
if err != nil {
return nil, err
}
return newInternalDB(stringIndexedFields, prefixIndexedFields, isDataDB, ms, tc), nil
}, nil
}
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe)

View File

@@ -55,17 +55,7 @@ func NewDataDBConn(dbType, host, port, name, user,
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, utils.DataDB, nil, opts.MongoQueryTimeout)
case utils.MetaInternal:
if config.CgrConfig().DataDbCfg().Opts.InternalDBDumpInterval != 0 {
d, err = RecoverDB(nil, nil, true, itmsCfg,
config.CgrConfig().DataDbCfg().Opts.InternalDBDumpPath,
config.CgrConfig().DataDbCfg().Opts.InternalDBBackupPath,
config.CgrConfig().DataDbCfg().Opts.InternalDBStartTimeout,
config.CgrConfig().DataDbCfg().Opts.InternalDBDumpInterval,
config.CgrConfig().DataDbCfg().Opts.InternalDBRewriteInterval,
config.CgrConfig().DataDbCfg().Opts.InternalDBWriteLimit)
return
}
d = NewInternalDB(nil, nil, true, true, itmsCfg)
d, err = NewInternalDB(nil, nil, true, opts.ToTransCacheOpts(), itmsCfg)
default:
err = fmt.Errorf("unsupported db_type <%s>", dbType)
}
@@ -87,17 +77,7 @@ func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string,
db, err = NewMySQLStorage(host, port, name, user, pass, marshaler, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLLogLevel,
opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.MySQLDSNParams)
case utils.MetaInternal:
if config.CgrConfig().StorDbCfg().Opts.InternalDBDumpInterval != 0 {
db, err = RecoverDB(stringIndexedFields, prefixIndexedFields, false, itmsCfg,
config.CgrConfig().StorDbCfg().Opts.InternalDBDumpPath,
config.CgrConfig().DataDbCfg().Opts.InternalDBBackupPath,
config.CgrConfig().StorDbCfg().Opts.InternalDBStartTimeout,
config.CgrConfig().StorDbCfg().Opts.InternalDBDumpInterval,
config.CgrConfig().StorDbCfg().Opts.InternalDBRewriteInterval,
config.CgrConfig().StorDbCfg().Opts.InternalDBWriteLimit)
return
}
db = NewInternalDB(stringIndexedFields, prefixIndexedFields, false, true, itmsCfg)
db, err = NewInternalDB(stringIndexedFields, prefixIndexedFields, false, opts.ToTransCacheOpts(), itmsCfg)
default:
err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]",
dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal)

2
go.mod
View File

@@ -25,7 +25,7 @@ require (
github.com/cgrates/go-diameter v0.0.0-20250228104837-c21fdf924ab5
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf
github.com/cgrates/ltcache v0.0.0-20250404091005-a6ffec15918c
github.com/cgrates/ltcache v0.0.0-20250409175814-a90b4db74697
github.com/cgrates/radigo v0.0.0-20240123163129-491c899df727
github.com/cgrates/rpcclient v0.0.0-20240816141816-52dd1074499e
github.com/cgrates/sipingo v1.0.1-0.20200514112313-699ebc1cdb8e

4
go.sum
View File

@@ -85,8 +85,8 @@ github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73 h1:7AYhvpegrSkY9tL
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf h1:GbMJzvtwdX1OCEmsqSts/cRCIcIMvo8AYtC2dQExWlg=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf/go.mod h1:oEq/JbubkOD2pXHvDy4r7519NkxriONisrnVpkCaNJw=
github.com/cgrates/ltcache v0.0.0-20250404091005-a6ffec15918c h1:IvC0/acpyQBcRl8JshUnCmaJjIY3/lkOw/AKDdUXc3w=
github.com/cgrates/ltcache v0.0.0-20250404091005-a6ffec15918c/go.mod h1:E6jHedwqFkZsXZG7MVvnCagpFQgKqDF6tg/mUqRRu34=
github.com/cgrates/ltcache v0.0.0-20250409175814-a90b4db74697 h1:zVnfz6L9vVZxYYr6F/JqjkrJzBjpqMJquejHN4RrqCI=
github.com/cgrates/ltcache v0.0.0-20250409175814-a90b4db74697/go.mod h1:E6jHedwqFkZsXZG7MVvnCagpFQgKqDF6tg/mUqRRu34=
github.com/cgrates/radigo v0.0.0-20240123163129-491c899df727 h1:rhYHlbfEPDNreekd1ZtUYi/NbFm5cEl8twQZ3c/0nYU=
github.com/cgrates/radigo v0.0.0-20240123163129-491c899df727/go.mod h1:W/5LcOm9jaz0NfIFT09bxjddEai8DTSfw9poqDqtAX4=
github.com/cgrates/rpcclient v0.0.0-20240816141816-52dd1074499e h1:rwvvB0F9WZNmBriZSNo6dbYy1H26yueeLBkuxQMUW0E=

View File

@@ -2191,7 +2191,7 @@ const (
InternalDBStartTimeoutCfg = "internalDBStartTimeout"
InternalDBDumpIntervalCfg = "internalDBDumpInterval"
InternalDBRewriteIntervalCfg = "internalDBRewriteInterval"
InternalDBWriteLimitCfg = "internalDBWriteLimit"
InternalDBFileSizeLimitCfg = "internalDBFileSizeLimit"
RedisMaxConnsCfg = "redisMaxConns"
RedisConnectAttemptsCfg = "redisConnectAttempts"
RedisSentinelNameCfg = "redisSentinel"

View File

@@ -1093,3 +1093,30 @@ type PanicMessageArgs struct {
APIOpts map[string]any
Message string
}
// ParseBinarySize converts string byte sizes (b, kb, mb, gb) to byte int64
func ParseBinarySize(size string) (int64, error) {
var num float64
var unit string
_, err := fmt.Sscanf(size, "%f%s", &num, &unit)
if err != nil {
return 0, fmt.Errorf("invalid size format: %s", size)
}
multipliers := map[string]int64{
"B": 1,
"KB": 1 << 10,
"K": 1 << 10,
"MB": 1 << 20,
"M": 1 << 20,
"GB": 1 << 30,
"G": 1 << 30,
}
if mult, ok := multipliers[strings.ToUpper(unit)]; ok {
return int64(num * float64(mult)), nil
}
return 0, fmt.Errorf("unknown unit: %s", unit)
}