mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Resource structure updated, db keys changed
This commit is contained in:
@@ -29,6 +29,20 @@ import (
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// ResourceCfg represents the user configuration for the resource
|
||||
type ResourceCfg struct {
|
||||
ID string // identifier of this resource
|
||||
Filters []*RequestFilter // filters for the request
|
||||
ActivationInterval *utils.ActivationInterval // time when this resource becomes active and expires
|
||||
UsageTTL time.Duration // auto-expire the usage after this duration
|
||||
Limit float64 // limit value
|
||||
AllocationMessage string // message returned by the winning resource on allocation
|
||||
Blocker bool // blocker flag to stop processing on filters matched
|
||||
Stored bool
|
||||
Weight float64 // Weight to sort the resources
|
||||
Thresholds []string // Thresholds to check after changing Limit
|
||||
}
|
||||
|
||||
// ResourceUsage represents an usage counted
|
||||
type ResourceUsage struct {
|
||||
ID string // Unique identifier of this ResourceUsage, Eg: FreeSWITCH UUID
|
||||
@@ -41,96 +55,86 @@ func (ru *ResourceUsage) isActive(atTime time.Time) bool {
|
||||
return ru.ExpiryTime.IsZero() || ru.ExpiryTime.Sub(atTime) > 0
|
||||
}
|
||||
|
||||
// ResourceCfg represents the user configuration for the resource
|
||||
type ResourceCfg struct {
|
||||
ID string // identifier of this resource
|
||||
Filters []*RequestFilter // filters for the request
|
||||
ActivationInterval *utils.ActivationInterval // time when this resource becomes active and expires
|
||||
UsageTTL time.Duration // expire the usage after this duration
|
||||
Limit float64 // limit value
|
||||
AllocationMessage string // message returned by the winning resource on allocation
|
||||
Blocker bool // blocker flag to stop processing on filters matched
|
||||
Stored bool
|
||||
Weight float64 // Weight to sort the resources
|
||||
Thresholds []string // Thresholds to check after changing Limit
|
||||
}
|
||||
|
||||
// StoredResourceUsages is stored on demand into dataDB
|
||||
type StoredResourceUsages map[string]*ResourceUsage
|
||||
|
||||
func NewResource(rCfg *ResourceCfg) *Resource {
|
||||
return &Resource{rCfg: rCfg,
|
||||
usages: make(map[string]*ResourceUsage)}
|
||||
}
|
||||
|
||||
// Resource represents a resource in the system
|
||||
// not thread safe, needs locking at process level
|
||||
type Resource struct {
|
||||
sync.RWMutex
|
||||
rCfg *ResourceCfg
|
||||
usages map[string]*ResourceUsage
|
||||
ttlUsages []*ResourceUsage // used to speed up expirying of the usages
|
||||
tUsage *float64 // sum of all usages
|
||||
dirty bool // the usages were modified, needs save
|
||||
ID string
|
||||
Usages map[string]*ResourceUsage
|
||||
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled
|
||||
rCfg *ResourceCfg // optional configuration attached
|
||||
tUsage *float64 // sum of all usages
|
||||
dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config
|
||||
}
|
||||
|
||||
// removeExpiredUnits removes units which are expired
|
||||
// removeExpiredUnits removes units which are expired from the resource
|
||||
func (r *Resource) removeExpiredUnits() {
|
||||
var firstActive int
|
||||
r.RLock()
|
||||
for _, rv := range r.ttlUsages {
|
||||
if rv.isActive(time.Now()) {
|
||||
for _, rID := range r.TTLIdx {
|
||||
if r, has := r.Usages[rID]; has && r.isActive(time.Now()) {
|
||||
break
|
||||
}
|
||||
firstActive += 1
|
||||
}
|
||||
r.RUnlock()
|
||||
if firstActive == 0 {
|
||||
return
|
||||
}
|
||||
r.Lock()
|
||||
for _, ru := range r.ttlUsages[:firstActive] {
|
||||
delete(r.usages, ru.ID)
|
||||
for _, rID := range r.TTLIdx[:firstActive] {
|
||||
ru, has := r.Usages[rID]
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
delete(r.Usages, rID)
|
||||
*r.tUsage -= ru.Units
|
||||
ru = nil // empty it so we avoid memleak
|
||||
if *r.tUsage < 0 { // something went wrong
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("resetting total usage for resourceID: %s, usage smaller than 0: %f", r.ID, *r.tUsage))
|
||||
r.tUsage = nil
|
||||
}
|
||||
}
|
||||
r.ttlUsages = r.ttlUsages[firstActive:]
|
||||
r.Unlock()
|
||||
r.TTLIdx = r.TTLIdx[firstActive:]
|
||||
}
|
||||
|
||||
// totalUsage returns the sum of all usage units
|
||||
func (r *Resource) totalUsage() float64 {
|
||||
func (r *Resource) totalUsage() (tU float64) {
|
||||
if r.tUsage == nil {
|
||||
var tu float64
|
||||
for _, ru := range r.usages {
|
||||
for _, ru := range r.Usages {
|
||||
tu += ru.Units
|
||||
}
|
||||
r.tUsage = &tu
|
||||
}
|
||||
return *r.tUsage
|
||||
if r.tUsage != nil {
|
||||
tU = *r.tUsage
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// recordUsage records a new usage
|
||||
func (r *Resource) recordUsage(ru *ResourceUsage) (err error) {
|
||||
if _, hasID := r.usages[ru.ID]; hasID {
|
||||
return fmt.Errorf("Duplicate resource usage with id: %s", ru.ID)
|
||||
if _, hasID := r.Usages[ru.ID]; hasID {
|
||||
return fmt.Errorf("duplicate resource usage with id: %s", ru.ID)
|
||||
}
|
||||
r.Usages[ru.ID] = ru
|
||||
if r.tUsage != nil {
|
||||
*r.tUsage += ru.Units
|
||||
}
|
||||
r.usages[ru.ID] = ru
|
||||
*r.tUsage += ru.Units
|
||||
return
|
||||
}
|
||||
|
||||
// clearUsage clears the usage for an ID
|
||||
func (r *Resource) clearUsage(ruID string) error {
|
||||
ru, hasIt := r.usages[ruID]
|
||||
func (r *Resource) clearUsage(ruID string) (err error) {
|
||||
ru, hasIt := r.Usages[ruID]
|
||||
if !hasIt {
|
||||
return fmt.Errorf("Cannot find usage record with id: %s", ruID)
|
||||
}
|
||||
delete(r.usages, ru.ID)
|
||||
*r.tUsage -= ru.Units
|
||||
return nil
|
||||
delete(r.Usages, ruID)
|
||||
if r.tUsage != nil {
|
||||
*r.tUsage -= ru.Units
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Resources is an ordered list of Resources based on Weight
|
||||
// Resources is an orderable list of Resources based on Weight
|
||||
type Resources []*Resource
|
||||
|
||||
// sort based on Weight
|
||||
@@ -174,16 +178,6 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
|
||||
if len(rs) == 0 {
|
||||
return utils.META_NONE, nil
|
||||
}
|
||||
// lock resources so we can safely take decisions, need all to be locked before proceeding
|
||||
for _, r := range rs {
|
||||
if dryRun { // dryRun only needs read
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
} else {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
}
|
||||
}
|
||||
// Simulate resource usage
|
||||
for _, r := range rs {
|
||||
if r.rCfg.Limit >= r.totalUsage()+ru.Units {
|
||||
@@ -214,10 +208,12 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB,
|
||||
return &ResourceService{dataDB: dataDB, statS: statS}, nil
|
||||
}
|
||||
|
||||
// ResourceService is the service handling channel limits
|
||||
// ResourceService is the service handling resources
|
||||
type ResourceService struct {
|
||||
dataDB DataDB // So we can load the data in cache and index it
|
||||
statS rpcclient.RpcClientConnection
|
||||
sync.RWMutex
|
||||
dataDB DataDB // So we can load the data in cache and index it
|
||||
statS rpcclient.RpcClientConnection
|
||||
eventResources map[string][]string // map[ruID][]string{rID} for faster queries
|
||||
}
|
||||
|
||||
// Called to start the service
|
||||
@@ -253,7 +249,7 @@ func (rS *ResourceService) matchingResourcesForEvent(
|
||||
passAllFilters := true
|
||||
for _, fltr := range rCfg.Filters {
|
||||
if pass, err := fltr.Pass(ev, "", rS.statS); err != nil {
|
||||
return nil, utils.NewErrServerError(err)
|
||||
return nil, err
|
||||
} else if !pass {
|
||||
passAllFilters = false
|
||||
continue
|
||||
@@ -262,7 +258,12 @@ func (rS *ResourceService) matchingResourcesForEvent(
|
||||
if !passAllFilters {
|
||||
continue
|
||||
}
|
||||
matchingResources[rCfg.ID] = NewResource(rCfg) // Cannot save it here since we could have errors after and resource will remain unused
|
||||
r, err := rS.dataDB.GetResource(rCfg.ID, false, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.rCfg = rCfg
|
||||
matchingResources[rCfg.ID] = r // Cannot save it here since we could have errors after and resource will remain unused
|
||||
}
|
||||
// All good, convert from Map to Slice so we can sort
|
||||
rs = make(Resources, len(matchingResources))
|
||||
@@ -273,8 +274,9 @@ func (rS *ResourceService) matchingResourcesForEvent(
|
||||
}
|
||||
rs.Sort()
|
||||
for i, r := range rs {
|
||||
if r.rCfg.Blocker {
|
||||
if r.rCfg.Blocker { // blocker will stop processing
|
||||
rs = rs[:i+1]
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
|
||||
@@ -69,11 +69,11 @@ func TestResourceRecordUsage(t *testing.T) {
|
||||
UsageTTL: time.Duration(1 * time.Millisecond),
|
||||
AllocationMessage: "ALLOC",
|
||||
},
|
||||
usages: map[string]*ResourceUsage{
|
||||
Usages: map[string]*ResourceUsage{
|
||||
ru.ID: ru,
|
||||
},
|
||||
ttlUsages: []*ResourceUsage{ru},
|
||||
tUsage: utils.Float64Pointer(2),
|
||||
TTLIdx: []string{ru.ID},
|
||||
tUsage: utils.Float64Pointer(2),
|
||||
}
|
||||
|
||||
if err := r.recordUsage(ru2); err != nil {
|
||||
@@ -82,7 +82,7 @@ func TestResourceRecordUsage(t *testing.T) {
|
||||
if err := r.recordUsage(ru); err == nil {
|
||||
t.Error("Duplicate ResourceUsage id should not be allowed")
|
||||
}
|
||||
if _, found := r.usages[ru2.ID]; !found {
|
||||
if _, found := r.Usages[ru2.ID]; !found {
|
||||
t.Error("ResourceUsage was not recorded")
|
||||
}
|
||||
if *r.tUsage != 4 {
|
||||
@@ -93,13 +93,13 @@ func TestResourceRecordUsage(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRLClearUsage(t *testing.T) {
|
||||
r.usages = map[string]*ResourceUsage{
|
||||
r.Usages = map[string]*ResourceUsage{
|
||||
ru.ID: ru,
|
||||
}
|
||||
*r.tUsage = 3
|
||||
r.clearUsage(ru.ID)
|
||||
if len(r.usages) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.usages))
|
||||
if len(r.Usages) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages))
|
||||
}
|
||||
if *r.tUsage != 1 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 1, r.tUsage)
|
||||
@@ -107,18 +107,18 @@ func TestRLClearUsage(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRLRemoveExpiredUnits(t *testing.T) {
|
||||
r.usages = map[string]*ResourceUsage{
|
||||
r.Usages = map[string]*ResourceUsage{
|
||||
ru.ID: ru,
|
||||
}
|
||||
*r.tUsage = 2
|
||||
|
||||
r.removeExpiredUnits()
|
||||
|
||||
if len(r.usages) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.usages))
|
||||
if len(r.Usages) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages))
|
||||
}
|
||||
if len(r.ttlUsages) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.ttlUsages))
|
||||
if len(r.TTLIdx) != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.TTLIdx))
|
||||
}
|
||||
if *r.tUsage != 0 {
|
||||
t.Errorf("Expecting: %+v, received: %+v", 0, r.tUsage)
|
||||
@@ -126,7 +126,7 @@ func TestRLRemoveExpiredUnits(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRLUsedUnits(t *testing.T) {
|
||||
r.usages = map[string]*ResourceUsage{
|
||||
r.Usages = map[string]*ResourceUsage{
|
||||
ru.ID: ru,
|
||||
}
|
||||
*r.tUsage = 2
|
||||
@@ -162,7 +162,7 @@ func TestRsort(t *testing.T) {
|
||||
UsageTTL: time.Duration(1 * time.Millisecond),
|
||||
},
|
||||
// AllocationMessage: "ALLOC2",
|
||||
usages: map[string]*ResourceUsage{
|
||||
Usages: map[string]*ResourceUsage{
|
||||
ru2.ID: ru2,
|
||||
},
|
||||
tUsage: utils.Float64Pointer(2),
|
||||
@@ -179,8 +179,8 @@ func TestRsort(t *testing.T) {
|
||||
func TestRsClearUsage(t *testing.T) {
|
||||
if err := r2.clearUsage(ru2.ID); err != nil {
|
||||
t.Error(err)
|
||||
} else if len(r2.usages) != 0 {
|
||||
t.Errorf("Unexpected usages %+v", r2.usages)
|
||||
} else if len(r2.Usages) != 0 {
|
||||
t.Errorf("Unexpected usages %+v", r2.Usages)
|
||||
} else if *r2.tUsage != 0 {
|
||||
t.Errorf("Unexpected tUsage %+v", r2.tUsage)
|
||||
}
|
||||
|
||||
@@ -101,6 +101,9 @@ type DataDB interface {
|
||||
GetResourceCfg(string, bool, string) (*ResourceCfg, error)
|
||||
SetResourceCfg(*ResourceCfg, string) error
|
||||
RemoveResourceCfg(string, string) error
|
||||
GetResource(string, bool, string) (*Resource, error)
|
||||
SetResource(*Resource) error
|
||||
RemoveResource(string, string) error
|
||||
GetTiming(string, bool, string) (*utils.TPTiming, error)
|
||||
SetTiming(*utils.TPTiming, string) error
|
||||
RemoveTiming(string, string) error
|
||||
|
||||
@@ -1311,7 +1311,7 @@ func (ms *MapStorage) GetStructVersion() (rsv *StructVersion, err error) {
|
||||
func (ms *MapStorage) GetResourceCfg(id string, skipCache bool, transactionID string) (rl *ResourceCfg, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.ResourcesPrefix + id
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
@@ -1345,20 +1345,64 @@ func (ms *MapStorage) SetResourceCfg(r *ResourceCfg, transactionID string) error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key := utils.ResourcesPrefix + r.ID
|
||||
ms.dict[key] = result
|
||||
ms.dict[utils.ResourceConfigsPrefix+r.ID] = result
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveResourceCfg(id string, transactionID string) error {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.ResourcesPrefix + id
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
delete(ms.dict, key)
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
key := utils.ResourcesPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x != nil {
|
||||
return x.(*Resource), nil
|
||||
}
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
}
|
||||
values, ok := ms.dict[key]
|
||||
if !ok {
|
||||
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
err = ms.ms.Unmarshal(values, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cache.Set(key, r, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) SetResource(r *Resource) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
result, err := ms.ms.Marshal(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ms.dict[utils.ResourcesPrefix+r.ID] = result
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) RemoveResource(id string, transactionID string) (err error) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
key := utils.ResourcesPrefix + id
|
||||
delete(ms.dict, key)
|
||||
cache.RemKey(key, cacheCommit(transactionID), transactionID)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MapStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
||||
@@ -35,31 +35,31 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
colDst = "destinations"
|
||||
colRds = "reverse_destinations"
|
||||
colAct = "actions"
|
||||
colApl = "action_plans"
|
||||
colAAp = "account_action_plans"
|
||||
colTsk = "tasks"
|
||||
colAtr = "action_triggers"
|
||||
colRpl = "rating_plans"
|
||||
colRpf = "rating_profiles"
|
||||
colAcc = "accounts"
|
||||
colShg = "shared_groups"
|
||||
colLcr = "lcr_rules"
|
||||
colDcs = "derived_chargers"
|
||||
colAls = "aliases"
|
||||
colRls = "reverse_aliases"
|
||||
colStq = "stat_qeues"
|
||||
colPbs = "pubsub"
|
||||
colUsr = "users"
|
||||
colCrs = "cdr_stats"
|
||||
colLht = "load_history"
|
||||
colVer = "versions"
|
||||
colRL = "resource_limits"
|
||||
colSts = "stats"
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
colDst = "destinations"
|
||||
colRds = "reverse_destinations"
|
||||
colAct = "actions"
|
||||
colApl = "action_plans"
|
||||
colAAp = "account_action_plans"
|
||||
colTsk = "tasks"
|
||||
colAtr = "action_triggers"
|
||||
colRpl = "rating_plans"
|
||||
colRpf = "rating_profiles"
|
||||
colAcc = "accounts"
|
||||
colShg = "shared_groups"
|
||||
colLcr = "lcr_rules"
|
||||
colDcs = "derived_chargers"
|
||||
colAls = "aliases"
|
||||
colRCfgs = "reverse_aliases"
|
||||
colStq = "stat_qeues"
|
||||
colPbs = "pubsub"
|
||||
colUsr = "users"
|
||||
colCrs = "cdr_stats"
|
||||
colLht = "load_history"
|
||||
colVer = "versions"
|
||||
colRCfg = "resource_configs"
|
||||
colSts = "stats"
|
||||
colRFI = "request_filter_indexes"
|
||||
colTmg = "timings"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -149,7 +149,7 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
|
||||
}
|
||||
var colectNames []string // collection names containing this index
|
||||
if ms.storageType == utils.DataDB {
|
||||
colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRls, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht}
|
||||
colectNames = []string{colAct, colApl, colAAp, colAtr, colDcs, colRCfgs, colRpl, colLcr, colDst, colRds, colAls, colUsr, colLht}
|
||||
}
|
||||
for _, col := range colectNames {
|
||||
if err = db.C(col).EnsureIndex(idx); err != nil {
|
||||
@@ -317,13 +317,13 @@ func (ms *MongoStorage) getColNameForPrefix(prefix string) (name string, ok bool
|
||||
utils.LCR_PREFIX: colLcr,
|
||||
utils.DERIVEDCHARGERS_PREFIX: colDcs,
|
||||
utils.ALIASES_PREFIX: colAls,
|
||||
utils.REVERSE_ALIASES_PREFIX: colRls,
|
||||
utils.REVERSE_ALIASES_PREFIX: colRCfgs,
|
||||
utils.PUBSUB_SUBSCRIBERS_PREFIX: colPbs,
|
||||
utils.USERS_PREFIX: colUsr,
|
||||
utils.CDR_STATS_PREFIX: colCrs,
|
||||
utils.LOADINST_KEY: colLht,
|
||||
utils.VERSION_PREFIX: colVer,
|
||||
utils.ResourcesPrefix: colRL,
|
||||
utils.ResourcesPrefix: colRCfg,
|
||||
utils.StatsPrefix: colSts,
|
||||
utils.TimingsPrefix: colTmg,
|
||||
}
|
||||
@@ -620,12 +620,12 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
result = append(result, utils.ACTION_PLAN_PREFIX+keyResult.Key)
|
||||
}
|
||||
case utils.REVERSE_ALIASES_PREFIX:
|
||||
iter := db.C(colRls).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
|
||||
iter := db.C(colRCfgs).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"key": 1}).Iter()
|
||||
for iter.Next(&keyResult) {
|
||||
result = append(result, utils.REVERSE_ALIASES_PREFIX+keyResult.Key)
|
||||
}
|
||||
case utils.ResourcesPrefix:
|
||||
iter := db.C(colRL).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
iter := db.C(colRCfg).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.ResourcesPrefix+idResult.Id)
|
||||
}
|
||||
@@ -635,12 +635,12 @@ func (ms *MongoStorage) GetKeysForPrefix(prefix string) (result []string, err er
|
||||
result = append(result, utils.StatsPrefix+idResult.Id)
|
||||
}
|
||||
case utils.AccountActionPlansPrefix:
|
||||
iter := db.C(colRL).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
iter := db.C(colRCfg).Find(bson.M{"key": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.AccountActionPlansPrefix+keyResult.Key)
|
||||
}
|
||||
case utils.TimingsPrefix:
|
||||
iter := db.C(colRL).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
iter := db.C(colRCfg).Find(bson.M{"id": bson.M{"$regex": bson.RegEx{Pattern: subject}}}).Select(bson.M{"id": 1}).Iter()
|
||||
for iter.Next(&idResult) {
|
||||
result = append(result, utils.TimingsPrefix+idResult.Id)
|
||||
}
|
||||
@@ -1313,7 +1313,7 @@ func (ms *MongoStorage) GetReverseAlias(reverseID string, skipCache bool, transa
|
||||
Key string
|
||||
Value []string
|
||||
}
|
||||
session, col := ms.conn(colRls)
|
||||
session, col := ms.conn(colRCfgs)
|
||||
defer session.Close()
|
||||
if err = col.Find(bson.M{"key": reverseID}).One(&result); err != nil {
|
||||
if err == mgo.ErrNotFound {
|
||||
@@ -1328,7 +1328,7 @@ func (ms *MongoStorage) GetReverseAlias(reverseID string, skipCache bool, transa
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetReverseAlias(al *Alias, transactionID string) (err error) {
|
||||
session, col := ms.conn(colRls)
|
||||
session, col := ms.conn(colRCfgs)
|
||||
defer session.Close()
|
||||
for _, value := range al.Values {
|
||||
for target, pairs := range value.Pairs {
|
||||
@@ -1367,7 +1367,7 @@ func (ms *MongoStorage) RemoveAlias(key, transactionID string) (err error) {
|
||||
cCommit := cacheCommit(transactionID)
|
||||
cache.RemKey(key, cCommit, transactionID)
|
||||
session.Close()
|
||||
session, col = ms.conn(colRls)
|
||||
session, col = ms.conn(colRCfgs)
|
||||
defer session.Close()
|
||||
for _, value := range al.Values {
|
||||
tmpKey := utils.ConcatenatedKey(al.GetId(), value.DestinationId)
|
||||
@@ -1842,7 +1842,7 @@ func (ms *MongoStorage) GetStructVersion() (rsv *StructVersion, err error) {
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetResourceCfg(id string, skipCache bool, transactionID string) (rl *ResourceCfg, err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
@@ -1851,7 +1851,7 @@ func (ms *MongoStorage) GetResourceCfg(id string, skipCache bool, transactionID
|
||||
return x.(*ResourceCfg), nil
|
||||
}
|
||||
}
|
||||
session, col := ms.conn(colRL)
|
||||
session, col := ms.conn(colRCfg)
|
||||
defer session.Close()
|
||||
rl = new(ResourceCfg)
|
||||
if err = col.Find(bson.M{"id": id}).One(rl); err != nil {
|
||||
@@ -1871,22 +1871,34 @@ func (ms *MongoStorage) GetResourceCfg(id string, skipCache bool, transactionID
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetResourceCfg(rl *ResourceCfg, transactionID string) (err error) {
|
||||
session, col := ms.conn(colRL)
|
||||
session, col := ms.conn(colRCfg)
|
||||
defer session.Close()
|
||||
_, err = col.Upsert(bson.M{"id": rl.ID}, rl)
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveResourceCfg(id string, transactionID string) (err error) {
|
||||
session, col := ms.conn(colRL)
|
||||
session, col := ms.conn(colRCfg)
|
||||
defer session.Close()
|
||||
if err = col.Remove(bson.M{"id": id}); err != nil {
|
||||
return
|
||||
}
|
||||
cache.RemKey(utils.ResourcesPrefix+id, cacheCommit(transactionID), transactionID)
|
||||
cache.RemKey(utils.ResourceConfigsPrefix+id, cacheCommit(transactionID), transactionID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) SetResource(r *Resource) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) RemoveResource(id string, transactionID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (ms *MongoStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
|
||||
@@ -1372,7 +1372,7 @@ func (rs *RedisStorage) GetStructVersion() (rsv *StructVersion, err error) {
|
||||
|
||||
func (rs *RedisStorage) GetResourceCfg(id string,
|
||||
skipCache bool, transactionID string) (rl *ResourceCfg, err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
if !skipCache {
|
||||
if x, ok := cache.Get(key); ok {
|
||||
if x == nil {
|
||||
@@ -1406,11 +1406,11 @@ func (rs *RedisStorage) SetResourceCfg(r *ResourceCfg, transactionID string) err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rs.Cmd("SET", utils.ResourcesPrefix+r.ID, result).Err
|
||||
return rs.Cmd("SET", utils.ResourceConfigsPrefix+r.ID, result).Err
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveResourceCfg(id string, transactionID string) (err error) {
|
||||
key := utils.ResourcesPrefix + id
|
||||
key := utils.ResourceConfigsPrefix + id
|
||||
if err = rs.Cmd("DEL", key).Err; err != nil {
|
||||
return
|
||||
}
|
||||
@@ -1418,6 +1418,18 @@ func (rs *RedisStorage) RemoveResourceCfg(id string, transactionID string) (err
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) SetResource(r *Resource) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) RemoveResource(id string, transactionID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (rs *RedisStorage) GetTiming(id string, skipCache bool, transactionID string) (t *utils.TPTiming, err error) {
|
||||
key := utils.TimingsPrefix + id
|
||||
if !skipCache {
|
||||
@@ -1541,7 +1553,7 @@ func (rs *RedisStorage) RemoveVersions(vrs Versions) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// GetStatsQueue retrieves a StatsQueue from dataDB
|
||||
// GetStatsConfig retrieves a StatsConfig from dataDB
|
||||
func (rs *RedisStorage) GetStatsConfig(sqID string) (sq *StatsConfig, err error) {
|
||||
key := utils.StatsConfigPrefix + sqID
|
||||
var values []byte
|
||||
|
||||
@@ -54,6 +54,7 @@ var (
|
||||
CacheAliases: ALIASES_PREFIX,
|
||||
CacheReverseAliases: REVERSE_ALIASES_PREFIX,
|
||||
CacheDerivedChargers: DERIVEDCHARGERS_PREFIX,
|
||||
CacheResourceConfigs: ResourceConfigsPrefix,
|
||||
CacheResources: ResourcesPrefix,
|
||||
CacheTimings: TimingsPrefix,
|
||||
}
|
||||
@@ -236,6 +237,7 @@ const (
|
||||
REVERSE_ALIASES_PREFIX = "rls_"
|
||||
ResourcesPrefix = "res_"
|
||||
ResourcesIndex = "rsi_"
|
||||
ResourceConfigsPrefix = "rcf_"
|
||||
StatsPrefix = "sts_"
|
||||
StatsIndex = "sti_"
|
||||
ThresholdsPrefix = "ths_"
|
||||
@@ -421,6 +423,7 @@ const (
|
||||
CacheReverseAliases = "reverse_aliases"
|
||||
CacheDerivedChargers = "derived_chargers"
|
||||
CacheResources = "resources"
|
||||
CacheResourceConfigs = "resource_configs"
|
||||
CacheTimings = "timings"
|
||||
StatS = "stats"
|
||||
CostSource = "CostSource"
|
||||
|
||||
Reference in New Issue
Block a user