Add connections from Suppliers through ConnManager

This commit is contained in:
TeoV
2019-12-09 06:25:20 -05:00
parent 9435b99f1d
commit 18150825bb
33 changed files with 139 additions and 287 deletions

View File

@@ -497,6 +497,7 @@ func main() {
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalStatSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalResourceSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) // needed to avod cyclic dependency
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
@@ -542,9 +543,7 @@ func main() {
internalStatSChan, connManager.GetConnMgr())
reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server,
internalResourceSChan, connManager.GetConnMgr())
supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server,
attrS.GetIntenternalChan(), stS.GetIntenternalChan(),
reS.GetIntenternalChan(), dspS.GetIntenternalChan())
supS := services.NewSupplierService(cfg, dmService, cacheS, filterSChan, server, internalSupplierSChan, connManager.GetConnMgr())
schS := services.NewSchedulerService(cfg, dmService, cacheS, filterSChan, server, internalCDRServerChan, dspS.GetIntenternalChan())
rals := services.NewRalService(cfg, dmService, storDBService, cacheS, filterSChan, server,
tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan,

View File

@@ -228,9 +228,9 @@ func TestCGRConfigReloadSupplierS(t *testing.T) {
Enabled: true,
StringIndexedFields: &[]string{"LCRProfile"},
PrefixIndexedFields: &[]string{utils.Destination},
ResourceSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
AttributeSConns: []*RemoteHost{},
ResourceSConns: []string{},
StatSConns: []string{},
AttributeSConns: []string{},
IndexedSelects: true,
DefaultRatio: 1,
}

View File

@@ -941,9 +941,9 @@ func TestDfSupplierSJsonCfg(t *testing.T) {
Indexed_selects: utils.BoolPointer(true),
String_indexed_fields: nil,
Prefix_indexed_fields: &[]string{},
Attributes_conns: &[]*RemoteHostJson{},
Resources_conns: &[]*RemoteHostJson{},
Stats_conns: &[]*RemoteHostJson{},
Attributes_conns: &[]string{},
Resources_conns: &[]string{},
Stats_conns: &[]string{},
Default_ratio: utils.IntPointer(1),
}
if cfg, err := dfCgrJsonCfg.SupplierSJsonCfg(); err != nil {

View File

@@ -873,9 +873,9 @@ func TestCgrCfgJSONDefaultSupplierSCfg(t *testing.T) {
IndexedSelects: true,
StringIndexedFields: nil,
PrefixIndexedFields: &[]string{},
AttributeSConns: []*RemoteHost{},
ResourceSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
AttributeSConns: []string{},
ResourceSConns: []string{},
StatSConns: []string{},
DefaultRatio: 1,
}
if !reflect.DeepEqual(eSupplSCfg, cgrCfg.supplierSCfg) {

View File

@@ -349,26 +349,29 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
// SupplierS checks
if cfg.supplierSCfg.Enabled && !cfg.dispatcherSCfg.Enabled {
if !cfg.resourceSCfg.Enabled {
for _, connCfg := range cfg.supplierSCfg.ResourceSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.SupplierS)
}
if cfg.supplierSCfg.Enabled {
for _, connID := range cfg.supplierSCfg.AttributeSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.attributeSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.SupplierS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID)
}
}
if !cfg.statsCfg.Enabled {
for _, connCfg := range cfg.supplierSCfg.StatSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatService, utils.SupplierS)
}
for _, connID := range cfg.supplierSCfg.StatSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.statsCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.StatService, utils.SupplierS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID)
}
}
if !cfg.attributeSCfg.Enabled {
for _, connCfg := range cfg.supplierSCfg.AttributeSConns {
if connCfg.Address == utils.MetaInternal {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.AttributeS, utils.SupplierS)
}
for _, connID := range cfg.supplierSCfg.ResourceSConns {
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.resourceSCfg.Enabled {
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ResourceS, utils.SupplierS)
}
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
return fmt.Errorf("<%s> Connection with id: <%s> not defined", utils.SupplierS, connID)
}
}
}

View File

