diff --git a/engine/resources.go b/engine/resources.go index ac8d05e0e..0c3baeae1 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -19,11 +19,13 @@ package engine import ( "fmt" + "math/rand" "reflect" "sort" "sync" "time" + "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" @@ -61,9 +63,9 @@ type Resource struct { ID string Usages map[string]*ResourceUsage TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled - rCfg *ResourceCfg // optional configuration attached tUsage *float64 // sum of all usages dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config + rCfg *ResourceCfg // for ordering purposes } // removeExpiredUnits removes units which are expired from the resource @@ -218,22 +220,93 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.Rp // ResourceService is the service handling resources type ResourceService struct { - dataDB DataDB // So we can load the data in cache and index it - statS rpcclient.RpcClientConnection - eventResources map[string][]string // map[ruID][]string{rID} for faster queries - erMux sync.RWMutex + dataDB DataDB // So we can load the data in cache and index it + statS rpcclient.RpcClientConnection + eventResources map[string][]string // map[ruID][]string{rID} for faster queries + erMux sync.RWMutex + storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool + srMux sync.RWMutex + stopBackup chan struct{} // control storing process + backupInterval time.Duration } // Called to start the service -func (rS *ResourceService) ListenAndServe() error { +func (rS *ResourceService) ListenAndServe(exitChan chan bool) error { + go rS.runBackup() // start backup loop + e := <-exitChan + exitChan <- e // put back for the others listening for shutdown request return nil } // Called to shutdown the service func (rS *ResourceService) ServiceShutdown() error { + utils.Logger.Info(" service shutdown initialized") + close(rS.stopBackup) + rS.storeResources() + utils.Logger.Info(" service shutdown complete") return nil } +// storeResource stores the necessary storedMetrics to dataDB +func (rS *ResourceService) StoreResource(r *Resource) (err error) { + if r.dirty == nil || !*r.dirty { + return + } + if err = rS.dataDB.SetResource(r); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" failed saving Resource with ID: %s, error: %s", + r.ID, err.Error())) + } else { + *r.dirty = false + } + return +} + +// storeResources executes one task of complete backup +func (rS *ResourceService) storeResources() { + var failedRIDs []string + for { + rS.srMux.Lock() + rID := rS.storedResources.GetOne() + if rID != "" { + delete(rS.storedResources, rID) + } + rS.srMux.Unlock() + if rID == "" { + break // no more keys, backup completed + } + if rIf, ok := cache.Get(utils.ResourcesPrefix + rID); !ok || rIf == nil { + utils.Logger.Warning(fmt.Sprintf(" failed retrieving from cache resource with ID: %s")) + } else if err := rS.StoreResource(rIf.(*Resource)); err != nil { + failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup + } + // randomize the CPU load and give up thread control + time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond) + } + if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup + rS.srMux.Lock() + for _, rID := range failedRIDs { + rS.storedResources[rID] = true + } + rS.srMux.Unlock() + } +} + +// backup will regularly store resources changed to dataDB +func (rS *ResourceService) runBackup() { + if rS.backupInterval <= 0 { + return + } + for { + select { + case <-rS.stopBackup: + return + } + rS.storeResources() + } + time.Sleep(rS.backupInterval) +} + // cachedResourcesForEvent attempts to retrieve cached resources for an event // returns nil if event not cached or errors occur func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) { @@ -295,6 +368,9 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) if err != nil { return nil, err } + if rCfg.Stored { + r.dirty = utils.BoolPointer(false) + } r.rCfg = rCfg matchingResources[rCfg.ID] = r // Cannot save it here since we could have errors after and resource will remain unused } @@ -362,9 +438,21 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r if err != nil { return err } + // index it for matching out of cache rS.erMux.Lock() rS.eventResources[args.UsageID] = mtcRLs.ids() rS.erMux.Unlock() + // index it for storing + rS.srMux.Lock() + for _, r := range mtcRLs { + if rS.backupInterval == -1 { + rS.StoreResource(r) + } else if r.dirty != nil { + *r.dirty = true // mark it to be saved + } + rS.storedResources[r.ID] = true + } + rS.srMux.Unlock() *reply = alcMsg return } @@ -381,6 +469,22 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re rS.erMux.Lock() delete(rS.eventResources, args.UsageID) rS.erMux.Unlock() + if rS.backupInterval != -1 { + rS.srMux.Lock() + } + for _, r := range mtcRLs { + if r.dirty != nil { + if rS.backupInterval == -1 { + rS.StoreResource(r) + } else { + *r.dirty = true // mark it to be saved + rS.storedResources[r.ID] = true + } + } + } + if rS.backupInterval != -1 { + rS.srMux.Unlock() + } *reply = utils.OK return nil }