Add multiple DB connections functionality

This commit is contained in:
arberkatellari
2025-10-16 15:54:55 +02:00
committed by Dan Christian Bogos
parent 119481b3d1
commit 5f2b1f67df
109 changed files with 3361 additions and 4613 deletions

View File

@@ -585,6 +585,14 @@ func (dbM *DataDBMock) BackupDataDB(backupFolderPath string, zip bool) (err erro
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) BackupStorDBDump(backupFolderPath string, zip bool) (err error) {
func (dbM *DataDBMock) SetCDR(_ *context.Context, cdr *utils.CGREvent, allowUpdate bool) error {
return utils.ErrNotImplemented
}
func (dbM *DataDBMock) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) ([]*utils.CDR, error) {
return nil, utils.ErrNotImplemented
}
func (dbM *DataDBMock) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) {
return utils.ErrNotImplemented
}

File diff suppressed because it is too large Load Diff

View File

@@ -50,8 +50,13 @@ func MatchingItemIDsForEvent(ctx *context.Context, ev utils.MapStorage, stringFl
lockID := utils.CacheInstanceToPrefix[cacheID] + itemIDPrefix
guardian.Guardian.Guard(ctx, func(ctx *context.Context) (_ error) {
if !indexedSelects {
var dataDB DataDB
dataDB, _, err = dm.dbConns.GetConn(cacheID)
if err != nil {
return
}
var keysWithID []string
if keysWithID, err = dm.DataDB().GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[cacheID]); err != nil {
if keysWithID, err = dataDB.GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[cacheID]); err != nil {
return
}
var sliceIDs []string

View File

@@ -293,9 +293,13 @@ func ComputeIndexes(ctx *context.Context, dm *DataManager, tnt, grp, idxItmType
var profilesIDs []string
if IDs == nil { // get all items
Cache.Clear([]string{idxItmType})
dataDB, _, err := dm.dbConns.GetConn(idxItmType)
if err != nil {
return nil, err
}
var ids []string
if ids, err = dm.DataDB().GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[idxItmType]); err != nil {
return
if ids, err = dataDB.GetKeysForPrefix(ctx, utils.CacheIndexesToPrefix[idxItmType]); err != nil {
return nil, err
}
for _, id := range ids {
profilesIDs = append(profilesIDs, utils.SplitConcatenatedKey(id)[1])

View File

@@ -287,8 +287,12 @@ func GetFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxC
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
dataDB, _, err := dm.DBConns().GetConn(indxType)
if err != nil {
return
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
if ids, err = dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
return
}
for _, id := range ids { // get all the objects from DB
@@ -307,7 +311,7 @@ func GetFltrIdxHealth(ctx *context.Context, dm *DataManager, fltrCache, fltrIdxC
// check the indexes( index->filter->obj relation)
idxPrfx := utils.CacheInstanceToPrefix[indxType]
var indexKeys []string
if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, idxPrfx); err != nil {
if indexKeys, err = dataDB.GetKeysForPrefix(ctx, idxPrfx); err != nil {
return
}
missingObj := utils.StringSet{}
@@ -381,8 +385,12 @@ func getRevFltrIdxHealthFromObj(ctx *context.Context, dm *DataManager, fltrCache
MissingFilters: make(map[string][]string),
}
objPrfx := utils.CacheIndexesToPrefix[indxType]
dataDB, _, err := dm.DBConns().GetConn(indxType)
if err != nil {
return
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
if ids, err = dataDB.GetKeysForPrefix(ctx, objPrfx); err != nil {
return
}
for _, id := range ids { // get all the objects
@@ -425,8 +433,12 @@ func getRevFltrIdxHealthFromObj(ctx *context.Context, dm *DataManager, fltrCache
// getRevFltrIdxHealthFromReverse parses the reverse indexes and updates the reply
func getRevFltrIdxHealthFromReverse(ctx *context.Context, dm *DataManager, fltrCache, revFltrIdxCache *ltcache.Cache, objCaches map[string]*ltcache.Cache, rply map[string]*ReverseFilterIHReply) (_ map[string]*ReverseFilterIHReply, err error) {
dataDB, _, err := dm.DBConns().GetConn(utils.CacheReverseFilterIndexes)
if err != nil {
return
}
var revIndexKeys []string
if revIndexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.FilterIndexPrfx); err != nil {
if revIndexKeys, err = dataDB.GetKeysForPrefix(ctx, utils.FilterIndexPrfx); err != nil {
return
}
missingObj := utils.StringSet{}
@@ -548,8 +560,12 @@ func GetFltrIdxHealthForRateRates(ctx *context.Context, dm *DataManager, fltrCac
BrokenIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
dataDB, _, err := dm.DBConns().GetConn(utils.MetaRateProfiles)
if err != nil {
return
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
if ids, err = dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
return
}
for _, id := range ids {
@@ -569,7 +585,7 @@ func GetFltrIdxHealthForRateRates(ctx *context.Context, dm *DataManager, fltrCac
// check the indexes( index->filter->obj relation)
var indexKeys []string
if indexKeys, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateFilterIndexPrfx); err != nil {
if indexKeys, err = dataDB.GetKeysForPrefix(ctx, utils.RateFilterIndexPrfx); err != nil {
return
}
for _, dataID := range indexKeys {
@@ -649,8 +665,12 @@ func getRevFltrIdxHealthFromRateRates(ctx *context.Context, dm *DataManager, flt
BrokenReverseIndexes: make(map[string][]string),
MissingFilters: make(map[string][]string),
}
dataDB, _, err := dm.DBConns().GetConn(utils.MetaRateProfiles)
if err != nil {
return
}
var ids []string
if ids, err = dm.dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
if ids, err = dataDB.GetKeysForPrefix(ctx, utils.RateProfilePrefix); err != nil {
return
}
for _, id := range ids {

View File

@@ -608,7 +608,11 @@ func (sS *StatS) V1GetQueueIDs(ctx *context.Context, args *utils.TenantWithAPIOp
tenant = sS.cfg.GeneralCfg().DefaultTenant
}
prfx := utils.StatQueuePrefix + tenant + utils.ConcatenatedKeySep
keys, err := sS.dm.DataDB().GetKeysForPrefix(ctx, prfx)
dataDB, _, err := sS.dm.DBConns().GetConn(utils.MetaStatQueues)
if err != nil {
return err
}
keys, err := dataDB.GetKeysForPrefix(ctx, prfx)
if err != nil {
return err
}

View File

@@ -110,6 +110,9 @@ type DataDB interface {
GetConfigSectionsDrv(*context.Context, string, []string) (map[string][]byte, error)
SetConfigSectionsDrv(*context.Context, string, map[string][]byte) error
RemoveConfigSectionsDrv(*context.Context, string, []string) error
SetCDR(*context.Context, *utils.CGREvent, bool) error
GetCDRs(*context.Context, []*Filter, map[string]any) ([]*utils.CDR, error)
RemoveCDRs(*context.Context, []*Filter) error
DumpDataDB() error
RewriteDataDB() error
BackupDataDB(string, bool) error
@@ -121,16 +124,6 @@ type DataDBDriver interface {
config.ConfigDB
}
type StorDB interface {
Storage
SetCDR(*context.Context, *utils.CGREvent, bool) error
GetCDRs(*context.Context, []*Filter, map[string]any) ([]*utils.CDR, error)
RemoveCDRs(*context.Context, []*Filter) error
DumpStorDB() error
RewriteStorDB() error
BackupStorDB(string, bool) error
}
type LoadStorage interface {
Storage
LoadReader

View File

@@ -262,18 +262,3 @@ func (iDB *InternalDB) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
}
return
}
// Will dump everything inside stordb to files
func (iDB *InternalDB) DumpStorDB() (err error) {
return iDB.db.DumpAll()
}
// Will rewrite every dump file of StorDB
func (iDB *InternalDB) RewriteStorDB() (err error) {
return iDB.db.RewriteAll()
}
// BackupStorDB will momentarely stop any dumping and rewriting until all dump folder is backed up in folder path backupFolderPath, making zip true will create a zip file in the path instead
func (iDB *InternalDB) BackupStorDB(backupFolderPath string, zip bool) (err error) {
return iDB.db.BackupDumpFolder(backupFolderPath, zip)
}

View File

@@ -177,12 +177,11 @@ func mapStringAnyDecoderWithDecimal(dc bsoncodec.DecodeContext, vr bsonrw.ValueR
// NewMongoStorage initializes a new MongoDB storage instance with provided connection parameters and settings.
// Returns an error if the setup fails.
func NewMongoStorage(scheme, host, port, db, user, pass, mrshlerStr string, storageType string,
func NewMongoStorage(scheme, host, port, db, user, pass, mrshlerStr string,
cdrsIndexes []string, ttl time.Duration) (*MongoStorage, error) {
mongoStorage := &MongoStorage{
ctxTTL: ttl,
cdrsIndexes: cdrsIndexes,
storageType: storageType,
counter: utils.NewCounter(time.Now().UnixNano(), 0),
}
uri := composeMongoURI(scheme, host, port, db, user, pass)
@@ -248,7 +247,6 @@ type MongoStorage struct {
ctxTTL time.Duration
ctxTTLMutex sync.RWMutex // used for TTL reload
db string
storageType string // DataDB/StorDB
ms utils.Marshaler
cdrsIndexes []string
counter *utils.Counter
@@ -262,11 +260,6 @@ func (ms *MongoStorage) query(ctx *context.Context, argfunc func(ctx mongo.Sessi
return ms.client.UseSession(ctxSession, argfunc)
}
// IsDataDB returns whether or not the storage is used for DataDB.
func (ms *MongoStorage) IsDataDB() bool {
return ms.storageType == utils.DataDB
}
// SetTTL sets the context TTL used for queries (Thread-safe).
func (ms *MongoStorage) SetTTL(ttl time.Duration) {
ms.ctxTTLMutex.Lock()
@@ -348,15 +341,11 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) error { // exported for
// EnsureIndexes creates database indexes for the specified collections.
func (ms *MongoStorage) EnsureIndexes(cols ...string) error {
if len(cols) == 0 {
if ms.IsDataDB() {
cols = []string{
ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx,
ColRsP, ColRes, ColIPp, ColIPs, ColSqs, ColSqp, ColTps, ColThs, ColRts,
ColAttr, ColFlt, ColCpp, ColRpp, ColApp, ColRpf, ColShg, ColAcc, ColAnp,
ColTrd, ColTrs,
}
} else {
cols = []string{utils.CDRsTBL}
cols = []string{
ColAct, ColApl, ColAAp, ColAtr, ColRpl, ColDst, ColRds, ColLht, ColIndx,
ColRsP, ColRes, ColIPp, ColIPs, ColSqs, ColSqp, ColTps, ColThs, ColRts,
ColAttr, ColFlt, ColCpp, ColRpp, ColApp, ColRpf, ColShg, ColAcc, ColAnp,
ColTrd, ColTrs, utils.CDRsTBL,
}
}
for _, col := range cols {

View File

@@ -405,18 +405,3 @@ func (ms *MongoStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
})
return err
}
// RewriteStorDB used only for InternalDB
func (ms *MongoStorage) DumpStorDB() (err error) {
return utils.ErrNotImplemented
}
// RewriteStorDB used only for InternalDB
func (ms *MongoStorage) RewriteStorDB() (err error) {
return utils.ErrNotImplemented
}
// BackupStorDB used only for InternalDB
func (ms *MongoStorage) BackupStorDB(backupFolderPath string, zip bool) (err error) {
return utils.ErrNotImplemented
}

View File

@@ -55,7 +55,7 @@ func NewMySQLStorage(host, port, name, user, password string,
return &SQLStorage{
DB: mySQLStorage.DB,
db: mySQLStorage.db,
StorDB: mySQLStorage,
DataDB: mySQLStorage,
SQLImpl: mySQLStorage,
}, nil
}

View File

@@ -32,7 +32,7 @@ type PostgresStorage struct {
SQLStorage
}
// NewPostgresStorage returns the posgres storDB
// NewPostgresStorage returns the posgres DB
func NewPostgresStorage(host, port, name, user, password,
sslmode, sslcert, sslkey, sslpassword, sslcertmode, sslrootcert string,
maxConn, maxIdleConn, sqlLogLevel int, connMaxLifetime time.Duration) (*SQLStorage, error) {
@@ -73,7 +73,7 @@ func NewPostgresStorage(host, port, name, user, password,
return &SQLStorage{
DB: pgStor.DB,
db: pgStor.db,
StorDB: pgStor,
DataDB: pgStor,
SQLImpl: pgStor,
}, nil
}

View File

@@ -1113,6 +1113,21 @@ func (rs *RedisStorage) RemoveConfigSectionsDrv(ctx *context.Context, nodeID str
return
}
// StorDB method not implemented yet
func (rs *RedisStorage) SetCDR(_ *context.Context, cdr *utils.CGREvent, allowUpdate bool) error {
return utils.ErrNotImplemented
}
// StorDB method not implemented yet
func (rs *RedisStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts map[string]any) ([]*utils.CDR, error) {
return nil, utils.ErrNotImplemented
}
// StorDB method not implemented yet
func (rs *RedisStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err error) {
return utils.ErrNotImplemented
}
// DumpDataDB will dump all of datadb from memory to a file, only for InternalDB
func (rs *RedisStorage) DumpDataDB() error {
return utils.ErrNotImplemented

View File

@@ -44,7 +44,7 @@ type SQLImpl interface {
type SQLStorage struct {
DB *sql.DB
db *gorm.DB
StorDB
DataDB
SQLImpl
}
@@ -350,17 +350,399 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
return
}
// Will dump everything inside stordb to a file, only for InternalDB
func (sqls *SQLStorage) DumpStorDB() (err error) {
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
return utils.ErrNotImplemented
}
// Will rewrite every dump file of StorDB, only for InternalDB
func (sqls *SQLStorage) RewriteStorDB() (err error) {
// Only intended for InternalDB
func (sqls *SQLStorage) BackupConfigDB(backupFolderPath string, zip bool) (err error) {
return utils.ErrNotImplemented
}
// BackupStorDB used only for InternalDB
func (sqls *SQLStorage) BackupStorDB(backupFolderPath string, zip bool) (err error) {
// BackupDataDB used only for InternalDB
func (sqls *SQLStorage) BackupDataDB(backupFolderPath string, zip bool) (err error) {
return utils.ErrNotImplemented
}
// Will dump everything inside DB to a file, only for InternalDB
func (sqls *SQLStorage) DumpConfigDB() (err error) {
return utils.ErrNotImplemented
}
// Will dump everything inside DB to a file, only for InternalDB
func (sqls *SQLStorage) DumpDataDB() (err error) {
return utils.ErrNotImplemented
}
// Will rewrite every dump file of DataDB, only for InternalDB
func (sqls *SQLStorage) RewriteDataDB() (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (exists bool, err error) {
return false, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
transactionID string) (loadInsts []*utils.LoadInstance, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetResourceProfileDrv(ctx *context.Context, tenant, id string) (rsp *utils.ResourceProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetResourceProfileDrv(ctx *context.Context, rsp *utils.ResourceProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveResourceProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetResourceDrv(ctx *context.Context, tenant, id string) (r *utils.Resource, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetResourceDrv(ctx *context.Context, r *utils.Resource) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveResourceDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetIPProfileDrv(ctx *context.Context, tenant, id string) (*utils.IPProfile, error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetIPProfileDrv(ctx *context.Context, ipp *utils.IPProfile) error {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveIPProfileDrv(ctx *context.Context, tenant, id string) error {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetIPAllocationsDrv(ctx *context.Context, tenant, id string) (*utils.IPAllocations, error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetIPAllocationsDrv(ctx *context.Context, ip *utils.IPAllocations) error {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveIPAllocationsDrv(ctx *context.Context, tenant, id string) error {
return utils.ErrNotImplemented
}
// GetStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) {
return nil, utils.ErrNotImplemented
}
// SetStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) {
return utils.ErrNotImplemented
}
// RemStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) {
return nil, utils.ErrNotImplemented
}
// SetStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) {
return utils.ErrNotImplemented
}
// RemStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRankingProfileDrv(ctx *context.Context, sg *utils.RankingProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRankingProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.RankingProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemRankingProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRankingDrv(ctx *context.Context, tenant, id string) (rn *utils.Ranking, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRankingDrv(_ *context.Context, rn *utils.Ranking) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetThresholdProfileDrv(ctx *context.Context, tenant, ID string) (tp *ThresholdProfile, err error) {
return nil, utils.ErrNotImplemented
}
// SetThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetThresholdProfileDrv(ctx *context.Context, tp *ThresholdProfile) (err error) {
return utils.ErrNotImplemented
}
// RemThresholdProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemThresholdProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetThresholdDrv(ctx *context.Context, tenant, id string) (r *Threshold, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetThresholdDrv(ctx *context.Context, r *Threshold) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetFilterDrv(ctx *context.Context, tenant, id string) (r *Filter, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetFilterDrv(ctx *context.Context, r *Filter) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveFilterDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRouteProfileDrv(ctx *context.Context, tenant, id string) (r *utils.RouteProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRouteProfileDrv(ctx *context.Context, r *utils.RouteProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveRouteProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetAttributeProfileDrv(ctx *context.Context, tenant, id string) (r *utils.AttributeProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetAttributeProfileDrv(ctx *context.Context, r *utils.AttributeProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveAttributeProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetChargerProfileDrv(_ *context.Context, tenant, id string) (r *utils.ChargerProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetChargerProfileDrv(_ *context.Context, r *utils.ChargerProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveChargerProfileDrv(_ *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStorageType returns the storage type that is being used
func (sqls *SQLStorage) GetStorageType() string {
return utils.MetaMySQL
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetLoadIDsDrv(ctx *context.Context, loadIDs map[string]int64) error {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveLoadIDsDrv() (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRateProfileDrv(ctx *context.Context, rpp *utils.RateProfile, optOverwrite bool) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRateProfileDrv(ctx *context.Context, tenant, id string) (rpp *utils.RateProfile, err error) {
return nil, utils.ErrNotImplemented
}
// GetRateProfileRateIDsDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetRateProfileRatesDrv(ctx *context.Context, tnt, profileID, rtPrfx string, needIDs bool) (rateIDs []string, rates []*utils.Rate, err error) {
return nil, nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveRateProfileDrv(ctx *context.Context, tenant, id string, rateIDs *[]string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetActionProfileDrv(ctx *context.Context, tenant, id string) (ap *utils.ActionProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetActionProfileDrv(ctx *context.Context, ap *utils.ActionProfile) (err error) {
return utils.ErrNotImplemented
}
func (sqls *SQLStorage) RemoveActionProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetIndexesDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
return nil, utils.ErrNotImplemented
}
// SetIndexesDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetIndexesDrv(ctx *context.Context, idxItmType, tntCtx string,
indexes map[string]utils.StringSet, commit bool, transactionID string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetAccountDrv(ctx *context.Context, tenant, id string) (ap *utils.Account, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetAccountDrv(ctx *context.Context, ap *utils.Account) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveAccountDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetConfigSectionsDrv(ctx *context.Context, nodeID string, sectionIDs []string) (sectionMap map[string][]byte, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetConfigSectionsDrv(ctx *context.Context, nodeID string, sectionsData map[string][]byte) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveConfigSectionsDrv(ctx *context.Context, nodeID string, sectionIDs []string) (err error) {
return utils.ErrNotImplemented
}
// ConfigDB method not implemented yet
func (sqls *SQLStorage) GetSection(ctx *context.Context, section string, val any) (err error) {
return utils.ErrNotImplemented
}
// ConfigDB method not implemented yet
func (sqls *SQLStorage) SetSection(_ *context.Context, section string, jsn any) (err error) {
return utils.ErrNotImplemented
}
// Only intended for InternalDB
func (sqls *SQLStorage) RewriteConfigDB() (err error) {
return utils.ErrNotImplemented
}

View File

@@ -31,8 +31,8 @@ import (
// NewDataDBConn creates a DataDB connection
func NewDataDBConn(dbType, host, port, name, user,
pass, marshaler string, opts *config.DataDBOpts,
itmsCfg map[string]*config.ItemOpts) (d DataDBDriver, err error) {
pass, marshaler string, stringIndexedFields, prefixIndexedFields []string,
opts *config.DBOpts, itmsCfg map[string]*config.ItemOpts) (d DataDBDriver, err error) {
switch dbType {
case utils.MetaRedis:
var dbNo int
@@ -52,34 +52,18 @@ func NewDataDBConn(dbType, host, port, name, user,
opts.RedisPoolPipelineWindow, opts.RedisPoolPipelineLimit,
opts.RedisTLS, opts.RedisClientCertificate, opts.RedisClientKey, opts.RedisCACertificate)
case utils.MetaMongo:
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, utils.DataDB, nil, opts.MongoQueryTimeout)
case utils.MetaInternal:
d, err = NewInternalDB(nil, nil, opts.ToTransCacheOpts(), itmsCfg)
default:
err = fmt.Errorf("unsupported db_type <%s>", dbType)
}
return
}
// NewStorDBConn returns a StorDB(implements Storage interface) based on dbType
func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string,
stringIndexedFields, prefixIndexedFields []string,
opts *config.StorDBOpts, itmsCfg map[string]*config.ItemOpts) (db StorDB, err error) {
switch dbType {
case utils.MetaMongo:
db, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, utils.MetaStorDB, stringIndexedFields, opts.MongoQueryTimeout)
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, stringIndexedFields, opts.MongoQueryTimeout)
case utils.MetaPostgres:
db, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode,
d, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode,
opts.PgSSLCert, opts.PgSSLKey, opts.PgSSLPassword, opts.PgSSLCertMode, opts.PgSSLRootCert,
opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime)
case utils.MetaMySQL:
db, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
d, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
opts.SQLLogLevel, opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.SQLDSNParams)
case utils.MetaInternal:
db, err = NewInternalDB(stringIndexedFields, prefixIndexedFields, opts.ToTransCacheOpts(), itmsCfg)
d, err = NewInternalDB(stringIndexedFields, prefixIndexedFields, opts.ToTransCacheOpts(), itmsCfg)
default:
err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]",
dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal)
err = fmt.Errorf("unsupported db_type <%s>", dbType)
}
return
}

View File

@@ -642,7 +642,11 @@ func (tS *ThresholdS) V1GetThresholdIDs(ctx *context.Context, args *utils.Tenant
tenant = tS.cfg.GeneralCfg().DefaultTenant
}
prfx := utils.ThresholdPrefix + tenant + utils.ConcatenatedKeySep
keys, err := tS.dm.DataDB().GetKeysForPrefix(ctx, prfx)
dataDB, _, err := tS.dm.DBConns().GetConn(utils.MetaThresholds)
if err != nil {
return err
}
keys, err := dataDB.GetKeysForPrefix(ctx, prfx)
if err != nil {
return err
}

View File

@@ -50,9 +50,8 @@ type TpReader struct {
//schedulerConns []string
}
func NewTpReader(db DataDB, lr LoadReader, tpid, timezone string,
func NewTpReader(db *DBConnManager, lr LoadReader, tpid, timezone string,
cacheConns, schedulerConns []string) (*TpReader, error) {
tpr := &TpReader{
tpid: tpid,
timezone: timezone,
@@ -359,8 +358,10 @@ func (tpr *TpReader) LoadAll() (err error) {
}
func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
if tpr.dm.dataDB == nil {
return errors.New("no database connection")
for _, db := range tpr.dm.DataDB() {
if db == nil {
return errors.New("no database connection")
}
}
//generate a loadID
loadID := time.Now().UnixNano()

View File

@@ -20,6 +20,7 @@ package engine
import (
"fmt"
"maps"
"github.com/cgrates/cgrates/utils"
)
@@ -52,8 +53,7 @@ func CheckVersions(storage Storage) error {
// Retrieve the current DB versions.
storType := storage.GetStorageType()
isDataDB := isDataDB(storage)
currentVersions := CurrentDBVersions(storType, isDataDB)
currentVersions := CurrentDBVersions(storType)
dbVersions, err := storage.GetVersions("")
if err == utils.ErrNotFound {
@@ -70,21 +70,15 @@ func CheckVersions(storage Storage) error {
return err
}
// Compare db versions with current versions.
message := dbVersions.Compare(currentVersions, storType, isDataDB)
message := dbVersions.Compare(currentVersions, storType)
if message != "" {
return fmt.Errorf("Migration needed: please backup cgr data and run: <%s>", message)
}
return nil
}
// relevant only for mongoDB
func isDataDB(storage Storage) bool {
conv, ok := storage.(*MongoStorage)
return ok && conv.IsDataDB()
}
func setDBVersions(storage Storage, overwrite bool) (err error) {
x := CurrentDBVersions(storage.GetStorageType(), isDataDB(storage))
x := CurrentDBVersions(storage.GetStorageType())
// no data, write version
if err = storage.SetVersions(x, overwrite); err != nil {
utils.Logger.Warning(fmt.Sprintf("Could not write current version to db: %v", err))
@@ -104,7 +98,7 @@ func OverwriteDBVersions(storage Storage) (err error) {
}
// Compare returns the migration message if the versions are not the latest
func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) string {
func (vers Versions) Compare(curent Versions, storType string) string {
var message map[string]string
switch storType {
case utils.MetaMongo:
@@ -162,20 +156,15 @@ func CurrentStorDBVersions() Versions {
func CurrentAllDBVersions() Versions {
dataDBVersions := CurrentDataDBVersions()
allVersions := make(Versions)
for k, v := range dataDBVersions {
allVersions[k] = v
}
maps.Copy(allVersions, dataDBVersions)
return allVersions
}
// CurrentDBVersions returns versions based on dbType
func CurrentDBVersions(storType string, isDataDB bool) Versions {
func CurrentDBVersions(storType string) Versions {
switch storType {
case utils.MetaMongo:
if isDataDB {
return CurrentDataDBVersions()
}
return CurrentStorDBVersions()
return CurrentAllDBVersions()
case utils.MetaInternal:
return CurrentAllDBVersions()
case utils.MetaRedis: