Start ResourceS listening loop in separate goroutine

This commit is contained in:
DanB
2017-09-04 16:15:37 +02:00
parent e6c97a6d59
commit e94c974df8
5 changed files with 72 additions and 82 deletions

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
/*
import (
"net/rpc"
"net/rpc/jsonrpc"
@@ -43,42 +42,42 @@ var (
)
var sTestsRLSV1 = []func(t *testing.T){
testV1RLSLoadConfig,
testV1RLSInitDataDb,
testV1RLSResetStorDb,
testV1RLSStartEngine,
testV1RLSRpcConn,
testV1RLSFromFolder,
testV1RLSGetResourcesFromEvent,
testV1RLSAllocateResource,
testV1RLSAllowUsage,
testV1RLSReleaseResource,
testV1RLSGetResourceConfigBeforeSet,
testV1RLSSetResourceConfig,
testV1RLSGetResourceConfigAfterSet,
testV1RLSUpdateResourceConfig,
testV1RLSGetResourceConfigAfterUpdate,
testV1RLSRemResourceCOnfig,
testV1RLSGetResourceConfigAfterDelete,
testV1RLSStopEngine,
testV1RsLoadConfig,
testV1RsInitDataDb,
testV1RsResetStorDb,
testV1RsStartEngine,
testV1RsRpcConn,
testV1RsFromFolder,
testV1RsGetResourcesFromEvent,
testV1RsAllocateResource,
testV1RsAllowUsage,
testV1RsReleaseResource,
testV1RsGetResourceConfigBeforeSet,
testV1RsSetResourceConfig,
testV1RsGetResourceConfigAfterSet,
testV1RsUpdateResourceConfig,
testV1RsGetResourceConfigAfterUpdate,
testV1RsRemResourceCOnfig,
testV1RsGetResourceConfigAfterDelete,
testV1RsStopEngine,
}
//Test start here
func TestRLSV1ITMySQL(t *testing.T) {
func TestRsV1ITMySQL(t *testing.T) {
rlsV1ConfDIR = "tutmysql"
for _, stest := range sTestsRLSV1 {
t.Run(rlsV1ConfDIR, stest)
}
}
func TestRLSV1ITMongo(t *testing.T) {
func TestRsV1ITMongo(t *testing.T) {
rlsV1ConfDIR = "tutmongo"
for _, stest := range sTestsRLSV1 {
t.Run(rlsV1ConfDIR, stest)
}
}
func testV1RLSLoadConfig(t *testing.T) {
func testV1RsLoadConfig(t *testing.T) {
var err error
rlsV1CfgPath = path.Join(*dataDir, "conf", "samples", rlsV1ConfDIR)
if rlsV1Cfg, err = config.NewCGRConfigFromFolder(rlsV1CfgPath); err != nil {
@@ -92,26 +91,26 @@ func testV1RLSLoadConfig(t *testing.T) {
}
}
func testV1RLSInitDataDb(t *testing.T) {
func testV1RsInitDataDb(t *testing.T) {
if err := engine.InitDataDb(rlsV1Cfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func testV1RLSResetStorDb(t *testing.T) {
func testV1RsResetStorDb(t *testing.T) {
if err := engine.InitStorDb(rlsV1Cfg); err != nil {
t.Fatal(err)
}
}
func testV1RLSStartEngine(t *testing.T) {
func testV1RsStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(rlsV1CfgPath, resDelay); err != nil {
t.Fatal(err)
}
}
func testV1RLSRpcConn(t *testing.T) {
func testV1RsRpcConn(t *testing.T) {
var err error
rlsV1Rpc, err = jsonrpc.Dial("tcp", rlsV1Cfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
@@ -119,7 +118,7 @@ func testV1RLSRpcConn(t *testing.T) {
}
}
func testV1RLSFromFolder(t *testing.T) {
func testV1RsFromFolder(t *testing.T) {
var reply string
time.Sleep(time.Duration(2000) * time.Millisecond)
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "tutorial")}
@@ -129,7 +128,7 @@ func testV1RLSFromFolder(t *testing.T) {
time.Sleep(time.Duration(1000) * time.Millisecond)
}
func testV1RLSGetResourcesFromEvent(t *testing.T) {
func testV1RsGetResourcesFromEvent(t *testing.T) {
var reply *[]*engine.ResourceCfg
ev := map[string]interface{}{"Unknown": "unknown"}
if err := rlsV1Rpc.Call("ResourceSV1.GetResourcesForEvent", ev, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
@@ -171,7 +170,7 @@ func testV1RLSGetResourcesFromEvent(t *testing.T) {
}
}
func testV1RLSAllocateResource(t *testing.T) {
func testV1RsAllocateResource(t *testing.T) {
var reply string
attrRU := utils.AttrRLsResourceUsage{
@@ -216,7 +215,7 @@ func testV1RLSAllocateResource(t *testing.T) {
}
func testV1RLSAllowUsage(t *testing.T) {
func testV1RsAllowUsage(t *testing.T) {
var reply bool
attrRU := utils.AttrRLsResourceUsage{
UsageID: "651a8db2-4f67-4cf8-b622-169e8a482e51",
@@ -239,7 +238,7 @@ func testV1RLSAllowUsage(t *testing.T) {
}
}
func testV1RLSReleaseResource(t *testing.T) {
func testV1RsReleaseResource(t *testing.T) {
var reply interface{}
attrRU := utils.AttrRLsResourceUsage{
@@ -265,14 +264,14 @@ func testV1RLSReleaseResource(t *testing.T) {
}
func testV1RLSGetResourceConfigBeforeSet(t *testing.T) {
func testV1RsGetResourceConfigBeforeSet(t *testing.T) {
var reply *string
if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testV1RLSSetResourceConfig(t *testing.T) {
func testV1RsSetResourceConfig(t *testing.T) {
rlsConfig = &engine.ResourceCfg{
ID: "RCFG1",
Filters: []*engine.RequestFilter{
@@ -302,7 +301,7 @@ func testV1RLSSetResourceConfig(t *testing.T) {
}
}
func testV1RLSGetResourceConfigAfterSet(t *testing.T) {
func testV1RsGetResourceConfigAfterSet(t *testing.T) {
var reply *engine.ResourceCfg
if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil {
t.Error(err)
@@ -311,7 +310,7 @@ func testV1RLSGetResourceConfigAfterSet(t *testing.T) {
}
}
func testV1RLSUpdateResourceConfig(t *testing.T) {
func testV1RsUpdateResourceConfig(t *testing.T) {
var result string
rlsConfig.Filters = []*engine.RequestFilter{
&engine.RequestFilter{
@@ -332,7 +331,7 @@ func testV1RLSUpdateResourceConfig(t *testing.T) {
}
}
func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) {
func testV1RsGetResourceConfigAfterUpdate(t *testing.T) {
var reply *engine.ResourceCfg
if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &reply); err != nil {
t.Error(err)
@@ -341,7 +340,7 @@ func testV1RLSGetResourceConfigAfterUpdate(t *testing.T) {
}
}
func testV1RLSRemResourceCOnfig(t *testing.T) {
func testV1RsRemResourceCOnfig(t *testing.T) {
var resp string
if err := rlsV1Rpc.Call("ApierV1.RemResourceConfig", &AttrGetResCfg{ID: rlsConfig.ID}, &resp); err != nil {
t.Error(err)
@@ -350,16 +349,15 @@ func testV1RLSRemResourceCOnfig(t *testing.T) {
}
}
func testV1RLSGetResourceConfigAfterDelete(t *testing.T) {
func testV1RsGetResourceConfigAfterDelete(t *testing.T) {
var reply *string
if err := rlsV1Rpc.Call("ApierV1.GetResourceConfig", &AttrGetResCfg{ID: "RCFG1"}, &reply); err == nil || err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testV1RLSStopEngine(t *testing.T) {
func testV1RsStopEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}
*/

