Add Connection from AttributeS to SupplierS

This commit is contained in:
TeoV
2018-07-24 05:03:34 -04:00
committed by Dan Christian Bogos
parent 1a94f4fe38
commit a6cac1de00
9 changed files with 38 additions and 7 deletions

View File

@@ -576,7 +576,20 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm *
// startAttributeService fires up the AttributeS
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS,
internalSupplierSChan chan rpcclient.RpcClientConnection) {
var err error
var sppSConn *rpcclient.RpcClientPool
if len(cfg.AttributeSCfg().SupplierSConns) != 0 { // Stats connection init
sppSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.AttributeSCfg().SupplierSConns, internalSupplierSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<AttributeS> Could not connect to SupplierS: %s", err.Error()))
exitChan <- true
return
}
}
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
@@ -584,7 +597,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
aS, err := engine.NewAttributeService(dm, filterS,
cfg.AttributeSCfg().StringIndexedFields,
cfg.AttributeSCfg().PrefixIndexedFields,
cfg.AttributeSCfg().ProcessRuns)
cfg.AttributeSCfg().ProcessRuns, sppSConn)
if err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> Could not init, error: %s",
@@ -1299,7 +1312,7 @@ func main() {
if cfg.AttributeSCfg().Enabled {
go startAttributeService(internalAttributeSChan, cacheS,
cfg, dm, server, exitChan, filterSChan)
cfg, dm, server, exitChan, filterSChan, internalSupplierSChan)
}
if cfg.ChargerSCfg().Enabled {
go startChargerService(internalChargerSChan, cacheS, internalAttributeSChan,

View File

@@ -24,6 +24,7 @@ type AttributeSCfg struct {
StringIndexedFields *[]string
PrefixIndexedFields *[]string
ProcessRuns int
SupplierSConns []*HaPoolConfig
}
func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error) {
@@ -50,5 +51,12 @@ func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error)
if jsnCfg.Process_runs != nil {
alS.ProcessRuns = *jsnCfg.Process_runs
}
if jsnCfg.Suppliers_conns != nil {
alS.SupplierSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {
alS.SupplierSConns[idx] = NewDfltHaPoolConfig()
alS.SupplierSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
return
}

View File

@@ -431,6 +431,7 @@ const CGRATES_CFG_JSON = `
//"string_indexed_fields": [], // query indexes based on these fields for faster processing
"prefix_indexed_fields": [], // query indexes based on these fields for faster processing
"process_runs": 1, // number of run loops when processing event
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
},

View File

@@ -706,6 +706,7 @@ func TestDfAttributeServJsonCfg(t *testing.T) {
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},
Process_runs: utils.IntPointer(1),
Suppliers_conns: &[]*HaPoolJsonCfg{},
}
if cfg, err := dfCgrJsonCfg.AttributeServJsonCfg(); err != nil {
t.Error(err)

View File

@@ -806,6 +806,7 @@ func TestCgrCfgJSONDefaultSAttributeSCfg(t *testing.T) {
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},
ProcessRuns: 1,
SupplierSConns: []*HaPoolConfig{},
}
if !reflect.DeepEqual(eAliasSCfg, cgrCfg.attributeSCfg) {
t.Errorf("received: %+v, expecting: %+v", eAliasSCfg, cgrCfg.attributeSCfg)

View File

@@ -428,6 +428,7 @@ type AttributeSJsonCfg struct {
String_indexed_fields *[]string
Prefix_indexed_fields *[]string
Process_runs *int
Suppliers_conns *[]*HaPoolJsonCfg
}
// ChargerSJsonCfg service config section

View File

@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
//
// SupplierSCfg is the configuration of supplier service
type SupplierSCfg struct {
Enabled bool

View File

@@ -20,17 +20,23 @@ package engine
import (
"fmt"
"reflect"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewAttributeService(dm *DataManager, filterS *FilterS,
stringIndexedFields, prefixIndexedFields *[]string,
processRuns int) (*AttributeService, error) {
processRuns int, sppS rpcclient.RpcClientConnection) (*AttributeService, error) {
if sppS != nil && reflect.ValueOf(sppS).IsNil() { // fix nil value in interface
sppS = nil
}
return &AttributeService{dm: dm, filterS: filterS,
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
processRuns: processRuns}, nil
processRuns: processRuns,
sppS: sppS}, nil
}
type AttributeService struct {
@@ -39,6 +45,7 @@ type AttributeService struct {
stringIndexedFields *[]string
prefixIndexedFields *[]string
processRuns int
sppS rpcclient.RpcClientConnection // rpc connection towards SupplierS
}
// ListenAndServe will initialize the service

View File

@@ -181,7 +181,7 @@ func TestAttributePopulateAttrService(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1)
attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1, nil)
if err != nil {
t.Errorf("Error: %+v", err)
}