Added connMngr in ees *rpc exporter

This commit is contained in:
porosnicuadrian
2021-11-05 16:06:37 +02:00
committed by Dan Christian Bogos
parent 08a1bd0d10
commit 82c8809d02
8 changed files with 31 additions and 29 deletions

View File

@@ -188,6 +188,7 @@ type EventExporterOpts struct {
CertPath *string
CAPath *string
TLS *bool
ConnIDs *[]string
RPCConnTimeout *time.Duration
RPCReplyTimeout *time.Duration
}

View File

@@ -41,7 +41,8 @@ type EventExporter interface {
}
// NewEventExporter produces exporters
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS) (ee EventExporter, err error) {
func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig,
filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) {
var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
cfg.Timezone,
@@ -79,7 +80,7 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi
case utils.MetaLog:
return NewLogEE(cfg, dc), nil
case utils.MetaRpc:
return NewRpcEE(cfg, dc)
return NewRpcEE(cfg, dc, connMngr)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type)
}

View File

@@ -33,7 +33,7 @@ func TestNewEventExporter(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileCSV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "open /var/spool/cgrates/ees/*default_"
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -69,7 +69,7 @@ func TestNewEventExporterCase2(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileFWV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "open /var/spool/cgrates/ees/*default_"
if strings.Contains(errExpect, err.Error()) {
t.Errorf("Expected %+v but got %+v", errExpect, err)
@@ -105,7 +105,7 @@ func TestNewEventExporterCase3(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -133,7 +133,7 @@ func TestNewEventExporterCase4(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPjsonMap
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -161,7 +161,7 @@ func TestNewEventExporterCase6(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaVirt
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -186,7 +186,7 @@ func TestNewEventExporterDefaultCase(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaNone
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := fmt.Sprintf("unsupported exporter type: <%s>", utils.MetaNone)
if err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
@@ -200,7 +200,7 @@ func TestNewEventExporterCase7(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
cgrCfg.EEsCfg().Exporters[0].ExportPath = "/invalid/path"
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
if err != nil {
t.Error(err)
}
@@ -230,7 +230,7 @@ func TestNewEventExporterCase8(t *testing.T) {
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQL
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil)
errExpect := "MANDATORY_IE_MISSING: [sqlTableName]"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)
@@ -241,7 +241,7 @@ func TestNewEventExporterCase8(t *testing.T) {
func TestNewEventExporterDcCase(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.GeneralCfg().DefaultTimezone = "invalid_timezone"
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil)
_, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil, nil)
errExpect := "unknown time zone invalid_timezone"
if err == nil || err.Error() != errExpect {
t.Errorf("Expected %+v \n but got %+v", errExpect, err)

View File

@@ -172,7 +172,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGR
}
}
if !isCached {
if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS); err != nil {
if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil {
return
}
if hasCache {

View File

@@ -276,7 +276,7 @@ func TestV1ProcessEvent4(t *testing.T) {
utils.MetaHTTPPost: ltcache.NewCache(1,
time.Second, false, onCacheEvicted),
}
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS)
newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil)
if err != nil {
t.Error(err)
}

View File

@@ -178,7 +178,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export
Opts: expEv.Opts,
Attempts: attempts,
FailedPostsDir: utils.MetaNone,
}, config.CgrConfig(), nil); err != nil {
}, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }

View File

@@ -53,7 +53,7 @@ func TestNatsEE(t *testing.T) {
break
}
}
evExp, err := NewEventExporter(cfg, cgrCfg, nil)
evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -143,7 +143,7 @@ func TestNatsEE2(t *testing.T) {
break
}
}
evExp, err := NewEventExporter(cfg, cgrCfg, nil)
evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -25,11 +25,12 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (e *RPCee, err error) {
func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage,
connMgr *engine.ConnManager) (e *RPCee, err error) {
e = &RPCee{
cfg: cfg,
dc: dc,
@@ -39,9 +40,10 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (e *RPCee,
}
type RPCee struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
conn *rpcclient.RPCClient
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
connMgr *engine.ConnManager
// conn *rpcclient.RPCClient
//opts
codec string
@@ -50,6 +52,7 @@ type RPCee struct {
keyPath string
certPath string
caPath string
connIDs []string
connTimeout time.Duration
replyTimeout time.Duration
@@ -61,12 +64,6 @@ func (e *RPCee) Cfg() (eCfg *config.EventExporterCfg) {
}
func (e *RPCee) Connect() (err error) {
e.Lock()
if e.conn, err = rpcclient.NewRPCClient(context.TODO(), utils.TCP, e.cfg.ExportPath, e.tls,
e.keyPath, e.certPath, e.caPath, 1, 1, e.connTimeout, e.replyTimeout, e.codec, nil, false, nil); err != nil {
return
}
e.Unlock()
return
}
@@ -74,13 +71,13 @@ func (e *RPCee) ExportEvent(ctx *context.Context, args interface{}, _ string) (e
e.Lock()
defer e.Unlock()
var rply interface{}
return e.conn.Call(ctx, e.serviceMethod, args, &rply)
return e.connMgr.Call(ctx, e.connIDs, e.serviceMethod, args, &rply)
}
func (e *RPCee) Close() (err error) {
e.Lock()
defer e.Unlock()
e.conn = nil
e.connMgr = nil
return
}
@@ -128,6 +125,9 @@ func (e *RPCee) parseOpts() (err error) {
if e.cfg.Opts.TLS != nil {
e.tls = *e.cfg.Opts.TLS
}
if e.cfg.Opts.ConnIDs != nil {
e.connIDs = *e.cfg.Opts.ConnIDs
}
if e.cfg.Opts.RPCConnTimeout != nil {
e.connTimeout = *e.cfg.Opts.RPCConnTimeout
}