View File

@@ -530,27 +530,31 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, data
func startResourceService(internalRsChan, internalStatSConn chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dataDB engine.DataDB, server *utils.Server, exitChan chan bool) {
var statsConn *rpcclient.RpcClientPool
if len(cfg.ResourceLimiterCfg().StatSConns) != 0 { // Stats connection init
if len(cfg.ResourceSCfg().StatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.ResourceLimiterCfg().StatSConns, internalStatSConn, cfg.InternalTtl)
cfg.ResourceSCfg().StatSConns, internalStatSConn, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
return
}
}
rS, err := engine.NewResourceService(cfg, dataDB, statsConn)
rS, err := engine.NewResourceService(dataDB, cfg.ResourceSCfg().StoreInterval, statsConn)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not init, error: %s", err.Error()))
exitChan <- true
return
}
utils.Logger.Info(fmt.Sprintf("Starting Resource Service"))
if err := rS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not start, error: %s", err.Error()))
go func() {
if err := rS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not start, error: %s", err.Error()))
}
rS.Shutdown()
exitChan <- true
return
}
}()
rsV1 := v1.NewResourceSV1(rS)
server.RpcRegister(rsV1)
internalRsChan <- rsV1
@@ -848,7 +852,7 @@ func main() {
}
// Start RL service
if cfg.ResourceLimiterCfg().Enabled {
if cfg.ResourceSCfg().Enabled {
go startResourceService(internalRsChan,
internalStatSChan, cfg, dataDB, server, exitChan)
}

View File

@@ -265,7 +265,7 @@ type CGRConfig struct {
AliasesServerEnabled bool // Starts PubSub as server: <true|false>.
UserServerEnabled bool // Starts User as server: <true|false>
UserServerIndexes []string // List of user profile field indexes
resourceLimiterCfg *ResourceLimiterConfig // Configuration for resource limiter
resourceSCfg *ResourceLimiterConfig // Configuration for resource limiter
statsCfg *StatSCfg // Configuration for StatS
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
@@ -417,7 +417,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
for _, smFSRLsConn := range self.SmFsConfig.RLsConns {
if smFSRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled {
if smFSRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("RLs not enabled but referenced by SMFreeSWITCH component")
}
}
@@ -441,7 +441,7 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
for _, smKamRLsConn := range self.SmKamConfig.RLsConns {
if smKamRLsConn.Address == utils.MetaInternal && !self.resourceLimiterCfg.Enabled {
if smKamRLsConn.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("RLs not enabled but requested by SM-Kamailio component")
}
}
@@ -502,8 +502,8 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// ResourceLimiter checks
if self.resourceLimiterCfg != nil && self.resourceLimiterCfg.Enabled {
for _, connCfg := range self.resourceLimiterCfg.StatSConns {
if self.resourceSCfg != nil && self.resourceSCfg.Enabled {
for _, connCfg := range self.resourceSCfg.StatSConns {
if connCfg.Address == utils.MetaInternal && !self.statsCfg.Enabled {
return errors.New("StatS not enabled but requested by ResourceLimiter component.")
}
@@ -1075,10 +1075,10 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
if jsnRLSCfg != nil {
if self.resourceLimiterCfg == nil {
self.resourceLimiterCfg = new(ResourceLimiterConfig)
if self.resourceSCfg == nil {
self.resourceSCfg = new(ResourceLimiterConfig)
}
if self.resourceLimiterCfg.loadFromJsonCfg(jsnRLSCfg); err != nil {
if self.resourceSCfg.loadFromJsonCfg(jsnRLSCfg); err != nil {
return err
}
}
@@ -1146,8 +1146,8 @@ func (self *CGRConfig) RadiusAgentCfg() *RadiusAgentCfg {
}
// ToDo: fix locking here
func (self *CGRConfig) ResourceLimiterCfg() *ResourceLimiterConfig {
return self.resourceLimiterCfg
func (self *CGRConfig) ResourceSCfg() *ResourceLimiterConfig {
return self.resourceSCfg
}
// ToDo: fix locking

View File

@@ -101,20 +101,8 @@
"indexes": ["Uuid"], // user profile field indexes
},
"rls": {
"enabled": true, // starts ResourceLimiter service: <true|false>.
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"cache_dump_interval": "0s", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|*never|dur>
"usage_ttl": "3h", // expire usage records if older than this duration <""|*never|dur>
},
"resources": {
"enabled": true,
"stats_conns": [
//{"address": "*internal"}
],
"cache_dump_interval": "0s",
"usage_ttl": "3h",
},
"stats": {

View File

@@ -220,18 +220,18 @@ func (rs Resources) AllocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
}
// Pas the config as a whole so we can ask access concurrently
func NewResourceService(cfg *config.CGRConfig, dataDB DataDB, statS rpcclient.RpcClientConnection) (*ResourceService, error) {
func NewResourceService(dataDB DataDB, storeInterval time.Duration, statS rpcclient.RpcClientConnection) (*ResourceService, error) {
if statS != nil && reflect.ValueOf(statS).IsNil() {
statS = nil
}
return &ResourceService{dataDB: dataDB, statS: statS,
scEventResources: ltcache.New(ltcache.UnlimitedCaching, time.Duration(1)*time.Minute, false, nil),
lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil)}, nil
lcEventResources: ltcache.New(ltcache.UnlimitedCaching, ltcache.UnlimitedCaching, false, nil),
storeInterval: storeInterval}, nil
}
// ResourceService is the service handling resources
type ResourceService struct {
cfg *config.CGRConfig
dataDB DataDB // So we can load the data in cache and index it
statS rpcclient.RpcClientConnection // allows applying filters based on stats
scEventResources *ltcache.Cache // short cache map[ruID], used to keep references to matched resources for events in allow queries
@@ -239,7 +239,7 @@ type ResourceService struct {
storedResources utils.StringMap // keep a record of resources which need saving, map[resID]bool
srMux sync.RWMutex
stopBackup chan struct{} // control storing process
backupInterval time.Duration
storeInterval time.Duration
}
// Called to start the service
@@ -251,7 +251,7 @@ func (rS *ResourceService) ListenAndServe(exitChan chan bool) error {
}
// Called to shutdown the service
func (rS *ResourceService) ServiceShutdown() error {
func (rS *ResourceService) Shutdown() error {
utils.Logger.Info("<ResourceS> service shutdown initialized")
close(rS.stopBackup)
rS.storeResources()
@@ -306,7 +306,7 @@ func (rS *ResourceService) storeResources() {
// backup will regularly store resources changed to dataDB
func (rS *ResourceService) runBackup() {
if rS.backupInterval <= 0 {
if rS.storeInterval <= 0 {
return
}
for {
@@ -316,7 +316,7 @@ func (rS *ResourceService) runBackup() {
}
rS.storeResources()
}
time.Sleep(rS.backupInterval)
time.Sleep(rS.storeInterval)
}
// cachedResourcesForEvent attempts to retrieve cached resources for an event
@@ -341,7 +341,7 @@ func (rS *ResourceService) cachedResourcesForEvent(evUUID string) (rs Resources)
}
lockIDs := utils.PrefixSliceItems(rIDs, utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for i, rID := range rIDs {
if r, err := rS.dataDB.GetResource(rID, false, ""); err != nil {
@@ -370,7 +370,7 @@ func (rS *ResourceService) matchingResourcesForEvent(ev map[string]interface{})
return nil, err
}
lockIDs := utils.PrefixSliceItems(rIDs.Slice(), utils.ResourcesPrefix)
guardian.Guardian.GuardIDs(rS.cfg.LockingTimeout, lockIDs...)
guardian.Guardian.GuardIDs(config.CgrConfig().LockingTimeout, lockIDs...)
defer guardian.Guardian.UnguardIDs(lockIDs...)
for resName := range rIDs {
rCfg, err := rS.dataDB.GetResourceCfg(resName, false, utils.NonTransactional)
@@ -492,7 +492,7 @@ func (rS *ResourceService) V1AllocateResource(args utils.AttrRLsResourceUsage, r
// index it for storing
rS.srMux.Lock()
for _, r := range mtcRLs {
if rS.backupInterval == -1 {
if rS.storeInterval == -1 {
rS.StoreResource(r)
} else if r.dirty != nil {
*r.dirty = true // mark it to be saved
@@ -514,12 +514,12 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re
}
mtcRLs.clearUsage(args.UsageID)
rS.lcEventResources.Remove(args.UsageID)
if rS.backupInterval != -1 {
if rS.storeInterval != -1 {
rS.srMux.Lock()
}
for _, r := range mtcRLs {
if r.dirty != nil {
if rS.backupInterval == -1 {
if rS.storeInterval == -1 {
rS.StoreResource(r)
} else {
*r.dirty = true // mark it to be saved
@@ -527,7 +527,7 @@ func (rS *ResourceService) V1ReleaseResource(args utils.AttrRLsResourceUsage, re
}
}
}
if rS.backupInterval != -1 {
if rS.storeInterval != -1 {
rS.srMux.Unlock()
}
*reply = utils.OK