Tenant in Resource

This commit is contained in:
DanB
2017-09-15 14:18:56 +02:00
parent 95565e8ff0
commit 9fbe0eefa4
6 changed files with 98 additions and 71 deletions

View File

@@ -82,6 +82,11 @@ type Resource struct {
rPrf *ResourceProfile // for ordering purposes
}
// TenantID returns the unique ID in a multi-tenant environment
func (r *Resource) TenantID() string {
return utils.ConcatenatedKey(r.Tenant, r.ID)
}
// removeExpiredUnits removes units which are expired from the resource
func (r *Resource) removeExpiredUnits() {
var firstActive int
@@ -187,13 +192,21 @@ func (rs Resources) clearUsage(ruID string) (err error) {
return
}
// ids returns list of resource IDs in resources
func (rs Resources) ids() (ids []string) {
ids = make([]string, len(rs))
// tenantIDs returns list of TenantIDs in resources
func (rs Resources) tenantIDs() []*utils.TenantID {
tntIDs := make([]*utils.TenantID, len(rs))
for i, r := range rs {
ids[i] = r.ID
tntIDs[i] = &utils.TenantID{r.Tenant, r.ID}
}
return
return tntIDs
}
func (rs Resources) tenatIDsStr() []string {
ids := make([]string, len(rs))
for i, r := range rs {
ids[i] = r.TenantID()
}
return ids
}
// AllocateResource attempts allocating resources for a *ResourceUsage
@@ -203,7 +216,7 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
if len(rs) == 0 {
return "", utils.ErrResourceUnavailable
}
lockIDs := utils.PrefixSliceItems(rs.ids(), utils.ResourcesPrefix)
lockIDs := utils.PrefixSliceItems(rs.tenatIDsStr(), utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
// Simulate resource usage
@@ -234,7 +247,7 @@ func NewResourceService(dataDB DataDB, storeInterval time.Duration,
statS = nil
}
return &ResourceService{dataDB: dataDB, statS: statS,
lcEventResources: make(map[string][]string),
lcEventResources: make(map[string][]*utils.TenantID),
storedResources: make(utils.StringMap),
storeInterval: storeInterval, stopBackup: make(chan struct{})}, nil
}
@@ -243,7 +256,7 @@ func NewResourceService(dataDB DataDB, storeInterval time.Duration,
type ResourceService struct {
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection // allows applying filters based on stats
lcEventResources map[string][]string // cache recording resources for events in alocation phase
lcEventResources map[string][]*utils.TenantID // cache recording resources for events in alocation phase
lcERMux sync.RWMutex // protects the lcEventResources
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
srMux sync.RWMutex // protects storedResources
@@ -340,7 +353,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
if rIDsIf, has := cache.Get(utils.EventResourcesPrefix + evUUID); !has {
return nil
} else if rIDsIf != nil {
rIDs = rIDsIf.([]string)
rIDs = rIDsIf.([]*utils.TenantID)
}
shortCached = true
}
@@ -348,11 +361,14 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
if len(rIDs) == 0 {
return
}
lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix)
lockIDs := make([]string, len(rIDs))
for i, rTid := range rIDs {
lockIDs[i] = utils.ResourcesPrefix + rTid.TenantID()
}
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for i, rID := range rIDs {
if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil {
for i, rTid := range rIDs {
if r, err := rS.dataDB.GetResource(rTid.Tenant, rTid.ID, false, ""); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ResourceS> force-uncaching resources for evUUID: <%s>, error: <%s>",
evUUID, err.Error()))
@@ -383,7 +399,8 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for resName := range rIDs {
rPrf, err := rS.dataDB.GetResourceProfile(resName, false, utils.NonTransactional)
rTntID := utils.NewTenantID(resName)
rPrf, err := rS.dataDB.GetResourceProfile(rTntID.Tenant, rTntID.ID, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
continue
@@ -406,7 +423,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
if !passAllFilters {
continue
}
r, err := rS.dataDB.GetResource(rPrf.ID, false, "")
r, err := rS.dataDB.GetResource(rPrf.Tenant, rPrf.ID, false, "")
if err != nil {
return nil, err
}
@@ -455,7 +472,7 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow *
if mtcRLs, err = rS.matchingResourcesForEvent(args.Event); err != nil {
return err
}
cache.Set(utils.EventResourcesPrefix+args.UsageID, mtcRLs.ids(), true, "")
cache.Set(utils.EventResourcesPrefix+args.UsageID, mtcRLs.tenantIDs(), true, "")
}
if _, err = mtcRLs.AllocateResource(
&ResourceUsage{ID: args.UsageID,
@@ -498,7 +515,7 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r
}
if wasShortCached || !wasCached {
rS.lcERMux.Lock()
rS.lcEventResources[args.UsageID] = mtcRLs.ids()
rS.lcEventResources[args.UsageID] = mtcRLs.tenantIDs()
rS.lcERMux.Unlock()
}
// index it for storing

View File

@@ -99,12 +99,12 @@ type DataDB interface {
RemoveAlias(string, string) error
SetReverseAlias(*Alias, string) error
GetReverseAlias(string, bool, string) ([]string, error)
GetResourceProfile(string, bool, string) (*ResourceProfile, error)
GetResourceProfile(string, string, bool, string) (*ResourceProfile, error)
SetResourceProfile(*ResourceProfile, string) error
RemoveResourceProfile(string, string) error
GetResource(string, bool, string) (*Resource, error)
RemoveResourceProfile(string, string, string) error
GetResource(string, string, bool, string) (*Resource, error)
SetResource(*Resource) error
RemoveResource(string, string) error
RemoveResource(string, string, string) error
GetTiming(string, bool, string) (*utils.TPTiming, error)
SetTiming(*utils.TPTiming, string) error
RemoveTiming(string, string) error

View File

@@ -193,6 +193,7 @@ func (ms *MapStorage) PreloadCacheForPrefix(prefix string) error {
// CacheDataFromDB loads data to cache,
// prefix represents the cache prefix, IDs should be nil if all available data should be loaded
// ToDo: convert IDs into []*utils.TenantIDs when infrastructure will be ready
func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) (err error) {
if !utils.IsSliceMember([]string{utils.DESTINATION_PREFIX,
utils.REVERSE_DESTINATION_PREFIX,
@@ -273,9 +274,11 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached
case utils.REVERSE_ALIASES_PREFIX:
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
case utils.ResourceProfilesPrefix:
_, err = ms.GetResourceProfile(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = ms.GetResourceProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.ResourcesPrefix:
_, err = ms.GetResource(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = ms.GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.TimingsPrefix:
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
}
@@ -1276,10 +1279,10 @@ func (ms *MapStorage) GetSMCost(cgrid, source, runid, originHost, originID strin
return
}
func (ms *MapStorage) GetResourceProfile(id string, skipCache bool, transactionID string) (rsp *ResourceProfile, err error) {
func (ms *MapStorage) GetResourceProfile(tenant, id string, skipCache bool, transactionID string) (rsp *ResourceProfile, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.ResourceProfilesPrefix + id
key := utils.ResourceProfilesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
@@ -1313,23 +1316,23 @@ func (ms *MapStorage) SetResourceProfile(r *ResourceProfile, transactionID strin
if err != nil {
return err
}
ms.dict[utils.ResourceProfilesPrefix+r.ID] = result
ms.dict[utils.ResourceProfilesPrefix+r.TenantID()] = result
return nil
}
func (ms *MapStorage) RemoveResourceProfile(id string, transactionID string) error {
func (ms *MapStorage) RemoveResourceProfile(tenant, id string, transactionID string) error {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.ResourceProfilesPrefix + id
key := utils.ResourceProfilesPrefix + utils.ConcatenatedKey(tenant, id)
delete(ms.dict, key)
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return nil
}
func (ms *MapStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
func (ms *MapStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
key := utils.ResourcesPrefix + id
key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x != nil {
@@ -1358,14 +1361,14 @@ func (ms *MapStorage) SetResource(r *Resource) (err error) {
if err != nil {
return err
}
ms.dict[utils.ResourcesPrefix+r.ID] = result
ms.dict[utils.ResourcesPrefix+r.TenantID()] = result
return
}
func (ms *MapStorage) RemoveResource(id string, transactionID string) (err error) {
func (ms *MapStorage) RemoveResource(tenant, id string, transactionID string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
key := utils.ResourcesPrefix + id
key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id)
delete(ms.dict, key)
cache.RemKey(key, cacheCommit(transactionID), transactionID)
return

View File

@@ -526,9 +526,11 @@ func (ms *MongoStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
case utils.REVERSE_ALIASES_PREFIX:
_, err = ms.GetReverseAlias(dataID, true, utils.NonTransactional)
case utils.ResourceProfilesPrefix:
_, err = ms.GetResourceProfile(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = ms.GetResourceProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.ResourcesPrefix:
_, err = ms.GetResource(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = ms.GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.TimingsPrefix:
_, err = ms.GetTiming(dataID, true, utils.NonTransactional)
}
@@ -1830,8 +1832,8 @@ func (ms *MongoStorage) GetAllCdrStats() (css []*CdrStats, err error) {
return
}
func (ms *MongoStorage) GetResourceProfile(id string, skipCache bool, transactionID string) (rp *ResourceProfile, err error) {
key := utils.ResourceProfilesPrefix + id
func (ms *MongoStorage) GetResourceProfile(tenant, id string, skipCache bool, transactionID string) (rp *ResourceProfile, err error) {
key := utils.ResourceProfilesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
@@ -1843,7 +1845,7 @@ func (ms *MongoStorage) GetResourceProfile(id string, skipCache bool, transactio
session, col := ms.conn(colRsP)
defer session.Close()
rp = new(ResourceProfile)
if err = col.Find(bson.M{"id": id}).One(rp); err != nil {
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(rp); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
@@ -1862,22 +1864,23 @@ func (ms *MongoStorage) GetResourceProfile(id string, skipCache bool, transactio
func (ms *MongoStorage) SetResourceProfile(rp *ResourceProfile, transactionID string) (err error) {
session, col := ms.conn(colRsP)
defer session.Close()
_, err = col.Upsert(bson.M{"id": rp.ID}, rp)
_, err = col.Upsert(bson.M{"tenant": rp.Tenant, "id": rp.ID}, rp)
return
}
func (ms *MongoStorage) RemoveResourceProfile(id string, transactionID string) (err error) {
func (ms *MongoStorage) RemoveResourceProfile(tenant, id string, transactionID string) (err error) {
session, col := ms.conn(colRsP)
defer session.Close()
if err = col.Remove(bson.M{"id": id}); err != nil {
if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil {
return
}
cache.RemKey(utils.ResourceProfilesPrefix+id, cacheCommit(transactionID), transactionID)
cache.RemKey(utils.ResourceProfilesPrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
return nil
}
func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
key := utils.ResourcesPrefix + id
func (ms *MongoStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) {
key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
@@ -1889,7 +1892,7 @@ func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID str
session, col := ms.conn(colRes)
defer session.Close()
r = new(Resource)
if err = col.Find(bson.M{"id": id}).One(r); err != nil {
if err = col.Find(bson.M{"tenant": tenant, "id": id}).One(r); err != nil {
if err == mgo.ErrNotFound {
err = utils.ErrNotFound
cache.Set(key, nil, cacheCommit(transactionID), transactionID)
@@ -1903,17 +1906,18 @@ func (ms *MongoStorage) GetResource(id string, skipCache bool, transactionID str
func (ms *MongoStorage) SetResource(r *Resource) (err error) {
session, col := ms.conn(colRes)
defer session.Close()
_, err = col.Upsert(bson.M{"id": r.ID}, r)
_, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r)
return
}
func (ms *MongoStorage) RemoveResource(id string, transactionID string) (err error) {
func (ms *MongoStorage) RemoveResource(tenant, id string, transactionID string) (err error) {
session, col := ms.conn(colRes)
defer session.Close()
if err = col.Remove(bson.M{"id": id}); err != nil {
if err = col.Remove(bson.M{"tenant": tenant, "id": id}); err != nil {
return
}
cache.RemKey(utils.ResourcesPrefix+id, cacheCommit(transactionID), transactionID)
cache.RemKey(utils.ResourcesPrefix+utils.ConcatenatedKey(tenant, id),
cacheCommit(transactionID), transactionID)
return nil
}

View File

@@ -288,9 +288,11 @@ func (rs *RedisStorage) CacheDataFromDB(prfx string, ids []string, mustBeCached
case utils.REVERSE_ALIASES_PREFIX:
_, err = rs.GetReverseAlias(dataID, true, utils.NonTransactional)
case utils.ResourceProfilesPrefix:
_, err = rs.GetResourceProfile(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = rs.GetResourceProfile(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.ResourcesPrefix:
_, err = rs.GetResource(dataID, true, utils.NonTransactional)
tntID := utils.NewTenantID(dataID)
_, err = rs.GetResource(tntID.Tenant, tntID.ID, true, utils.NonTransactional)
case utils.TimingsPrefix:
_, err = rs.GetTiming(dataID, true, utils.NonTransactional)
}
@@ -1350,8 +1352,8 @@ func (rs *RedisStorage) GetAllCdrStats() (css []*CdrStats, err error) {
return
}
func (rs *RedisStorage) GetResourceProfile(id string, skipCache bool, transactionID string) (rsp *ResourceProfile, err error) {
key := utils.ResourceProfilesPrefix + id
func (rs *RedisStorage) GetResourceProfile(tenant, id string, skipCache bool, transactionID string) (rsp *ResourceProfile, err error) {
key := utils.ResourceProfilesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
@@ -1385,11 +1387,11 @@ func (rs *RedisStorage) SetResourceProfile(rsp *ResourceProfile, transactionID s
if err != nil {
return err
}
return rs.Cmd("SET", utils.ResourceProfilesPrefix+rsp.ID, result).Err
return rs.Cmd("SET", utils.ResourceProfilesPrefix+rsp.TenantID(), result).Err
}
func (rs *RedisStorage) RemoveResourceProfile(id string, transactionID string) (err error) {
key := utils.ResourceProfilesPrefix + id
func (rs *RedisStorage) RemoveResourceProfile(tenant, id string, transactionID string) (err error) {
key := utils.ResourceProfilesPrefix + utils.ConcatenatedKey(tenant, id)
if err = rs.Cmd("DEL", key).Err; err != nil {
return
}
@@ -1397,8 +1399,8 @@ func (rs *RedisStorage) RemoveResourceProfile(id string, transactionID string) (
return
}
func (rs *RedisStorage) GetResource(id string, skipCache bool, transactionID string) (r *Resource, err error) {
key := utils.ResourcesPrefix + id
func (rs *RedisStorage) GetResource(tenant, id string, skipCache bool, transactionID string) (r *Resource, err error) {
key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id)
if !skipCache {
if x, ok := cache.Get(key); ok {
if x == nil {
@@ -1427,11 +1429,11 @@ func (rs *RedisStorage) SetResource(r *Resource) (err error) {
if err != nil {
return err
}
return rs.Cmd("SET", utils.ResourcesPrefix+r.ID, result).Err
return rs.Cmd("SET", utils.ResourcesPrefix+r.TenantID(), result).Err
}
func (rs *RedisStorage) RemoveResource(id string, transactionID string) (err error) {
key := utils.ResourcesPrefix + id
func (rs *RedisStorage) RemoveResource(tenant, id string, transactionID string) (err error) {
key := utils.ResourcesPrefix + utils.ConcatenatedKey(tenant, id)
if err = rs.Cmd("DEL", key).Err; err != nil {
return
}