Add connection from ThresholdS to ResourceS through ConnManager

This commit is contained in:
TeoV
2019-12-09 03:58:43 -05:00
parent 27789c4d36
commit 9435b99f1d
39 changed files with 80 additions and 148 deletions

View File

@@ -496,6 +496,7 @@ func main() {
internalChargerSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalResourceSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
@@ -519,7 +520,7 @@ func main() {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): internalChargerSChan,
utils.GuardianSv1: internalGuardianSChan,
//utils.LoaderSv1: ldrs.GetIntenternalChan(),
//utils.ResourceSv1: reS.GetIntenternalChan(),
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): internalResourceSChan,
//utils.Responder: rals.GetResponder().GetIntenternalChan(),
//utils.SchedulerSv1: schS.GetIntenternalChan(),
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): internalSessionSChan,
@@ -540,7 +541,7 @@ func main() {
stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server,
internalStatSChan, connManager.GetConnMgr())
reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server,
tS.GetIntenternalChan(), dspS.GetIntenternalChan())
internalResourceSChan, connManager.GetConnMgr())
supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), dspS.GetIntenternalChan())

View File

@@ -203,9 +203,7 @@ func TestCGRConfigReloadResourceS(t *testing.T) {
StringIndexedFields: &[]string{utils.Account},
PrefixIndexedFields: &[]string{},
IndexedSelects: true,
ThresholdSConns: []*RemoteHost{
&RemoteHost{Address: "127.0.0.1:2012", Transport: utils.MetaJSON},
},
ThresholdSConns: []string{utils.MetaLocalHost},
}
if !reflect.DeepEqual(expAttr, cfg.ResourceSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.ResourceSCfg()))

View File

