Added configuration code for new dispatcher config

This commit is contained in:
Trial97
2019-01-30 16:01:43 +02:00
committed by Dan Christian Bogos
parent 683430ef4c
commit fb1a12a4b7
7 changed files with 156 additions and 46 deletions

View File

@@ -973,15 +973,15 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool
cfg.DispatcherSCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherSCfg().DispatchingStrategy,
cfg.DispatcherCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherCfg().DispatchingStrategy,
utils.Meta) // remote * from DispatchingStrategy
if len(cfg.DispatcherSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().RALsConns, internalRaterChan,
cfg.DispatcherCfg().RALsConns, internalRaterChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
@@ -989,13 +989,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ResSConns, nil,
cfg.DispatcherCfg().ResSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error()))
@@ -1003,13 +1003,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ThreshSConns, nil,
cfg.DispatcherCfg().ThreshSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error()))
@@ -1017,13 +1017,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().StatSConns, nil,
cfg.DispatcherCfg().StatSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error()))
@@ -1031,13 +1031,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().SupplSConns, nil,
cfg.DispatcherCfg().SupplSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error()))
@@ -1045,13 +1045,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().AttrSConns, nil,
cfg.DispatcherCfg().AttrSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error()))
@@ -1059,13 +1059,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().SessionSConns) != 0 {
sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().SessionSConns) != 0 {
sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().SessionSConns, nil,
cfg.DispatcherCfg().SessionSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error()))
@@ -1073,13 +1073,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
return
}
}
if len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
chargerSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy,
if len(cfg.DispatcherCfg().ChargerSConns) != 0 {
chargerSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().ChargerSConns, nil,
cfg.DispatcherCfg().ChargerSConns, nil,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ChargerS: %s", utils.DispatcherS, err.Error()))
@@ -1102,31 +1102,31 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc
exitChan <- true
return
}()
if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherCfg().ThreshSConns) != 0 {
server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS))
}
if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
if !cfg.StatSCfg().Enabled && len(cfg.DispatcherCfg().StatSConns) != 0 {
server.RpcRegisterName(utils.StatSv1,
v1.NewDispatcherStatSv1(dspS))
}
if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 {
if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherCfg().ResSConns) != 0 {
server.RpcRegisterName(utils.ResourceSv1,
v1.NewDispatcherResourceSv1(dspS))
}
if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 {
if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherCfg().SupplSConns) != 0 {
server.RpcRegisterName(utils.SupplierSv1,
v1.NewDispatcherSupplierSv1(dspS))
}
if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 {
if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherCfg().AttrSConns) != 0 {
server.RpcRegisterName(utils.AttributeSv1,
v1.NewDispatcherAttributeSv1(dspS))
}
if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 {
if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherCfg().SessionSConns) != 0 {
server.RpcRegisterName(utils.SessionSv1,
v1.NewDispatcherSessionSv1(dspS))
}
if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 {
if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherCfg().ChargerSConns) != 0 {
server.RpcRegisterName(utils.ChargerSv1,
v1.NewDispatcherChargerSv1(dspS))
}
@@ -1604,7 +1604,7 @@ func main() {
internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan, internalAttributeSChan)
}
if cfg.DispatcherSCfg().Enabled {
if cfg.DispatcherCfg().Enabled {
go startDispatcherService(internalDispatcherSChan,
internalRaterChan, cacheS, dm, server, exitChan)
}

View File

@@ -159,6 +159,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.supplierSCfg = new(SupplierSCfg)
cfg.sureTaxCfg = new(SureTaxCfg)
cfg.dispatcherSCfg = new(DispatcherSCfg)
cfg.dispatcherCfg = new(DispatcherCfg)
cfg.loaderCgrCfg = new(LoaderCgrCfg)
cfg.migratorCgrCfg = new(MigratorCgrCfg)
cfg.mailerCfg = new(MailerCfg)
@@ -305,6 +306,7 @@ type CGRConfig struct {
supplierSCfg *SupplierSCfg // SupplierS config
sureTaxCfg *SureTaxCfg // SureTax config
dispatcherSCfg *DispatcherSCfg // DispatcherS config
dispatcherCfg *DispatcherCfg // Dispatcher config
loaderCgrCfg *LoaderCgrCfg // LoaderCgr config
migratorCgrCfg *MigratorCgrCfg // MigratorCgr config
mailerCfg *MailerCfg // Mailer config
@@ -701,11 +703,11 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
// DispaterS checks
if self.dispatcherSCfg.Enabled {
if self.dispatcherCfg.Enabled {
if !utils.IsSliceMember([]string{utils.MetaFirst, utils.MetaRandom, utils.MetaNext,
utils.MetaBroadcast}, self.dispatcherSCfg.DispatchingStrategy) {
utils.MetaBroadcast}, self.dispatcherCfg.DispatchingStrategy) {
return fmt.Errorf("<%s> unsupported dispatching strategy %s",
utils.DispatcherS, self.dispatcherSCfg.DispatchingStrategy)
utils.DispatcherS, self.dispatcherCfg.DispatchingStrategy)
}
}
// Scheduler check connection with CDR Server
@@ -937,11 +939,19 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
return err
}
jsnDispatcherCfg, err := jsnCfg.DispatcherSJsonCfg()
jsnDispatcherCfg, err := jsnCfg.DispatcherJsonCfg()
if err != nil {
return err
}
if self.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil {
if self.dispatcherCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil {
return err
}
jsnDispatcherSCfg, err := jsnCfg.DispatcherSJsonCfg()
if err != nil {
return err
}
if self.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherSCfg); err != nil {
return err
}
@@ -1157,6 +1167,10 @@ func (cfg *CGRConfig) LoaderCfg() []*LoaderSCfg {
return cfg.loaderCfg
}
func (cfg *CGRConfig) DispatcherCfg() *DispatcherCfg {
return cfg.dispatcherCfg
}
func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg {
return cfg.dispatcherSCfg
}