@@ -553,33 +553,22 @@ func TestConfigSanitySupplierS(t *testing.T) {
cfg, _ = NewDefaultCGRConfig()
cfg.supplierSCfg.Enabled = true
cfg.supplierSCfg.ResourceSConns = []*RemoteHost{
&RemoteHost{
Address: utils.MetaInternal,
},
}
cfg.supplierSCfg.ResourceSConns = []string{utils.MetaInternal}
expected := "<ResourceS> not enabled but requested by <SupplierS> component."
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.resourceSCfg.Enabled = true
cfg.supplierSCfg.StatSConns = []*RemoteHost{
&RemoteHost{
Address: utils.MetaInternal,
},
}
cfg.supplierSCfg.StatSConns = []string{utils.MetaInternal}
expected = "<StatS> not enabled but requested by <SupplierS> component."
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)
}
cfg.statsCfg.Enabled = true
cfg.supplierSCfg.AttributeSConns = []*RemoteHost{
&RemoteHost{
Address: utils.MetaInternal,
},
}
cfg.supplierSCfg.AttributeSConns = []string{utils.MetaInternal}
expected = "<AttributeS> not enabled but requested by <SupplierS> component."
if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected {
t.Errorf("Expecting: %+q received: %+q", expected, err)

View File

@@ -446,9 +446,9 @@ type SupplierSJsonCfg struct {
Indexed_selects *bool
String_indexed_fields *[]string
Prefix_indexed_fields *[]string
Attributes_conns *[]*RemoteHostJson
Resources_conns *[]*RemoteHostJson
Stats_conns *[]*RemoteHostJson
Attributes_conns *[]string
Resources_conns *[]string
Stats_conns *[]string
Default_ratio *int
}

View File

@@ -18,15 +18,17 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import "github.com/cgrates/cgrates/utils"
// SupplierSCfg is the configuration of supplier service
type SupplierSCfg struct {
Enabled bool
IndexedSelects bool
StringIndexedFields *[]string
PrefixIndexedFields *[]string
AttributeSConns []*RemoteHost
ResourceSConns []*RemoteHost
StatSConns []*RemoteHost
AttributeSConns []string
ResourceSConns []string
StatSConns []string
DefaultRatio int
}
@@ -55,24 +57,36 @@ func (spl *SupplierSCfg) loadFromJsonCfg(jsnCfg *SupplierSJsonCfg) (err error) {
spl.PrefixIndexedFields = &pif
}
if jsnCfg.Attributes_conns != nil {
spl.AttributeSConns = make([]*RemoteHost, len(*jsnCfg.Attributes_conns))
for idx, jsnHaCfg := range *jsnCfg.Attributes_conns {
spl.AttributeSConns[idx] = NewDfltRemoteHost()
spl.AttributeSConns[idx].loadFromJsonCfg(jsnHaCfg)
spl.AttributeSConns = make([]string, len(*jsnCfg.Attributes_conns))
for idx, conn := range *jsnCfg.Attributes_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if conn == utils.MetaInternal {
spl.AttributeSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes)
} else {
spl.AttributeSConns[idx] = conn
}
}
}
if jsnCfg.Resources_conns != nil {
spl.ResourceSConns = make([]*RemoteHost, len(*jsnCfg.Resources_conns))
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
spl.ResourceSConns[idx] = NewDfltRemoteHost()
spl.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg)
spl.ResourceSConns = make([]string, len(*jsnCfg.Resources_conns))
for idx, conn := range *jsnCfg.Resources_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if conn == utils.MetaInternal {
spl.ResourceSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)
} else {
spl.ResourceSConns[idx] = conn
}
}
}
if jsnCfg.Stats_conns != nil {
spl.StatSConns = make([]*RemoteHost, len(*jsnCfg.Stats_conns))
for idx, jsnHaCfg := range *jsnCfg.Stats_conns {
spl.StatSConns[idx] = NewDfltRemoteHost()
spl.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
spl.StatSConns = make([]string, len(*jsnCfg.Stats_conns))
for idx, conn := range *jsnCfg.Stats_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if conn == utils.MetaInternal {
spl.StatSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)
} else {
spl.StatSConns[idx] = conn
}
}
}
if jsnCfg.Default_ratio != nil {

View File

@@ -47,9 +47,9 @@ func TestSupplierSCfgloadFromJsonCfg(t *testing.T) {
}`
expected = SupplierSCfg{
PrefixIndexedFields: &[]string{"index1", "index2"},
AttributeSConns: []*RemoteHost{},
ResourceSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
AttributeSConns: []string{},
ResourceSConns: []string{},
StatSConns: []string{},
DefaultRatio: 1,
}
if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil {

View File

@@ -102,9 +102,7 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
},

View File

@@ -102,9 +102,7 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
},

View File

@@ -103,9 +103,7 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "*internal"},
],
"stats_conns": ["*localhost"],
},

View File

@@ -51,9 +51,7 @@
"suppliers": {
"enabled": true,
"attributes_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"}
],
"attributes_conns": ["*localhost"],
},

View File

@@ -98,12 +98,8 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
},

View File

@@ -141,12 +141,8 @@
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination"],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
},

View File

@@ -84,12 +84,8 @@
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination"],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
},

View File

@@ -99,12 +99,8 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
},

View File

@@ -99,12 +99,8 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"},
],
"resources_conns": [
{"address": "127.0.0.1:2013", "transport":"*gob"},
],
"stats_conns": ["conn1"],
"resources_conns": ["conn1"],
},

View File

@@ -98,9 +98,7 @@
"suppliers": {
"enabled": true,
"stats_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
},

View File

@@ -104,12 +104,8 @@
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination"],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
},

View File

@@ -223,9 +223,7 @@
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination",],
"stats_conns": [
{"address": "*internal"},
],
"stats_conns": ["*internal"],
},

View File

@@ -127,12 +127,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -124,12 +124,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -123,12 +123,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -116,12 +116,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"}
],
"stats_conns": [
{"address": "*internal"}
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},

View File

@@ -127,12 +127,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -124,12 +124,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -123,12 +123,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"},
],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
},

View File

@@ -116,12 +116,8 @@
"suppliers": {
"enabled": true,
"resources_conns": [
{"address": "*internal"}
],
"stats_conns": [
{"address": "*internal"}
],
"resources_conns": ["*internal"],
"stats_conns": ["*internal"],
"string_indexed_fields": ["Account"],
"prefix_indexed_fields": ["Destination"],
},

View File

@@ -20,7 +20,6 @@ package engine
import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
@@ -28,7 +27,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// Supplier defines supplier related information used within a SupplierProfile
@@ -110,24 +108,12 @@ func (lps SupplierProfiles) Sort() {
// NewSupplierService initializes the Supplier Service
func NewSupplierService(dm *DataManager,
filterS *FilterS, cgrcfg *config.CGRConfig, resourceS,
statS, attributeS rpcclient.ClientConnector) (spS *SupplierService, err error) {
if attributeS != nil && reflect.ValueOf(attributeS).IsNil() { // fix nil value in interface
attributeS = nil
}
if resourceS != nil && reflect.ValueOf(resourceS).IsNil() { // fix nil value in interface
resourceS = nil
}
if statS != nil && reflect.ValueOf(statS).IsNil() { // fix nil value in interface
statS = nil
}
filterS *FilterS, cgrcfg *config.CGRConfig, connMgr *ConnManager) (spS *SupplierService, err error) {
spS = &SupplierService{
dm: dm,
filterS: filterS,
attributeS: attributeS,
resourceS: resourceS,
statS: statS,
cgrcfg: cgrcfg,
dm: dm,
filterS: filterS,
cgrcfg: cgrcfg,
connMgr: connMgr,
}
if spS.sorter, err = NewSupplierSortDispatcher(spS); err != nil {
return nil, err
@@ -137,13 +123,11 @@ func NewSupplierService(dm *DataManager,
// SupplierService is the service computing Supplier queries
type SupplierService struct {
dm *DataManager
filterS *FilterS
cgrcfg *config.CGRConfig
attributeS rpcclient.ClientConnector
resourceS rpcclient.ClientConnector
statS rpcclient.ClientConnector
sorter SupplierSortDispatcher
dm *DataManager
filterS *FilterS
cgrcfg *config.CGRConfig
sorter SupplierSortDispatcher
connMgr *ConnManager
}
// ListenAndServe will initialize the service
@@ -309,10 +293,10 @@ func (spS *SupplierService) costForEvent(ev *utils.CGREvent,
func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMetric map[string]float64, err error) {
stsMetric = make(map[string]float64)
provStsMetrics := make(map[string][]float64)
if spS.statS != nil {
if len(spS.cgrcfg.SupplierSCfg().StatSConns) != 0 {
for _, statID := range statIDs {
var metrics map[string]float64
if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics,
if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().StatSConns, utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statID}}, &metrics); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -338,12 +322,12 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet
// first metric found is always returned
func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, tenant string) (result float64, err error) {
provStsMetrics := make(map[string][]float64)
if spS.statS != nil {
if len(spS.cgrcfg.SupplierSCfg().StatSConns) != 0 {
for _, statID := range statIDs {
// check if we get an ID in the following form (StatID:MetricID)
statWithMetric := strings.Split(statID, utils.InInFieldSep)
var metrics map[string]float64
if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics,
if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().StatSConns, utils.StatSv1GetQueueFloatMetrics,
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statWithMetric[0]}}, &metrics); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -376,10 +360,10 @@ func (spS *SupplierService) statMetricsForLoadDistribution(statIDs []string, ten
// resourceUsage returns sum of all resource usages out of list
func (spS *SupplierService) resourceUsage(resIDs []string, tenant string) (tUsage float64, err error) {
if spS.resourceS != nil {
if len(spS.cgrcfg.SupplierSCfg().ResourceSConns) != 0 {
for _, resID := range resIDs {
var res Resource
if err = spS.resourceS.Call(utils.ResourceSv1GetResource,
if err = spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().ResourceSConns, utils.ResourceSv1GetResource,
&utils.TenantID{Tenant: tenant, ID: resID}, &res); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
@@ -593,14 +577,14 @@ func (spS *SupplierService) V1GetSuppliers(args *ArgsGetSuppliers, reply *Sorted
} else if args.CGREvent.Event == nil {
return utils.NewErrMandatoryIeMissing(utils.Event)
}
if spS.attributeS != nil {
if len(spS.cgrcfg.SupplierSCfg().AttributeSConns) != 0 {
attrArgs := &AttrArgsProcessEvent{
Context: utils.StringPointer(utils.MetaSuppliers),
CGREvent: args.CGREvent,
ArgDispatcher: args.ArgDispatcher,
}
var rplyEv AttrSProcessEventReply
if err := spS.attributeS.Call(utils.AttributeSv1ProcessEvent,
if err := spS.connMgr.Call(spS.cgrcfg.SupplierSCfg().AttributeSConns, utils.AttributeSv1ProcessEvent,
attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
args.CGREvent = rplyEv.CGREvent
} else if err.Error() != utils.ErrNotFound.Error() {
@@ -635,21 +619,3 @@ func (spS *SupplierService) V1GetSupplierProfilesForEvent(args *utils.CGREventWi
*reply = sPs
return
}
// SetAttributeSConnection sets the new connection to the attribute service
// only used on reload
func (spS *SupplierService) SetAttributeSConnection(attrS rpcclient.ClientConnector) {
spS.attributeS = attrS
}
// SetStatSConnection sets the new connection to the stat service
// only used on reload
func (spS *SupplierService) SetStatSConnection(stS rpcclient.ClientConnector) {
spS.statS = stS
}
// SetResourceSConnection sets the new connection to the resource service
// only used on reload
func (spS *SupplierService) SetResourceSConnection(rS rpcclient.ClientConnector) {
spS.resourceS = rS
}

View File

@@ -303,8 +303,7 @@ func TestSuppliersPopulateSupplierService(t *testing.T) {
defaultCfg.SupplierSCfg().StringIndexedFields = nil
defaultCfg.SupplierSCfg().PrefixIndexedFields = nil
splService, err = NewSupplierService(dmSPP, &FilterS{
dm: dmSPP,
cfg: defaultCfg}, defaultCfg, nil, nil, nil)
dm: dmSPP, cfg: defaultCfg}, defaultCfg, nil)
if err != nil {
t.Errorf("Error: %+v", err)
}

View File

@@ -33,34 +33,28 @@ import (
// NewSupplierService returns the Supplier Service
func NewSupplierService(cfg *config.CGRConfig, dm *DataDBService,
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
server *utils.Server, attrsChan, stsChan, resChan,
dispatcherChan chan rpcclient.ClientConnector) servmanager.Service {
server *utils.Server, internalSupplierSChan chan rpcclient.RpcClientConnection,
connMgr *engine.ConnManager) servmanager.Service {
return &SupplierService{
connChan: make(chan rpcclient.ClientConnector, 1),
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
attrsChan: attrsChan,
stsChan: stsChan,
resChan: resChan,
dispatcherChan: dispatcherChan,
connChan: internalSupplierSChan,
cfg: cfg,
dm: dm,
cacheS: cacheS,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
}
}
// SupplierService implements Service interface
type SupplierService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
attrsChan chan rpcclient.ClientConnector
stsChan chan rpcclient.ClientConnector
resChan chan rpcclient.ClientConnector
dispatcherChan chan rpcclient.ClientConnector
cfg *config.CGRConfig
dm *DataDBService
cacheS *engine.CacheS
filterSChan chan *engine.FilterS
server *utils.Server
connMgr *engine.ConnManager
splS *engine.SupplierService
rpc *v1.SupplierSv1
@@ -79,30 +73,10 @@ func (splS *SupplierService) Start() (err error) {
filterS := <-splS.filterSChan
splS.filterSChan <- filterS
var attrSConn, resourceSConn, statSConn rpcclient.ClientConnector
attrSConn, err = NewConnection(splS.cfg, splS.attrsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().AttributeSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SupplierS, utils.SupplierS, err.Error()))
return
}
statSConn, err = NewConnection(splS.cfg, splS.stsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().StatSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
return
}
resourceSConn, err = NewConnection(splS.cfg, splS.resChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().ResourceSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
return
}
splS.Lock()
defer splS.Unlock()
splS.splS, err = engine.NewSupplierService(splS.dm.GetDM(), filterS, splS.cfg,
resourceSConn, statSConn, attrSConn)
splS.connMgr)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s",
utils.SupplierS, err.Error()))
@@ -125,30 +99,6 @@ func (splS *SupplierService) GetIntenternalChan() (conn chan rpcclient.ClientCon
// Reload handles the change of config
func (splS *SupplierService) Reload() (err error) {
var attrSConn, resourceSConn, statSConn rpcclient.ClientConnector
attrSConn, err = NewConnection(splS.cfg, splS.attrsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().AttributeSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SupplierS, utils.SupplierS, err.Error()))
return
}
statSConn, err = NewConnection(splS.cfg, splS.stsChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().StatSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
return
}
resourceSConn, err = NewConnection(splS.cfg, splS.resChan, splS.dispatcherChan, splS.cfg.SupplierSCfg().ResourceSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
return
}
splS.Lock()
splS.splS.SetAttributeSConnection(attrSConn)
splS.splS.SetStatSConnection(statSConn)
splS.splS.SetResourceSConnection(resourceSConn)
splS.Unlock()
return
}

View File

@@ -53,7 +53,7 @@ func TestSupplierSReload(t *testing.T) {
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg)
sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
supS := NewSupplierService(cfg, db, chS, filterSChan, server, nil, sts.GetIntenternalChan(), nil, nil)
supS := NewSupplierService(cfg, db, chS, filterSChan, server, make(chan rpcclient.RpcClientConnection, 1), nil)
srvMngr.AddServices(NewConnManagerService(cfg, nil), supS, sts, NewLoaderService(cfg, db, filterSChan, server, nil, nil, engineShutdown), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)