DiameterAgent config - replacing PubSubConns with ThresholdSConns, DiameterProcessor publish_event with threshods_event

This commit is contained in:
DanB
2018-09-18 21:00:49 +02:00
parent b418cd5303
commit 657c0422bc
7 changed files with 33 additions and 28 deletions

View File

@@ -278,31 +278,33 @@ func startAsteriskAgent(internalSMGChan chan rpcclient.RpcClientConnection, exit
exitChan <- true
}
func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
func startDiameterAgent(internalSMGChan, internalThdSChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
var err error
utils.Logger.Info("Starting CGRateS DiameterAgent service")
var smgConn, pubsubConn *rpcclient.RpcClientPool
var smgConn, thdSConn *rpcclient.RpcClientPool
if len(cfg.DiameterAgentCfg().SessionSConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DiameterAgentCfg().SessionSConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DiameterAgent, utils.SessionS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DiameterAgentCfg().PubSubConns) != 0 {
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
if len(cfg.DiameterAgentCfg().ThresholdSConns) != 0 {
thdSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl)
cfg.DiameterAgentCfg().ThresholdSConns, internalThdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DiameterAgent, utils.ThresholdS, err.Error()))
exitChan <- true
return
}
}
da, err := agents.NewDiameterAgent(cfg, smgConn, pubsubConn)
da, err := agents.NewDiameterAgent(cfg, smgConn, thdSConn)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<DiameterAgent> error: %s!", err))
exitChan <- true
@@ -1298,7 +1300,7 @@ func main() {
}
if cfg.DiameterAgentCfg().Enabled {
go startDiameterAgent(internalSMGChan, internalPubSubSChan, exitChan)
go startDiameterAgent(internalSMGChan, internalThresholdSChan, exitChan)
}
if cfg.RadiusAgentCfg().Enabled {

View File

@@ -637,9 +637,10 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("SMGeneric not enabled but referenced by DiameterAgent component")
}
}
for _, daPubSubSConn := range self.diameterAgentCfg.PubSubConns {
if daPubSubSConn.Address == utils.MetaInternal && !self.PubSubServerEnabled {
return errors.New("PubSubS not enabled but requested by DiameterAgent component.")
for _, conn := range self.diameterAgentCfg.ThresholdSConns {
if conn.Address == utils.MetaInternal && !self.ThresholdSCfg().Enabled {
return fmt.Errorf("%s not enabled but requested by %s component.",
utils.ThresholdS, utils.DiameterAgent)
}
}
}

View File

@@ -369,7 +369,7 @@ const CGRATES_CFG_JSON = `
"sessions_conns": [
{"address": "*internal"} // connection towards SessionService
],
"pubsubs_conns": [], // address where to reach the pubusb service, empty to disable pubsub functionality: <""|*internal|x.y.z.y:1234>
"thresholds_conns": [], // address where to reach the thresholds service, empty to disable thresholds functionality: <""|*internal|x.y.z.y:1234>
"create_cdr": true, // create CDR out of CCR terminate and send it to SessionS
"cdr_requires_session": true, // only create CDR if there is an active session at terminate
"debit_interval": "5m", // interval for CCR updates

View File

@@ -602,7 +602,7 @@ func TestDiameterAgentJsonCfg(t *testing.T) {
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
}},
Pubsubs_conns: &[]*HaPoolJsonCfg{},
Thresholds_conns: &[]*HaPoolJsonCfg{},
Create_cdr: utils.BoolPointer(true),
Cdr_requires_session: utils.BoolPointer(true),
Debit_interval: utils.StringPointer("5m"),

View File

@@ -942,7 +942,7 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
DictionariesDir: "/usr/share/cgrates/diameter/dict/",
SessionSConns: []*HaPoolConfig{
&HaPoolConfig{Address: "*internal"}},
PubSubConns: []*HaPoolConfig{},
ThresholdSConns: []*HaPoolConfig{},
CreateCDR: true,
DebitInterval: 5 * time.Minute,
Timezone: "",
@@ -965,8 +965,10 @@ func TestCgrCfgJSONDefaultsDiameterAgentCfg(t *testing.T) {
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns) {
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.SessionSConns, testDA.SessionSConns)
}
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.PubSubConns, testDA.PubSubConns) {
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.PubSubConns, testDA.PubSubConns)
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.ThresholdSConns,
testDA.ThresholdSConns) {
t.Errorf("expecting: %+v, received: %+v",
cgrCfg.diameterAgentCfg.ThresholdSConns, testDA.ThresholdSConns)
}
if !reflect.DeepEqual(cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR) {
t.Errorf("expecting: %+v, received: %+v", cgrCfg.diameterAgentCfg.CreateCDR, testDA.CreateCDR)

View File

@@ -29,7 +29,7 @@ type DiameterAgentCfg struct {
Listen string // address where to listen for diameter requests <x.y.z.y:1234>
DictionariesDir string
SessionSConns []*HaPoolConfig // connections towards SMG component
PubSubConns []*HaPoolConfig // connection towards pubsubs
ThresholdSConns []*HaPoolConfig // connection towards pubsubs
CreateCDR bool
CDRRequiresSession bool
DebitInterval time.Duration
@@ -61,11 +61,11 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro
self.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Pubsubs_conns != nil {
self.PubSubConns = make([]*HaPoolConfig, len(*jsnCfg.Pubsubs_conns))
for idx, jsnHaCfg := range *jsnCfg.Pubsubs_conns {
self.PubSubConns[idx] = NewDfltHaPoolConfig()
self.PubSubConns[idx].loadFromJsonCfg(jsnHaCfg)
if jsnCfg.Thresholds_conns != nil {
self.ThresholdSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns))
for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns {
self.ThresholdSConns[idx] = NewDfltHaPoolConfig()
self.ThresholdSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Create_cdr != nil {
@@ -121,7 +121,7 @@ func (self *DiameterAgentCfg) loadFromJsonCfg(jsnCfg *DiameterAgentJsonCfg) erro
type DARequestProcessor struct {
Id string
DryRun bool
PublishEvent bool
ThresholdSEvent bool
RequestFilter utils.RSRFields
Flags utils.StringMap // Various flags to influence behavior
ContinueOnSuccess bool
@@ -140,8 +140,8 @@ func (self *DARequestProcessor) loadFromJsonCfg(jsnCfg *DARequestProcessorJsnCfg
if jsnCfg.Dry_run != nil {
self.DryRun = *jsnCfg.Dry_run
}
if jsnCfg.Publish_event != nil {
self.PublishEvent = *jsnCfg.Publish_event
if jsnCfg.Thresholds_event != nil {
self.ThresholdSEvent = *jsnCfg.Thresholds_event
}
var err error
if jsnCfg.Request_filter != nil {

View File

@@ -331,7 +331,7 @@ type DiameterAgentJsonCfg struct {
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
Dictionaries_dir *string // path towards additional dictionaries
Sessions_conns *[]*HaPoolJsonCfg // Connections towards generic SM
Pubsubs_conns *[]*HaPoolJsonCfg // connection towards pubsubs
Thresholds_conns *[]*HaPoolJsonCfg // connection towards pubsubs
Create_cdr *bool
Cdr_requires_session *bool
Debit_interval *string
@@ -347,7 +347,7 @@ type DiameterAgentJsonCfg struct {
type DARequestProcessorJsnCfg struct {
Id *string
Dry_run *bool
Publish_event *bool
Thresholds_event *bool
Request_filter *string
Flags *[]string
Continue_on_success *bool