|
|
|
|
@@ -223,16 +223,16 @@ func (rs Resources) clearUsage(ruTntID string) (err error) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// tenantIDs returns list of TenantIDs in resources
|
|
|
|
|
func (rs Resources) tenantIDs() []*utils.TenantID {
|
|
|
|
|
tntIDs := make([]*utils.TenantID, len(rs))
|
|
|
|
|
for i, r := range rs {
|
|
|
|
|
tntIDs[i] = &utils.TenantID{r.Tenant, r.ID}
|
|
|
|
|
// resIDsMp returns a map of resource IDs which is used for caching
|
|
|
|
|
func (rs Resources) resIDsMp() (mp utils.StringMap) {
|
|
|
|
|
mp = make(utils.StringMap)
|
|
|
|
|
for _, r := range rs {
|
|
|
|
|
mp[r.ID] = true
|
|
|
|
|
}
|
|
|
|
|
return tntIDs
|
|
|
|
|
return mp
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rs Resources) tenatIDsStr() []string {
|
|
|
|
|
func (rs Resources) tenatIDs() []string {
|
|
|
|
|
ids := make([]string, len(rs))
|
|
|
|
|
for i, r := range rs {
|
|
|
|
|
ids[i] = r.TenantID()
|
|
|
|
|
@@ -240,6 +240,14 @@ func (rs Resources) tenatIDsStr() []string {
|
|
|
|
|
return ids
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rs Resources) IDs() []string {
|
|
|
|
|
ids := make([]string, len(rs))
|
|
|
|
|
for i, r := range rs {
|
|
|
|
|
ids[i] = r.ID
|
|
|
|
|
}
|
|
|
|
|
return ids
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// allocateResource attempts allocating resources for a *ResourceUsage
|
|
|
|
|
// simulates on dryRun
|
|
|
|
|
// returns utils.ErrResourceUnavailable if allocation is not possible
|
|
|
|
|
@@ -247,7 +255,7 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
|
|
|
|
|
if len(rs) == 0 {
|
|
|
|
|
return "", utils.ErrResourceUnavailable
|
|
|
|
|
}
|
|
|
|
|
lockIDs := utils.PrefixSliceItems(rs.tenatIDsStr(), utils.ResourcesPrefix)
|
|
|
|
|
lockIDs := utils.PrefixSliceItems(rs.tenatIDs(), utils.ResourcesPrefix)
|
|
|
|
|
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
|
|
|
|
|
// Simulate resource usage
|
|
|
|
|
for _, r := range rs {
|
|
|
|
|
@@ -290,7 +298,6 @@ func NewResourceService(dm *DataManager, storeInterval time.Duration,
|
|
|
|
|
thdS = nil
|
|
|
|
|
}
|
|
|
|
|
return &ResourceService{dm: dm, thdS: thdS,
|
|
|
|
|
lcEventResources: make(map[string][]*utils.TenantID),
|
|
|
|
|
storedResources: make(utils.StringMap),
|
|
|
|
|
storeInterval: storeInterval,
|
|
|
|
|
filterS: filterS,
|
|
|
|
|
@@ -306,12 +313,10 @@ type ResourceService struct {
|
|
|
|
|
filterS *FilterS
|
|
|
|
|
stringIndexedFields *[]string // speed up query on indexes
|
|
|
|
|
prefixIndexedFields *[]string
|
|
|
|
|
lcEventResources map[string][]*utils.TenantID // cache recording resources for events in alocation phase
|
|
|
|
|
lcERMux sync.RWMutex // protects the lcEventResources
|
|
|
|
|
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
|
|
|
|
|
srMux sync.RWMutex // protects storedResources
|
|
|
|
|
storeInterval time.Duration // interval to dump data on
|
|
|
|
|
stopBackup chan struct{} // control storing process
|
|
|
|
|
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
|
|
|
|
|
srMux sync.RWMutex // protects storedResources
|
|
|
|
|
storeInterval time.Duration // interval to dump data on
|
|
|
|
|
stopBackup chan struct{} // control storing process
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Called to start the service
|
|
|
|
|
@@ -342,7 +347,7 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) {
|
|
|
|
|
r.ID, err.Error()))
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
//since we no longer handle cache in DataManager do here a manul caching
|
|
|
|
|
//since we no longer handle cache in DataManager do here a manual caching
|
|
|
|
|
if err = rS.dm.CacheDataFromDB(utils.ResourcesPrefix, []string{r.TenantID()}, true); err != nil {
|
|
|
|
|
utils.Logger.Warning(
|
|
|
|
|
fmt.Sprintf("<ResourceS> failed caching Resource with ID: %s, error: %s",
|
|
|
|
|
@@ -399,114 +404,6 @@ func (rS *ResourceService) runBackup() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cachedResourcesForEvent attempts to retrieve cached resources for an event
|
|
|
|
|
// returns nil if event not cached or errors occur
|
|
|
|
|
// returns []Resource if negative reply was cached
|
|
|
|
|
func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) {
|
|
|
|
|
var shortCached bool
|
|
|
|
|
rS.lcERMux.RLock()
|
|
|
|
|
rIDs, has := rS.lcEventResources[evUUID]
|
|
|
|
|
rS.lcERMux.RUnlock()
|
|
|
|
|
if !has {
|
|
|
|
|
if rIDsIf, has := Cache.Get(utils.CacheEventResources, evUUID); !has {
|
|
|
|
|
return nil
|
|
|
|
|
} else if rIDsIf != nil {
|
|
|
|
|
rIDs = rIDsIf.([]*utils.TenantID)
|
|
|
|
|
}
|
|
|
|
|
shortCached = true
|
|
|
|
|
}
|
|
|
|
|
rs = make(Resources, len(rIDs))
|
|
|
|
|
if len(rIDs) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
lockIDs := make([]string, len(rIDs))
|
|
|
|
|
for i, rTid := range rIDs {
|
|
|
|
|
lockIDs[i] = utils.ResourcesPrefix + rTid.TenantID()
|
|
|
|
|
}
|
|
|
|
|
guardian.Guardian.Guard(func() (gRes interface{}, gErr error) {
|
|
|
|
|
for i, rTid := range rIDs {
|
|
|
|
|
if r, err := rS.dm.GetResource(rTid.Tenant, rTid.ID, true, true, ""); err != nil {
|
|
|
|
|
utils.Logger.Warning(
|
|
|
|
|
fmt.Sprintf("<ResourceS> force-uncaching resources for evUUID: <%s>, error: <%s>",
|
|
|
|
|
evUUID, err.Error()))
|
|
|
|
|
// on errors, cleanup cache so we recache
|
|
|
|
|
if shortCached {
|
|
|
|
|
Cache.Remove(utils.CacheEventResources, evUUID, true, "")
|
|
|
|
|
} else {
|
|
|
|
|
rS.lcERMux.Lock()
|
|
|
|
|
delete(rS.lcEventResources, evUUID)
|
|
|
|
|
rS.lcERMux.Unlock()
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
} else {
|
|
|
|
|
rs[i] = r
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
|
|
|
|
|
func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent, usageTTL *time.Duration) (rs Resources, err error) {
|
|
|
|
|
matchingResources := make(map[string]*Resource)
|
|
|
|
|
rIDs, err := MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
|
|
|
|
|
rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.ResourceSCfg().IndexedSelects)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
for resName := range rIDs {
|
|
|
|
|
rPrf, err := rS.dm.GetResourceProfile(ev.Tenant, resName, true, true, utils.NonTransactional)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == utils.ErrNotFound {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if rPrf.ActivationInterval != nil && ev.Time != nil &&
|
|
|
|
|
!rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if pass, err := rS.filterS.Pass(ev.Tenant, rPrf.FilterIDs,
|
|
|
|
|
config.NewNavigableMap(ev.Event)); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if !pass {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if rPrf.Stored && r.dirty == nil {
|
|
|
|
|
r.dirty = utils.BoolPointer(false)
|
|
|
|
|
}
|
|
|
|
|
if usageTTL != nil {
|
|
|
|
|
if *usageTTL != 0 {
|
|
|
|
|
r.ttl = usageTTL
|
|
|
|
|
}
|
|
|
|
|
} else if rPrf.UsageTTL >= 0 {
|
|
|
|
|
r.ttl = utils.DurationPointer(rPrf.UsageTTL)
|
|
|
|
|
}
|
|
|
|
|
r.rPrf = rPrf
|
|
|
|
|
matchingResources[rPrf.ID] = r
|
|
|
|
|
}
|
|
|
|
|
// All good, convert from Map to Slice so we can sort
|
|
|
|
|
rs = make(Resources, len(matchingResources))
|
|
|
|
|
i := 0
|
|
|
|
|
for _, r := range matchingResources {
|
|
|
|
|
rs[i] = r
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
rs.Sort()
|
|
|
|
|
for i, r := range rs {
|
|
|
|
|
if r.rPrf.Blocker { // blocker will stop processing
|
|
|
|
|
rs = rs[:i+1]
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// processThresholds will pass the event for resource to ThresholdS
|
|
|
|
|
func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.ArgDispatcher) (err error) {
|
|
|
|
|
if rS.thdS == nil {
|
|
|
|
|
@@ -535,30 +432,129 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A
|
|
|
|
|
if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
|
|
|
|
|
err.Error() != utils.ErrNotFound.Error() {
|
|
|
|
|
utils.Logger.Warning(
|
|
|
|
|
fmt.Sprintf("<ResourceS> error: %s processing event %+v with ThresholdS.", err.Error(), thEv))
|
|
|
|
|
fmt.Sprintf("<%s> error: %s processing event %+v with %s.",
|
|
|
|
|
utils.ResourceS, err.Error(), thEv, utils.ThresholdS))
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
|
|
|
|
|
func (rS *ResourceService) matchingResourcesForEvent(ev *utils.CGREvent,
|
|
|
|
|
evUUID string, usageTTL *time.Duration) (rs Resources, err error) {
|
|
|
|
|
matchingResources := make(map[string]*Resource)
|
|
|
|
|
var isCached bool
|
|
|
|
|
var rIDs utils.StringMap
|
|
|
|
|
if x, ok := Cache.Get(utils.CacheEventResources, evUUID); ok { // The ResourceIDs were cached as utils.StringMap{"resID":bool}
|
|
|
|
|
isCached = true
|
|
|
|
|
if x == nil {
|
|
|
|
|
err = utils.ErrNotFound
|
|
|
|
|
}
|
|
|
|
|
rIDs = x.(utils.StringMap)
|
|
|
|
|
} else { // select the resourceIDs out of dataDB
|
|
|
|
|
rIDs, err = MatchingItemIDsForEvent(ev.Event, rS.stringIndexedFields, rS.prefixIndexedFields,
|
|
|
|
|
rS.dm, utils.CacheResourceFilterIndexes, ev.Tenant, rS.filterS.cfg.ResourceSCfg().IndexedSelects)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == utils.ErrNotFound {
|
|
|
|
|
Cache.Set(utils.CacheEventResources, evUUID, nil, nil, true, "") // cache negative match
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
lockIDs := utils.PrefixSliceItems(rs.IDs(), utils.ResourcesPrefix)
|
|
|
|
|
guardian.Guardian.Guard(func() (gIface interface{}, gErr error) {
|
|
|
|
|
for resName := range rIDs {
|
|
|
|
|
var rPrf *ResourceProfile
|
|
|
|
|
if rPrf, err = rS.dm.GetResourceProfile(ev.Tenant, resName,
|
|
|
|
|
true, true, utils.NonTransactional); err != nil {
|
|
|
|
|
if err == utils.ErrNotFound {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if rPrf.ActivationInterval != nil && ev.Time != nil &&
|
|
|
|
|
!rPrf.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if pass, err := rS.filterS.Pass(ev.Tenant, rPrf.FilterIDs,
|
|
|
|
|
config.NewNavigableMap(ev.Event)); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
} else if !pass {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
r, err := rS.dm.GetResource(rPrf.Tenant, rPrf.ID, true, true, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if rPrf.Stored && r.dirty == nil {
|
|
|
|
|
r.dirty = utils.BoolPointer(false)
|
|
|
|
|
}
|
|
|
|
|
if usageTTL != nil {
|
|
|
|
|
if *usageTTL != 0 {
|
|
|
|
|
r.ttl = usageTTL
|
|
|
|
|
}
|
|
|
|
|
} else if rPrf.UsageTTL >= 0 {
|
|
|
|
|
r.ttl = utils.DurationPointer(rPrf.UsageTTL)
|
|
|
|
|
}
|
|
|
|
|
r.rPrf = rPrf
|
|
|
|
|
matchingResources[rPrf.ID] = r
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockIDs...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if isCached {
|
|
|
|
|
Cache.Remove(utils.CacheEventResources, evUUID,
|
|
|
|
|
cacheCommit(utils.NonTransactional), utils.NonTransactional)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// All good, convert from Map to Slice so we can sort
|
|
|
|
|
rs = make(Resources, len(matchingResources))
|
|
|
|
|
i := 0
|
|
|
|
|
for _, r := range matchingResources {
|
|
|
|
|
rs[i] = r
|
|
|
|
|
i++
|
|
|
|
|
}
|
|
|
|
|
rs.Sort()
|
|
|
|
|
for i, r := range rs {
|
|
|
|
|
if r.rPrf.Blocker { // blocker will stop processing
|
|
|
|
|
rs = rs[:i+1]
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Cache.Set(utils.CacheEventResources, evUUID, rs.resIDsMp(), nil, true, "")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// V1ResourcesForEvent returns active resource configs matching the event
|
|
|
|
|
func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage, reply *Resources) (err error) {
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant", "ID"}); len(missing) != 0 { //Params missing
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
} else if args.Event == nil {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing("Event")
|
|
|
|
|
} else if args.UsageID == "" {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(utils.UsageID)
|
|
|
|
|
}
|
|
|
|
|
var mtcRLs Resources
|
|
|
|
|
if args.UsageID != "" { // only cached if UsageID is present
|
|
|
|
|
mtcRLs = rS.cachedResourcesForEvent(args.TenantID())
|
|
|
|
|
}
|
|
|
|
|
if mtcRLs == nil {
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
// RPC caching
|
|
|
|
|
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
|
|
|
|
|
cacheKey := utils.ConcatenatedKey(utils.ResourceSv1GetResourcesForEvent, args.TenantID())
|
|
|
|
|
refID := guardian.Guardian.GuardIDs("",
|
|
|
|
|
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
|
|
|
|
|
defer guardian.Guardian.UnguardIDs(refID)
|
|
|
|
|
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
|
|
|
|
|
cachedResp := itm.(*utils.CachedRPCResponse)
|
|
|
|
|
if cachedResp.Error == nil {
|
|
|
|
|
*reply = *cachedResp.Result.(*Resources)
|
|
|
|
|
}
|
|
|
|
|
return cachedResp.Error
|
|
|
|
|
}
|
|
|
|
|
Cache.Set(utils.CacheEventResources, args.TenantID(), mtcRLs.tenantIDs(), nil, true, "")
|
|
|
|
|
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
|
|
|
|
|
&utils.CachedRPCResponse{Result: reply, Error: err},
|
|
|
|
|
nil, true, utils.NonTransactional)
|
|
|
|
|
}
|
|
|
|
|
if len(mtcRLs) == 0 {
|
|
|
|
|
return utils.ErrNotFound
|
|
|
|
|
// end of RPC caching
|
|
|
|
|
|
|
|
|
|
var mtcRLs Resources
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
*reply = mtcRLs
|
|
|
|
|
return
|
|
|
|
|
@@ -566,23 +562,36 @@ func (rS *ResourceService) V1ResourcesForEvent(args utils.ArgRSv1ResourceUsage,
|
|
|
|
|
|
|
|
|
|
// V1AuthorizeResources queries service to find if an Usage is allowed
|
|
|
|
|
func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage, reply *string) (err error) {
|
|
|
|
|
var alcMessage string
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
} else if args.UsageID == "" {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(utils.UsageID)
|
|
|
|
|
}
|
|
|
|
|
if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
}
|
|
|
|
|
if args.CGREvent.Event == nil {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing("Event")
|
|
|
|
|
}
|
|
|
|
|
mtcRLs := rS.cachedResourcesForEvent(args.TenantID())
|
|
|
|
|
if mtcRLs == nil {
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
|
|
// RPC caching
|
|
|
|
|
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
|
|
|
|
|
cacheKey := utils.ConcatenatedKey(utils.ResourceSv1AuthorizeResources, args.TenantID())
|
|
|
|
|
refID := guardian.Guardian.GuardIDs("",
|
|
|
|
|
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
|
|
|
|
|
defer guardian.Guardian.UnguardIDs(refID)
|
|
|
|
|
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
|
|
|
|
|
cachedResp := itm.(*utils.CachedRPCResponse)
|
|
|
|
|
if cachedResp.Error == nil {
|
|
|
|
|
*reply = *cachedResp.Result.(*string)
|
|
|
|
|
}
|
|
|
|
|
return cachedResp.Error
|
|
|
|
|
}
|
|
|
|
|
Cache.Set(utils.CacheEventResources, args.TenantID(), mtcRLs.tenantIDs(), nil, true, "")
|
|
|
|
|
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
|
|
|
|
|
&utils.CachedRPCResponse{Result: reply, Error: err},
|
|
|
|
|
nil, true, utils.NonTransactional)
|
|
|
|
|
}
|
|
|
|
|
// end of RPC caching
|
|
|
|
|
|
|
|
|
|
var mtcRLs Resources
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID, args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
var alcMessage string
|
|
|
|
|
if alcMessage, err = mtcRLs.allocateResource(
|
|
|
|
|
&ResourceUsage{
|
|
|
|
|
Tenant: args.CGREvent.Tenant,
|
|
|
|
|
@@ -590,9 +599,8 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage,
|
|
|
|
|
Units: args.Units}, true); err != nil {
|
|
|
|
|
if err == utils.ErrResourceUnavailable {
|
|
|
|
|
err = utils.ErrResourceUnauthorized
|
|
|
|
|
Cache.Set(utils.CacheEventResources, args.UsageID, nil, nil, true, "")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
*reply = alcMessage
|
|
|
|
|
return
|
|
|
|
|
@@ -600,44 +608,44 @@ func (rS *ResourceService) V1AuthorizeResources(args utils.ArgRSv1ResourceUsage,
|
|
|
|
|
|
|
|
|
|
// V1AllocateResource is called when a resource requires allocation
|
|
|
|
|
func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, reply *string) (err error) {
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
} else if args.UsageID == "" {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(utils.UsageID)
|
|
|
|
|
}
|
|
|
|
|
if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
}
|
|
|
|
|
if args.CGREvent.Event == nil {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing("Event")
|
|
|
|
|
}
|
|
|
|
|
var wasCached bool
|
|
|
|
|
mtcRLs := rS.cachedResourcesForEvent(args.UsageID)
|
|
|
|
|
if mtcRLs == nil {
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil {
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
// RPC caching
|
|
|
|
|
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
|
|
|
|
|
cacheKey := utils.ConcatenatedKey(utils.ResourceSv1AllocateResources, args.TenantID())
|
|
|
|
|
refID := guardian.Guardian.GuardIDs("",
|
|
|
|
|
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
|
|
|
|
|
defer guardian.Guardian.UnguardIDs(refID)
|
|
|
|
|
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
|
|
|
|
|
cachedResp := itm.(*utils.CachedRPCResponse)
|
|
|
|
|
if cachedResp.Error == nil {
|
|
|
|
|
*reply = *cachedResp.Result.(*string)
|
|
|
|
|
}
|
|
|
|
|
return cachedResp.Error
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
wasCached = true
|
|
|
|
|
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
|
|
|
|
|
&utils.CachedRPCResponse{Result: reply, Error: err},
|
|
|
|
|
nil, true, utils.NonTransactional)
|
|
|
|
|
}
|
|
|
|
|
alcMsg, err := mtcRLs.allocateResource(
|
|
|
|
|
&ResourceUsage{Tenant: args.CGREvent.Tenant, ID: args.UsageID, Units: args.Units}, false)
|
|
|
|
|
if err != nil {
|
|
|
|
|
// end of RPC caching
|
|
|
|
|
|
|
|
|
|
var mtcRLs Resources
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID,
|
|
|
|
|
args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var alcMsg string
|
|
|
|
|
if alcMsg, err = mtcRLs.allocateResource(
|
|
|
|
|
&ResourceUsage{Tenant: args.CGREvent.Tenant, ID: args.UsageID,
|
|
|
|
|
Units: args.Units}, false); err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// index it for matching out of cache
|
|
|
|
|
var wasShortCached bool
|
|
|
|
|
if wasCached {
|
|
|
|
|
if _, has := Cache.Get(utils.CacheEventResources, args.UsageID); has {
|
|
|
|
|
// remove from short cache to populate event cache
|
|
|
|
|
wasShortCached = true
|
|
|
|
|
Cache.Remove(utils.CacheEventResources, args.UsageID, true, "")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if wasShortCached || !wasCached {
|
|
|
|
|
rS.lcERMux.Lock()
|
|
|
|
|
rS.lcEventResources[args.UsageID] = mtcRLs.tenantIDs()
|
|
|
|
|
rS.lcERMux.Unlock()
|
|
|
|
|
}
|
|
|
|
|
// index it for storing
|
|
|
|
|
for _, r := range mtcRLs {
|
|
|
|
|
if rS.storeInterval == 0 || r.dirty == nil {
|
|
|
|
|
@@ -659,25 +667,39 @@ func (rS *ResourceService) V1AllocateResource(args utils.ArgRSv1ResourceUsage, r
|
|
|
|
|
|
|
|
|
|
// V1ReleaseResource is called when we need to clear an allocation
|
|
|
|
|
func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, reply *string) (err error) {
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{"Tenant"}); len(missing) != 0 { //Params missing
|
|
|
|
|
if missing := utils.MissingStructFields(&args.CGREvent, []string{utils.Tenant, utils.ID, utils.Event}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
} else if args.UsageID == "" {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(utils.UsageID)
|
|
|
|
|
}
|
|
|
|
|
if missing := utils.MissingStructFields(&args, []string{"UsageID"}); len(missing) != 0 { //Params missing
|
|
|
|
|
return utils.NewErrMandatoryIeMissing(missing...)
|
|
|
|
|
}
|
|
|
|
|
if args.CGREvent.Event == nil {
|
|
|
|
|
return utils.NewErrMandatoryIeMissing("Event")
|
|
|
|
|
}
|
|
|
|
|
mtcRLs := rS.cachedResourcesForEvent(args.UsageID)
|
|
|
|
|
if mtcRLs == nil {
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageTTL); err != nil {
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
// RPC caching
|
|
|
|
|
if config.CgrConfig().CacheCfg()[utils.CacheRPCResponses].Limit != 0 {
|
|
|
|
|
cacheKey := utils.ConcatenatedKey(utils.ResourceSv1ReleaseResources, args.TenantID())
|
|
|
|
|
refID := guardian.Guardian.GuardIDs("",
|
|
|
|
|
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
|
|
|
|
|
defer guardian.Guardian.UnguardIDs(refID)
|
|
|
|
|
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
|
|
|
|
|
cachedResp := itm.(*utils.CachedRPCResponse)
|
|
|
|
|
if cachedResp.Error == nil {
|
|
|
|
|
*reply = *cachedResp.Result.(*string)
|
|
|
|
|
}
|
|
|
|
|
return cachedResp.Error
|
|
|
|
|
}
|
|
|
|
|
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
|
|
|
|
|
&utils.CachedRPCResponse{Result: reply, Error: err},
|
|
|
|
|
nil, true, utils.NonTransactional)
|
|
|
|
|
}
|
|
|
|
|
// end of RPC caching
|
|
|
|
|
|
|
|
|
|
var mtcRLs Resources
|
|
|
|
|
if mtcRLs, err = rS.matchingResourcesForEvent(&args.CGREvent, args.UsageID,
|
|
|
|
|
args.UsageTTL); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
mtcRLs.clearUsage(args.UsageID)
|
|
|
|
|
rS.lcERMux.Lock()
|
|
|
|
|
delete(rS.lcEventResources, args.UsageID)
|
|
|
|
|
rS.lcERMux.Unlock()
|
|
|
|
|
|
|
|
|
|
// Handle storing
|
|
|
|
|
if rS.storeInterval != -1 {
|
|
|
|
|
rS.srMux.Lock()
|
|
|
|
|
}
|
|
|
|
|
@@ -695,6 +717,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.ArgRSv1ResourceUsage, re
|
|
|
|
|
if rS.storeInterval != -1 {
|
|
|
|
|
rS.srMux.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*reply = utils.OK
|
|
|
|
|
return nil
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|