diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 358254e02..3c3c646ed 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -320,7 +320,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie exitChan <- true } -func startSmKamailio(internalRaterChan, internalCDRSChan, internalRLsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { +func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SMKamailio service.") var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool if len(cfg.SmKamConfig.RALsConns) != 0 { @@ -343,7 +343,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan, internalRLsChan chan r } if len(cfg.SmKamConfig.RLsConns) != 0 { rlSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.SmKamConfig.RLsConns, internalRLsChan, cfg.InternalTtl) + cfg.SmKamConfig.RLsConns, internalRsChan, cfg.InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RLsConns: %s", err.Error())) exitChan <- true @@ -527,33 +527,33 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, data internalUserSChan <- userServer } -func startResourceLimiterService(internalRLSChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, +func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dataDB engine.DataDB, server *utils.Server, exitChan chan bool) { var statsConn *rpcclient.RpcClientPool if len(cfg.ResourceLimiterCfg().StatSConns) != 0 { // Stats connection init statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.ResourceLimiterCfg().StatSConns, internalStatSConn, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) exitChan <- true return } } - rls, err := engine.NewResourceService(cfg, dataDB, statsConn) + rS, err := engine.NewResourceService(cfg, dataDB, statsConn) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf(" Could not init, error: %s", err.Error())) exitChan <- true return } - utils.Logger.Info(fmt.Sprintf("Starting ResourceLimiter service")) - if err := rls.ListenAndServe(); err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) + utils.Logger.Info(fmt.Sprintf("Starting Resource Service")) + if err := rS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not start, error: %s", err.Error())) exitChan <- true return } - rsV1 := v1.NewResourceSV1(rls) + rsV1 := v1.NewResourceSV1(rS) server.RpcRegister(rsV1) - internalRLSChan <- rsV1 + internalRsChan <- rsV1 } // startStatService fires up the StatS @@ -581,7 +581,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cfg func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, internalPubSubSChan, internalUserSChan, - internalAliaseSChan, internalRLsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) { + internalAliaseSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, internalSMGChan chan *sessionmanager.SMGeneric) { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -599,8 +599,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalAliaseSChan <- aliases case smg := <-internalSMGChan: internalSMGChan <- smg - case rls := <-internalRLsChan: - internalRLsChan <- rls + case rls := <-internalRsChan: + internalRsChan <- rls case statS := <-internalStatSChan: internalStatSChan <- statS } @@ -755,7 +755,7 @@ func main() { internalUserSChan := make(chan rpcclient.RpcClientConnection, 1) internalAliaseSChan := make(chan rpcclient.RpcClientConnection, 1) internalSMGChan := make(chan *sessionmanager.SMGeneric, 1) - internalRLSChan := make(chan rpcclient.RpcClientConnection, 1) + internalRsChan := make(chan rpcclient.RpcClientConnection, 1) internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // Start ServiceManager @@ -794,14 +794,14 @@ func main() { } // Start SM-FreeSWITCH if cfg.SmFsConfig.Enabled { - go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan) + go startSmFreeSWITCH(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan) // close all sessions on shutdown go shutdownSessionmanagerSingnalHandler(exitChan) } // Start SM-Kamailio if cfg.SmKamConfig.Enabled { - go startSmKamailio(internalRaterChan, internalCdrSChan, internalRLSChan, cdrDb, exitChan) + go startSmKamailio(internalRaterChan, internalCdrSChan, internalRsChan, cdrDb, exitChan) } // Start SM-OpenSIPS @@ -849,7 +849,7 @@ func main() { // Start RL service if cfg.ResourceLimiterCfg().Enabled { - go startResourceLimiterService(internalRLSChan, + go startResourceService(internalRsChan, internalStatSChan, cfg, dataDB, server, exitChan) } @@ -859,7 +859,7 @@ func main() { // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalHistorySChan, - internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRLSChan, internalStatSChan, internalSMGChan) + internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan) <-exitChan if *pidFile != "" { diff --git a/engine/resources.go b/engine/resources.go index 0c3baeae1..5a9f86e80 100755 --- a/engine/resources.go +++ b/engine/resources.go @@ -27,10 +27,15 @@ import ( "github.com/cgrates/cgrates/cache" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + // ResourceCfg represents the user configuration for the resource type ResourceCfg struct { ID string // identifier of this resource @@ -189,6 +194,9 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage if len(rs) == 0 { return utils.META_NONE, nil } + lockIDs := utils.PrefixSliceItems(rs.ids(), utils.ResourcesPrefix) + guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...) + defer guardian.Guardian.UnguardIDs(lockIDs...) // Simulate resource usage for _, r := range rs { if r.rCfg.Limit >= r.totalUsage()+ru.Units { @@ -220,6 +228,7 @@ func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.Rp // ResourceService is the service handling resources type ResourceService struct { + cfg *config.CGRConfig dataDB DataDB // So we can load the data in cache and index it statS rpcclient.RpcClientConnection eventResources map[string][]string // map[ruID][]string{rID} for faster queries @@ -247,7 +256,7 @@ func (rS *ResourceService) ServiceShutdown() error { return nil } -// storeResource stores the necessary storedMetrics to dataDB +// StoreResource stores the resource in DB and corrects dirty flag func (rS *ResourceService) StoreResource(r *Resource) (err error) { if r.dirty == nil || !*r.dirty { return @@ -262,10 +271,10 @@ func (rS *ResourceService) StoreResource(r *Resource) (err error) { return } -// storeResources executes one task of complete backup +// storeResources represents one task of complete backup func (rS *ResourceService) storeResources() { var failedRIDs []string - for { + for { // don't stop untill we store all dirty resources rS.srMux.Lock() rID := rS.storedResources.GetOne() if rID != "" { @@ -316,6 +325,9 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) if !has { return nil } + lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix) + guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + defer guardian.Guardian.UnguardIDs(lockIDs...) rs = make(Resources, len(rIDs)) for i, rID := range rIDs { if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil { @@ -336,11 +348,14 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources) // matchingResourcesForEvent returns ordered list of matching resources which are active by the time of the call func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) (rs Resources, err error) { matchingResources := make(map[string]*Resource) - rlIDs, err := matchingItemIDsForEvent(ev, rS.dataDB, utils.ResourcesIndex) + rIDs, err := matchingItemIDsForEvent(ev, rS.dataDB, utils.ResourcesIndex) if err != nil { return nil, err } - for resName := range rlIDs { + lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourcesPrefix) + guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...) + defer guardian.Guardian.UnguardIDs(lockIDs...) + for resName := range rIDs { rCfg, err := rS.dataDB.GetResourceCfg(resName, false, utils.NonTransactional) if err != nil { if err == utils.ErrNotFound { @@ -391,7 +406,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{}) return } -// V1ResourcesForEvent returns active resource limits matching the event +// V1ResourcesForEvent returns active resource configs matching the event func (rS *ResourceService) V1ResourcesForEvent(ev map[string]interface{}, reply *[]*ResourceCfg) error { matchingRLForEv, err := rS.matchingResourcesForEvent(ev) if err != nil { diff --git a/engine/resources_test.go b/engine/resources_test.go index 74f8cbba6..7aaffb314 100644 --- a/engine/resources_test.go +++ b/engine/resources_test.go @@ -25,13 +25,13 @@ import ( ) var ( - r, r2 *Resource - ru, ru2, ru3 *ResourceUsage - rs Resources + r1, r2 *Resource + ru1, ru2, ru3 *ResourceUsage + rs Resources ) -func TestResourceRecordUsage(t *testing.T) { - ru = &ResourceUsage{ +func TestRSRecordUsage(t *testing.T) { + ru1 = &ResourceUsage{ ID: "RU1", ExpiryTime: time.Date(2014, 7, 3, 13, 43, 0, 1, time.UTC), Units: 2, @@ -43,7 +43,8 @@ func TestResourceRecordUsage(t *testing.T) { Units: 2, } - r = &Resource{ + r1 = &Resource{ + ID: "RL1", rCfg: &ResourceCfg{ ID: "RL1", Filters: []*RequestFilter{ @@ -70,73 +71,60 @@ func TestResourceRecordUsage(t *testing.T) { AllocationMessage: "ALLOC", }, Usages: map[string]*ResourceUsage{ - ru.ID: ru, + ru1.ID: ru1, }, - TTLIdx: []string{ru.ID}, + TTLIdx: []string{ru1.ID}, tUsage: utils.Float64Pointer(2), } - if err := r.recordUsage(ru2); err != nil { + if err := r1.recordUsage(ru2); err != nil { t.Error(err.Error()) } else { - if err := r.recordUsage(ru); err == nil { + if err := r1.recordUsage(ru1); err == nil { t.Error("Duplicate ResourceUsage id should not be allowed") } - if _, found := r.Usages[ru2.ID]; !found { + if _, found := r1.Usages[ru2.ID]; !found { t.Error("ResourceUsage was not recorded") } - if *r.tUsage != 4 { - t.Errorf("Expecting: %+v, received: %+v", 4, r.tUsage) + if *r1.tUsage != 4 { + t.Errorf("Expecting: %+v, received: %+v", 4, r1.tUsage) } } } -func TestRLClearUsage(t *testing.T) { - r.Usages = map[string]*ResourceUsage{ - ru.ID: ru, +func TestRSRemoveExpiredUnits(t *testing.T) { + r1.Usages = map[string]*ResourceUsage{ + ru1.ID: ru1, } - *r.tUsage = 3 - r.clearUsage(ru.ID) - if len(r.Usages) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages)) + *r1.tUsage = 2 + + r1.removeExpiredUnits() + + if len(r1.Usages) != 0 { + t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.Usages)) } - if *r.tUsage != 1 { - t.Errorf("Expecting: %+v, received: %+v", 1, r.tUsage) + if len(r1.TTLIdx) != 0 { + t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.TTLIdx)) + } + if *r1.tUsage != 0 { + t.Errorf("Expecting: %+v, received: %+v", 0, r1.tUsage) } } -func TestRLRemoveExpiredUnits(t *testing.T) { - r.Usages = map[string]*ResourceUsage{ - ru.ID: ru, +func TestRSUsedUnits(t *testing.T) { + r1.Usages = map[string]*ResourceUsage{ + ru1.ID: ru1, } - *r.tUsage = 2 - - r.removeExpiredUnits() - - if len(r.Usages) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(r.Usages)) - } - if len(r.TTLIdx) != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, len(r.TTLIdx)) - } - if *r.tUsage != 0 { - t.Errorf("Expecting: %+v, received: %+v", 0, r.tUsage) - } -} - -func TestRLUsedUnits(t *testing.T) { - r.Usages = map[string]*ResourceUsage{ - ru.ID: ru, - } - *r.tUsage = 2 - if usedUnits := r.totalUsage(); usedUnits != 2 { + r1.tUsage = nil + if usedUnits := r1.totalUsage(); usedUnits != 2 { t.Errorf("Expecting: %+v, received: %+v", 2, usedUnits) } } -func TestRsort(t *testing.T) { +func TestRSRsort(t *testing.T) { r2 = &Resource{ + ID: "RL2", rCfg: &ResourceCfg{ ID: "RL2", Filters: []*RequestFilter{ @@ -168,7 +156,7 @@ func TestRsort(t *testing.T) { tUsage: utils.Float64Pointer(2), } - rs = Resources{r2, r} + rs = Resources{r2, r1} rs.Sort() if rs[0].rCfg.ID != "RL1" { @@ -176,7 +164,18 @@ func TestRsort(t *testing.T) { } } -func TestRsClearUsage(t *testing.T) { +func TestRSClearUsage(t *testing.T) { + r1.Usages = map[string]*ResourceUsage{ + ru1.ID: ru1, + } + r1.tUsage = nil + r1.clearUsage(ru1.ID) + if len(r1.Usages) != 0 { + t.Errorf("Expecting: %+v, received: %+v", 0, len(r1.Usages)) + } + if r1.totalUsage() != 0 { + t.Errorf("Expecting: %+v, received: %+v", 0, r1.tUsage) + } if err := r2.clearUsage(ru2.ID); err != nil { t.Error(err) } else if len(r2.Usages) != 0 { @@ -186,22 +185,26 @@ func TestRsClearUsage(t *testing.T) { } } -func TestRsRecordUsages(t *testing.T) { - if err := rs.recordUsage(ru); err == nil { +func TestRSRecordUsages(t *testing.T) { + r1.Usages = map[string]*ResourceUsage{ + ru1.ID: ru1, + } + r1.tUsage = nil + if err := rs.recordUsage(ru1); err == nil { t.Error("should get duplicated error") } } -func TestRsAllocateResource(t *testing.T) { - rs.clearUsage(ru.ID) +func TestRSAllocateResource(t *testing.T) { + rs.clearUsage(ru1.ID) rs.clearUsage(ru2.ID) rs[0].rCfg.UsageTTL = time.Duration(20 * time.Second) rs[1].rCfg.UsageTTL = time.Duration(20 * time.Second) - //ru.ExpiryTime = time.Now() + //ru1.ExpiryTime = time.Now() //ru2.Time = time.Now() - if alcMessage, err := rs.AllocateResource(ru, false); err != nil { + if alcMessage, err := rs.AllocateResource(ru1, false); err != nil { t.Error(err.Error()) } else { if alcMessage != "ALLOC" { @@ -216,7 +219,7 @@ func TestRsAllocateResource(t *testing.T) { rs[0].rCfg.Limit = 2 rs[1].rCfg.Limit = 4 - if alcMessage, err := rs.AllocateResource(ru, true); err != nil { + if alcMessage, err := rs.AllocateResource(ru1, true); err != nil { t.Error(err.Error()) } else { if alcMessage != "RL2" { diff --git a/utils/slice.go b/utils/slice.go index 38c606bda..1102c90f1 100644 --- a/utils/slice.go +++ b/utils/slice.go @@ -66,3 +66,11 @@ func AvgNegative(values []float64) float64 { } return Avg(values) } + +func PrefixSliceItems(slc []string, prfx string) (out []string) { + out = make([]string, len(slc)) + for i, itm := range slc { + out[i] = prfx + itm + } + return +}