mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-22 23:58:44 +05:00
Continue removing destination
This commit is contained in:
committed by
Dan Christian Bogos
parent
661f1c4172
commit
c3e7337cdf
@@ -43,8 +43,6 @@ var (
|
||||
utils.AccountFilterIndexPrfx: {},
|
||||
}
|
||||
cachePrefixMap = utils.StringSet{
|
||||
utils.DestinationPrefix: {},
|
||||
utils.ReverseDestinationPrefix: {},
|
||||
utils.ResourceProfilesPrefix: {},
|
||||
utils.TimingsPrefix: {},
|
||||
utils.ResourcesPrefix: {},
|
||||
|
||||
@@ -223,19 +223,19 @@ func (fltr *Filter) Compile() (err error) {
|
||||
|
||||
var supportedFiltersType utils.StringSet = utils.NewStringSet([]string{
|
||||
utils.MetaString, utils.MetaPrefix, utils.MetaSuffix,
|
||||
utils.MetaCronExp, utils.MetaRSR, utils.MetaDestinations,
|
||||
utils.MetaCronExp, utils.MetaRSR,
|
||||
utils.MetaEmpty, utils.MetaExists, utils.MetaLessThan, utils.MetaLessOrEqual,
|
||||
utils.MetaGreaterThan, utils.MetaGreaterOrEqual, utils.MetaEqual,
|
||||
utils.MetaNotEqual, utils.MetaIPNet, utils.MetaAPIBan,
|
||||
utils.MetaActivationInterval})
|
||||
var needsFieldName utils.StringSet = utils.NewStringSet([]string{
|
||||
utils.MetaString, utils.MetaPrefix, utils.MetaSuffix,
|
||||
utils.MetaCronExp, utils.MetaRSR, utils.MetaDestinations, utils.MetaLessThan,
|
||||
utils.MetaCronExp, utils.MetaRSR, utils.MetaLessThan,
|
||||
utils.MetaEmpty, utils.MetaExists, utils.MetaLessOrEqual, utils.MetaGreaterThan,
|
||||
utils.MetaGreaterOrEqual, utils.MetaEqual, utils.MetaNotEqual, utils.MetaIPNet, utils.MetaAPIBan,
|
||||
utils.MetaActivationInterval})
|
||||
var needsValues utils.StringSet = utils.NewStringSet([]string{utils.MetaString, utils.MetaPrefix,
|
||||
utils.MetaSuffix, utils.MetaCronExp, utils.MetaRSR, utils.MetaDestinations,
|
||||
utils.MetaSuffix, utils.MetaCronExp, utils.MetaRSR,
|
||||
utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual,
|
||||
utils.MetaEqual, utils.MetaNotEqual, utils.MetaIPNet, utils.MetaAPIBan,
|
||||
utils.MetaActivationInterval})
|
||||
@@ -344,8 +344,6 @@ func (fltr *FilterRule) Pass(ctx *context.Context, dDP utils.DataProvider) (resu
|
||||
result, err = fltr.passStringSuffix(dDP)
|
||||
case utils.MetaCronExp, utils.MetaNotCronExp:
|
||||
result, err = fltr.passCronExp(ctx, dDP)
|
||||
case utils.MetaDestinations, utils.MetaNotDestinations:
|
||||
result, err = fltr.passDestinations(ctx, dDP)
|
||||
case utils.MetaRSR, utils.MetaNotRSR:
|
||||
result, err = fltr.passRSR(dDP)
|
||||
case utils.MetaLessThan, utils.MetaLessOrEqual, utils.MetaGreaterThan, utils.MetaGreaterOrEqual:
|
||||
@@ -507,34 +505,6 @@ func (fltr *FilterRule) passCronExp(ctx *context.Context, dDP utils.DataProvider
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (fltr *FilterRule) passDestinations(ctx *context.Context, dDP utils.DataProvider) (bool, error) {
|
||||
dst, err := fltr.rsrElement.ParseDataProvider(dDP)
|
||||
if err != nil {
|
||||
if err == utils.ErrNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
for _, p := range utils.SplitPrefix(dst, utils.MIN_PREFIX_MATCH) {
|
||||
var destIDs []string
|
||||
if err = connMgr.Call(ctx, config.CgrConfig().FilterSCfg().AdminSConns, utils.APIerSv1GetReverseDestination, &p, &destIDs); err != nil {
|
||||
continue
|
||||
}
|
||||
for _, dID := range destIDs {
|
||||
for _, valDstIDVal := range fltr.rsrValues {
|
||||
valDstID, err := valDstIDVal.ParseDataProvider(dDP)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if valDstID == dID {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (fltr *FilterRule) passRSR(dDP utils.DataProvider) (bool, error) {
|
||||
fld, err := fltr.rsrElement.ParseDataProvider(dDP)
|
||||
if err != nil {
|
||||
|
||||
@@ -382,13 +382,11 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
|
||||
utils.CacheDispatcherRoutes: {},
|
||||
utils.CacheDispatcherLoads: {},
|
||||
utils.CacheDispatchers: {},
|
||||
utils.CacheDestinations: {},
|
||||
utils.CacheEventResources: {},
|
||||
utils.CacheFilters: {},
|
||||
utils.CacheResourceFilterIndexes: {},
|
||||
utils.CacheResourceProfiles: {},
|
||||
utils.CacheResources: {},
|
||||
utils.CacheReverseDestinations: {},
|
||||
utils.CacheRPCResponses: {},
|
||||
utils.CacheStatFilterIndexes: {},
|
||||
utils.CacheStatQueueProfiles: {},
|
||||
@@ -421,7 +419,6 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
|
||||
|
||||
utils.CacheVersions: {},
|
||||
utils.CacheTBLTPTimings: {},
|
||||
utils.CacheTBLTPDestinations: {},
|
||||
utils.CacheTBLTPResources: {},
|
||||
utils.CacheTBLTPStats: {},
|
||||
utils.CacheTBLTPThresholds: {},
|
||||
|
||||
@@ -51,10 +51,6 @@ type DestinationMdl struct {
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
func (DestinationMdl) TableName() string {
|
||||
return utils.TBLTPDestinations
|
||||
}
|
||||
|
||||
type ResourceMdl struct {
|
||||
PK uint `gorm:"primary_key"`
|
||||
Tpid string
|
||||
|
||||
@@ -44,7 +44,6 @@ type CSVStorage struct {
|
||||
sep rune
|
||||
generator func() csvReaderCloser
|
||||
// file names
|
||||
destinationsFn []string
|
||||
timingsFn []string
|
||||
resProfilesFn []string
|
||||
statsFn []string
|
||||
@@ -61,15 +60,13 @@ type CSVStorage struct {
|
||||
}
|
||||
|
||||
// NewCSVStorage creates a CSV storege that takes the data from the paths specified
|
||||
func NewCSVStorage(sep rune,
|
||||
destinationsFn, timingsFn,
|
||||
func NewCSVStorage(sep rune, timingsFn,
|
||||
resProfilesFn, statsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn,
|
||||
rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage {
|
||||
return &CSVStorage{
|
||||
sep: sep,
|
||||
generator: NewCsvFile,
|
||||
destinationsFn: destinationsFn,
|
||||
timingsFn: timingsFn,
|
||||
resProfilesFn: resProfilesFn,
|
||||
statsFn: statsFn,
|
||||
@@ -92,7 +89,6 @@ func NewFileCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
destinationsPaths := appendName(allFoldersPath, utils.DestinationsCsv)
|
||||
timingsPaths := appendName(allFoldersPath, utils.TimingsCsv)
|
||||
resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv)
|
||||
statsPaths := appendName(allFoldersPath, utils.StatsCsv)
|
||||
@@ -107,7 +103,6 @@ func NewFileCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
actionProfilesFn := appendName(allFoldersPath, utils.ActionProfilesCsv)
|
||||
accountsFn := appendName(allFoldersPath, utils.AccountsCsv)
|
||||
return NewCSVStorage(sep,
|
||||
destinationsPaths,
|
||||
timingsPaths,
|
||||
resourcesPaths,
|
||||
statsPaths,
|
||||
@@ -125,12 +120,11 @@ func NewFileCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
}
|
||||
|
||||
// NewStringCSVStorage creates a csv storage from strings
|
||||
func NewStringCSVStorage(sep rune,
|
||||
destinationsFn, timingsFn,
|
||||
func NewStringCSVStorage(sep rune, timingsFn,
|
||||
resProfilesFn, statsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn,
|
||||
rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage {
|
||||
c := NewCSVStorage(sep, []string{destinationsFn}, []string{timingsFn},
|
||||
c := NewCSVStorage(sep, []string{timingsFn},
|
||||
[]string{resProfilesFn}, []string{statsFn}, []string{thresholdsFn}, []string{filterFn},
|
||||
[]string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn},
|
||||
[]string{dispatcherProfilesFn}, []string{dispatcherHostsFn}, []string{rateProfilesFn},
|
||||
@@ -156,7 +150,6 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
|
||||
return []string{}
|
||||
}
|
||||
c := NewCSVStorage(sep,
|
||||
getIfExist(utils.Destinations),
|
||||
getIfExist(utils.Timings),
|
||||
getIfExist(utils.Resources),
|
||||
getIfExist(utils.Stats),
|
||||
@@ -181,7 +174,6 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
|
||||
|
||||
// NewURLCSVStorage returns a CSVStorage that can parse URLs
|
||||
func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
var destinationsPaths []string
|
||||
var timingsPaths []string
|
||||
var resourcesPaths []string
|
||||
var statsPaths []string
|
||||
@@ -198,7 +190,6 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
|
||||
for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) {
|
||||
if !strings.HasSuffix(baseURL, utils.CSVSuffix) {
|
||||
destinationsPaths = append(destinationsPaths, joinURL(baseURL, utils.DestinationsCsv))
|
||||
timingsPaths = append(timingsPaths, joinURL(baseURL, utils.TimingsCsv))
|
||||
resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv))
|
||||
statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv))
|
||||
@@ -215,8 +206,6 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
continue
|
||||
}
|
||||
switch {
|
||||
case strings.HasSuffix(baseURL, utils.DestinationsCsv):
|
||||
destinationsPaths = append(destinationsPaths, baseURL)
|
||||
case strings.HasSuffix(baseURL, utils.TimingsCsv):
|
||||
timingsPaths = append(timingsPaths, baseURL)
|
||||
case strings.HasSuffix(baseURL, utils.ResourcesCsv):
|
||||
@@ -248,7 +237,6 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
}
|
||||
|
||||
c := NewCSVStorage(sep,
|
||||
destinationsPaths,
|
||||
timingsPaths,
|
||||
resourcesPaths,
|
||||
statsPaths,
|
||||
@@ -342,18 +330,6 @@ func (csvs *CSVStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e
|
||||
return tpTimings.AsTPTimings(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestination, error) {
|
||||
var tpDests DestinationMdls
|
||||
if err := csvs.proccesData(DestinationMdl{}, csvs.destinationsFn, func(tp interface{}) {
|
||||
d := tp.(DestinationMdl)
|
||||
d.Tpid = tpid
|
||||
tpDests = append(tpDests, d)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tpDests.AsTPDestinations(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPResourceProfile, error) {
|
||||
var tpResLimits ResourceMdls
|
||||
if err := csvs.proccesData(ResourceMdl{}, csvs.resProfilesFn, func(tp interface{}) {
|
||||
|
||||
@@ -36,7 +36,6 @@ type Storage interface {
|
||||
Close()
|
||||
Flush(string) error
|
||||
GetKeysForPrefix(ctx *context.Context, prefix string) ([]string, error)
|
||||
RemoveKeysForPrefix(string) error
|
||||
GetVersions(itm string) (vrs Versions, err error)
|
||||
SetVersions(vrs Versions, overwrite bool) (err error)
|
||||
RemoveVersions(vrs Versions) (err error)
|
||||
@@ -49,10 +48,6 @@ type Storage interface {
|
||||
type DataDB interface {
|
||||
Storage
|
||||
HasDataDrv(*context.Context, string, string, string) (bool, error)
|
||||
RemoveDestinationDrv(string, string) error
|
||||
RemoveReverseDestinationDrv(string, string, string) error
|
||||
SetReverseDestinationDrv(string, []string, string) error
|
||||
GetReverseDestinationDrv(string, string) ([]string, error)
|
||||
GetResourceProfileDrv(string, string) (*ResourceProfile, error)
|
||||
SetResourceProfileDrv(*ResourceProfile) error
|
||||
RemoveResourceProfileDrv(string, string) error
|
||||
@@ -142,7 +137,6 @@ type LoadReader interface {
|
||||
GetTpTableIds(string, string, []string,
|
||||
map[string]string, *utils.PaginatorWithSearch) ([]string, error)
|
||||
GetTPTimings(string, string) ([]*utils.ApierTPTiming, error)
|
||||
GetTPDestinations(string, string) ([]*utils.TPDestination, error)
|
||||
GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error)
|
||||
GetTPStats(string, string, string) ([]*utils.TPStatProfile, error)
|
||||
GetTPThresholds(string, string, string) ([]*utils.TPThresholdProfile, error)
|
||||
@@ -160,7 +154,6 @@ type LoadReader interface {
|
||||
type LoadWriter interface {
|
||||
RemTpData(string, string, map[string]string) error
|
||||
SetTPTimings([]*utils.ApierTPTiming) error
|
||||
SetTPDestinations([]*utils.TPDestination) error
|
||||
SetTPResources([]*utils.TPResourceProfile) error
|
||||
SetTPStats([]*utils.TPStatProfile) error
|
||||
SetTPThresholds([]*utils.TPThresholdProfile) error
|
||||
|
||||
@@ -82,7 +82,7 @@ func (iDB *InternalDB) SelectDatabase(string) (err error) {
|
||||
|
||||
// GetKeysForPrefix returns the keys from cache that have the given prefix
|
||||
func (iDB *InternalDB) GetKeysForPrefix(ctx *context.Context, prefix string) (ids []string, err error) {
|
||||
keyLen := len(utils.DestinationPrefix)
|
||||
keyLen := len(utils.AccountPrefix)
|
||||
if len(prefix) < keyLen {
|
||||
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %s", prefix)
|
||||
return
|
||||
@@ -96,18 +96,6 @@ func (iDB *InternalDB) GetKeysForPrefix(ctx *context.Context, prefix string) (id
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) RemoveKeysForPrefix(prefix string) (err error) {
|
||||
var keys []string
|
||||
if keys, err = iDB.GetKeysForPrefix(context.TODO(), prefix); err != nil {
|
||||
return
|
||||
}
|
||||
for _, key := range keys {
|
||||
Cache.RemoveWithoutReplicate(utils.CacheReverseDestinations, key,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetVersions(itm string) (vrs Versions, err error) {
|
||||
x, ok := Cache.Get(utils.CacheVersions, utils.VersionName)
|
||||
if !ok || x == nil {
|
||||
@@ -182,8 +170,6 @@ func (iDB *InternalDB) IsDBEmpty() (isEmpty bool, err error) {
|
||||
|
||||
func (iDB *InternalDB) HasDataDrv(ctx *context.Context, category, subject, tenant string) (bool, error) {
|
||||
switch category {
|
||||
case utils.DestinationPrefix:
|
||||
return Cache.HasItem(utils.CachePrefixToInstance[category], subject), nil
|
||||
case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix,
|
||||
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
|
||||
utils.FilterPrefix, utils.RouteProfilePrefix, utils.AttributeProfilePrefix,
|
||||
@@ -193,59 +179,6 @@ func (iDB *InternalDB) HasDataDrv(ctx *context.Context, category, subject, tenan
|
||||
return false, errors.New("Unsupported HasData category")
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) RemoveDestinationDrv(destID string, transactionID string) (err error) {
|
||||
Cache.RemoveWithoutReplicate(utils.CacheDestinations, destID,
|
||||
cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) (err error) {
|
||||
var revDst []string
|
||||
if Cache.HasItem(utils.CacheReverseDestinations, prfx) {
|
||||
x, ok := Cache.Get(utils.CacheReverseDestinations, prfx)
|
||||
if !ok || x == nil {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
revDst = x.([]string)
|
||||
}
|
||||
mpRevDst := utils.NewStringSet(revDst)
|
||||
mpRevDst.Remove(dstID)
|
||||
if mpRevDst.Size() != 0 {
|
||||
Cache.SetWithoutReplicate(utils.CacheReverseDestinations, prfx, mpRevDst.AsSlice(), nil,
|
||||
cacheCommit(transactionID), transactionID)
|
||||
} else {
|
||||
Cache.RemoveWithoutReplicate(utils.CacheReverseDestinations, prfx,
|
||||
cacheCommit(transactionID), transactionID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetReverseDestinationDrv(destID string, prefixes []string, transactionID string) (err error) {
|
||||
for _, p := range prefixes {
|
||||
var revDst []string
|
||||
if Cache.HasItem(utils.CacheReverseDestinations, p) {
|
||||
if x, ok := Cache.Get(utils.CacheReverseDestinations, p); ok && x != nil {
|
||||
revDst = x.([]string)
|
||||
}
|
||||
}
|
||||
mpRevDst := utils.NewStringSet(revDst)
|
||||
mpRevDst.Add(destID)
|
||||
// for ReverseDestination we will use Groups
|
||||
Cache.SetWithoutReplicate(utils.CacheReverseDestinations, p, mpRevDst.AsSlice(), nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetReverseDestinationDrv(prefix, transactionID string) (ids []string, err error) {
|
||||
if x, ok := Cache.Get(utils.CacheReverseDestinations, prefix); ok && x != nil {
|
||||
if ids = x.([]string); len(ids) != 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourceProfile, err error) {
|
||||
if x, ok := Cache.Get(utils.CacheResourceProfiles, utils.ConcatenatedKey(tenant, id)); ok && x != nil {
|
||||
return x.(*ResourceProfile), nil
|
||||
|
||||
@@ -75,26 +75,6 @@ func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTi
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDestination, err error) {
|
||||
key := tpid
|
||||
if id != utils.EmptyString {
|
||||
key += utils.ConcatenatedKeySep + id
|
||||
}
|
||||
ids := Cache.GetItemIDs(utils.CacheTBLTPDestinations, key)
|
||||
for _, id := range ids {
|
||||
x, ok := Cache.Get(utils.CacheTBLTPDestinations, id)
|
||||
if !ok || x == nil {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
dsts = append(dsts, x.(*utils.TPDestination))
|
||||
}
|
||||
|
||||
if len(dsts) == 0 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) {
|
||||
key := tpid
|
||||
if tenant != utils.EmptyString {
|
||||
@@ -400,16 +380,6 @@ func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err error) {
|
||||
if len(dests) == 0 {
|
||||
return nil
|
||||
}
|
||||
for _, destination := range dests {
|
||||
Cache.SetWithoutReplicate(utils.CacheTBLTPDestinations, utils.ConcatenatedKey(destination.TPid, destination.ID), destination, nil,
|
||||
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err error) {
|
||||
if len(resources) == 0 {
|
||||
|
||||
@@ -318,7 +318,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte
|
||||
return
|
||||
}
|
||||
//StorDB
|
||||
case utils.TBLTPTimings, utils.TBLTPDestinations,
|
||||
case utils.TBLTPTimings,
|
||||
utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers,
|
||||
utils.TBLTPDispatcherHosts, utils.TBLTPChargers,
|
||||
utils.TBLTPRoutes, utils.TBLTPThresholds:
|
||||
@@ -373,7 +373,7 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) {
|
||||
}
|
||||
}
|
||||
if ms.storageType == utils.StorDB {
|
||||
for _, col := range []string{utils.TBLTPTimings, utils.TBLTPDestinations,
|
||||
for _, col := range []string{utils.TBLTPTimings,
|
||||
utils.TBLTPStats, utils.TBLTPResources,
|
||||
utils.CDRsTBL, utils.SessionCostsTBL} {
|
||||
if err = ms.ensureIndexesForCol(col); err != nil {
|
||||
@@ -415,10 +415,6 @@ func (ms *MongoStorage) SelectDatabase(dbName string) (err error) {
|
||||
func (ms *MongoStorage) RemoveKeysForPrefix(prefix string) (err error) {
|
||||
var colName string
|
||||
switch prefix {
|
||||
case utils.DestinationPrefix:
|
||||
colName = ColDst
|
||||
case utils.ReverseDestinationPrefix:
|
||||
colName = ColRds
|
||||
case utils.LoadInstKey:
|
||||
colName = ColLht
|
||||
case utils.VersionPrefix:
|
||||
@@ -546,7 +542,7 @@ func (ms *MongoStorage) getField3(sctx mongo.SessionContext, col, prefix, field
|
||||
// GetKeysForPrefix implementation
|
||||
func (ms *MongoStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (result []string, err error) {
|
||||
var category, subject string
|
||||
keyLen := len(utils.DestinationPrefix)
|
||||
keyLen := len(utils.AccountPrefix)
|
||||
if len(prefix) < keyLen {
|
||||
return nil, fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
|
||||
}
|
||||
@@ -555,10 +551,6 @@ func (ms *MongoStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (r
|
||||
subject = fmt.Sprintf("^%s", prefix[keyLen:]) // old way, no tenant support
|
||||
err = ms.query(ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
switch category {
|
||||
case utils.DestinationPrefix:
|
||||
result, err = ms.getField(sctx, ColDst, utils.DestinationPrefix, subject, "key")
|
||||
case utils.ReverseDestinationPrefix:
|
||||
result, err = ms.getField(sctx, ColRds, utils.ReverseDestinationPrefix, subject, "key")
|
||||
case utils.ResourceProfilesPrefix:
|
||||
result, err = ms.getField2(sctx, ColRsP, utils.ResourceProfilesPrefix, subject, tntID)
|
||||
case utils.ResourcesPrefix:
|
||||
@@ -629,8 +621,6 @@ func (ms *MongoStorage) HasDataDrv(ctx *context.Context, category, subject, tena
|
||||
err = ms.query(ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
var count int64
|
||||
switch category {
|
||||
case utils.DestinationPrefix:
|
||||
count, err = ms.getCol(ColDst).CountDocuments(sctx, bson.M{"key": subject})
|
||||
case utils.ResourcesPrefix:
|
||||
count, err = ms.getCol(ColRes).CountDocuments(sctx, bson.M{"tenant": tenant, "id": subject})
|
||||
case utils.ResourceProfilesPrefix:
|
||||
|
||||
@@ -180,33 +180,6 @@ func (ms *MongoStorage) GetTPTimings(tpid, id string) ([]*utils.ApierTPTiming, e
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPDestinations(tpid, id string) ([]*utils.TPDestination, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
filter["id"] = id
|
||||
}
|
||||
var results []*utils.TPDestination
|
||||
err := ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.TBLTPDestinations).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for cur.Next(sctx) {
|
||||
var el utils.TPDestination
|
||||
err := cur.Decode(&el)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
results = append(results, &el)
|
||||
}
|
||||
if len(results) == 0 {
|
||||
return utils.ErrNotFound
|
||||
}
|
||||
return cur.Close(sctx)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPResourceProfile, error) {
|
||||
filter := bson.M{"tpid": tpid}
|
||||
if id != "" {
|
||||
@@ -330,24 +303,6 @@ func (ms *MongoStorage) SetTPTimings(tps []*utils.ApierTPTiming) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPDestinations(tpDsts []*utils.TPDestination) (err error) {
|
||||
if len(tpDsts) == 0 {
|
||||
return nil
|
||||
}
|
||||
return ms.query(context.TODO(), func(sctx mongo.SessionContext) (err error) {
|
||||
for _, tp := range tpDsts {
|
||||
_, err = ms.getCol(utils.TBLTPDestinations).UpdateOne(sctx, bson.M{"tpid": tp.TPid, "id": tp.ID},
|
||||
bson.M{"$set": tp},
|
||||
options.Update().SetUpsert(true),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetTPResources(tpRLs []*utils.TPResourceProfile) (err error) {
|
||||
if len(tpRLs) == 0 {
|
||||
return
|
||||
|
||||
@@ -242,9 +242,6 @@ func (rs *RedisStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
|
||||
func (rs *RedisStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (exists bool, err error) {
|
||||
var i int
|
||||
switch category {
|
||||
case utils.DestinationPrefix:
|
||||
err = rs.Cmd(&i, redisEXISTS, category+subject)
|
||||
return i == 1, err
|
||||
case utils.ResourcesPrefix, utils.ResourceProfilesPrefix, utils.StatQueuePrefix,
|
||||
utils.StatQueueProfilePrefix, utils.ThresholdPrefix, utils.ThresholdProfilePrefix,
|
||||
utils.FilterPrefix, utils.RouteProfilePrefix, utils.AttributeProfilePrefix,
|
||||
@@ -256,33 +253,6 @@ func (rs *RedisStorage) HasDataDrv(ctx *context.Context, category, subject, tena
|
||||
return false, errors.New("unsupported HasData category")
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetReverseDestinationDrv(key, transactionID string) (ids []string, err error) {
|
||||
if err = rs.Cmd(&ids, redisSMEMBERS, utils.ReverseDestinationPrefix+key); err != nil {
|
||||
return
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
err = utils.ErrNotFound
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetReverseDestinationDrv(destID string, prefixes []string, transactionID string) (err error) {
|
||||
for _, p := range prefixes {
|
||||
if err = rs.Cmd(nil, redisSADD, utils.ReverseDestinationPrefix+p, destID); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveDestinationDrv(destID, transactionID string) (err error) {
|
||||
return rs.Cmd(nil, redisDEL, utils.DestinationPrefix+destID)
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveReverseDestinationDrv(dstID, prfx, transactionID string) (err error) {
|
||||
return rs.Cmd(nil, redisSREM, utils.ReverseDestinationPrefix+prfx, dstID)
|
||||
}
|
||||
|
||||
// Limit will only retrieve the last n items out of history, newest first
|
||||
func (rs *RedisStorage) GetLoadHistory(limit int, skipCache bool,
|
||||
transactionID string) (loadInsts []*utils.LoadInstance, err error) {
|
||||
|
||||
@@ -99,7 +99,7 @@ func (sqls *SQLStorage) CreateTablesFromScript(scriptPath string) error {
|
||||
|
||||
func (sqls *SQLStorage) IsDBEmpty() (resp bool, err error) {
|
||||
tbls := []string{
|
||||
utils.TBLTPTimings, utils.TBLTPDestinations,
|
||||
utils.TBLTPTimings,
|
||||
utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds,
|
||||
utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL,
|
||||
utils.TBLVersions, utils.TBLTPRoutes, utils.TBLTPAttributes, utils.TBLTPChargers,
|
||||
@@ -123,7 +123,6 @@ func (sqls *SQLStorage) GetTpIds(colName string) ([]string, error) {
|
||||
if colName == "" {
|
||||
for _, clNm := range []string{
|
||||
utils.TBLTPTimings,
|
||||
utils.TBLTPDestinations,
|
||||
utils.TBLTPResources,
|
||||
utils.TBLTPStats,
|
||||
utils.TBLTPThresholds,
|
||||
@@ -229,7 +228,7 @@ func (sqls *SQLStorage) RemTpData(table, tpid string, args map[string]string) er
|
||||
tx := sqls.db.Begin()
|
||||
|
||||
if len(table) == 0 { // Remove tpid out of all tables
|
||||
for _, tblName := range []string{utils.TBLTPTimings, utils.TBLTPDestinations,
|
||||
for _, tblName := range []string{utils.TBLTPTimings,
|
||||
utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds,
|
||||
utils.TBLTPFilters, utils.TBLTPRoutes, utils.TBLTPAttributes,
|
||||
utils.TBLTPChargers, utils.TBLTPDispatchers, utils.TBLTPDispatcherHosts, utils.TBLTPAccounts,
|
||||
|
||||
@@ -96,18 +96,6 @@ func (tpExp *TPExporter) Run() error {
|
||||
toExportMap[utils.TimingsCsv][i] = sd
|
||||
}
|
||||
|
||||
storDataDestinations, err := tpExp.storDB.GetTPDestinations(tpExp.tpID, "")
|
||||
if err != nil && err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpDestinations))
|
||||
withError = true
|
||||
}
|
||||
for _, sd := range storDataDestinations {
|
||||
sdModels := APItoModelDestination(sd)
|
||||
for _, sdModel := range sdModels {
|
||||
toExportMap[utils.DestinationsCsv] = append(toExportMap[utils.DestinationsCsv], sdModel)
|
||||
}
|
||||
}
|
||||
|
||||
storDataResources, err := tpExp.storDB.GetTPResources(tpExp.tpID, "", "")
|
||||
if err != nil && err.Error() != utils.ErrNotFound.Error() {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpResources))
|
||||
|
||||
@@ -41,7 +41,6 @@ type TPCSVImporter struct {
|
||||
// Change it to func(string) error as soon as Travis updates.
|
||||
var fileHandlers = map[string]func(*TPCSVImporter, string) error{
|
||||
utils.TimingsCsv: (*TPCSVImporter).importTimings,
|
||||
utils.DestinationsCsv: (*TPCSVImporter).importDestinations,
|
||||
utils.ResourcesCsv: (*TPCSVImporter).importResources,
|
||||
utils.StatsCsv: (*TPCSVImporter).importStats,
|
||||
utils.ThresholdsCsv: (*TPCSVImporter).importThresholds,
|
||||
@@ -92,21 +91,6 @@ func (tpImp *TPCSVImporter) importTimings(fn string) error {
|
||||
return tpImp.StorDB.SetTPTimings(tps)
|
||||
}
|
||||
|
||||
func (tpImp *TPCSVImporter) importDestinations(fn string) error {
|
||||
if tpImp.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
}
|
||||
tps, err := tpImp.csvr.GetTPDestinations(tpImp.TPid, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < len(tps); i++ {
|
||||
tps[i].TPid = tpImp.TPid
|
||||
}
|
||||
|
||||
return tpImp.StorDB.SetTPDestinations(tps)
|
||||
}
|
||||
|
||||
func (tpImp *TPCSVImporter) importResources(fn string) error {
|
||||
if tpImp.Verbose {
|
||||
log.Printf("Processing file: <%s> ", fn)
|
||||
|
||||
@@ -1149,8 +1149,6 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
|
||||
return
|
||||
}
|
||||
// take IDs for each type
|
||||
dstIds, _ := tpr.GetLoadedIds(utils.DestinationPrefix)
|
||||
revDstIDs, _ := tpr.GetLoadedIds(utils.ReverseDestinationPrefix)
|
||||
tmgIds, _ := tpr.GetLoadedIds(utils.TimingsPrefix)
|
||||
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
|
||||
resIDs, _ := tpr.GetLoadedIds(utils.ResourcesPrefix)
|
||||
@@ -1170,23 +1168,21 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
|
||||
|
||||
//compose Reload Cache argument
|
||||
cacheArgs := map[string][]string{
|
||||
utils.DestinationIDs: dstIds,
|
||||
utils.ReverseDestinationIDs: revDstIDs,
|
||||
utils.TimingIDs: tmgIds,
|
||||
utils.ResourceProfileIDs: rspIDs,
|
||||
utils.ResourceIDs: resIDs,
|
||||
utils.StatsQueueIDs: stqIDs,
|
||||
utils.StatsQueueProfileIDs: stqpIDs,
|
||||
utils.ThresholdIDs: trsIDs,
|
||||
utils.ThresholdProfileIDs: trspfIDs,
|
||||
utils.FilterIDs: flrIDs,
|
||||
utils.RouteProfileIDs: routeIDs,
|
||||
utils.AttributeProfileIDs: apfIDs,
|
||||
utils.ChargerProfileIDs: chargerIDs,
|
||||
utils.DispatcherProfileIDs: dppIDs,
|
||||
utils.DispatcherHostIDs: dphIDs,
|
||||
utils.RateProfileIDs: ratePrfIDs,
|
||||
utils.ActionProfileIDs: actionPrfIDs,
|
||||
utils.TimingIDs: tmgIds,
|
||||
utils.ResourceProfileIDs: rspIDs,
|
||||
utils.ResourceIDs: resIDs,
|
||||
utils.StatsQueueIDs: stqIDs,
|
||||
utils.StatsQueueProfileIDs: stqpIDs,
|
||||
utils.ThresholdIDs: trsIDs,
|
||||
utils.ThresholdProfileIDs: trspfIDs,
|
||||
utils.FilterIDs: flrIDs,
|
||||
utils.RouteProfileIDs: routeIDs,
|
||||
utils.AttributeProfileIDs: apfIDs,
|
||||
utils.ChargerProfileIDs: chargerIDs,
|
||||
utils.DispatcherProfileIDs: dppIDs,
|
||||
utils.DispatcherHostIDs: dphIDs,
|
||||
utils.RateProfileIDs: ratePrfIDs,
|
||||
utils.ActionProfileIDs: actionPrfIDs,
|
||||
}
|
||||
|
||||
// verify if we need to clear indexes
|
||||
|
||||
@@ -140,23 +140,21 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st
|
||||
// CurrentDataDBVersions returns the needed DataDB versions
|
||||
func CurrentDataDBVersions() Versions {
|
||||
return Versions{
|
||||
utils.StatS: 4,
|
||||
utils.Accounts: 3,
|
||||
utils.Actions: 2,
|
||||
utils.Thresholds: 4,
|
||||
utils.Routes: 2,
|
||||
utils.Attributes: 7,
|
||||
utils.Timing: 1,
|
||||
utils.RQF: 5,
|
||||
utils.Resource: 1,
|
||||
utils.Subscribers: 1,
|
||||
utils.Destinations: 1,
|
||||
utils.ReverseDestinations: 1,
|
||||
utils.Chargers: 2,
|
||||
utils.Dispatchers: 2,
|
||||
utils.LoadIDsVrs: 1,
|
||||
utils.RateProfiles: 1,
|
||||
utils.ActionProfiles: 1,
|
||||
utils.StatS: 4,
|
||||
utils.Accounts: 3,
|
||||
utils.Actions: 2,
|
||||
utils.Thresholds: 4,
|
||||
utils.Routes: 2,
|
||||
utils.Attributes: 7,
|
||||
utils.Timing: 1,
|
||||
utils.RQF: 5,
|
||||
utils.Resource: 1,
|
||||
utils.Subscribers: 1,
|
||||
utils.Chargers: 2,
|
||||
utils.Dispatchers: 2,
|
||||
utils.LoadIDsVrs: 1,
|
||||
utils.RateProfiles: 1,
|
||||
utils.ActionProfiles: 1,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +171,6 @@ func CurrentStorDBVersions() Versions {
|
||||
utils.TpResources: 1,
|
||||
utils.TpTiming: 1,
|
||||
utils.TpResource: 1,
|
||||
utils.TpDestinations: 1,
|
||||
utils.TpChargers: 1,
|
||||
utils.TpDispatchers: 1,
|
||||
utils.TpRateProfiles: 1,
|
||||
|
||||
Reference in New Issue
Block a user