DispatcherS connections initialization, string and prefix indexes configuration, cleanup old dispatcherCfg

This commit is contained in:
DanB
2019-02-01 13:17:41 +01:00
parent d4d9939feb
commit 347050a5b2
9 changed files with 85 additions and 222 deletions

View File

@@ -965,31 +965,35 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig,
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, dm *engine.DataManager,
server *utils.Server, exitChan chan bool) {
func startDispatcherService(internalDispatcherSChan,
intAttrSChan chan rpcclient.RpcClientConnection,
cfg *config.CGRConfig,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
dm *engine.DataManager, server *utils.Server, exitChan chan bool) {
utils.Logger.Info("Starting CGRateS Dispatcher service.")
fltrS := <-filterSChan
filterSChan <- fltrS
var err error
//var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
/*
if len(cfg.DispatcherSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().RALsConns, internalRaterChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
conns := make(map[string]*rpcclient.RpcClientPool)
for connID, haPoolCfg := range cfg.DispatcherSCfg().Conns {
var connPool *rpcclient.RpcClientPool
if connPool, err = engine.NewRPCPool(
rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
haPoolCfg, nil, time.Duration(0)); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> could not connect to connID: <%s>, err: <%s>",
utils.DispatcherS, err.Error()))
exitChan <- true
return
}
*/
conns[connID] = connPool
}
dspS, err := dispatchers.NewDispatcherService(dm, cfg)
dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, conns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true
@@ -1508,9 +1512,10 @@ func main() {
internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan, internalAttributeSChan)
}
if cfg.DispatcherCfg().Enabled {
if cfg.DispatcherSCfg().Enabled {
go startDispatcherService(internalDispatcherSChan,
internalRaterChan, cacheS, dm, server, exitChan)
internalAttributeSChan, cfg, cacheS, filterSChan,
dm, server, exitChan)
}
if cfg.AnalyzerSCfg().Enabled {

View File

@@ -159,7 +159,6 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.supplierSCfg = new(SupplierSCfg)
cfg.sureTaxCfg = new(SureTaxCfg)
cfg.dispatcherSCfg = new(DispatcherSCfg)
cfg.dispatcherCfg = new(DispatcherCfg)
cfg.loaderCgrCfg = new(LoaderCgrCfg)
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.mailerCfg = new(MailerCfg)
@@ -306,7 +305,6 @@ type CGRConfig struct {
supplierSCfg *SupplierSCfg // SupplierS config
sureTaxCfg *SureTaxCfg // SureTax config
dispatcherSCfg *DispatcherSCfg // DispatcherS config
dispatcherCfg *DispatcherCfg // Dispatcher config
loaderCgrCfg *LoaderCgrCfg // LoaderCgr config
migratorCgrCfg *MigratorCgrCfg // MigratorCgr config
mailerCfg *MailerCfg // Mailer config
@@ -702,12 +700,18 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
}
// DispaterS checks
if self.dispatcherCfg.Enabled {
if !utils.IsSliceMember([]string{utils.MetaFirst, utils.MetaRandom, utils.MetaNext,
utils.MetaBroadcast}, self.dispatcherCfg.DispatchingStrategy) {
return fmt.Errorf("<%s> unsupported dispatching strategy %s",
utils.DispatcherS, self.dispatcherCfg.DispatchingStrategy)
if self.dispatcherSCfg.Enabled {
if len(self.dispatcherSCfg.Conns) == 0 {
return fmt.Errorf("<%s> no connections defined", utils.DispatcherS)
}
for connID, haPool := range self.dispatcherSCfg.Conns {
for _, connCfg := range haPool {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf(
"<%s> connID: <%s> %s connections are not supported",
utils.DispatcherS, connID, utils.MetaInternal)
}
}
}
}
// Scheduler check connection with CDR Server
@@ -939,14 +943,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
return err
}
jsnDispatcherCfg, err := jsnCfg.DispatcherJsonCfg()
if err != nil {
return err
}
if self.dispatcherCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil {
return err
}
jsnDispatcherSCfg, err := jsnCfg.DispatcherSJsonCfg()
if err != nil {
return err
@@ -1167,10 +1163,6 @@ func (cfg *CGRConfig) LoaderCfg() []*LoaderSCfg {
return cfg.loaderCfg
}
func (cfg *CGRConfig) DispatcherCfg() *DispatcherCfg {
return cfg.dispatcherCfg
}
func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg {
return cfg.dispatcherSCfg
}

View File

@@ -723,22 +723,10 @@ const CGRATES_CFG_JSON = `
},
"dispatcher":{
"enabled": false, // starts DispatcherS service: <true|false>.
"rals_conns": [], // address where to reach the RALs for dispatcherS <*internal>
"resources_conns": [], // address where to reach the ResourceS <""|127.0.0.1:2013>
"thresholds_conns": [], // address where to reach the ThresholdS <""|127.0.0.1:2013>
"stats_conns": [], // address where to reach the StatS <""|127.0.0.1:2013>
"suppliers_conns": [], // address where to reach the SupplierS <""|127.0.0.1:2013>
"attributes_conns": [], // address where to reach the AttributeS <""|127.0.0.1:2013>
"sessions_conns": [], // connection towards SessionService
"chargers_conns": [], // address where to reach the ChargerS <""|127.0.0.1:2013>
"dispatching_strategy":"*first", // strategy for dispatching <*first|*random|*next|*broadcast>
},
"dispatchers":{
"enabled": false, // starts DispatcherS service: <true|false>.
//"string_indexed_fields": [], // query indexes based on these fields for faster processing
"prefix_indexed_fields": [], // query indexes based on these fields for faster processing
"conns": {
"sessions_eu": [
{"address": "127.0.0.1:2012", "transport": "*json"},

View File

@@ -453,18 +453,6 @@ func (self CgrJsonCfg) SureTaxJsonCfg() (*SureTaxJsonCfg, error) {
return cfg, nil
}
func (self CgrJsonCfg) DispatcherJsonCfg() (*DispatcherJsonCfg, error) {
rawCfg, hasKey := self[DispatcherJson]
if !hasKey {
return nil, nil
}
cfg := new(DispatcherJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) {
rawCfg, hasKey := self[DispatcherSJson]
if !hasKey {

View File

@@ -1379,29 +1379,11 @@ func TestDfHttpJsonCfg(t *testing.T) {
}
}
func TestDfDispatcherJsonCfg(t *testing.T) {
eCfg := &DispatcherJsonCfg{
Enabled: utils.BoolPointer(false),
Rals_conns: &[]*HaPoolJsonCfg{},
Resources_conns: &[]*HaPoolJsonCfg{},
Thresholds_conns: &[]*HaPoolJsonCfg{},
Stats_conns: &[]*HaPoolJsonCfg{},
Suppliers_conns: &[]*HaPoolJsonCfg{},
Attributes_conns: &[]*HaPoolJsonCfg{},
Sessions_conns: &[]*HaPoolJsonCfg{},
Chargers_conns: &[]*HaPoolJsonCfg{},
Dispatching_strategy: utils.StringPointer(utils.MetaFirst),
}
if cfg, err := dfCgrJsonCfg.DispatcherJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg))
}
}
func TestDfDispatcherSJsonCfg(t *testing.T) {
eCfg := &DispatcherSJsonCfg{
Enabled: utils.BoolPointer(false),
Enabled: utils.BoolPointer(false),
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},
Conns: &map[string]*[]*HaPoolJsonCfg{
"sessions_eu": &[]*HaPoolJsonCfg{
{Address: utils.StringPointer("127.0.0.1:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},

View File

@@ -1453,27 +1453,11 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) {
}
}
func TestCgrCfgJSONDefaultDispatcherCfg(t *testing.T) {
eDspSCfg := &DispatcherCfg{
Enabled: false,
RALsConns: []*HaPoolConfig{},
ResSConns: []*HaPoolConfig{},
ThreshSConns: []*HaPoolConfig{},
StatSConns: []*HaPoolConfig{},
SupplSConns: []*HaPoolConfig{},
AttrSConns: []*HaPoolConfig{},
SessionSConns: []*HaPoolConfig{},
ChargerSConns: []*HaPoolConfig{},
DispatchingStrategy: utils.MetaFirst,
}
if !reflect.DeepEqual(cgrCfg.dispatcherCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)
}
}
func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
eDspSCfg := &DispatcherSCfg{
Enabled: false,
Enabled: false,
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},
Conns: map[string][]*HaPoolConfig{
"sessions_eu": []*HaPoolConfig{
{Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc},

View File

@@ -18,93 +18,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherCfg struct {
Enabled bool
RALsConns []*HaPoolConfig
ResSConns []*HaPoolConfig
ThreshSConns []*HaPoolConfig
StatSConns []*HaPoolConfig
SupplSConns []*HaPoolConfig
AttrSConns []*HaPoolConfig
SessionSConns []*HaPoolConfig
ChargerSConns []*HaPoolConfig
DispatchingStrategy string
}
func (dps *DispatcherCfg) loadFromJsonCfg(jsnCfg *DispatcherJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
dps.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Rals_conns != nil {
dps.RALsConns = make([]*HaPoolConfig, len(*jsnCfg.Rals_conns))
for idx, jsnHaCfg := range *jsnCfg.Rals_conns {
dps.RALsConns[idx] = NewDfltHaPoolConfig()
dps.RALsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Resources_conns != nil {
dps.ResSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns))
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
dps.ResSConns[idx] = NewDfltHaPoolConfig()
dps.ResSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Thresholds_conns != nil {
dps.ThreshSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
dps.ThreshSConns[idx] = NewDfltHaPoolConfig()
dps.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Stats_conns != nil {
dps.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns))
for idx, jsnHaCfg := range *jsnCfg.Stats_conns {
dps.StatSConns[idx] = NewDfltHaPoolConfig()
dps.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Suppliers_conns != nil {
dps.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {
dps.SupplSConns[idx] = NewDfltHaPoolConfig()
dps.SupplSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Attributes_conns != nil {
dps.AttrSConns = make([]*HaPoolConfig, len(*jsnCfg.Attributes_conns))
for idx, jsnHaCfg := range *jsnCfg.Attributes_conns {
dps.AttrSConns[idx] = NewDfltHaPoolConfig()
dps.AttrSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Sessions_conns != nil {
dps.SessionSConns = make([]*HaPoolConfig, len(*jsnCfg.Sessions_conns))
for idx, jsnHaCfg := range *jsnCfg.Sessions_conns {
dps.SessionSConns[idx] = NewDfltHaPoolConfig()
dps.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Chargers_conns != nil {
dps.ChargerSConns = make([]*HaPoolConfig, len(*jsnCfg.Chargers_conns))
for idx, jsnHaCfg := range *jsnCfg.Chargers_conns {
dps.ChargerSConns[idx] = NewDfltHaPoolConfig()
dps.ChargerSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Dispatching_strategy != nil {
dps.DispatchingStrategy = *jsnCfg.Dispatching_strategy
}
return nil
}
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherSCfg struct {
Enabled bool
Conns map[string][]*HaPoolConfig
Enabled bool
StringIndexedFields *[]string
PrefixIndexedFields *[]string
Conns map[string][]*HaPoolConfig
}
func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
@@ -114,6 +33,20 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro
if jsnCfg.Enabled != nil {
dps.Enabled = *jsnCfg.Enabled
}
if jsnCfg.String_indexed_fields != nil {
sif := make([]string, len(*jsnCfg.String_indexed_fields))
for i, fID := range *jsnCfg.String_indexed_fields {
sif[i] = fID
}
dps.StringIndexedFields = &sif
}
if jsnCfg.Prefix_indexed_fields != nil {
pif := make([]string, len(*jsnCfg.Prefix_indexed_fields))
for i, fID := range *jsnCfg.Prefix_indexed_fields {
pif[i] = fID
}
dps.PrefixIndexedFields = &pif
}
if jsnCfg.Conns != nil {
dps.Conns = make(map[string][]*HaPoolConfig, len(*jsnCfg.Conns))
for id, conns := range *jsnCfg.Conns {

View File

@@ -534,23 +534,11 @@ type SureTaxJsonCfg struct {
Tax_exemption_code_list *string
}
// Dispatcher service config section
type DispatcherJsonCfg struct {
Enabled *bool
Rals_conns *[]*HaPoolJsonCfg
Resources_conns *[]*HaPoolJsonCfg
Thresholds_conns *[]*HaPoolJsonCfg
Stats_conns *[]*HaPoolJsonCfg
Suppliers_conns *[]*HaPoolJsonCfg
Attributes_conns *[]*HaPoolJsonCfg
Sessions_conns *[]*HaPoolJsonCfg
Chargers_conns *[]*HaPoolJsonCfg
Dispatching_strategy *string
}
type DispatcherSJsonCfg struct {
Enabled *bool
Conns *map[string]*[]*HaPoolJsonCfg
Enabled *bool
String_indexed_fields *[]string
Prefix_indexed_fields *[]string
Conns *map[string]*[]*HaPoolJsonCfg
}
type LoaderCfgJson struct {

View File

@@ -27,21 +27,21 @@ import (
"github.com/cgrates/rpcclient"
)
// NewDispatcherService initializes a DispatcherService
// NewDispatcherService constructs a DispatcherService
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg}, nil
cfg *config.CGRConfig, fltrS *engine.FilterS,
conns map[string]*rpcclient.RpcClientPool) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg,
fltrS: fltrS, conns: conns}, nil
}
// DispatcherService is the service handling dispatching towards internal components
// designed to handle automatic partitioning and failover
type DispatcherService struct {
dm *engine.DataManager
cfg *config.CGRConfig
filterS *engine.FilterS
stringIndexedFields *[]string
prefixIndexedFields *[]string
conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID
dm *engine.DataManager
cfg *config.CGRConfig
fltrS *engine.FilterS
conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID
}
// ListenAndServe will initialize the service
@@ -69,8 +69,11 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys)
}
matchingPrfls := make(map[string]*engine.DispatcherProfile)
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.stringIndexedFields, dS.prefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects)
prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event,
dS.cfg.DispatcherSCfg().StringIndexedFields,
dS.cfg.DispatcherSCfg().PrefixIndexedFields,
dS.dm, utils.CacheDispatcherFilterIndexes,
idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects)
if err != nil {
return nil, err
}
@@ -97,7 +100,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent,
!prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active
continue
}
if pass, err := dS.filterS.Pass(ev.Tenant, prfl.FilterIDs,
if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs,
config.NewNavigableMap(ev.Event)); err != nil {
return nil, err
} else if !pass {