@@ -891,7 +891,7 @@ func TestDfResourceLimiterSJsonCfg(t *testing.T) {
eCfg := &ResourceSJsonCfg{
Enabled: utils.BoolPointer(false),
Indexed_selects: utils.BoolPointer(true),
Thresholds_conns: &[]*RemoteHostJson{},
Thresholds_conns: &[]string{},
Store_interval: utils.StringPointer(""),
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},

View File

@@ -829,7 +829,7 @@ func TestCgrCfgJSONDefaultsResLimCfg(t *testing.T) {
eResLiCfg := &ResourceSConfig{
Enabled: false,
IndexedSelects: true,
ThresholdSConns: []*RemoteHost{},
ThresholdSConns: []string{},
StoreInterval: 0,
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},

View File

@@ -327,11 +327,14 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
// ResourceLimiter checks
if cfg.resourceSCfg.Enabled && !cfg.thresholdSCfg.Enabled && !cfg.dispatcherSCfg.Enabled {
for _, connCfg := range cfg.resourceSCfg.ThresholdSConns {
if connCfg.Address == utils.MetaInternal {
if cfg.resourceSCfg.Enabled {
for _, connID := range cfg.resourceSCfg.ThresholdSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.thresholdSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ThresholdS, utils.ResourceS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.ResourceS, connID)
}
}
}
// StatS checks

View File

@@ -528,12 +528,8 @@ func TestConfigSanityHTTPAgent(t *testing.T) {
func TestConfigSanityResourceLimiter(t *testing.T) {
cfg, _ = NewDefaultCGRConfig()
cfg.resourceSCfg = &ResourceSConfig{
Enabled: true,
ThresholdSConns: []*RemoteHost{
&RemoteHost{
Address: utils.MetaInternal,
},
},
Enabled: true,
ThresholdSConns: []string{utils.MetaInternal},
}
expected := "<ThresholdS> not enabled but requested by <ResourceS> component."
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {

View File

@@ -414,7 +414,7 @@ type ChargerSJsonCfg struct {
type ResourceSJsonCfg struct {
Enabled *bool
Indexed_selects *bool
Thresholds_conns *[]*RemoteHostJson
Thresholds_conns *[]string
Store_interval *string
String_indexed_fields *[]string
Prefix_indexed_fields *[]string

View File

@@ -27,7 +27,7 @@ import (
type ResourceSConfig struct {
Enabled bool
IndexedSelects bool
ThresholdSConns []*RemoteHost // Connections towards StatS
ThresholdSConns []string
StoreInterval time.Duration // Dump regularly from cache into dataDB
StringIndexedFields *[]string
PrefixIndexedFields *[]string
@@ -44,10 +44,14 @@ func (rlcfg *ResourceSConfig) loadFromJsonCfg(jsnCfg *ResourceSJsonCfg) (err err
rlcfg.IndexedSelects = *jsnCfg.Indexed_selects
}
if jsnCfg.Thresholds_conns != nil {
rlcfg.ThresholdSConns = make([]*RemoteHost, len(*jsnCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
rlcfg.ThresholdSConns[idx] = NewDfltRemoteHost()
rlcfg.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
rlcfg.ThresholdSConns = make([]string, len(*jsnCfg.Thresholds_conns))
for idx, conn := range *jsnCfg.Thresholds_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if conn == utils.MetaInternal {
rlcfg.ThresholdSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)
} else {
rlcfg.ThresholdSConns[idx] = conn
}
}
}
if jsnCfg.Store_interval != nil {

View File

@@ -47,7 +47,7 @@ func TestResourceSConfigloadFromJsonCfg(t *testing.T) {
expected = ResourceSConfig{
Enabled: true,
StoreInterval: time.Duration(time.Second),
ThresholdSConns: []*RemoteHost{},
ThresholdSConns: []string{},
PrefixIndexedFields: &[]string{"index1", "index2"},
}
if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil {

View File

@@ -83,9 +83,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
},

View File

@@ -83,9 +83,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -85,9 +85,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -45,9 +45,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -79,9 +79,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -122,9 +122,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -104,9 +104,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -76,9 +76,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -70,9 +70,7 @@
"resources": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
},

View File

@@ -70,9 +70,7 @@
"resources": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
},

View File

@@ -30,9 +30,6 @@
"resources": {
"enabled": true,
"stats_conns": [
//{"address": "*internal"}
],
"cache_dump_interval": "0s",
"usage_ttl": "3h",
},

View File

@@ -56,9 +56,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"}
],
"thresholds_conns": ["*localhost"],
},

View File

@@ -64,9 +64,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "127.0.0.1:2013", "transport": "*gob"}
],
"thresholds_conns": ["conn1"],
},

View File

@@ -65,9 +65,7 @@
"resources": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -80,9 +80,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -141,9 +141,7 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"}
],
"thresholds_conns": ["*localhost"],
"string_indexed_fields": ["Account"]
},

View File

@@ -148,18 +148,14 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"}
],
"thresholds_conns": ["conn1"],
"string_indexed_fields": ["Account"]
},
"stats": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"}
],
"thresholds_conns": ["conn1"],
"string_indexed_fields": ["Account"]
},

View File

@@ -80,9 +80,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -79,9 +79,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -85,9 +85,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -106,9 +106,7 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"}
],
"thresholds_conns": ["*localhost"],
"string_indexed_fields": ["Account"]
},

View File

@@ -114,9 +114,7 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"}
],
"thresholds_conns": ["conn1"],
"string_indexed_fields": ["Account"]
},

View File

@@ -204,9 +204,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
},

View File

@@ -53,9 +53,7 @@
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"]
},

View File

@@ -95,9 +95,7 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},

View File

@@ -95,9 +95,7 @@
"resources": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"}
],
"thresholds_conns": ["*internal"],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},

View File