View File

@@ -61,7 +61,8 @@ const (
LoaderJson = "loaders"
MAILER_JSN = "mailer"
SURETAX_JSON = "suretax"
DispatcherSJson = "dispatcher"
DispatcherJson = "dispatcher"
DispatcherSJson = "dispatchers"
CgrLoaderCfgJson = "loader"
CgrMigratorCfgJson = "migrator"
ChargerSCfgJson = "chargers"
@@ -452,6 +453,18 @@ func (self CgrJsonCfg) SureTaxJsonCfg() (*SureTaxJsonCfg, error) {
return cfg, nil
}
func (self CgrJsonCfg) DispatcherJsonCfg() (*DispatcherJsonCfg, error) {
rawCfg, hasKey := self[DispatcherJson]
if !hasKey {
return nil, nil
}
cfg := new(DispatcherJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}
func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) {
rawCfg, hasKey := self[DispatcherSJson]
if !hasKey {

View File

@@ -1379,8 +1379,8 @@ func TestDfHttpJsonCfg(t *testing.T) {
}
}
func TestDfDispatcherSJsonCfg(t *testing.T) {
eCfg := &DispatcherSJsonCfg{
func TestDfDispatcherJsonCfg(t *testing.T) {
eCfg := &DispatcherJsonCfg{
Enabled: utils.BoolPointer(false),
Rals_conns: &[]*HaPoolJsonCfg{},
Resources_conns: &[]*HaPoolJsonCfg{},
@@ -1392,6 +1392,31 @@ func TestDfDispatcherSJsonCfg(t *testing.T) {
Chargers_conns: &[]*HaPoolJsonCfg{},
Dispatching_strategy: utils.StringPointer(utils.MetaFirst),
}
if cfg, err := dfCgrJsonCfg.DispatcherJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg))
}
}
func TestDfDispatcherSJsonCfg(t *testing.T) {
eCfg := &DispatcherSJsonCfg{
Enabled: utils.BoolPointer(false),
Conns: &map[string]*[]*HaPoolJsonCfg{
"sessions_eu": &[]*HaPoolJsonCfg{
{Address: utils.StringPointer("127.0.0.1:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
{Address: utils.StringPointer("127.0.0.2:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
},
"sessions_us": &[]*HaPoolJsonCfg{
{Address: utils.StringPointer("127.0.0.3:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
{Address: utils.StringPointer("127.0.0.4:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
},
"sessions_others": &[]*HaPoolJsonCfg{
{Address: utils.StringPointer("127.0.0.5:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
{Address: utils.StringPointer("127.0.0.6:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)},
},
},
}
if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {

View File

@@ -1453,8 +1453,8 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) {
}
}
func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
eDspSCfg := &DispatcherSCfg{
func TestCgrCfgJSONDefaultDispatcherCfg(t *testing.T) {
eDspSCfg := &DispatcherCfg{
Enabled: false,
RALsConns: []*HaPoolConfig{},
ResSConns: []*HaPoolConfig{},
@@ -1466,6 +1466,29 @@ func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
ChargerSConns: []*HaPoolConfig{},
DispatchingStrategy: utils.MetaFirst,
}
if !reflect.DeepEqual(cgrCfg.dispatcherCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)
}
}
func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
eDspSCfg := &DispatcherSCfg{
Enabled: false,
Conns: map[string][]*HaPoolConfig{
"sessions_eu": []*HaPoolConfig{
{Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc},
{Address: "127.0.0.2:2012", Transport: utils.MetaJSONrpc},
},
"sessions_us": []*HaPoolConfig{
{Address: "127.0.0.3:2012", Transport: utils.MetaJSONrpc},
{Address: "127.0.0.4:2012", Transport: utils.MetaJSONrpc},
},
"sessions_others": []*HaPoolConfig{
{Address: "127.0.0.5:2012", Transport: utils.MetaJSONrpc},
{Address: "127.0.0.6:2012", Transport: utils.MetaJSONrpc},
},
},
}
if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)
}

View File

@@ -19,7 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherSCfg struct {
type DispatcherCfg struct {
Enabled bool
RALsConns []*HaPoolConfig
ResSConns []*HaPoolConfig
@@ -32,7 +32,7 @@ type DispatcherSCfg struct {
DispatchingStrategy string
}
func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
func (dps *DispatcherCfg) loadFromJsonCfg(jsnCfg *DispatcherJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
@@ -100,3 +100,33 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro
}
return nil
}
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherSCfg struct {
Enabled bool
Conns map[string][]*HaPoolConfig
}
func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
dps.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Conns != nil {
dps.Conns = make(map[string][]*HaPoolConfig, len(*jsnCfg.Conns))
for id, conns := range *jsnCfg.Conns {
if conns == nil {
continue
}
Conns := make([]*HaPoolConfig, len(*conns))
for idx, jsnHaCfg := range *conns {
Conns[idx] = NewDfltHaPoolConfig()
Conns[idx].loadFromJsonCfg(jsnHaCfg)
}
dps.Conns[id] = Conns
}
}
return nil
}

View File

@@ -535,7 +535,7 @@ type SureTaxJsonCfg struct {
}
// Dispatcher service config section
type DispatcherSJsonCfg struct {
type DispatcherJsonCfg struct {
Enabled *bool
Rals_conns *[]*HaPoolJsonCfg
Resources_conns *[]*HaPoolJsonCfg
@@ -548,6 +548,11 @@ type DispatcherSJsonCfg struct {
Dispatching_strategy *string
}
type DispatcherSJsonCfg struct {
Enabled *bool
Conns *map[string]*[]*HaPoolJsonCfg
}
type LoaderCfgJson struct {
Tpid *string
Data_path *string