mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 23:28:44 +05:00
Cache with -1 for unlimited, ResourceS with short and long caching
This commit is contained in:
@@ -35,8 +35,7 @@ func TestHistoryRatinPlans(t *testing.T) {
|
||||
func TestHistoryDestinations(t *testing.T) {
|
||||
scribe := historyScribe.(*history.MockScribe)
|
||||
buf := scribe.GetBuffer(history.DESTINATIONS_FN)
|
||||
expected := `{"Id":"*ddc_test","Prefixes":["333","666"]},
|
||||
{"Id":"ALL","Prefixes":["49","41","43"]},
|
||||
expected := `{"Id":"ALL","Prefixes":["49","41","43"]},
|
||||
{"Id":"DST_TCDDBSWF","Prefixes":["1716"]},
|
||||
{"Id":"DST_UK_Mobile_BIG5","Prefixes":["447956"]},
|
||||
{"Id":"EU_LANDLINE","Prefixes":["444"]},
|
||||
|
||||
@@ -197,22 +197,22 @@ func (le *LCREntry) GetQOSLimits() (minASR, maxASR float64, minPDD, maxPDD, minA
|
||||
if maxASR, err = strconv.ParseFloat(params[1], 64); err != nil {
|
||||
maxASR = -1
|
||||
}
|
||||
if minPDD, err = utils.ParseDurationWithSecs(params[2]); err != nil {
|
||||
if minPDD, err = utils.ParseDurationWithSecs(params[2]); err != nil || params[2] == "" {
|
||||
minPDD = -1
|
||||
}
|
||||
if maxPDD, err = utils.ParseDurationWithSecs(params[3]); err != nil {
|
||||
if maxPDD, err = utils.ParseDurationWithSecs(params[3]); err != nil || params[3] == "" {
|
||||
maxPDD = -1
|
||||
}
|
||||
if minACD, err = utils.ParseDurationWithSecs(params[4]); err != nil {
|
||||
if minACD, err = utils.ParseDurationWithSecs(params[4]); err != nil || params[4] == "" {
|
||||
minACD = -1
|
||||
}
|
||||
if maxACD, err = utils.ParseDurationWithSecs(params[5]); err != nil {
|
||||
if maxACD, err = utils.ParseDurationWithSecs(params[5]); err != nil || params[5] == "" {
|
||||
maxACD = -1
|
||||
}
|
||||
if minTCD, err = utils.ParseDurationWithSecs(params[6]); err != nil {
|
||||
if minTCD, err = utils.ParseDurationWithSecs(params[6]); err != nil || params[6] == "" {
|
||||
minTCD = -1
|
||||
}
|
||||
if maxTCD, err = utils.ParseDurationWithSecs(params[7]); err != nil {
|
||||
if maxTCD, err = utils.ParseDurationWithSecs(params[7]); err != nil || params[7] == "" {
|
||||
maxTCD = -1
|
||||
}
|
||||
if minACC, err = strconv.ParseFloat(params[8], 64); err != nil {
|
||||
|
||||
@@ -119,7 +119,8 @@ func TestLcrGetQosLimitsSome(t *testing.T) {
|
||||
minAcc != 1 || maxAcc != -1 ||
|
||||
minTcc != 3 || maxTcc != -1 ||
|
||||
minDdc != -1 || maxDdc != 2 {
|
||||
t.Error("Wrong qos limits parsed: ", minAsr, maxAsr, minAcd, maxAcd, minTcd, maxTcd, minTcc, maxTcc, minDdc, maxDdc)
|
||||
t.Errorf("Wrong qos limits parsed: <%v>, <%v>, <%v>, <%v>, <%v>, <%v>, <%v>, <%v>, <%v>, <%v>",
|
||||
minAsr, maxAsr, minAcd, maxAcd, minTcd, maxTcd, minTcc, maxTcc, minDdc, maxDdc)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/guardian"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/ltcache"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
@@ -223,20 +224,22 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.Rp
|
||||
if statS != nil && reflect.ValueOf(statS).IsNil() {
|
||||
statS = nil
|
||||
}
|
||||
return &ResourceService{dataDB: dataDB, statS: statS}, nil
|
||||
return &ResourceService{dataDB: dataDB, statS: statS,
|
||||
scEventResources: ltcache.New(ltcache.UnlimitedCaching, time.Duration(1)*time.Minute, false, nil),
|
||||
lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil)}, nil
|
||||
}
|
||||
|
||||
// ResourceService is the service handling resources
|
||||
type ResourceService struct {
|
||||
cfg *config.CGRConfig
|
||||
dataDB DataDB // So we can load the data in cache and index it
|
||||
statS rpcclient.RpcClientConnection
|
||||
eventResources map[string][]string // map[ruID][]string{rID} for faster queries
|
||||
erMux sync.RWMutex
|
||||
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
|
||||
srMux sync.RWMutex
|
||||
stopBackup chan struct{} // control storing process
|
||||
backupInterval time.Duration
|
||||
cfg *config.CGRConfig
|
||||
dataDB DataDB // So we can load the data in cache and index it
|
||||
statS rpcclient.RpcClientConnection // allows applying filters based on stats
|
||||
scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries
|
||||
lcEventResources *ltcache.Cache // cache recording resources for events in alocation phase
|
||||
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
|
||||
srMux sync.RWMutex
|
||||
stopBackup chan struct{} // control storing process
|
||||
backupInterval time.Duration
|
||||
}
|
||||
|
||||
// Called to start the service
|
||||
@@ -318,25 +321,39 @@ func (rS *ResourceService) runBackup() {
|
||||
|
||||
// cachedResourcesForEvent attempts to retrieve cached resources for an event
|
||||
// returns nil if event not cached or errors occur
|
||||
// returns []Resource if negative reply was cached
|
||||
func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) {
|
||||
rS.erMux.RLock()
|
||||
rIDs, has := rS.eventResources[evUUID]
|
||||
rS.erMux.RUnlock()
|
||||
var shortCached bool
|
||||
rIDsIf, has := rS.lcEventResources.Get(evUUID)
|
||||
if !has {
|
||||
return nil
|
||||
if rIDsIf, has = rS.scEventResources.Get(evUUID); !has {
|
||||
return nil
|
||||
}
|
||||
shortCached = true
|
||||
}
|
||||
var rIDs []string
|
||||
if rIDsIf != nil {
|
||||
rIDs = rIDsIf.([]string)
|
||||
}
|
||||
rs = make(Resources, len(rIDs))
|
||||
if len(rIDs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix)
|
||||
guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...)
|
||||
defer guardian.Guardian.UnguardIDs(lockIDs...)
|
||||
rs = make(Resources, len(rIDs))
|
||||
for i, rID := range rIDs {
|
||||
if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<ResourceS> force-uncaching resources for evUUID: <%s>, error: <%s>",
|
||||
evUUID, err.Error()))
|
||||
rS.erMux.Lock()
|
||||
delete(rS.eventResources, evUUID)
|
||||
rS.erMux.Unlock()
|
||||
// on errors, cleanup cache so we recache
|
||||
if shortCached {
|
||||
rS.scEventResources.Remove(evUUID)
|
||||
} else {
|
||||
rS.lcEventResources.Remove(evUUID)
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
rs[i] = r
|
||||
@@ -410,7 +427,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
|
||||
func (rS *ResourceService) V1ResourcesForEvent(ev map[string]interface{}, reply *[]*ResourceCfg) error {
|
||||
matchingRLForEv, err := rS.matchingResourcesForEvent(ev)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
return err
|
||||
}
|
||||
if len(matchingRLForEv) == 0 {
|
||||
return utils.ErrNotFound
|
||||
@@ -428,11 +445,13 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow *
|
||||
if mtcRLs, err = rS.matchingResourcesForEvent(args.Event); err != nil {
|
||||
return err
|
||||
}
|
||||
rS.scEventResources.Set(args.UsageID, mtcRLs.ids())
|
||||
}
|
||||
if _, err = mtcRLs.AllocateResource(
|
||||
&ResourceUsage{ID: args.UsageID,
|
||||
Units: args.Units}, true); err != nil {
|
||||
if err == utils.ErrResourceUnavailable {
|
||||
rS.scEventResources.Set(args.UsageID, nil)
|
||||
return // not error but still not allowed
|
||||
}
|
||||
return err
|
||||
@@ -443,20 +462,33 @@ func (rS *ResourceService) V1AllowUsage(args utils.AttrRLsResourceUsage, allow *
|
||||
|
||||
// V1AllocateResource is called when a resource requires allocation
|
||||
func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, reply *string) (err error) {
|
||||
var wasCached bool
|
||||
mtcRLs := rS.cachedResourcesForEvent(args.UsageID)
|
||||
if mtcRLs == nil {
|
||||
if mtcRLs, err = rS.matchingResourcesForEvent(args.Event); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
wasCached = true
|
||||
}
|
||||
alcMsg, err := mtcRLs.AllocateResource(&ResourceUsage{ID: args.UsageID, Units: args.Units}, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// index it for matching out of cache
|
||||
rS.erMux.Lock()
|
||||
rS.eventResources[args.UsageID] = mtcRLs.ids()
|
||||
rS.erMux.Unlock()
|
||||
var wasShortCached bool
|
||||
if wasCached {
|
||||
if _, has := rS.scEventResources.Get(args.UsageID); has {
|
||||
// remove from short cache to populate event cache
|
||||
wasShortCached = true
|
||||
rS.scEventResources.Remove(args.UsageID)
|
||||
}
|
||||
}
|
||||
if wasShortCached || !wasCached {
|
||||
rS.lcEventResources.Set(args.UsageID, mtcRLs.ids())
|
||||
}
|
||||
|
||||
// index it for storing
|
||||
rS.srMux.Lock()
|
||||
for _, r := range mtcRLs {
|
||||
@@ -481,9 +513,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re
|
||||
}
|
||||
}
|
||||
mtcRLs.clearUsage(args.UsageID)
|
||||
rS.erMux.Lock()
|
||||
delete(rS.eventResources, args.UsageID)
|
||||
rS.erMux.Unlock()
|
||||
rS.lcEventResources.Remove(args.UsageID)
|
||||
if rS.backupInterval != -1 {
|
||||
rS.srMux.Lock()
|
||||
}
|
||||
|
||||
@@ -71,7 +71,8 @@ func (s storage) smembers(key string, ms Marshaler) (idMap utils.StringMap, ok b
|
||||
}
|
||||
|
||||
func NewMapStorage() (*MapStorage, error) {
|
||||
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(), cacheCfg: config.CgrConfig().CacheConfig}, nil
|
||||
return &MapStorage{dict: make(map[string][]byte), ms: NewCodecMsgpackMarshaler(),
|
||||
cacheCfg: config.CgrConfig().CacheConfig}, nil
|
||||
}
|
||||
|
||||
func NewMapStorageJson() (mpStorage *MapStorage, err error) {
|
||||
@@ -247,7 +248,7 @@ func (ms *MapStorage) CacheDataFromDB(prefix string, IDs []string, mustBeCached
|
||||
if cCfg, has := ms.cacheCfg[utils.CachePrefixToInstance[prefix]]; has {
|
||||
nrItems = cCfg.Limit
|
||||
}
|
||||
if nrItems != 0 && nrItems < len(IDs) {
|
||||
if nrItems > 0 && nrItems < len(IDs) {
|
||||
IDs = IDs[:nrItems]
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user