From 657c0422bcbf4b18f7bb265db4ec2004b1b62fdf Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 18 Sep 2018 21:00:49 +0200 Subject: [PATCH] DiameterAgent config - replacing PubSubConns with ThresholdSConns, DiameterProcessor publish_event with threshods_event --- cmd/cgr-engine/cgr-engine.go | 20 +++++++++++--------- config/config.go | 7 ++++--- config/config_defaults.go | 2 +- config/config_json_test.go | 2 +- config/config_test.go | 8 +++++--- config/daconfig.go | 18 +++++++++--------- config/libconfig_json.go | 4 ++-- 7 files changed, 33 insertions(+), 28 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 18dcae1a3..3037e6723 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -278,31 +278,33 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit exitChan <- true } -func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { +func startDiameterAgent(internalSMGChan, internalThdSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { var err error utils.Logger.Info("Starting CGRateS DiameterAgent service") - var smgConn, pubsubConn *rpcclient.RpcClientPool + var smgConn, thdSConn *rpcclient.RpcClientPool if len(cfg.DiameterAgentCfg().SessionSConns) != 0 { smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.DiameterAgentCfg().SessionSConns, internalSMGChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.SessionS, err.Error())) exitChan <- true return } } - if len(cfg.DiameterAgentCfg().PubSubConns) != 0 { - pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, + if len(cfg.DiameterAgentCfg().ThresholdSConns) != 0 { + thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, - cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl) + cfg.DiameterAgentCfg().ThresholdSConns, internalThdSChan, cfg.InternalTtl) if err != nil { - utils.Logger.Crit(fmt.Sprintf(" Could not connect to PubSubS: %s", err.Error())) + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DiameterAgent, utils.ThresholdS, err.Error())) exitChan <- true return } } - da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn) + da, err := agents.NewDiameterAgent(cfg, smgConn, thdSConn) if err != nil { utils.Logger.Err(fmt.Sprintf(" error: %s!", err)) exitChan <- true @@ -1298,7 +1300,7 @@ func main() { } if cfg.DiameterAgentCfg().Enabled { - go startDiameterAgent(internalSMGChan, internalPubSubSChan, exitChan) + go startDiameterAgent(internalSMGChan, internalThresholdSChan, exitChan) } if cfg.RadiusAgentCfg().Enabled { diff --git a/config/config.go b/config/config.go index 133983dc0..ae434a679 100755 --- a/config/config.go +++ b/config/config.go @@ -637,9 +637,10 @@ func (self *CGRConfig) checkConfigSanity() error { return errors.New("SMGeneric not enabled but referenced by DiameterAgent component") } } - for _, daPubSubSConn := range self.diameterAgentCfg.PubSubConns { - if daPubSubSConn.Address == utils.MetaInternal && !self.PubSubServerEnabled { - return errors.New("PubSubS not enabled but requested by DiameterAgent component.") + for _, conn := range self.diameterAgentCfg.ThresholdSConns { + if conn.Address == utils.MetaInternal && !self.ThresholdSCfg().Enabled { + return fmt.Errorf("%s not enabled but requested by %s component.", + utils.ThresholdS, utils.DiameterAgent) } } } diff --git a/config/config_defaults.go b/config/config_defaults.go index 5e1791856..d3b0d9405 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -369,7 +369,7 @@ const CGRATES_CFG_JSON = ` "sessions_conns": [ {"address": "*internal"} // connection towards SessionService ], - "pubsubs_conns": [], // address where to reach the pubusb service, empty to disable pubsub functionality: <""|*internal|x.y.z.y:1234> + "thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234> "create_cdr": true, // create CDR out of CCR terminate and send it to SessionS "cdr_requires_session": true, // only create CDR if there is an active session at terminate "debit_interval": "5m", // interval for CCR updates diff --git a/config/config_json_test.go b/config/config_json_test.go index c24c12f82..d30618aeb 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -602,7 +602,7 @@ func TestDiameterAgentJsonCfg(t *testing.T) { &HaPoolJsonCfg{ Address: utils.StringPointer(utils.MetaInternal), }}, - Pubsubs_conns: &[]*HaPoolJsonCfg{}, + Thresholds_conns: &[]*HaPoolJsonCfg{}, Create_cdr: utils.BoolPointer(true), Cdr_requires_session: utils.BoolPointer(true), Debit_interval: utils.StringPointer("5m"), diff --git a/config/config_test.go b/config/config_test.go index 83ad6bae9..3c0b7f484 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -942,7 +942,7 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) { DictionariesDir: "/usr/share/cgrates/diameter/dict/", SessionSConns: []*HaPoolConfig{ &HaPoolConfig{Address: "*internal"}}, - PubSubConns: []*HaPoolConfig{}, + ThresholdSConns: []*HaPoolConfig{}, CreateCDR: true, DebitInterval: 5 * time.Minute, Timezone: "", @@ -965,8 +965,10 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) { if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns) { t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns) } - if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.PubSubConns, testDA.PubSubConns) { - t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.PubSubConns, testDA.PubSubConns) + if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.ThresholdSConns, + testDA.ThresholdSConns) { + t.Errorf("expecting: %+v, received: %+v", + cgrCfg.diameterAgentCfg.ThresholdSConns, testDA.ThresholdSConns) } if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR) { t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR) diff --git a/config/daconfig.go b/config/daconfig.go index e7daf35a8..b009f7afa 100644 --- a/config/daconfig.go +++ b/config/daconfig.go @@ -29,7 +29,7 @@ type DiameterAgentCfg struct { Listen string // address where to listen for diameter requests DictionariesDir string SessionSConns []*HaPoolConfig // connections towards SMG component - PubSubConns []*HaPoolConfig // connection towards pubsubs + ThresholdSConns []*HaPoolConfig // connection towards pubsubs CreateCDR bool CDRRequiresSession bool DebitInterval time.Duration @@ -61,11 +61,11 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro self.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg) } } - if jsnCfg.Pubsubs_conns != nil { - self.PubSubConns = make([]*HaPoolConfig, len(*jsnCfg.Pubsubs_conns)) - for idx, jsnHaCfg := range *jsnCfg.Pubsubs_conns { - self.PubSubConns[idx] = NewDfltHaPoolConfig() - self.PubSubConns[idx].loadFromJsonCfg(jsnHaCfg) + if jsnCfg.Thresholds_conns != nil { + self.ThresholdSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns)) + for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { + self.ThresholdSConns[idx] = NewDfltHaPoolConfig() + self.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg) } } if jsnCfg.Create_cdr != nil { @@ -121,7 +121,7 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro type DARequestProcessor struct { Id string DryRun bool - PublishEvent bool + ThresholdSEvent bool RequestFilter utils.RSRFields Flags utils.StringMap // Various flags to influence behavior ContinueOnSuccess bool @@ -140,8 +140,8 @@ func (self *DARequestProcessor) loadFromJsonCfg(jsnCfg *DARequestProcessorJsnCfg if jsnCfg.Dry_run != nil { self.DryRun = *jsnCfg.Dry_run } - if jsnCfg.Publish_event != nil { - self.PublishEvent = *jsnCfg.Publish_event + if jsnCfg.Thresholds_event != nil { + self.ThresholdSEvent = *jsnCfg.Thresholds_event } var err error if jsnCfg.Request_filter != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4ef6ecc6f..a27b3747f 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -331,7 +331,7 @@ type DiameterAgentJsonCfg struct { Listen *string // address where to listen for diameter requests Dictionaries_dir *string // path towards additional dictionaries Sessions_conns *[]*HaPoolJsonCfg // Connections towards generic SM - Pubsubs_conns *[]*HaPoolJsonCfg // connection towards pubsubs + Thresholds_conns *[]*HaPoolJsonCfg // connection towards pubsubs Create_cdr *bool Cdr_requires_session *bool Debit_interval *string @@ -347,7 +347,7 @@ type DiameterAgentJsonCfg struct { type DARequestProcessorJsnCfg struct { Id *string Dry_run *bool - Publish_event *bool + Thresholds_event *bool Request_filter *string Flags *[]string Continue_on_success *bool