From 0da0f69e1cb5876995cb9cc30327ddba32ed367d Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Wed, 19 Nov 2025 14:54:10 +0200 Subject: [PATCH] Make Indexes storable in MySQL and Postgres --- config/config_defaults.go | 2 - config/configsanity.go | 39 +-- data/conf/samples/tutmysql/cgrates.json | 13 +- data/conf/samples/tutpostgres/cgrates.json | 14 +- data/conf/samples/tutredis/cgrates.json | 140 +++++++++ data/storage/mysql/create_db_tables.sql | 14 + data/storage/postgres/create_db_tables.sql | 15 +- engine/libtest.go | 26 -- engine/models.go | 14 +- engine/storage_mongo_datadb.go | 4 +- engine/storage_redis.go | 4 +- engine/storage_sql.go | 324 ++++++++++++++------- engine/version.go | 8 +- utils/consts.go | 3 +- utils/stringset.go | 18 ++ 15 files changed, 436 insertions(+), 202 deletions(-) create mode 100644 data/conf/samples/tutredis/cgrates.json diff --git a/config/config_defaults.go b/config/config_defaults.go index 758629727..7ba1d9871 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -186,8 +186,6 @@ const CGRATES_CFG_JSON = ` "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - - // compatible db types: <*internal|*redis|*mongo> "*resource_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*ip_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, diff --git a/config/configsanity.go b/config/configsanity.go index 565ab4236..8259c5dd7 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1026,29 +1026,6 @@ func (cfg *CGRConfig) checkConfigSanity() error { // DataDB sanity checks hasOneInternalDB := false // used to reutrn error in case more then 1 internaldb is found - allDBsItems := []string{ - utils.CacheVersions, - utils.MetaAccounts, - utils.MetaIPProfiles, - utils.MetaIPAllocations, - utils.MetaActionProfiles, - utils.MetaChargerProfiles, - utils.MetaAttributeProfiles, - utils.MetaResourceProfiles, - utils.MetaResources, - utils.MetaStatQueueProfiles, - utils.MetaStatQueues, - utils.MetaThresholdProfiles, - utils.MetaThresholds, - utils.MetaFilters, - utils.MetaRouteProfiles, - utils.MetaRateProfiles, - utils.MetaRankingProfiles, - utils.MetaRankings, - utils.MetaTrendProfiles, - utils.MetaTrends, - utils.MetaLoadIDs, - } for _, dbcfg := range cfg.dbCfg.DBConns { if dbcfg.Type == utils.MetaInternal { if hasOneInternalDB { @@ -1080,19 +1057,11 @@ func (cfg *CGRConfig) checkConfigSanity() error { } storDBTypes := []string{utils.MetaInternal, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.Internal, utils.MySQL, utils.Mongo, utils.Postgres} - dataDBTypes := []string{utils.MetaInternal, utils.MetaRedis, utils.MetaMongo, - utils.Internal, utils.Redis, utils.Mongo} - if !slices.Contains(allDBsItems, item) { - if item == utils.MetaCDRs { - if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) { - return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, - storDBTypes[4:], cfg.dbCfg.DBConns[val.DBConn].Type) - } - } else { - if !slices.Contains(dataDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) { - return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, dataDBTypes[3:], cfg.dbCfg.DBConns[val.DBConn].Type) - } + if item == utils.MetaCDRs { + if !slices.Contains(storDBTypes, cfg.dbCfg.DBConns[val.DBConn].Type) { + return fmt.Errorf("<%s> db item can only be of types <%v>, got <%s>", item, + storDBTypes[4:], cfg.dbCfg.DBConns[val.DBConn].Type) } } found1RmtConns := false diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index c2678f100..397b13bfb 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -20,24 +20,13 @@ "db": { // database used to store runtime data (eg: accounts, cdr stats) "db_conns": { "*default": { // The id of the DB connection - "db_type": "redis", // db type: - "db_host": "127.0.0.1", - "db_port": 6379, // db port to reach the database - "db_name": "10", // db database name to connect to - "db_user": "cgrates", - }, - "StorDB": { // The id of the DB connection "db_type": "mysql", // db type: "db_host": "127.0.0.1", // the host to connect to "db_port": 3306, // db port to reach the database "db_name": "cgrates", // db database name to connect to "db_user": "cgrates", // username to use when connecting to the database "db_password": "CGRateS.org" // password to use when connecting to the database - }, - }, - "items": { - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, - "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } } }, diff --git a/data/conf/samples/tutpostgres/cgrates.json b/data/conf/samples/tutpostgres/cgrates.json index 483bcfae9..f3f2ade04 100644 --- a/data/conf/samples/tutpostgres/cgrates.json +++ b/data/conf/samples/tutpostgres/cgrates.json @@ -19,26 +19,14 @@ "db": { // database used to store runtime data (eg: accounts, cdr stats) "db_conns": { "*default": { - "db_type": "redis", // data_db type: - "db_host": "127.0.0.1", - "db_port": 6379, // data_db port to reach the database - "db_name": "10", // data_db database name to connect to - "db_user": "cgrates", - }, - "StorDB": { // The id of the DB connection "db_type": "postgres", // db type: "db_host": "127.0.0.1", "db_port": 5432, // db port to reach the database "db_name": "cgrates", // the host to connect to "db_user": "cgrates", "db_password": "CGRateS.org" // password to use when connecting to the database - }, + }, }, - "items": { - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, - "*accounts": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, - "*load_ids": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} - } }, "cdrs": { diff --git a/data/conf/samples/tutredis/cgrates.json b/data/conf/samples/tutredis/cgrates.json new file mode 100644 index 000000000..31b7a0631 --- /dev/null +++ b/data/conf/samples/tutredis/cgrates.json @@ -0,0 +1,140 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "reply_timeout": "50s", +}, + +"logger": { + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_conns": { + "*default": { // The id of the DB connection + "db_type": "redis", // db type: + "db_host": "127.0.0.1", + "db_port": 6379, // db port to reach the database + "db_name": "10", // db database name to connect to + "db_user": "cgrates", + }, + "StorDB": { // The id of the DB connection + "db_type": "mysql", // db type: + "db_host": "127.0.0.1", // the host to connect to + "db_port": 3306, // db port to reach the database + "db_name": "cgrates", // db database name to connect to + "db_user": "cgrates", // username to use when connecting to the database + "db_password": "CGRateS.org" // password to use when connecting to the database + }, + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "rates_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator":{ + + "users_filters":["Account"], +}, + + +"admins": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*internal"], +}, + +"tpes": { + "enabled": true +}, + +} diff --git a/data/storage/mysql/create_db_tables.sql b/data/storage/mysql/create_db_tables.sql index fb58bb20d..7b3b02b31 100644 --- a/data/storage/mysql/create_db_tables.sql +++ b/data/storage/mysql/create_db_tables.sql @@ -235,3 +235,17 @@ CREATE TABLE load_ids ( `load_ids` JSON NOT NULL, PRIMARY KEY (`pk`) ); + +DROP TABLE IF EXISTS indexes; +CREATE TABLE indexes ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40), + `type` VARCHAR(40) NOT NULL, + `key` VARCHAR(64), + `value` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_type_key (`tenant`, `type`, `key`) +); +CREATE INDEX indexes_key_idx ON indexes (`key`); +CREATE INDEX indexes_type_idx ON indexes (`type`); +CREATE INDEX indexes_type_key_idx ON indexes (`type`, `key`); diff --git a/data/storage/postgres/create_db_tables.sql b/data/storage/postgres/create_db_tables.sql index 05b102f12..6850b923d 100644 --- a/data/storage/postgres/create_db_tables.sql +++ b/data/storage/postgres/create_db_tables.sql @@ -229,4 +229,17 @@ DROP TABLE IF EXISTS load_ids; CREATE TABLE load_ids ( pk SERIAL PRIMARY KEY, load_ids JSONB NOT NULL -); \ No newline at end of file +); + +DROP TABLE IF EXISTS indexes; +CREATE TABLE indexes ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40), + type VARCHAR(40) NOT NULL, + key VARCHAR(40), + value JSONB NOT NULL, + UNIQUE (tenant, type, key) +); +CREATE INDEX indexes_key_idx ON indexes ("key"); +CREATE INDEX indexes_type_idx ON indexes ("type"); +CREATE INDEX indexes_type_key_idx ON indexes ("type", "key"); diff --git a/engine/libtest.go b/engine/libtest.go index 2aac54b5e..9a58f3965 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -745,13 +745,6 @@ var ( DB: &DBParams{ DBConns: map[string]DBConn{ utils.MetaDefault: { - Type: utils.StringPointer(utils.MetaRedis), - Host: utils.StringPointer("127.0.0.1"), - Port: utils.IntPointer(6379), - Name: utils.StringPointer("10"), - User: utils.StringPointer(utils.CGRateSLwr), - }, - utils.StorDB: { Type: utils.StringPointer(utils.MetaMySQL), Host: utils.StringPointer("127.0.0.1"), Port: utils.IntPointer(3306), @@ -760,12 +753,6 @@ var ( Password: utils.StringPointer("CGRateS.org"), }, }, - Items: map[string]Item{ - utils.MetaCDRs: { - Limit: utils.IntPointer(-1), - DbConn: utils.StringPointer(utils.StorDB), - }, - }, }, } RedisDBCfg = DBCfg{ @@ -826,13 +813,6 @@ var ( DB: &DBParams{ DBConns: map[string]DBConn{ utils.MetaDefault: { - Type: utils.StringPointer(utils.MetaRedis), - Host: utils.StringPointer("127.0.0.1"), - Port: utils.IntPointer(6379), - Name: utils.StringPointer("10"), - User: utils.StringPointer(utils.CGRateSLwr), - }, - utils.StorDB: { Type: utils.StringPointer(utils.MetaPostgres), Host: utils.StringPointer("127.0.0.1"), Port: utils.IntPointer(5432), @@ -841,12 +821,6 @@ var ( Password: utils.StringPointer("CGRateS.org"), }, }, - Items: map[string]Item{ - utils.MetaCDRs: { - Limit: utils.IntPointer(-1), - DbConn: utils.StringPointer(utils.StorDB), - }, - }, }, } ) diff --git a/engine/models.go b/engine/models.go index 96373deb5..df674b7d6 100644 --- a/engine/models.go +++ b/engine/models.go @@ -537,7 +537,7 @@ func (RouteProfileMdl) TableName() string { return utils.TBLRouteProfiles } -// Doesnt include Rates in RateProfile json, Rates taken from Rate using foreign keys +// Doesnt include Rates in RateProfile json, Rates taken from RateMdl using foreign keys type RateProfileJSONMdl struct { PK uint `gorm:"primary_key"` Tenant string `index:"0" re:".*"` @@ -622,3 +622,15 @@ type LoadIDMdl struct { func (LoadIDMdl) TableName() string { return utils.TBLLoadIDs } + +type IndexMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + Type string `index:"1" re:".*"` + Key string `index:"2" re:".*"` + Value utils.JSONB `gorm:"type:jsonb" index:"3" re:".*"` +} + +func (IndexMdl) TableName() string { + return utils.TBLIndexes +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 70ab1ab09..da27cc3b4 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -1446,7 +1446,7 @@ func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id }) } -// GetIndexesDrv retrieves Indexes from dataDB +// GetIndexesDrv retrieves Indexes from DB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context // id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (map[string]utils.StringSet, error) { @@ -1503,7 +1503,7 @@ func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, return indexes, nil } -// SetIndexesDrv stores Indexes into DataDB +// SetIndexesDrv stores Indexes into DB // the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context func (ms *MongoStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) error { diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 072817519..8c982c40f 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -964,7 +964,7 @@ func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id return rs.Cmd(nil, redisDEL, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id)) } -// GetIndexesDrv retrieves Indexes from dataDB +// GetIndexesDrv retrieves Indexes from DB func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) { mp := make(map[string]string) dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx @@ -997,7 +997,7 @@ func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, return } -// SetIndexesDrv stores Indexes into DataDB +// SetIndexesDrv stores Indexes into DB func (rs *RedisStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx diff --git a/engine/storage_sql.go b/engine/storage_sql.go index afb061697..46a192049 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -20,7 +20,6 @@ package engine import ( "database/sql" - "errors" "fmt" "os" "path" @@ -62,7 +61,7 @@ func (sqls *SQLStorage) ExportGormDB() *gorm.DB { } func (sqls *SQLStorage) Flush(scriptsPath string) (err error) { - for _, scriptName := range []string{utils.CreateAccountsTablesSQL, + for _, scriptName := range []string{utils.CreateDBTablesSQL, utils.CreateCDRsTablesSQL, utils.CreateTariffPlanTablesSQL} { if err := sqls.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil { return err @@ -140,6 +139,32 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrendProfiles, tntID) case utils.TrendPrefix: keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrends, tntID) + case utils.AttributeFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.AttributeFilterIndexes, tntID.Tenant) + case utils.ResourceFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.ResourceFilterIndexes, tntID.Tenant) + case utils.IPFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.IPFilterIndexes, tntID.Tenant) + case utils.StatFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.StatFilterIndexes, tntID.Tenant) + case utils.ThresholdFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.ThresholdFilterIndexes, tntID.Tenant) + case utils.RouteFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.RouteFilterIndexes, tntID.Tenant) + case utils.ChargerFilterIndexes: + keys, err = sqls.getAllIndexKeys(utils.ChargerFilterIndexes, tntID.Tenant) + case utils.ActionPlanIndexes: + keys, err = sqls.getAllIndexKeys(utils.ActionPlanIndexes, tntID.Tenant) + case utils.ActionProfilesFilterIndexPrfx: + keys, err = sqls.getAllIndexKeys(utils.ActionProfilesFilterIndexPrfx, tntID.Tenant) + case utils.AccountFilterIndexPrfx: + keys, err = sqls.getAllIndexKeys(utils.AccountFilterIndexPrfx, tntID.Tenant) + case utils.RateProfilesFilterIndexPrfx: + keys, err = sqls.getAllIndexKeys(utils.RateProfilesFilterIndexPrfx, tntID.Tenant) + case utils.RateFilterIndexPrfx: + keys, err = sqls.getAllIndexKeys(utils.RateFilterIndexPrfx, tntID.Tenant) + case utils.FilterIndexPrfx: + keys, err = sqls.getAllIndexKeys(utils.FilterIndexPrfx, tntID.Tenant) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) } @@ -167,13 +192,30 @@ func (sqls *SQLStorage) CreateTablesFromScript(scriptPath string) error { return nil } -func (sqls *SQLStorage) IsDBEmpty() (resp bool, err error) { - for _, tbl := range []string{utils.CDRsTBL, utils.TBLVersions} { - if sqls.db.Migrator().HasTable(tbl) { - return false, nil +func (sqls *SQLStorage) IsDBEmpty() (bool, error) { + sqlTables := []string{ + utils.CDRsTBL, utils.TBLVersions, utils.TBLAccounts, + utils.TBLIPProfiles, utils.TBLIPAllocations, utils.TBLActionProfiles, + utils.TBLChargerProfiles, utils.TBLAttributeProfiles, utils.TBLResourceProfiles, + utils.TBLResources, utils.TBLStatQueueProfiles, utils.TBLStatQueues, + utils.TBLThresholdProfiles, utils.TBLThresholds, utils.TBLFilters, + utils.TBLRouteProfiles, utils.TBLRateProfiles, utils.TBLRates, + utils.TBLRankingProfiles, utils.TBLRankings, utils.TBLTrendProfiles, + utils.TBLTrends, utils.TBLLoadIDs, utils.TBLIndexes, + } + for _, tbl := range sqlTables { + if !sqls.db.Migrator().HasTable(tbl) { + continue + } + var count int64 + if err := sqls.db.Table(tbl).Count(&count).Error; err != nil { + return false, err + } + if count > 0 { + return false, nil // Table contains data } } - return true, nil + return true, nil // All tables empty } // GetVersions returns slice of all versions or a specific version if tag is specified @@ -1073,24 +1115,20 @@ func (sqls *SQLStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateP return } } - var existingRP RateProfileJSONMdl - result := tx.Where(RateProfileJSONMdl{Tenant: rpMdl.Tenant, ID: rpMdl.ID}).First(&existingRP) - switch result.Error { - case nil: // Record exists, update it - rpMdl.PK = existingRP.PK - if err = tx.Save(rpMdl).Error; err != nil { - tx.Rollback() - return - } - case gorm.ErrRecordNotFound: // Record doesn't exist, create it - if err = tx.Create(rpMdl).Error; err != nil { - tx.Rollback() - return - } - default: + var existingRP []*RateProfileJSONMdl + result := tx.Where(RateProfileJSONMdl{Tenant: rpMdl.Tenant, ID: rpMdl.ID}).Find(&existingRP) + if result.Error != nil { tx.Rollback() return result.Error } + if result.RowsAffected > 0 { + // Update existing + rpMdl.PK = existingRP[0].PK + } + if err = tx.Save(rpMdl).Error; err != nil { + tx.Rollback() + return err + } for rID, rate := range rpp.Rates { rMdl := &RateMdl{ Tenant: rpp.Tenant, @@ -1106,24 +1144,20 @@ func (sqls *SQLStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateP return } } - var existingRT RateMdl - result := tx.Where(RateMdl{Tenant: rMdl.Tenant, ID: rMdl.ID, RateProfileID: rpMdl.ID}).First(&existingRT) - switch result.Error { - case nil: // Record exists, update it - rMdl.PK = existingRT.PK - if err = tx.Save(rMdl).Error; err != nil { - tx.Rollback() - return - } - case gorm.ErrRecordNotFound: // Record doesn't exist, create it - if err = tx.Create(rMdl).Error; err != nil { - tx.Rollback() - return - } - default: + var existingRT []*RateMdl + result := tx.Where(RateMdl{Tenant: rMdl.Tenant, ID: rMdl.ID, RateProfileID: rpMdl.ID}).Find(&existingRT) + if result.Error != nil { tx.Rollback() return result.Error } + if result.RowsAffected > 0 { + // Update existing + rMdl.PK = existingRT[0].PK + } + if err = tx.Save(rMdl).Error; err != nil { + tx.Rollback() + return err + } } tx.Commit() return @@ -1439,14 +1473,13 @@ func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance, // Make sure we do it locked since other instances can modify the history while we read it. err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) error { return sqls.db.Transaction(func(tx *gorm.DB) error { - var mdl *LoadInstanceMdl - result := tx.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).First(&mdl) - if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { - return result.Error + var mdl []*LoadInstanceMdl + if qErr := tx.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).Find(&mdl).Error; qErr != nil { + return qErr } var existingLoadHistory []*utils.LoadInstance - if result.Error == nil && len(mdl.LoadInstance) > 0 { - existingLoadHistory = utils.MapStringInterfaceToLoadInstances(mdl.LoadInstance) + if len(mdl) != 0 && len(mdl[0].LoadInstance) > 0 { + existingLoadHistory = utils.MapStringInterfaceToLoadInstances(mdl[0].LoadInstance) } // Insert at the first position @@ -1499,14 +1532,13 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool, return nil, utils.ErrNotFound } } - var mdl *LoadInstanceMdl - result := sqls.db.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).First(&mdl) - if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, result.Error - } else if errors.Is(result.Error, gorm.ErrRecordNotFound) { + var mdl []*LoadInstanceMdl + if err := sqls.db.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).Find(&mdl).Error; err != nil { + return nil, err + } else if len(mdl) == 0 { return nil, utils.ErrNotFound } - loadInstances := utils.MapStringInterfaceToLoadInstances(mdl.LoadInstance) + loadInstances := utils.MapStringInterfaceToLoadInstances(mdl[0].LoadInstance) cCommit := cacheCommit(transactionID) if errCh := Cache.Remove(context.TODO(), utils.LoadInstKey, utils.EmptyString, cCommit, transactionID); errCh != nil { return nil, errCh @@ -1521,22 +1553,17 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool, } func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) { - var mdl LoadIDMdl + var mdl []LoadIDMdl tx := sqls.db.Table(utils.TBLLoadIDs) - result := tx.First(&mdl) - if result.Error != nil { - if errors.Is(result.Error, gorm.ErrRecordNotFound) { - return nil, utils.ErrNotFound - } - return nil, result.Error - } - if len(mdl.LoadIDs) == 0 { + if err := tx.Find(&mdl).Error; err != nil { + return nil, err + } else if len(mdl) == 0 || len(mdl[0].LoadIDs) == 0 { return nil, utils.ErrNotFound } loadIDs = make(map[string]int64) // Filter by prefix if specified if itemIDPrefix != utils.EmptyString { - for key, value := range mdl.LoadIDs { + for key, value := range mdl[0].LoadIDs { if strings.HasPrefix(key, itemIDPrefix) { loadIDs[key] = int64(value.(float64)) } @@ -1545,7 +1572,7 @@ func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix str return nil, utils.ErrNotFound } } else { - for k, v := range mdl.LoadIDs { + for k, v := range mdl[0].LoadIDs { loadIDs[k] = int64(v.(float64)) } } @@ -1554,24 +1581,23 @@ func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix str func (sqls *SQLStorage) SetLoadIDsDrv(ctx *context.Context, loadIDs map[string]int64) error { return sqls.db.Transaction(func(tx *gorm.DB) error { - var existing LoadIDMdl - result := tx.Table(utils.TBLLoadIDs).First(&existing) - if result.Error == nil { - for k, v := range loadIDs { - existing.LoadIDs[k] = v - } - } else if errors.Is(result.Error, gorm.ErrRecordNotFound) { + var existing []*LoadIDMdl + if err := tx.Table(utils.TBLLoadIDs).Find(&existing).Error; err != nil { + return err + } + if len(existing) == 0 { loadIdsMapAny := make(map[string]any) for k, v := range loadIDs { loadIdsMapAny[k] = v } - existing = LoadIDMdl{ + existing = append([]*LoadIDMdl{}, &LoadIDMdl{ LoadIDs: loadIdsMapAny, - } - } else { - return result.Error + }) } - return tx.Table(utils.TBLLoadIDs).Save(&existing).Error + for k, v := range loadIDs { + existing[0].LoadIDs[k] = v + } + return tx.Table(utils.TBLLoadIDs).Save(&existing[0]).Error }) } @@ -1585,45 +1611,116 @@ func (sqls *SQLStorage) RemoveLoadIDsDrv() (err error) { return } -// Only intended for InternalDB -func (sqls *SQLStorage) BackupConfigDB(backupFolderPath string, zip bool) (err error) { - return utils.ErrNotImplemented +func (sqls *SQLStorage) getAllIndexKeys(tenant, typePrefix string) ([]string, error) { + var keys []string + if err := sqls.db.Model(&IndexMdl{}). + Where("tenant = ? AND type LIKE ?", tenant, typePrefix+"%"). + Pluck("key", &keys).Error; err != nil { + return nil, err + } + return keys, nil } -// BackupDataDB used only for InternalDB -func (sqls *SQLStorage) BackupDataDB(backupFolderPath string, zip bool) (err error) { - return utils.ErrNotImplemented -} - -// Will dump everything inside DB to a file, only for InternalDB -func (sqls *SQLStorage) DumpConfigDB() (err error) { - return utils.ErrNotImplemented -} - -// Will dump everything inside DB to a file, only for InternalDB -func (sqls *SQLStorage) DumpDataDB() (err error) { - return utils.ErrNotImplemented -} - -// Will rewrite every dump file of DataDB, only for InternalDB -func (sqls *SQLStorage) RewriteDataDB() (err error) { - return utils.ErrNotImplemented -} - -// GetIndexesDrv DataDB method not implemented yet +// GetIndexesDrv retrieves Indexes from DB +// tenants, item types, keys and values are stored in seperate columns func (sqls *SQLStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) { - return nil, utils.ErrNotImplemented + originItemType := utils.CacheInstanceToPrefix[idxItmType] + itemType := originItemType + if transactionID != utils.EmptyString { + itemType = "tmp_" + utils.ConcatenatedKey(originItemType, transactionID) + } + var indexesFound []*IndexMdl + if err := sqls.db.Transaction(func(tx *gorm.DB) error { + tx = tx.Where(&IndexMdl{Tenant: tntCtx, Type: itemType}) + if len(idxKey) != 0 { + tx = tx.Where(&IndexMdl{Key: idxKey}) + } + return tx.Find(&indexesFound).Error + }); err != nil { + return nil, err + } + if len(indexesFound) == 0 { + return nil, utils.ErrNotFound + } + indexes = make(map[string]utils.StringSet) + for _, indexFound := range indexesFound { + indexes[indexFound.Key] = utils.MapStringAnyToStringSet(indexFound.Value) + } + return indexes, nil } -// SetIndexesDrv DataDB method not implemented yet +// SetIndexesDrv stores Indexes into DB +// tenants, item types, keys and values are stored in seperate columns func (sqls *SQLStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string, indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) { - return utils.ErrNotImplemented + originItemType := utils.CacheInstanceToPrefix[idxItmType] + itemType := originItemType + if transactionID != utils.EmptyString { + itemType = "tmp_" + utils.ConcatenatedKey(originItemType, transactionID) + } + if commit && transactionID != utils.EmptyString { // only fully commit transactions + keys, err := sqls.getAllIndexKeys(tntCtx, itemType) + if err != nil { + return err + } + err = sqls.db.Transaction(func(tx *gorm.DB) error { + for _, key := range keys { + // ensure no duplicates exist + if err := tx.Where(&IndexMdl{Tenant: tntCtx, Type: originItemType, + Key: key}, key).Delete(&IndexMdl{}).Error; err != nil { + return err + } + if err := tx.Model(&IndexMdl{}).Where(&IndexMdl{Tenant: tntCtx, + Type: "tmp_" + utils.ConcatenatedKey(originItemType, transactionID), + Key: key}). // only update the key + Update("type", originItemType).Error; err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + } + var lastErr error + for idxKey, itmMp := range indexes { + err := sqls.db.Transaction(func(tx *gorm.DB) error { + if err := tx.Where(&IndexMdl{Tenant: tntCtx, Type: itemType, Key: idxKey}). + Delete(&IndexMdl{}).Error; err != nil { + return err + } + if len(itmMp) == 0 { + // DELETE entry it empty index + return nil + } + // upsert entry + return tx.Save(&IndexMdl{ + Tenant: tntCtx, + Type: itemType, + Key: idxKey, + Value: itmMp.ToMapStringAny(), + }).Error + }) + if err != nil { + lastErr = err + } + } + return lastErr } -// DataDB method not implemented yet +// RemoveIndexesDrv removes the indexes func (sqls *SQLStorage) RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (err error) { - return utils.ErrNotImplemented + if len(idxKey) != 0 { + return sqls.db.Transaction(func(tx *gorm.DB) error { + return tx.Where(&IndexMdl{Tenant: tntCtx, Type: idxItmType, Key: idxKey}). + Delete(&IndexMdl{}).Error + }) + } + return sqls.db.Transaction(func(tx *gorm.DB) error { + return tx.Where(&IndexMdl{Tenant: tntCtx, Type: idxItmType}). + Delete(&IndexMdl{}).Error + }) } // DataDB method not implemented yet @@ -1651,6 +1748,31 @@ func (sqls *SQLStorage) SetSection(_ *context.Context, section string, jsn any) return utils.ErrNotImplemented } +// Only intended for InternalDB +func (sqls *SQLStorage) BackupConfigDB(backupFolderPath string, zip bool) (err error) { + return utils.ErrNotImplemented +} + +// BackupDataDB used only for InternalDB +func (sqls *SQLStorage) BackupDataDB(backupFolderPath string, zip bool) (err error) { + return utils.ErrNotImplemented +} + +// Will dump everything inside DB to a file, only for InternalDB +func (sqls *SQLStorage) DumpConfigDB() (err error) { + return utils.ErrNotImplemented +} + +// Will dump everything inside DB to a file, only for InternalDB +func (sqls *SQLStorage) DumpDataDB() (err error) { + return utils.ErrNotImplemented +} + +// Will rewrite every dump file of DataDB, only for InternalDB +func (sqls *SQLStorage) RewriteDataDB() (err error) { + return utils.ErrNotImplemented +} + // Only intended for InternalDB func (sqls *SQLStorage) RewriteConfigDB() (err error) { return utils.ErrNotImplemented diff --git a/engine/version.go b/engine/version.go index e5d186ea4..995af6034 100644 --- a/engine/version.go +++ b/engine/version.go @@ -103,7 +103,7 @@ func (vers Versions) Compare(curent Versions, storType string) string { switch storType { case utils.MetaMongo: message = dataDBVers - case utils.MetaInternal: + case utils.MetaInternal, utils.MetaMySQL, utils.MetaPostgres: message = allVers case utils.MetaRedis: message = dataDBVers @@ -163,14 +163,10 @@ func CurrentAllDBVersions() Versions { // CurrentDBVersions returns versions based on dbType func CurrentDBVersions(storType string) Versions { switch storType { - case utils.MetaMongo: - return CurrentAllDBVersions() - case utils.MetaInternal: + case utils.MetaMongo, utils.MetaPostgres, utils.MetaMySQL, utils.MetaInternal: return CurrentAllDBVersions() case utils.MetaRedis: return CurrentDataDBVersions() - case utils.MetaPostgres, utils.MetaMySQL: - return CurrentStorDBVersions() } return nil } diff --git a/utils/consts.go b/utils/consts.go index aae40a21b..2dd1398f5 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -290,7 +290,7 @@ const ( LoadIDPrefix = "lid_" LoadInstKey = "load_history" CreateCDRsTablesSQL = "create_cdrs_tables.sql" - CreateAccountsTablesSQL = "create_db_tables.sql" + CreateDBTablesSQL = "create_db_tables.sql" CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql" TestSQL = "TEST_SQL" MetaAsc = "*asc" @@ -1942,6 +1942,7 @@ const ( TBLTrendProfiles = "trend_profiles" TBLTrends = "trends" TBLLoadIDs = "load_ids" + TBLIndexes = "indexes" OldSMCosts = "sm_costs" TBLTPDispatchers = "tp_dispatcher_profiles" TBLTPDispatcherHosts = "tp_dispatcher_hosts" diff --git a/utils/stringset.go b/utils/stringset.go index 158f974f7..722049e85 100644 --- a/utils/stringset.go +++ b/utils/stringset.go @@ -162,6 +162,24 @@ func (s StringSet) FieldAsString(fldPath []string) (_ string, err error) { return "{}", nil // noting in it as is a empty structure } +// ToMapStringAny converts StringSet to map[string]any +func (s StringSet) ToMapStringAny() map[string]any { + out := make(map[string]any, len(s)) + for k, v := range s { + out[k] = v + } + return out +} + +// MapStringAnyToStringSet converts map[string]any to a StringSet. +func MapStringAnyToStringSet(m map[string]any) StringSet { + out := make(StringSet, len(m)) + for k := range m { + out[k] = struct{}{} + } + return out +} + // InterfaceToMapStringStringSet converts map[string]any to map[string]StringSet func InterfaceToMapStringStringSet(m any) map[string]StringSet { v, ok := m.(map[string]any)