Remove supplier connection from AttributeS and update check config sanity

This commit is contained in:
TeoV
2018-07-25 03:21:24 -04:00
committed by Dan Christian Bogos
parent 98fd8e1d56
commit 1e584f5297
9 changed files with 11 additions and 38 deletions

View File

@@ -576,20 +576,7 @@ func startUsersServer(internalUserSChan chan rpcclient.RpcClientConnection, dm *
// startAttributeService fires up the AttributeS
func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS,
internalSupplierSChan chan rpcclient.RpcClientConnection) {
var err error
var sppSConn *rpcclient.RpcClientPool
if len(cfg.AttributeSCfg().SupplierSConns) != 0 { // Stats connection init
sppSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.TLSClientKey, cfg.TLSClientCerificate,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.AttributeSCfg().SupplierSConns, internalSupplierSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<AttributeS> Could not connect to SupplierS: %s", err.Error()))
exitChan <- true
return
}
}
server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) {
filterS := <-filterSChan
filterSChan <- filterS
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
@@ -597,7 +584,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
aS, err := engine.NewAttributeService(dm, filterS,
cfg.AttributeSCfg().StringIndexedFields,
cfg.AttributeSCfg().PrefixIndexedFields,
cfg.AttributeSCfg().ProcessRuns, sppSConn)
cfg.AttributeSCfg().ProcessRuns)
if err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> Could not init, error: %s",
@@ -1311,7 +1298,7 @@ func main() {
if cfg.AttributeSCfg().Enabled {
go startAttributeService(internalAttributeSChan, cacheS,
cfg, dm, server, exitChan, filterSChan, internalSupplierSChan)
cfg, dm, server, exitChan, filterSChan)
}
if cfg.ChargerSCfg().Enabled {
go startChargerService(internalChargerSChan, cacheS, internalAttributeSChan,

View File

@@ -24,7 +24,6 @@ type AttributeSCfg struct {
StringIndexedFields *[]string
PrefixIndexedFields *[]string
ProcessRuns int
SupplierSConns []*HaPoolConfig
}
func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error) {
@@ -51,12 +50,5 @@ func (alS *AttributeSCfg) loadFromJsonCfg(jsnCfg *AttributeSJsonCfg) (err error)
if jsnCfg.Process_runs != nil {
alS.ProcessRuns = *jsnCfg.Process_runs
}
if jsnCfg.Suppliers_conns != nil {
alS.SupplierSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns))
for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns {
alS.SupplierSConns[idx] = NewDfltHaPoolConfig()
alS.SupplierSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
return
}

View File

@@ -700,6 +700,11 @@ func (self *CGRConfig) checkConfigSanity() error {
return errors.New("StatS not enabled but requested by SupplierS component.")
}
}
for _, connCfg := range self.supplierSCfg.AttributeSConns {
if connCfg.Address == utils.MetaInternal && !self.attributeSCfg.Enabled {
return errors.New("AttributeS not enabled but requested by SupplierS component.")
}
}
}
// DispaterS checks
if self.dispatcherSCfg != nil && self.dispatcherSCfg.Enabled {

View File

@@ -430,7 +430,6 @@ const CGRATES_CFG_JSON = `
//"string_indexed_fields": [], // query indexes based on these fields for faster processing
"prefix_indexed_fields": [], // query indexes based on these fields for faster processing
"process_runs": 1, // number of run loops when processing event
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
},

View File

@@ -705,7 +705,6 @@ func TestDfAttributeServJsonCfg(t *testing.T) {
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},
Process_runs: utils.IntPointer(1),
Suppliers_conns: &[]*HaPoolJsonCfg{},
}
if cfg, err := dfCgrJsonCfg.AttributeServJsonCfg(); err != nil {
t.Error(err)

View File

@@ -803,7 +803,6 @@ func TestCgrCfgJSONDefaultSAttributeSCfg(t *testing.T) {
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},
ProcessRuns: 1,
SupplierSConns: []*HaPoolConfig{},
}
if !reflect.DeepEqual(eAliasSCfg, cgrCfg.attributeSCfg) {
t.Errorf("received: %+v, expecting: %+v", eAliasSCfg, cgrCfg.attributeSCfg)

View File

@@ -427,7 +427,6 @@ type AttributeSJsonCfg struct {
String_indexed_fields *[]string
Prefix_indexed_fields *[]string
Process_runs *int
Suppliers_conns *[]*HaPoolJsonCfg
}
// ChargerSJsonCfg service config section

View File

@@ -20,23 +20,17 @@ package engine
import (
"fmt"
"reflect"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewAttributeService(dm *DataManager, filterS *FilterS,
stringIndexedFields, prefixIndexedFields *[]string,
processRuns int, sppS rpcclient.RpcClientConnection) (*AttributeService, error) {
if sppS != nil && reflect.ValueOf(sppS).IsNil() { // fix nil value in interface
sppS = nil
}
processRuns int) (*AttributeService, error) {
return &AttributeService{dm: dm, filterS: filterS,
stringIndexedFields: stringIndexedFields,
prefixIndexedFields: prefixIndexedFields,
processRuns: processRuns,
sppS: sppS}, nil
processRuns: processRuns}, nil
}
type AttributeService struct {
@@ -45,7 +39,6 @@ type AttributeService struct {
stringIndexedFields *[]string
prefixIndexedFields *[]string
processRuns int
sppS rpcclient.RpcClientConnection // rpc connection towards SupplierS
}
// ListenAndServe will initialize the service

View File

@@ -181,7 +181,7 @@ func TestAttributePopulateAttrService(t *testing.T) {
if err != nil {
t.Errorf("Error: %+v", err)
}
attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1, nil)
attrService, err = NewAttributeService(dmAtr, &FilterS{dm: dmAtr, cfg: defaultCfg}, nil, nil, 1)
if err != nil {
t.Errorf("Error: %+v", err)
}