mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 23:28:44 +05:00
Make Indexes storable in MySQL and Postgres
This commit is contained in:
committed by
Dan Christian Bogos
parent
7722265e11
commit
0da0f69e1c
@@ -745,13 +745,6 @@ var (
|
||||
DB: &DBParams{
|
||||
DBConns: map[string]DBConn{
|
||||
utils.MetaDefault: {
|
||||
Type: utils.StringPointer(utils.MetaRedis),
|
||||
Host: utils.StringPointer("127.0.0.1"),
|
||||
Port: utils.IntPointer(6379),
|
||||
Name: utils.StringPointer("10"),
|
||||
User: utils.StringPointer(utils.CGRateSLwr),
|
||||
},
|
||||
utils.StorDB: {
|
||||
Type: utils.StringPointer(utils.MetaMySQL),
|
||||
Host: utils.StringPointer("127.0.0.1"),
|
||||
Port: utils.IntPointer(3306),
|
||||
@@ -760,12 +753,6 @@ var (
|
||||
Password: utils.StringPointer("CGRateS.org"),
|
||||
},
|
||||
},
|
||||
Items: map[string]Item{
|
||||
utils.MetaCDRs: {
|
||||
Limit: utils.IntPointer(-1),
|
||||
DbConn: utils.StringPointer(utils.StorDB),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
RedisDBCfg = DBCfg{
|
||||
@@ -826,13 +813,6 @@ var (
|
||||
DB: &DBParams{
|
||||
DBConns: map[string]DBConn{
|
||||
utils.MetaDefault: {
|
||||
Type: utils.StringPointer(utils.MetaRedis),
|
||||
Host: utils.StringPointer("127.0.0.1"),
|
||||
Port: utils.IntPointer(6379),
|
||||
Name: utils.StringPointer("10"),
|
||||
User: utils.StringPointer(utils.CGRateSLwr),
|
||||
},
|
||||
utils.StorDB: {
|
||||
Type: utils.StringPointer(utils.MetaPostgres),
|
||||
Host: utils.StringPointer("127.0.0.1"),
|
||||
Port: utils.IntPointer(5432),
|
||||
@@ -841,12 +821,6 @@ var (
|
||||
Password: utils.StringPointer("CGRateS.org"),
|
||||
},
|
||||
},
|
||||
Items: map[string]Item{
|
||||
utils.MetaCDRs: {
|
||||
Limit: utils.IntPointer(-1),
|
||||
DbConn: utils.StringPointer(utils.StorDB),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -537,7 +537,7 @@ func (RouteProfileMdl) TableName() string {
|
||||
return utils.TBLRouteProfiles
|
||||
}
|
||||
|
||||
// Doesnt include Rates in RateProfile json, Rates taken from Rate using foreign keys
|
||||
// Doesnt include Rates in RateProfile json, Rates taken from RateMdl using foreign keys
|
||||
type RateProfileJSONMdl struct {
|
||||
PK uint `gorm:"primary_key"`
|
||||
Tenant string `index:"0" re:".*"`
|
||||
@@ -622,3 +622,15 @@ type LoadIDMdl struct {
|
||||
func (LoadIDMdl) TableName() string {
|
||||
return utils.TBLLoadIDs
|
||||
}
|
||||
|
||||
type IndexMdl struct {
|
||||
PK uint `gorm:"primary_key"`
|
||||
Tenant string `index:"0" re:".*"`
|
||||
Type string `index:"1" re:".*"`
|
||||
Key string `index:"2" re:".*"`
|
||||
Value utils.JSONB `gorm:"type:jsonb" index:"3" re:".*"`
|
||||
}
|
||||
|
||||
func (IndexMdl) TableName() string {
|
||||
return utils.TBLIndexes
|
||||
}
|
||||
|
||||
@@ -1446,7 +1446,7 @@ func (ms *MongoStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id
|
||||
})
|
||||
}
|
||||
|
||||
// GetIndexesDrv retrieves Indexes from dataDB
|
||||
// GetIndexesDrv retrieves Indexes from DB
|
||||
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
|
||||
// id is used as a concatenated key in case of filterIndexes the id will be filterType:fieldName:fieldVal
|
||||
func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (map[string]utils.StringSet, error) {
|
||||
@@ -1503,7 +1503,7 @@ func (ms *MongoStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx,
|
||||
return indexes, nil
|
||||
}
|
||||
|
||||
// SetIndexesDrv stores Indexes into DataDB
|
||||
// SetIndexesDrv stores Indexes into DB
|
||||
// the key is the tenant of the item or in case of context dependent profiles is a concatenatedKey between tenant and context
|
||||
func (ms *MongoStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
|
||||
indexes map[string]utils.StringSet, commit bool, transactionID string) error {
|
||||
|
||||
@@ -964,7 +964,7 @@ func (rs *RedisStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id
|
||||
return rs.Cmd(nil, redisDEL, utils.ActionProfilePrefix+utils.ConcatenatedKey(tenant, id))
|
||||
}
|
||||
|
||||
// GetIndexesDrv retrieves Indexes from dataDB
|
||||
// GetIndexesDrv retrieves Indexes from DB
|
||||
func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
|
||||
mp := make(map[string]string)
|
||||
dbKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
|
||||
@@ -997,7 +997,7 @@ func (rs *RedisStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx,
|
||||
return
|
||||
}
|
||||
|
||||
// SetIndexesDrv stores Indexes into DataDB
|
||||
// SetIndexesDrv stores Indexes into DB
|
||||
func (rs *RedisStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
|
||||
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
|
||||
originKey := utils.CacheInstanceToPrefix[idxItmType] + tntCtx
|
||||
|
||||
@@ -20,7 +20,6 @@ package engine
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
@@ -62,7 +61,7 @@ func (sqls *SQLStorage) ExportGormDB() *gorm.DB {
|
||||
}
|
||||
|
||||
func (sqls *SQLStorage) Flush(scriptsPath string) (err error) {
|
||||
for _, scriptName := range []string{utils.CreateAccountsTablesSQL,
|
||||
for _, scriptName := range []string{utils.CreateDBTablesSQL,
|
||||
utils.CreateCDRsTablesSQL, utils.CreateTariffPlanTablesSQL} {
|
||||
if err := sqls.CreateTablesFromScript(path.Join(scriptsPath, scriptName)); err != nil {
|
||||
return err
|
||||
@@ -140,6 +139,32 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
|
||||
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrendProfiles, tntID)
|
||||
case utils.TrendPrefix:
|
||||
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrends, tntID)
|
||||
case utils.AttributeFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.AttributeFilterIndexes, tntID.Tenant)
|
||||
case utils.ResourceFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.ResourceFilterIndexes, tntID.Tenant)
|
||||
case utils.IPFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.IPFilterIndexes, tntID.Tenant)
|
||||
case utils.StatFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.StatFilterIndexes, tntID.Tenant)
|
||||
case utils.ThresholdFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.ThresholdFilterIndexes, tntID.Tenant)
|
||||
case utils.RouteFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.RouteFilterIndexes, tntID.Tenant)
|
||||
case utils.ChargerFilterIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.ChargerFilterIndexes, tntID.Tenant)
|
||||
case utils.ActionPlanIndexes:
|
||||
keys, err = sqls.getAllIndexKeys(utils.ActionPlanIndexes, tntID.Tenant)
|
||||
case utils.ActionProfilesFilterIndexPrfx:
|
||||
keys, err = sqls.getAllIndexKeys(utils.ActionProfilesFilterIndexPrfx, tntID.Tenant)
|
||||
case utils.AccountFilterIndexPrfx:
|
||||
keys, err = sqls.getAllIndexKeys(utils.AccountFilterIndexPrfx, tntID.Tenant)
|
||||
case utils.RateProfilesFilterIndexPrfx:
|
||||
keys, err = sqls.getAllIndexKeys(utils.RateProfilesFilterIndexPrfx, tntID.Tenant)
|
||||
case utils.RateFilterIndexPrfx:
|
||||
keys, err = sqls.getAllIndexKeys(utils.RateFilterIndexPrfx, tntID.Tenant)
|
||||
case utils.FilterIndexPrfx:
|
||||
keys, err = sqls.getAllIndexKeys(utils.FilterIndexPrfx, tntID.Tenant)
|
||||
default:
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
|
||||
}
|
||||
@@ -167,13 +192,30 @@ func (sqls *SQLStorage) CreateTablesFromScript(scriptPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sqls *SQLStorage) IsDBEmpty() (resp bool, err error) {
|
||||
for _, tbl := range []string{utils.CDRsTBL, utils.TBLVersions} {
|
||||
if sqls.db.Migrator().HasTable(tbl) {
|
||||
return false, nil
|
||||
func (sqls *SQLStorage) IsDBEmpty() (bool, error) {
|
||||
sqlTables := []string{
|
||||
utils.CDRsTBL, utils.TBLVersions, utils.TBLAccounts,
|
||||
utils.TBLIPProfiles, utils.TBLIPAllocations, utils.TBLActionProfiles,
|
||||
utils.TBLChargerProfiles, utils.TBLAttributeProfiles, utils.TBLResourceProfiles,
|
||||
utils.TBLResources, utils.TBLStatQueueProfiles, utils.TBLStatQueues,
|
||||
utils.TBLThresholdProfiles, utils.TBLThresholds, utils.TBLFilters,
|
||||
utils.TBLRouteProfiles, utils.TBLRateProfiles, utils.TBLRates,
|
||||
utils.TBLRankingProfiles, utils.TBLRankings, utils.TBLTrendProfiles,
|
||||
utils.TBLTrends, utils.TBLLoadIDs, utils.TBLIndexes,
|
||||
}
|
||||
for _, tbl := range sqlTables {
|
||||
if !sqls.db.Migrator().HasTable(tbl) {
|
||||
continue
|
||||
}
|
||||
var count int64
|
||||
if err := sqls.db.Table(tbl).Count(&count).Error; err != nil {
|
||||
return false, err
|
||||
}
|
||||
if count > 0 {
|
||||
return false, nil // Table contains data
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
return true, nil // All tables empty
|
||||
}
|
||||
|
||||
// GetVersions returns slice of all versions or a specific version if tag is specified
|
||||
@@ -1073,24 +1115,20 @@ func (sqls *SQLStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateP
|
||||
return
|
||||
}
|
||||
}
|
||||
var existingRP RateProfileJSONMdl
|
||||
result := tx.Where(RateProfileJSONMdl{Tenant: rpMdl.Tenant, ID: rpMdl.ID}).First(&existingRP)
|
||||
switch result.Error {
|
||||
case nil: // Record exists, update it
|
||||
rpMdl.PK = existingRP.PK
|
||||
if err = tx.Save(rpMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
case gorm.ErrRecordNotFound: // Record doesn't exist, create it
|
||||
if err = tx.Create(rpMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
default:
|
||||
var existingRP []*RateProfileJSONMdl
|
||||
result := tx.Where(RateProfileJSONMdl{Tenant: rpMdl.Tenant, ID: rpMdl.ID}).Find(&existingRP)
|
||||
if result.Error != nil {
|
||||
tx.Rollback()
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected > 0 {
|
||||
// Update existing
|
||||
rpMdl.PK = existingRP[0].PK
|
||||
}
|
||||
if err = tx.Save(rpMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
for rID, rate := range rpp.Rates {
|
||||
rMdl := &RateMdl{
|
||||
Tenant: rpp.Tenant,
|
||||
@@ -1106,24 +1144,20 @@ func (sqls *SQLStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateP
|
||||
return
|
||||
}
|
||||
}
|
||||
var existingRT RateMdl
|
||||
result := tx.Where(RateMdl{Tenant: rMdl.Tenant, ID: rMdl.ID, RateProfileID: rpMdl.ID}).First(&existingRT)
|
||||
switch result.Error {
|
||||
case nil: // Record exists, update it
|
||||
rMdl.PK = existingRT.PK
|
||||
if err = tx.Save(rMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
case gorm.ErrRecordNotFound: // Record doesn't exist, create it
|
||||
if err = tx.Create(rMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return
|
||||
}
|
||||
default:
|
||||
var existingRT []*RateMdl
|
||||
result := tx.Where(RateMdl{Tenant: rMdl.Tenant, ID: rMdl.ID, RateProfileID: rpMdl.ID}).Find(&existingRT)
|
||||
if result.Error != nil {
|
||||
tx.Rollback()
|
||||
return result.Error
|
||||
}
|
||||
if result.RowsAffected > 0 {
|
||||
// Update existing
|
||||
rMdl.PK = existingRT[0].PK
|
||||
}
|
||||
if err = tx.Save(rMdl).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
tx.Commit()
|
||||
return
|
||||
@@ -1439,14 +1473,13 @@ func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
|
||||
// Make sure we do it locked since other instances can modify the history while we read it.
|
||||
err := guardian.Guardian.Guard(context.TODO(), func(ctx *context.Context) error {
|
||||
return sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
var mdl *LoadInstanceMdl
|
||||
result := tx.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).First(&mdl)
|
||||
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return result.Error
|
||||
var mdl []*LoadInstanceMdl
|
||||
if qErr := tx.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).Find(&mdl).Error; qErr != nil {
|
||||
return qErr
|
||||
}
|
||||
var existingLoadHistory []*utils.LoadInstance
|
||||
if result.Error == nil && len(mdl.LoadInstance) > 0 {
|
||||
existingLoadHistory = utils.MapStringInterfaceToLoadInstances(mdl.LoadInstance)
|
||||
if len(mdl) != 0 && len(mdl[0].LoadInstance) > 0 {
|
||||
existingLoadHistory = utils.MapStringInterfaceToLoadInstances(mdl[0].LoadInstance)
|
||||
}
|
||||
|
||||
// Insert at the first position
|
||||
@@ -1499,14 +1532,13 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
var mdl *LoadInstanceMdl
|
||||
result := sqls.db.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).First(&mdl)
|
||||
if result.Error != nil && !errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, result.Error
|
||||
} else if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
var mdl []*LoadInstanceMdl
|
||||
if err := sqls.db.Table(utils.LoadInstKey).Where(&LoadInstanceMdl{Key: utils.LoadInstKey}).Find(&mdl).Error; err != nil {
|
||||
return nil, err
|
||||
} else if len(mdl) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
loadInstances := utils.MapStringInterfaceToLoadInstances(mdl.LoadInstance)
|
||||
loadInstances := utils.MapStringInterfaceToLoadInstances(mdl[0].LoadInstance)
|
||||
cCommit := cacheCommit(transactionID)
|
||||
if errCh := Cache.Remove(context.TODO(), utils.LoadInstKey, utils.EmptyString, cCommit, transactionID); errCh != nil {
|
||||
return nil, errCh
|
||||
@@ -1521,22 +1553,17 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
|
||||
}
|
||||
|
||||
func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) {
|
||||
var mdl LoadIDMdl
|
||||
var mdl []LoadIDMdl
|
||||
tx := sqls.db.Table(utils.TBLLoadIDs)
|
||||
result := tx.First(&mdl)
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return nil, result.Error
|
||||
}
|
||||
if len(mdl.LoadIDs) == 0 {
|
||||
if err := tx.Find(&mdl).Error; err != nil {
|
||||
return nil, err
|
||||
} else if len(mdl) == 0 || len(mdl[0].LoadIDs) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
loadIDs = make(map[string]int64)
|
||||
// Filter by prefix if specified
|
||||
if itemIDPrefix != utils.EmptyString {
|
||||
for key, value := range mdl.LoadIDs {
|
||||
for key, value := range mdl[0].LoadIDs {
|
||||
if strings.HasPrefix(key, itemIDPrefix) {
|
||||
loadIDs[key] = int64(value.(float64))
|
||||
}
|
||||
@@ -1545,7 +1572,7 @@ func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix str
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
} else {
|
||||
for k, v := range mdl.LoadIDs {
|
||||
for k, v := range mdl[0].LoadIDs {
|
||||
loadIDs[k] = int64(v.(float64))
|
||||
}
|
||||
}
|
||||
@@ -1554,24 +1581,23 @@ func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix str
|
||||
|
||||
func (sqls *SQLStorage) SetLoadIDsDrv(ctx *context.Context, loadIDs map[string]int64) error {
|
||||
return sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
var existing LoadIDMdl
|
||||
result := tx.Table(utils.TBLLoadIDs).First(&existing)
|
||||
if result.Error == nil {
|
||||
for k, v := range loadIDs {
|
||||
existing.LoadIDs[k] = v
|
||||
}
|
||||
} else if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
var existing []*LoadIDMdl
|
||||
if err := tx.Table(utils.TBLLoadIDs).Find(&existing).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if len(existing) == 0 {
|
||||
loadIdsMapAny := make(map[string]any)
|
||||
for k, v := range loadIDs {
|
||||
loadIdsMapAny[k] = v
|
||||
}
|
||||
existing = LoadIDMdl{
|
||||
existing = append([]*LoadIDMdl{}, &LoadIDMdl{
|
||||
LoadIDs: loadIdsMapAny,
|
||||
}
|
||||
} else {
|
||||
return result.Error
|
||||
})
|
||||
}
|
||||
return tx.Table(utils.TBLLoadIDs).Save(&existing).Error
|
||||
for k, v := range loadIDs {
|
||||
existing[0].LoadIDs[k] = v
|
||||
}
|
||||
return tx.Table(utils.TBLLoadIDs).Save(&existing[0]).Error
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1585,45 +1611,116 @@ func (sqls *SQLStorage) RemoveLoadIDsDrv() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// Only intended for InternalDB
|
||||
func (sqls *SQLStorage) BackupConfigDB(backupFolderPath string, zip bool) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
func (sqls *SQLStorage) getAllIndexKeys(tenant, typePrefix string) ([]string, error) {
|
||||
var keys []string
|
||||
if err := sqls.db.Model(&IndexMdl{}).
|
||||
Where("tenant = ? AND type LIKE ?", tenant, typePrefix+"%").
|
||||
Pluck("key", &keys).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return keys, nil
|
||||
}
|
||||
|
||||
// BackupDataDB used only for InternalDB
|
||||
func (sqls *SQLStorage) BackupDataDB(backupFolderPath string, zip bool) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will dump everything inside DB to a file, only for InternalDB
|
||||
func (sqls *SQLStorage) DumpConfigDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will dump everything inside DB to a file, only for InternalDB
|
||||
func (sqls *SQLStorage) DumpDataDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will rewrite every dump file of DataDB, only for InternalDB
|
||||
func (sqls *SQLStorage) RewriteDataDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// GetIndexesDrv DataDB method not implemented yet
|
||||
// GetIndexesDrv retrieves Indexes from DB
|
||||
// tenants, item types, keys and values are stored in seperate columns
|
||||
func (sqls *SQLStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
originItemType := utils.CacheInstanceToPrefix[idxItmType]
|
||||
itemType := originItemType
|
||||
if transactionID != utils.EmptyString {
|
||||
itemType = "tmp_" + utils.ConcatenatedKey(originItemType, transactionID)
|
||||
}
|
||||
var indexesFound []*IndexMdl
|
||||
if err := sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
tx = tx.Where(&IndexMdl{Tenant: tntCtx, Type: itemType})
|
||||
if len(idxKey) != 0 {
|
||||
tx = tx.Where(&IndexMdl{Key: idxKey})
|
||||
}
|
||||
return tx.Find(&indexesFound).Error
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(indexesFound) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
indexes = make(map[string]utils.StringSet)
|
||||
for _, indexFound := range indexesFound {
|
||||
indexes[indexFound.Key] = utils.MapStringAnyToStringSet(indexFound.Value)
|
||||
}
|
||||
return indexes, nil
|
||||
}
|
||||
|
||||
// SetIndexesDrv DataDB method not implemented yet
|
||||
// SetIndexesDrv stores Indexes into DB
|
||||
// tenants, item types, keys and values are stored in seperate columns
|
||||
func (sqls *SQLStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
|
||||
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
originItemType := utils.CacheInstanceToPrefix[idxItmType]
|
||||
itemType := originItemType
|
||||
if transactionID != utils.EmptyString {
|
||||
itemType = "tmp_" + utils.ConcatenatedKey(originItemType, transactionID)
|
||||
}
|
||||
if commit && transactionID != utils.EmptyString { // only fully commit transactions
|
||||
keys, err := sqls.getAllIndexKeys(tntCtx, itemType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
for _, key := range keys {
|
||||
// ensure no duplicates exist
|
||||
if err := tx.Where(&IndexMdl{Tenant: tntCtx, Type: originItemType,
|
||||
Key: key}, key).Delete(&IndexMdl{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Model(&IndexMdl{}).Where(&IndexMdl{Tenant: tntCtx,
|
||||
Type: "tmp_" + utils.ConcatenatedKey(originItemType, transactionID),
|
||||
Key: key}). // only update the key
|
||||
Update("type", originItemType).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var lastErr error
|
||||
for idxKey, itmMp := range indexes {
|
||||
err := sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Where(&IndexMdl{Tenant: tntCtx, Type: itemType, Key: idxKey}).
|
||||
Delete(&IndexMdl{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if len(itmMp) == 0 {
|
||||
// DELETE entry it empty index
|
||||
return nil
|
||||
}
|
||||
// upsert entry
|
||||
return tx.Save(&IndexMdl{
|
||||
Tenant: tntCtx,
|
||||
Type: itemType,
|
||||
Key: idxKey,
|
||||
Value: itmMp.ToMapStringAny(),
|
||||
}).Error
|
||||
})
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// DataDB method not implemented yet
|
||||
// RemoveIndexesDrv removes the indexes
|
||||
func (sqls *SQLStorage) RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
if len(idxKey) != 0 {
|
||||
return sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
return tx.Where(&IndexMdl{Tenant: tntCtx, Type: idxItmType, Key: idxKey}).
|
||||
Delete(&IndexMdl{}).Error
|
||||
})
|
||||
}
|
||||
return sqls.db.Transaction(func(tx *gorm.DB) error {
|
||||
return tx.Where(&IndexMdl{Tenant: tntCtx, Type: idxItmType}).
|
||||
Delete(&IndexMdl{}).Error
|
||||
})
|
||||
}
|
||||
|
||||
// DataDB method not implemented yet
|
||||
@@ -1651,6 +1748,31 @@ func (sqls *SQLStorage) SetSection(_ *context.Context, section string, jsn any)
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Only intended for InternalDB
|
||||
func (sqls *SQLStorage) BackupConfigDB(backupFolderPath string, zip bool) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// BackupDataDB used only for InternalDB
|
||||
func (sqls *SQLStorage) BackupDataDB(backupFolderPath string, zip bool) (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will dump everything inside DB to a file, only for InternalDB
|
||||
func (sqls *SQLStorage) DumpConfigDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will dump everything inside DB to a file, only for InternalDB
|
||||
func (sqls *SQLStorage) DumpDataDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Will rewrite every dump file of DataDB, only for InternalDB
|
||||
func (sqls *SQLStorage) RewriteDataDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Only intended for InternalDB
|
||||
func (sqls *SQLStorage) RewriteConfigDB() (err error) {
|
||||
return utils.ErrNotImplemented
|
||||
|
||||
@@ -103,7 +103,7 @@ func (vers Versions) Compare(curent Versions, storType string) string {
|
||||
switch storType {
|
||||
case utils.MetaMongo:
|
||||
message = dataDBVers
|
||||
case utils.MetaInternal:
|
||||
case utils.MetaInternal, utils.MetaMySQL, utils.MetaPostgres:
|
||||
message = allVers
|
||||
case utils.MetaRedis:
|
||||
message = dataDBVers
|
||||
@@ -163,14 +163,10 @@ func CurrentAllDBVersions() Versions {
|
||||
// CurrentDBVersions returns versions based on dbType
|
||||
func CurrentDBVersions(storType string) Versions {
|
||||
switch storType {
|
||||
case utils.MetaMongo:
|
||||
return CurrentAllDBVersions()
|
||||
case utils.MetaInternal:
|
||||
case utils.MetaMongo, utils.MetaPostgres, utils.MetaMySQL, utils.MetaInternal:
|
||||
return CurrentAllDBVersions()
|
||||
case utils.MetaRedis:
|
||||
return CurrentDataDBVersions()
|
||||
case utils.MetaPostgres, utils.MetaMySQL:
|
||||
return CurrentStorDBVersions()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user