diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 607d996dc..08330c22c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -973,15 +973,15 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc var err error var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool - cfg.DispatcherSCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherSCfg().DispatchingStrategy, + cfg.DispatcherCfg().DispatchingStrategy = strings.TrimPrefix(cfg.DispatcherCfg().DispatchingStrategy, utils.Meta) // remote * from DispatchingStrategy - if len(cfg.DispatcherSCfg().RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().RALsConns) != 0 { + ralsConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().RALsConns, internalRaterChan, + cfg.DispatcherCfg().RALsConns, internalRaterChan, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error())) @@ -989,13 +989,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().ResSConns) != 0 { - resSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().ResSConns) != 0 { + resSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ResSConns, nil, + cfg.DispatcherCfg().ResSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResoruceS: %s", utils.DispatcherS, err.Error())) @@ -1003,13 +1003,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().ThreshSConns) != 0 { - threshSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().ThreshSConns) != 0 { + threshSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ThreshSConns, nil, + cfg.DispatcherCfg().ThreshSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error())) @@ -1017,13 +1017,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().StatSConns) != 0 { - statSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().StatSConns) != 0 { + statSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().StatSConns, nil, + cfg.DispatcherCfg().StatSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatQueueS: %s", utils.DispatcherS, err.Error())) @@ -1031,13 +1031,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().SupplSConns) != 0 { - suplSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().SupplSConns) != 0 { + suplSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().SupplSConns, nil, + cfg.DispatcherCfg().SupplSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error())) @@ -1045,13 +1045,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().AttrSConns) != 0 { - attrSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().AttrSConns) != 0 { + attrSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().AttrSConns, nil, + cfg.DispatcherCfg().AttrSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error())) @@ -1059,13 +1059,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().SessionSConns) != 0 { - sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().SessionSConns) != 0 { + sessionsSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().SessionSConns, nil, + cfg.DispatcherCfg().SessionSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error())) @@ -1073,13 +1073,13 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc return } } - if len(cfg.DispatcherSCfg().ChargerSConns) != 0 { - chargerSConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, + if len(cfg.DispatcherCfg().ChargerSConns) != 0 { + chargerSConns, err = engine.NewRPCPool(cfg.DispatcherCfg().DispatchingStrategy, cfg.TlsCfg().ClientKey, cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().ChargerSConns, nil, + cfg.DispatcherCfg().ChargerSConns, nil, cfg.GeneralCfg().InternalTtl) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ChargerS: %s", utils.DispatcherS, err.Error())) @@ -1102,31 +1102,31 @@ func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcc exitChan <- true return }() - if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 { + if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherCfg().ThreshSConns) != 0 { server.RpcRegisterName(utils.ThresholdSv1, v1.NewDispatcherThresholdSv1(dspS)) } - if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 { + if !cfg.StatSCfg().Enabled && len(cfg.DispatcherCfg().StatSConns) != 0 { server.RpcRegisterName(utils.StatSv1, v1.NewDispatcherStatSv1(dspS)) } - if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherSCfg().ResSConns) != 0 { + if !cfg.ResourceSCfg().Enabled && len(cfg.DispatcherCfg().ResSConns) != 0 { server.RpcRegisterName(utils.ResourceSv1, v1.NewDispatcherResourceSv1(dspS)) } - if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherSCfg().SupplSConns) != 0 { + if !cfg.SupplierSCfg().Enabled && len(cfg.DispatcherCfg().SupplSConns) != 0 { server.RpcRegisterName(utils.SupplierSv1, v1.NewDispatcherSupplierSv1(dspS)) } - if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherSCfg().AttrSConns) != 0 { + if !cfg.AttributeSCfg().Enabled && len(cfg.DispatcherCfg().AttrSConns) != 0 { server.RpcRegisterName(utils.AttributeSv1, v1.NewDispatcherAttributeSv1(dspS)) } - if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherSCfg().SessionSConns) != 0 { + if !cfg.SessionSCfg().Enabled && len(cfg.DispatcherCfg().SessionSConns) != 0 { server.RpcRegisterName(utils.SessionSv1, v1.NewDispatcherSessionSv1(dspS)) } - if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherSCfg().ChargerSConns) != 0 { + if !cfg.ChargerSCfg().Enabled && len(cfg.DispatcherCfg().ChargerSConns) != 0 { server.RpcRegisterName(utils.ChargerSv1, v1.NewDispatcherChargerSv1(dspS)) } @@ -1604,7 +1604,7 @@ func main() { internalRsChan, internalStatSChan, cfg, dm, server, exitChan, filterSChan, internalAttributeSChan) } - if cfg.DispatcherSCfg().Enabled { + if cfg.DispatcherCfg().Enabled { go startDispatcherService(internalDispatcherSChan, internalRaterChan, cacheS, dm, server, exitChan) } diff --git a/config/config.go b/config/config.go index b3d75d8a1..f4f10c64c 100755 --- a/config/config.go +++ b/config/config.go @@ -159,6 +159,7 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { cfg.supplierSCfg = new(SupplierSCfg) cfg.sureTaxCfg = new(SureTaxCfg) cfg.dispatcherSCfg = new(DispatcherSCfg) + cfg.dispatcherCfg = new(DispatcherCfg) cfg.loaderCgrCfg = new(LoaderCgrCfg) cfg.migratorCgrCfg = new(MigratorCgrCfg) cfg.mailerCfg = new(MailerCfg) @@ -305,6 +306,7 @@ type CGRConfig struct { supplierSCfg *SupplierSCfg // SupplierS config sureTaxCfg *SureTaxCfg // SureTax config dispatcherSCfg *DispatcherSCfg // DispatcherS config + dispatcherCfg *DispatcherCfg // Dispatcher config loaderCgrCfg *LoaderCgrCfg // LoaderCgr config migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config @@ -701,11 +703,11 @@ func (self *CGRConfig) checkConfigSanity() error { } } // DispaterS checks - if self.dispatcherSCfg.Enabled { + if self.dispatcherCfg.Enabled { if !utils.IsSliceMember([]string{utils.MetaFirst, utils.MetaRandom, utils.MetaNext, - utils.MetaBroadcast}, self.dispatcherSCfg.DispatchingStrategy) { + utils.MetaBroadcast}, self.dispatcherCfg.DispatchingStrategy) { return fmt.Errorf("<%s> unsupported dispatching strategy %s", - utils.DispatcherS, self.dispatcherSCfg.DispatchingStrategy) + utils.DispatcherS, self.dispatcherCfg.DispatchingStrategy) } } // Scheduler check connection with CDR Server @@ -937,11 +939,19 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) { return err } - jsnDispatcherCfg, err := jsnCfg.DispatcherSJsonCfg() + jsnDispatcherCfg, err := jsnCfg.DispatcherJsonCfg() if err != nil { return err } - if self.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil { + if self.dispatcherCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil { + return err + } + + jsnDispatcherSCfg, err := jsnCfg.DispatcherSJsonCfg() + if err != nil { + return err + } + if self.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherSCfg); err != nil { return err } @@ -1157,6 +1167,10 @@ func (cfg *CGRConfig) LoaderCfg() []*LoaderSCfg { return cfg.loaderCfg } +func (cfg *CGRConfig) DispatcherCfg() *DispatcherCfg { + return cfg.dispatcherCfg +} + func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg { return cfg.dispatcherSCfg } diff --git a/config/config_json.go b/config/config_json.go index 45deab1c7..6bdaa79bd 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -61,7 +61,8 @@ const ( LoaderJson = "loaders" MAILER_JSN = "mailer" SURETAX_JSON = "suretax" - DispatcherSJson = "dispatcher" + DispatcherJson = "dispatcher" + DispatcherSJson = "dispatchers" CgrLoaderCfgJson = "loader" CgrMigratorCfgJson = "migrator" ChargerSCfgJson = "chargers" @@ -452,6 +453,18 @@ func (self CgrJsonCfg) SureTaxJsonCfg() (*SureTaxJsonCfg, error) { return cfg, nil } +func (self CgrJsonCfg) DispatcherJsonCfg() (*DispatcherJsonCfg, error) { + rawCfg, hasKey := self[DispatcherJson] + if !hasKey { + return nil, nil + } + cfg := new(DispatcherJsonCfg) + if err := json.Unmarshal(*rawCfg, cfg); err != nil { + return nil, err + } + return cfg, nil +} + func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) { rawCfg, hasKey := self[DispatcherSJson] if !hasKey { diff --git a/config/config_json_test.go b/config/config_json_test.go index 03b9fa39c..2076fe594 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1379,8 +1379,8 @@ func TestDfHttpJsonCfg(t *testing.T) { } } -func TestDfDispatcherSJsonCfg(t *testing.T) { - eCfg := &DispatcherSJsonCfg{ +func TestDfDispatcherJsonCfg(t *testing.T) { + eCfg := &DispatcherJsonCfg{ Enabled: utils.BoolPointer(false), Rals_conns: &[]*HaPoolJsonCfg{}, Resources_conns: &[]*HaPoolJsonCfg{}, @@ -1392,6 +1392,31 @@ func TestDfDispatcherSJsonCfg(t *testing.T) { Chargers_conns: &[]*HaPoolJsonCfg{}, Dispatching_strategy: utils.StringPointer(utils.MetaFirst), } + if cfg, err := dfCgrJsonCfg.DispatcherJsonCfg(); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eCfg, cfg) { + t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg)) + } +} + +func TestDfDispatcherSJsonCfg(t *testing.T) { + eCfg := &DispatcherSJsonCfg{ + Enabled: utils.BoolPointer(false), + Conns: &map[string]*[]*HaPoolJsonCfg{ + "sessions_eu": &[]*HaPoolJsonCfg{ + {Address: utils.StringPointer("127.0.0.1:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + {Address: utils.StringPointer("127.0.0.2:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + }, + "sessions_us": &[]*HaPoolJsonCfg{ + {Address: utils.StringPointer("127.0.0.3:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + {Address: utils.StringPointer("127.0.0.4:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + }, + "sessions_others": &[]*HaPoolJsonCfg{ + {Address: utils.StringPointer("127.0.0.5:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + {Address: utils.StringPointer("127.0.0.6:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, + }, + }, + } if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { diff --git a/config/config_test.go b/config/config_test.go index 0173d2f64..a56112d59 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1453,8 +1453,8 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) { } } -func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { - eDspSCfg := &DispatcherSCfg{ +func TestCgrCfgJSONDefaultDispatcherCfg(t *testing.T) { + eDspSCfg := &DispatcherCfg{ Enabled: false, RALsConns: []*HaPoolConfig{}, ResSConns: []*HaPoolConfig{}, @@ -1466,6 +1466,29 @@ func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { ChargerSConns: []*HaPoolConfig{}, DispatchingStrategy: utils.MetaFirst, } + if !reflect.DeepEqual(cgrCfg.dispatcherCfg, eDspSCfg) { + t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) + } +} + +func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { + eDspSCfg := &DispatcherSCfg{ + Enabled: false, + Conns: map[string][]*HaPoolConfig{ + "sessions_eu": []*HaPoolConfig{ + {Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc}, + {Address: "127.0.0.2:2012", Transport: utils.MetaJSONrpc}, + }, + "sessions_us": []*HaPoolConfig{ + {Address: "127.0.0.3:2012", Transport: utils.MetaJSONrpc}, + {Address: "127.0.0.4:2012", Transport: utils.MetaJSONrpc}, + }, + "sessions_others": []*HaPoolConfig{ + {Address: "127.0.0.5:2012", Transport: utils.MetaJSONrpc}, + {Address: "127.0.0.6:2012", Transport: utils.MetaJSONrpc}, + }, + }, + } if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) } diff --git a/config/dispatchercfg.go b/config/dispatchercfg.go index 6904a1f6f..3ae63e115 100755 --- a/config/dispatchercfg.go +++ b/config/dispatchercfg.go @@ -19,7 +19,7 @@ along with this program. If not, see package config // DispatcherSCfg is the configuration of dispatcher service -type DispatcherSCfg struct { +type DispatcherCfg struct { Enabled bool RALsConns []*HaPoolConfig ResSConns []*HaPoolConfig @@ -32,7 +32,7 @@ type DispatcherSCfg struct { DispatchingStrategy string } -func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) { +func (dps *DispatcherCfg) loadFromJsonCfg(jsnCfg *DispatcherJsonCfg) (err error) { if jsnCfg == nil { return nil } @@ -100,3 +100,33 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro } return nil } + +// DispatcherSCfg is the configuration of dispatcher service +type DispatcherSCfg struct { + Enabled bool + Conns map[string][]*HaPoolConfig +} + +func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) { + if jsnCfg == nil { + return nil + } + if jsnCfg.Enabled != nil { + dps.Enabled = *jsnCfg.Enabled + } + if jsnCfg.Conns != nil { + dps.Conns = make(map[string][]*HaPoolConfig, len(*jsnCfg.Conns)) + for id, conns := range *jsnCfg.Conns { + if conns == nil { + continue + } + Conns := make([]*HaPoolConfig, len(*conns)) + for idx, jsnHaCfg := range *conns { + Conns[idx] = NewDfltHaPoolConfig() + Conns[idx].loadFromJsonCfg(jsnHaCfg) + } + dps.Conns[id] = Conns + } + } + return nil +} diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 899a0cc96..495b53e39 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -535,7 +535,7 @@ type SureTaxJsonCfg struct { } // Dispatcher service config section -type DispatcherSJsonCfg struct { +type DispatcherJsonCfg struct { Enabled *bool Rals_conns *[]*HaPoolJsonCfg Resources_conns *[]*HaPoolJsonCfg @@ -548,6 +548,11 @@ type DispatcherSJsonCfg struct { Dispatching_strategy *string } +type DispatcherSJsonCfg struct { + Enabled *bool + Conns *map[string]*[]*HaPoolJsonCfg +} + type LoaderCfgJson struct { Tpid *string Data_path *string