diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d09ec98dd..88d41988a 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -576,7 +576,20 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm * // startAttributeService fires up the AttributeS func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager, - server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS, + internalSupplierSChan chan rpcclient.RpcClientConnection) { + var err error + var sppSConn *rpcclient.RpcClientPool + if len(cfg.AttributeSCfg().SupplierSConns) != 0 { // Stats connection init + sppSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.AttributeSCfg().SupplierSConns, internalSupplierSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf(" Could not connect to SupplierS: %s", err.Error())) + exitChan <- true + return + } + } filterS := <-filterSChan filterSChan <- filterS <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) @@ -584,7 +597,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec aS, err := engine.NewAttributeService(dm, filterS, cfg.AttributeSCfg().StringIndexedFields, cfg.AttributeSCfg().PrefixIndexedFields, - cfg.AttributeSCfg().ProcessRuns) + cfg.AttributeSCfg().ProcessRuns, sppSConn) if err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> Could not init, error: %s", @@ -1299,7 +1312,7 @@ func main() { if cfg.AttributeSCfg().Enabled { go startAttributeService(internalAttributeSChan, cacheS, - cfg, dm, server, exitChan, filterSChan) + cfg, dm, server, exitChan, filterSChan, internalSupplierSChan) } if cfg.ChargerSCfg().Enabled { go startChargerService(internalChargerSChan, cacheS, internalAttributeSChan, diff --git a/config/attributescfg.go b/config/attributescfg.go index 477d26b46..aebdccd0b 100644 --- a/config/attributescfg.go +++ b/config/attributescfg.go @@ -24,6 +24,7 @@ type AttributeSCfg struct { StringIndexedFields *[]string PrefixIndexedFields *[]string ProcessRuns int + SupplierSConns []*HaPoolConfig } func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error) { @@ -50,5 +51,12 @@ func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error) if jsnCfg.Process_runs != nil { alS.ProcessRuns = *jsnCfg.Process_runs } + if jsnCfg.Suppliers_conns != nil { + alS.SupplierSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) + for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { + alS.SupplierSConns[idx] = NewDfltHaPoolConfig() + alS.SupplierSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } return } diff --git a/config/config_defaults.go b/config/config_defaults.go index 96b162379..9fa50985a 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -431,6 +431,7 @@ const CGRATES_CFG_JSON = ` //"string_indexed_fields": [], // query indexes based on these fields for faster processing "prefix_indexed_fields": [], // query indexes based on these fields for faster processing "process_runs": 1, // number of run loops when processing event + "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 651a2f157..ccf659855 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -706,6 +706,7 @@ func TestDfAttributeServJsonCfg(t *testing.T) { String_indexed_fields: nil, Prefix_indexed_fields: &[]string{}, Process_runs: utils.IntPointer(1), + Suppliers_conns: &[]*HaPoolJsonCfg{}, } if cfg, err := dfCgrJsonCfg.AttributeServJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 5fe044144..3e399cea5 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -806,6 +806,7 @@ func TestCgrCfgJSONDefaultSAttributeSCfg(t *testing.T) { StringIndexedFields: nil, PrefixIndexedFields: &[]string{}, ProcessRuns: 1, + SupplierSConns: []*HaPoolConfig{}, } if !reflect.DeepEqual(eAliasSCfg, cgrCfg.attributeSCfg) { t.Errorf("received: %+v, expecting: %+v", eAliasSCfg, cgrCfg.attributeSCfg) diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 401096bf3..ec8acc076 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -428,6 +428,7 @@ type AttributeSJsonCfg struct { String_indexed_fields *[]string Prefix_indexed_fields *[]string Process_runs *int + Suppliers_conns *[]*HaPoolJsonCfg } // ChargerSJsonCfg service config section diff --git a/config/supplierscfg.go b/config/supplierscfg.go index 6800eca70..945432e02 100644 --- a/config/supplierscfg.go +++ b/config/supplierscfg.go @@ -18,7 +18,6 @@ along with this program. If not, see package config -// // SupplierSCfg is the configuration of supplier service type SupplierSCfg struct { Enabled bool diff --git a/engine/attributes.go b/engine/attributes.go index 8a012c38d..9e1cfb1cb 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -20,17 +20,23 @@ package engine import ( "fmt" + "reflect" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func NewAttributeService(dm *DataManager, filterS *FilterS, stringIndexedFields, prefixIndexedFields *[]string, - processRuns int) (*AttributeService, error) { + processRuns int, sppS rpcclient.RpcClientConnection) (*AttributeService, error) { + if sppS != nil && reflect.ValueOf(sppS).IsNil() { // fix nil value in interface + sppS = nil + } return &AttributeService{dm: dm, filterS: filterS, stringIndexedFields: stringIndexedFields, prefixIndexedFields: prefixIndexedFields, - processRuns: processRuns}, nil + processRuns: processRuns, + sppS: sppS}, nil } type AttributeService struct { @@ -39,6 +45,7 @@ type AttributeService struct { stringIndexedFields *[]string prefixIndexedFields *[]string processRuns int + sppS rpcclient.RpcClientConnection // rpc connection towards SupplierS } // ListenAndServe will initialize the service diff --git a/engine/attributes_test.go b/engine/attributes_test.go index 5bf1f70aa..2c5eef9bf 100644 --- a/engine/attributes_test.go +++ b/engine/attributes_test.go @@ -181,7 +181,7 @@ func TestAttributePopulateAttrService(t *testing.T) { if err != nil { t.Errorf("Error: %+v", err) } - attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1) + attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1, nil) if err != nil { t.Errorf("Error: %+v", err) }