Make Filters and RouteProfiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-12 14:25:02 +02:00
committed by Dan Christian Bogos
parent 61e7bacab1
commit c51d3f27de
26 changed files with 944 additions and 57 deletions

View File

@@ -123,6 +123,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholdProfiles, tntID)
case utils.ThresholdPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLThresholds, tntID)
case utils.FilterPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLFilters, tntID)
case utils.RouteProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRouteProfiles, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -948,6 +952,126 @@ func (sqls *SQLStorage) RemoveThresholdDrv(ctx *context.Context, tenant, id stri
return
}
func (sqls *SQLStorage) GetFilterDrv(ctx *context.Context, tenant, id string) (f *Filter, err error) {
var result []*FilterJSONMdl
if err = sqls.db.Model(&FilterJSONMdl{}).Where(&FilterJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return MapStringInterfaceToFilter(result[0].Filter)
}
func (sqls *SQLStorage) SetFilterDrv(ctx *context.Context, f *Filter) (err error) {
tx := sqls.db.Begin()
mdl := &FilterJSONMdl{
Tenant: f.Tenant,
ID: f.ID,
Filter: f.AsMapStringInterface(),
}
if err = tx.Model(&FilterJSONMdl{}).Where(
FilterJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
FilterJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveFilterDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&FilterJSONMdl{}).Where(&FilterJSONMdl{Tenant: tenant, ID: id}).
Delete(&FilterJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetRouteProfileDrv(ctx *context.Context, tenant, id string) (rp *utils.RouteProfile, err error) {
var result []*RouteProfileMdl
if err = sqls.db.Model(&RouteProfileMdl{}).Where(&RouteProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToRouteProfile(result[0].RouteProfile), nil
}
func (sqls *SQLStorage) SetRouteProfileDrv(ctx *context.Context, rp *utils.RouteProfile) (err error) {
tx := sqls.db.Begin()
mdl := &RouteProfileMdl{
Tenant: rp.Tenant,
ID: rp.ID,
RouteProfile: rp.AsMapStringInterface(),
}
if err = tx.Model(&RouteProfileMdl{}).Where(
RouteProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
RouteProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveRouteProfileDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&RouteProfileMdl{}).Where(&RouteProfileMdl{Tenant: tenant, ID: id}).
Delete(&RouteProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// Used to check if specific subject is stored using prefix key attached to entity
func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (has bool, err error) {
var categoryModelMap = map[string]any{
utils.AccountPrefix: &AccountJSONMdl{},
utils.ActionProfilePrefix: &ActionProfileJSONMdl{},
utils.ResourcesPrefix: &ResourceJSONMdl{},
utils.ResourceProfilesPrefix: &ResourceProfileMdl{},
utils.IPAllocationsPrefix: &IPAllocationMdl{},
utils.IPProfilesPrefix: &IPProfileMdl{},
utils.StatQueuePrefix: &StatQueueMdl{},
utils.StatQueueProfilePrefix: &StatQueueProfileMdl{},
utils.ThresholdPrefix: &ThresholdJSONMdl{},
utils.ThresholdProfilePrefix: &ThresholdProfileMdl{},
utils.FilterPrefix: &FilterJSONMdl{},
utils.RouteProfilePrefix: &RouteProfileMdl{},
utils.AttributeProfilePrefix: &AttributeProfileMdl{},
utils.ChargerProfilePrefix: &ChargerProfileMdl{},
// utils.TrendPrefix: &TrendJSONMdl{},
// utils.TrendProfilePrefix: &TrendProfileMdl{},
// utils.RateProfilePrefix: &RateProfileJSONMdl{},
}
model, ok := categoryModelMap[category]
if !ok {
return false, fmt.Errorf("unsupported category in HasDataDrv: %s", category)
}
var count int64 // if it finds 1, return
err = sqls.db.Model(model).Where("tenant = ? AND id = ?", tenant, subject).
Limit(1).Count(&count).Error
return count > 0, err
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
@@ -979,11 +1103,6 @@ func (sqls *SQLStorage) RewriteDataDB() (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (exists bool, err error) {
return false, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
transactionID string) (loadInsts []*utils.LoadInstance, err error) {
@@ -1050,36 +1169,6 @@ func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetFilterDrv(ctx *context.Context, tenant, id string) (r *Filter, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetFilterDrv(ctx *context.Context, r *Filter) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveFilterDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetRouteProfileDrv(ctx *context.Context, tenant, id string) (r *utils.RouteProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetRouteProfileDrv(ctx *context.Context, r *utils.RouteProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveRouteProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStorageType returns the storage type that is being used
func (sqls *SQLStorage) GetStorageType() string {
return utils.MetaMySQL