ResourceS locking

This commit is contained in:
DanB
2017-09-03 14:08:06 +02:00
parent 82da7dfe6b
commit df38ed3da9
4 changed files with 107 additions and 81 deletions

View File

@@ -320,7 +320,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie
exitChan <- true
}
func startSmKamailio(internalRaterChan, internalCDRSChan, internalRLsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS SMKamailio service.")
var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool
if len(cfg.SmKamConfig.RALsConns) != 0 {
@@ -343,7 +343,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan, internalRLsChan chan r
}
if len(cfg.SmKamConfig.RLsConns) != 0 {
rlSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.RLsConns, internalRLsChan, cfg.InternalTtl)
cfg.SmKamConfig.RLsConns, internalRsChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RLsConns: %s", err.Error()))
exitChan <- true
@@ -527,33 +527,33 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, data
internalUserSChan <- userServer
}
func startResourceLimiterService(internalRLSChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, server *utils.Server, exitChan chan bool) {
var statsConn *rpcclient.RpcClientPool
if len(cfg.ResourceLimiterCfg().StatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.ResourceLimiterCfg().StatSConns, internalStatSConn, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RLs> Could not connect to StatS: %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
return
}
}
rls, err := engine.NewResourceService(cfg, dataDB, statsConn)
rS, err := engine.NewResourceService(cfg, dataDB, statsConn)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RLs> Could not init, error: %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting ResourceLimiter service"))
if err := rls.ListenAndServe(); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RLs> Could not start, error: %s", err.Error()))
utils.Logger.Info(fmt.Sprintf("Starting Resource Service"))
if err := rS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not start, error: %s", err.Error()))
exitChan <- true
return
}
rsV1 := v1.NewResourceSV1(rls)
rsV1 := v1.NewResourceSV1(rS)
server.RpcRegister(rsV1)
internalRLSChan <- rsV1
internalRsChan <- rsV1
}
// startStatService fires up the StatS
@@ -581,7 +581,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRLsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
internalAliaseSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -599,8 +599,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalAliaseSChan <- aliases
case smg := <-internalSMGChan:
internalSMGChan <- smg
case rls := <-internalRLsChan:
internalRLsChan <- rls
case rls := <-internalRsChan:
internalRsChan <- rls
case statS := <-internalStatSChan:
internalStatSChan <- statS
}
@@ -755,7 +755,7 @@ func main() {
internalUserSChan := make(chan rpcclient.RpcClientConnection, 1)
internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSMGChan := make(chan *sessionmanager.SMGeneric, 1)
internalRLSChan := make(chan rpcclient.RpcClientConnection, 1)
internalRsChan := make(chan rpcclient.RpcClientConnection, 1)
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
@@ -794,14 +794,14 @@ func main() {
}
// Start SM-FreeSWITCH
if cfg.SmFsConfig.Enabled {
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan)
go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler(exitChan)
}
// Start SM-Kamailio
if cfg.SmKamConfig.Enabled {
go startSmKamailio(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan)
go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan)
}
// Start SM-OpenSIPS
@@ -849,7 +849,7 @@ func main() {
// Start RL service
if cfg.ResourceLimiterCfg().Enabled {
go startResourceLimiterService(internalRLSChan,
go startResourceService(internalRsChan,
internalStatSChan, cfg, dataDB, server, exitChan)
}
@@ -859,7 +859,7 @@ func main() {
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalStatSChan, internalSMGChan)
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan)
<-exitChan
if *pidFile != "" {

View File

@@ -27,10 +27,15 @@ import (
"github.com/cgrates/cgrates/cache"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// ResourceCfg represents the user configuration for the resource
type ResourceCfg struct {
ID string // identifier of this resource
@@ -189,6 +194,9 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
if len(rs) == 0 {
return utils.META_NONE, nil
}
lockIDs := utils.PrefixSliceItems(rs.ids(), utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
// Simulate resource usage
for _, r := range rs {
if r.rCfg.Limit >= r.totalUsage()+ru.Units {
@@ -220,6 +228,7 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.Rp
// ResourceService is the service handling resources
type ResourceService struct {
cfg *config.CGRConfig
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection
eventResources map[string][]string // map[ruID][]string{rID} for faster queries
@@ -247,7 +256,7 @@ func (rS *ResourceService) ServiceShutdown() error {
return nil
}
// storeResource stores the necessary storedMetrics to dataDB
// StoreResource stores the resource in DB and corrects dirty flag
func (rS *ResourceService) StoreResource(r *Resource) (err error) {
if r.dirty == nil || !*r.dirty {
return
@@ -262,10 +271,10 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) {
return
}
// storeResources executes one task of complete backup
// storeResources represents one task of complete backup
func (rS *ResourceService) storeResources() {
var failedRIDs []string
for {
for { // don't stop untill we store all dirty resources
rS.srMux.Lock()
rID := rS.storedResources.GetOne()
if rID != "" {
@@ -316,6 +325,9 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
if !has {
return nil
}
lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
rs = make(Resources, len(rIDs))
for i, rID := range rIDs {
if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil {
@@ -336,11 +348,14 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
// matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call
func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) (rs Resources, err error) {
matchingResources := make(map[string]*Resource)
rlIDs, err := matchingItemIDsForEvent(ev, rS.dataDB, utils.ResourcesIndex)
rIDs, err := matchingItemIDsForEvent(ev, rS.dataDB, utils.ResourcesIndex)
if err != nil {
return nil, err
}
for resName := range rlIDs {
lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for resName := range rIDs {
rCfg, err := rS.dataDB.GetResourceCfg(resName, false, utils.NonTransactional)
if err != nil {
if err == utils.ErrNotFound {
@@ -391,7 +406,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
return
}
// V1ResourcesForEvent returns active resource limits matching the event
// V1ResourcesForEvent returns active resource configs matching the event
func (rS *ResourceService) V1ResourcesForEvent(ev map[string]interface{}, reply *[]*ResourceCfg) error {
matchingRLForEv, err := rS.matchingResourcesForEvent(ev)
if err != nil {

View File

@@ -25,13 +25,13 @@ import (
)
var (
r, r2 *Resource
ru, ru2, ru3 *ResourceUsage
rs Resources
r1, r2 *Resource
ru1, ru2, ru3 *ResourceUsage
rs Resources
)
func TestResourceRecordUsage(t *testing.T) {
ru = &ResourceUsage{
func TestRSRecordUsage(t *testing.T) {
ru1 = &ResourceUsage{
ID: "RU1",
ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC),
Units: 2,
@@ -43,7 +43,8 @@ func TestResourceRecordUsage(t *testing.T) {
Units: 2,
}
r = &Resource{
r1 = &Resource{
ID: "RL1",
rCfg: &ResourceCfg{
ID: "RL1",
Filters: []*RequestFilter{
@@ -70,73 +71,60 @@ func TestResourceRecordUsage(t *testing.T) {
AllocationMessage: "ALLOC",
},
Usages: map[string]*ResourceUsage{
ru.ID: ru,
ru1.ID: ru1,
},
TTLIdx: []string{ru.ID},
TTLIdx: []string{ru1.ID},
tUsage: utils.Float64Pointer(2),
}
if err := r.recordUsage(ru2); err != nil {
if err := r1.recordUsage(ru2); err != nil {
t.Error(err.Error())
} else {
if err := r.recordUsage(ru); err == nil {
if err := r1.recordUsage(ru1); err == nil {
t.Error("Duplicate ResourceUsage id should not be allowed")
}
if _, found := r.Usages[ru2.ID]; !found {
if _, found := r1.Usages[ru2.ID]; !found {
t.Error("ResourceUsage was not recorded")
}
if *r.tUsage != 4 {
t.Errorf("Expecting: %+v, received: %+v", 4, r.tUsage)
if *r1.tUsage != 4 {
t.Errorf("Expecting: %+v, received: %+v", 4, r1.tUsage)
}
}
}
func TestRLClearUsage(t *testing.T) {
r.Usages = map[string]*ResourceUsage{
ru.ID: ru,
func TestRSRemoveExpiredUnits(t *testing.T) {
r1.Usages = map[string]*ResourceUsage{
ru1.ID: ru1,
}
*r.tUsage = 3
r.clearUsage(ru.ID)
if len(r.Usages) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages))
*r1.tUsage = 2
r1.removeExpiredUnits()
if len(r1.Usages) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.Usages))
}
if *r.tUsage != 1 {
t.Errorf("Expecting: %+v, received: %+v", 1, r.tUsage)
if len(r1.TTLIdx) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.TTLIdx))
}
if *r1.tUsage != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, r1.tUsage)
}
}
func TestRLRemoveExpiredUnits(t *testing.T) {
r.Usages = map[string]*ResourceUsage{
ru.ID: ru,
func TestRSUsedUnits(t *testing.T) {
r1.Usages = map[string]*ResourceUsage{
ru1.ID: ru1,
}
*r.tUsage = 2
r.removeExpiredUnits()
if len(r.Usages) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages))
}
if len(r.TTLIdx) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r.TTLIdx))
}
if *r.tUsage != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, r.tUsage)
}
}
func TestRLUsedUnits(t *testing.T) {
r.Usages = map[string]*ResourceUsage{
ru.ID: ru,
}
*r.tUsage = 2
if usedUnits := r.totalUsage(); usedUnits != 2 {
r1.tUsage = nil
if usedUnits := r1.totalUsage(); usedUnits != 2 {
t.Errorf("Expecting: %+v, received: %+v", 2, usedUnits)
}
}
func TestRsort(t *testing.T) {
func TestRSRsort(t *testing.T) {
r2 = &Resource{
ID: "RL2",
rCfg: &ResourceCfg{
ID: "RL2",
Filters: []*RequestFilter{
@@ -168,7 +156,7 @@ func TestRsort(t *testing.T) {
tUsage: utils.Float64Pointer(2),
}
rs = Resources{r2, r}
rs = Resources{r2, r1}
rs.Sort()
if rs[0].rCfg.ID != "RL1" {
@@ -176,7 +164,18 @@ func TestRsort(t *testing.T) {
}
}
func TestRsClearUsage(t *testing.T) {
func TestRSClearUsage(t *testing.T) {
r1.Usages = map[string]*ResourceUsage{
ru1.ID: ru1,
}
r1.tUsage = nil
r1.clearUsage(ru1.ID)
if len(r1.Usages) != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.Usages))
}
if r1.totalUsage() != 0 {
t.Errorf("Expecting: %+v, received: %+v", 0, r1.tUsage)
}
if err := r2.clearUsage(ru2.ID); err != nil {
t.Error(err)
} else if len(r2.Usages) != 0 {
@@ -186,22 +185,26 @@ func TestRsClearUsage(t *testing.T) {
}
}
func TestRsRecordUsages(t *testing.T) {
if err := rs.recordUsage(ru); err == nil {
func TestRSRecordUsages(t *testing.T) {
r1.Usages = map[string]*ResourceUsage{
ru1.ID: ru1,
}
r1.tUsage = nil
if err := rs.recordUsage(ru1); err == nil {
t.Error("should get duplicated error")
}
}
func TestRsAllocateResource(t *testing.T) {
rs.clearUsage(ru.ID)
func TestRSAllocateResource(t *testing.T) {
rs.clearUsage(ru1.ID)
rs.clearUsage(ru2.ID)
rs[0].rCfg.UsageTTL = time.Duration(20 * time.Second)
rs[1].rCfg.UsageTTL = time.Duration(20 * time.Second)
//ru.ExpiryTime = time.Now()
//ru1.ExpiryTime = time.Now()
//ru2.Time = time.Now()
if alcMessage, err := rs.AllocateResource(ru, false); err != nil {
if alcMessage, err := rs.AllocateResource(ru1, false); err != nil {
t.Error(err.Error())
} else {
if alcMessage != "ALLOC" {
@@ -216,7 +219,7 @@ func TestRsAllocateResource(t *testing.T) {
rs[0].rCfg.Limit = 2
rs[1].rCfg.Limit = 4
if alcMessage, err := rs.AllocateResource(ru, true); err != nil {
if alcMessage, err := rs.AllocateResource(ru1, true); err != nil {
t.Error(err.Error())
} else {
if alcMessage != "RL2" {

View File

@@ -66,3 +66,11 @@ func AvgNegative(values []float64) float64 {
}
return Avg(values)
}
func PrefixSliceItems(slc []string, prfx string) (out []string) {
out = make([]string, len(slc))
for i, itm := range slc {
out[i] = prfx + itm
}
return
}