@@ -21,7 +21,6 @@ package engine
import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
"time"
@@ -298,16 +297,15 @@ func (rs Resources) allocateResource(ru *ResourceUsage, dryRun bool) (alcMessage
// NewResourceService returns a new ResourceService
func NewResourceService(dm *DataManager, cgrcfg *config.CGRConfig,
thdS rpcclient.ClientConnector, filterS *FilterS) (*ResourceService, error) {
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
return &ResourceService{dm: dm, thdS: thdS,
filterS *FilterS, connMgr *ConnManager) (*ResourceService, error) {
return &ResourceService{dm: dm,
storedResources: make(utils.StringMap),
cgrcfg: cgrcfg,
filterS: filterS,
loopStoped: make(chan struct{}),
stopBackup: make(chan struct{})}, nil
stopBackup: make(chan struct{}),
connMgr: connMgr}, nil
}
// ResourceService is the service handling resources
@@ -320,6 +318,7 @@ type ResourceService struct {
cgrcfg *config.CGRConfig
stopBackup chan struct{} // control storing process
loopStoped chan struct{}
connMgr *ConnManager
}
// Called to start the service
@@ -435,7 +434,7 @@ func (rS *ResourceService) processThresholds(r *Resource, argDispatcher *utils.A
ArgDispatcher: argDispatcher,
}
var tIDs []string
if err = rS.thdS.Call(utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
if err = rS.connMgr.Call(rS.cgrcfg.ResourceSCfg().ThresholdSConns, utils.ThresholdSv1ProcessEvent, thEv, &tIDs); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with %s.",

View File

@@ -372,8 +372,8 @@ func TestResourcePopulateResourceService(t *testing.T) {
defaultCfg.ResourceSCfg().StoreInterval = 1
defaultCfg.ResourceSCfg().StringIndexedFields = nil
defaultCfg.ResourceSCfg().PrefixIndexedFields = nil
resService, err = NewResourceService(dmRES, defaultCfg, nil,
&FilterS{dm: dmRES, cfg: defaultCfg})
resService, err = NewResourceService(dmRES, defaultCfg,
&FilterS{dm: dmRES, cfg: defaultCfg}, nil)
if err != nil {
t.Errorf("Error: %+v", err)
}
@@ -734,8 +734,8 @@ func TestResourceCaching(t *testing.T) {
defaultCfg.ResourceSCfg().StoreInterval = 1
defaultCfg.ResourceSCfg().StringIndexedFields = nil
defaultCfg.ResourceSCfg().PrefixIndexedFields = nil
resService, err = NewResourceService(dmRES, defaultCfg, nil,
&FilterS{dm: dmRES, cfg: defaultCfg})
resService, err = NewResourceService(dmRES, defaultCfg,
&FilterS{dm: dmRES, cfg: defaultCfg}, nil)
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -33,34 +33,32 @@ import (
// NewResourceService returns the Resource Service
func NewResourceService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, thrsChan,
dispatcherChan chan rpcclient.ClientConnector) servmanager.Service {
server *utils.Server, internalResourceSChan chan rpcclient.RpcClientConnection,
connMgr *engine.ConnManager) servmanager.Service {
return &ResourceService{
connChan: make(chan rpcclient.ClientConnector, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
thrsChan: thrsChan,
dispatcherChan: dispatcherChan,
connChan: internalResourceSChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
}
}
// ResourceService implements Service interface
type ResourceService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
thrsChan chan rpcclient.ClientConnector
dispatcherChan chan rpcclient.ClientConnector
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
reS *engine.ResourceService
rpc *v1.ResourceSv1
connChan chan rpcclient.ClientConnector
connChan chan rpcclient.RpcClientConnection
connMgr *engine.ConnManager
}
// Start should handle the sercive start
@@ -76,15 +74,9 @@ func (reS *ResourceService) Start() (err error) {
filterS := <-reS.filterSChan
reS.filterSChan <- filterS
var thdSConn rpcclient.ClientConnector
if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error()))
return
}
reS.Lock()
defer reS.Unlock()
reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, thdSConn, filterS)
reS.reS, err = engine.NewResourceService(reS.dm.GetDM(), reS.cfg, filterS, reS.connMgr)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.ResourceS, err.Error()))
return
@@ -106,13 +98,7 @@ func (reS *ResourceService) GetIntenternalChan() (conn chan rpcclient.ClientConn
// Reload handles the change of config
func (reS *ResourceService) Reload() (err error) {
var thdSConn rpcclient.ClientConnector
if thdSConn, err = NewConnection(reS.cfg, reS.thrsChan, reS.dispatcherChan, reS.cfg.ResourceSCfg().ThresholdSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.ResourceS, err.Error()))
return
}
reS.Lock()
reS.reS.SetThresholdConnection(thdSConn)
reS.reS.Reload()
reS.Unlock()
return

View File

@@ -55,7 +55,7 @@ func TestResourceSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1))
reS := NewResourceService(cfg, db, chS, filterSChan, server, tS.GetIntenternalChan(), nil)
reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
srvMngr.AddServices(NewConnManagerService(cfg, nil), tS, reS, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)