ResourceS with backup mechanism

This commit is contained in:
DanB
2017-09-02 21:39:35 +02:00
parent 2d2614d51c
commit 82da7dfe6b

View File

@@ -19,11 +19,13 @@ package engine
import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
"time"
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
@@ -61,9 +63,9 @@ type Resource struct {
ID string
Usages map[string]*ResourceUsage
TTLIdx []string // holds ordered list of ResourceIDs based on their TTL, empty if feature is disabled
rCfg *ResourceCfg // optional configuration attached
tUsage *float64 // sum of all usages
dirty *bool // the usages were modified, needs save, *bool so we only save if enabled in config
rCfg *ResourceCfg // for ordering purposes
}
// removeExpiredUnits removes units which are expired from the resource
@@ -218,22 +220,93 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.Rp
// ResourceService is the service handling resources
type ResourceService struct {
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection
eventResources map[string][]string // map[ruID][]string{rID} for faster queries
erMux sync.RWMutex
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection
eventResources map[string][]string // map[ruID][]string{rID} for faster queries
erMux sync.RWMutex
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
srMux sync.RWMutex
stopBackup chan struct{} // control storing process
backupInterval time.Duration
}
// Called to start the service
func (rS *ResourceService) ListenAndServe() error {
func (rS *ResourceService) ListenAndServe(exitChan chan bool) error {
go rS.runBackup() // start backup loop
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
}
// Called to shutdown the service
func (rS *ResourceService) ServiceShutdown() error {
utils.Logger.Info("<ResourceS> service shutdown initialized")
close(rS.stopBackup)
rS.storeResources()
utils.Logger.Info("<ResourceS> service shutdown complete")
return nil
}
// storeResource stores the necessary storedMetrics to dataDB
func (rS *ResourceService) StoreResource(r *Resource) (err error) {
if r.dirty == nil || !*r.dirty {
return
}
if err = rS.dataDB.SetResource(r); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<ResourceS> failed saving Resource with ID: %s, error: %s",
r.ID, err.Error()))
} else {
*r.dirty = false
}
return
}
// storeResources executes one task of complete backup
func (rS *ResourceService) storeResources() {
var failedRIDs []string
for {
rS.srMux.Lock()
rID := rS.storedResources.GetOne()
if rID != "" {
delete(rS.storedResources, rID)
}
rS.srMux.Unlock()
if rID == "" {
break // no more keys, backup completed
}
if rIf, ok := cache.Get(utils.ResourcesPrefix + rID); !ok || rIf == nil {
utils.Logger.Warning(fmt.Sprintf("<ResourceS> failed retrieving from cache resource with ID: %s"))
} else if err := rS.StoreResource(rIf.(*Resource)); err != nil {
failedRIDs = append(failedRIDs, rID) // record failure so we can schedule it for next backup
}
// randomize the CPU load and give up thread control
time.Sleep(time.Duration(rand.Intn(1000)) * time.Nanosecond)
}
if len(failedRIDs) != 0 { // there were errors on save, schedule the keys for next backup
rS.srMux.Lock()
for _, rID := range failedRIDs {
rS.storedResources[rID] = true
}
rS.srMux.Unlock()
}
}
// backup will regularly store resources changed to dataDB
func (rS *ResourceService) runBackup() {
if rS.backupInterval <= 0 {
return
}
for {
select {
case <-rS.stopBackup:
return
}
rS.storeResources()
}
time.Sleep(rS.backupInterval)
}
// cachedResourcesForEvent attempts to retrieve cached resources for an event
// returns nil if event not cached or errors occur
func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) {
@@ -295,6 +368,9 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
if err != nil {
return nil, err
}
if rCfg.Stored {
r.dirty = utils.BoolPointer(false)
}
r.rCfg = rCfg
matchingResources[rCfg.ID] = r // Cannot save it here since we could have errors after and resource will remain unused
}
@@ -362,9 +438,21 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r
if err != nil {
return err
}
// index it for matching out of cache
rS.erMux.Lock()
rS.eventResources[args.UsageID] = mtcRLs.ids()
rS.erMux.Unlock()
// index it for storing
rS.srMux.Lock()
for _, r := range mtcRLs {
if rS.backupInterval == -1 {
rS.StoreResource(r)
} else if r.dirty != nil {
*r.dirty = true // mark it to be saved
}
rS.storedResources[r.ID] = true
}
rS.srMux.Unlock()
*reply = alcMsg
return
}
@@ -381,6 +469,22 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re
rS.erMux.Lock()
delete(rS.eventResources, args.UsageID)
rS.erMux.Unlock()
if rS.backupInterval != -1 {
rS.srMux.Lock()
}
for _, r := range mtcRLs {
if r.dirty != nil {
if rS.backupInterval == -1 {
rS.StoreResource(r)
} else {
*r.dirty = true // mark it to be saved
rS.storedResources[r.ID] = true
}
}
}
if rS.backupInterval != -1 {
rS.srMux.Unlock()
}
*reply = utils.OK
return nil
}