Add config for dispatcher

This commit is contained in:
TeoV
2018-04-10 10:47:20 -04:00
committed by Dan Christian Bogos
parent 492f703d43
commit a5f77d5a8b
10 changed files with 154 additions and 4 deletions

View File

@@ -337,7 +337,7 @@ variable_rtp_audio_out_dtmf_packet_count: 0
variable_rtp_audio_out_cng_packet_count: 0
variable_rtp_audio_rtcp_packet_count: 1450
variable_rtp_audio_rtcp_octet_count: 45940
variable_cgr_subsystems: *resources%3B*attributes%3B*sessions%3B*suppliers%3B*suppliers_event_cost%3B*suppliers_ignore_errors%3B*accounts`
variable_cgr_subsystems: *resources%3B*attributes%3B*sessions%3B*suppliers_event_cost%3B*suppliers_ignore_errors%3B*accounts`
func TestEventCreation(t *testing.T) {
body := `Event-Name: RE_SCHEDULE

View File

@@ -1007,6 +1007,14 @@ func main() {
cfg, dm, server, exitChan, filterSChan)
}
if cfg.DispatcherSCfg().Enabled {
/*
go startDispatcherService(internalSupplierSChan, cacheS,
internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan)
*/
}
go loaderService(cacheS, cfg, dm, server, exitChan)
// Serve rpc connections

View File

@@ -144,6 +144,9 @@ func NewDefaultCGRConfig() (*CGRConfig, error) {
cfg.diameterAgentCfg = new(DiameterAgentCfg)
cfg.radiusAgentCfg = new(RadiusAgentCfg)
cfg.filterSCfg = new(FilterSCfg)
cfg.dispatcherSCfg = &DispatcherSCfg{
Enabled: true,
}
cfg.ConfigReloads = make(map[string]chan struct{})
cfg.ConfigReloads[utils.CDRC] = make(chan struct{}, 1)
cfg.ConfigReloads[utils.CDRC] <- struct{}{} // Unlock the channel
@@ -346,6 +349,7 @@ type CGRConfig struct {
thresholdSCfg *ThresholdSCfg // configuration for ThresholdS
supplierSCfg *SupplierSCfg // configuration for SupplierS
loaderCfg []*LoaderConfig // configuration for Loader
dispatcherSCfg *DispatcherSCfg // configuration for Dispatcher
MailerServer string // The server to use when sending emails out
MailerAuthUser string // Authenticate to email server using this user
MailerAuthPass string // Authenticate to email server with this password
@@ -657,7 +661,9 @@ func (self *CGRConfig) checkConfigSanity() error {
}
}
}
// DispaterS checks
if self.dispatcherSCfg != nil && self.dispatcherSCfg.Enabled {
}
return nil
}
@@ -816,6 +822,11 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
return err
}
jsnDispatcherCfg, err := jsnCfg.DispatcherSJsonCfg()
if err != nil {
return err
}
if jsnDataDbCfg != nil {
if jsnDataDbCfg.Db_type != nil {
self.DataDbType = *jsnDataDbCfg.Db_type
@@ -1349,6 +1360,15 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
}
}
if jsnDispatcherCfg != nil {
if self.dispatcherSCfg == nil {
self.dispatcherSCfg = new(DispatcherSCfg)
}
if self.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil {
return err
}
}
return nil
}

View File

@@ -64,6 +64,7 @@ const (
LoaderJson = "loaders"
MAILER_JSN = "mailer"
SURETAX_JSON = "suretax"
DispatcherSJson = "dispatcher"
)
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
@@ -436,3 +437,15 @@ func (self CgrJsonCfg) SureTaxJsonCfg() (*SureTaxJsonCfg, error) {
}
return cfg, nil
}
func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) {
rawCfg, hasKey := self[DispatcherSJson]
if !hasKey {
return nil, nil
}
cfg := new(DispatcherSJsonCfg)
if err := json.Unmarshal(*rawCfg, cfg); err != nil {
return nil, err
}
return cfg, nil
}

View File

@@ -960,3 +960,17 @@ func TestDfHttpJsonCfg(t *testing.T) {
t.Error("Received: ", cfg)
}
}
// Will be activated after finish config struct
/*
func TestDfDispatcherSJsonCfg(t *testing.T) {
eCfg := &DispatcherSJsonCfg{
Enabled: utils.BoolPointer(true),
}
if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Errorf("expecting: %+v, received: %+v", eCfg, cfg)
}
}
*/

View File

@@ -287,13 +287,11 @@ func TestCgrCfgJSONDefaultsStorDB(t *testing.T) {
}
func TestCgrCfgJSONDefaultsRALs(t *testing.T) {
//asd
eHaPoolcfg := []*HaPoolConfig{}
if cgrCfg.RALsEnabled != false {
t.Error(cgrCfg.RALsEnabled)
}
if !reflect.DeepEqual(cgrCfg.RALsThresholdSConns, eHaPoolcfg) {
t.Error(cgrCfg.RALsThresholdSConns)
}
@@ -988,3 +986,12 @@ func TestLoaderDefaults(t *testing.T) {
t.Errorf("received: %+v, expecting: %+v", eCfg, cgrCfg.loaderCfg)
}
}
func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
eDspSCfg := &DispatcherSCfg{
Enabled: true,
}
if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)
}
}

34
config/dispatchercfg.go Executable file
View File

@@ -0,0 +1,34 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
// DispatcherSCfg is the configuration of dispatcher service
type DispatcherSCfg struct {
Enabled bool
}
func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
dps.Enabled = *jsnCfg.Enabled
}
return nil
}

View File

@@ -485,3 +485,8 @@ type SureTaxJsonCfg struct {
Sales_type_code *string
Tax_exemption_code_list *string
}
// Dispatcher service config section
type DispatcherSJsonCfg struct {
Enabled *bool
}

48
dispatcher/dispatcher.go Executable file
View File

@@ -0,0 +1,48 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatcher
import (
"github.com/cgrates/cgrates/utils"
)
// NewDispatcherService initializes a DispatcherService
func NewDispatcherService() (dspS *DispatcherService, err error) {
dspS = &DispatcherService{}
return
}
// DispatcherService is the service handling dispatcher
type DispatcherService struct {
}
// ListenAndServe will initialize the service
func (spS *DispatcherService) ListenAndServe(exitChan chan bool) error {
utils.Logger.Info("Starting Dispatcher Service")
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
}
// Shutdown is called to shutdown the service
func (spS *DispatcherService) Shutdown() error {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherS))
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherS))
return nil
}

View File

@@ -559,6 +559,7 @@ const (
StatService = "StatS"
FilterS = "FilterS"
ThresholdS = "ThresholdS"
DispatcherS = "DispatcherS"
)
//Migrator Metas