diff --git a/config/eescfg.go b/config/eescfg.go index 5ac447a0d..32faa24b6 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -188,6 +188,7 @@ type EventExporterOpts struct { CertPath *string CAPath *string TLS *bool + ConnIDs *[]string RPCConnTimeout *time.Duration RPCReplyTimeout *time.Duration } diff --git a/ees/ee.go b/ees/ee.go index 1d91246ee..d561eeb55 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -41,7 +41,8 @@ type EventExporter interface { } // NewEventExporter produces exporters -func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS) (ee EventExporter, err error) { +func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, + filterS *engine.FilterS, connMngr *engine.ConnManager) (ee EventExporter, err error) { var dc *utils.SafeMapStorage if dc, err = newEEMetrics(utils.FirstNonEmpty( cfg.Timezone, @@ -79,7 +80,7 @@ func NewEventExporter(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, fi case utils.MetaLog: return NewLogEE(cfg, dc), nil case utils.MetaRpc: - return NewRpcEE(cfg, dc) + return NewRpcEE(cfg, dc, connMngr) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cfg.Type) } diff --git a/ees/ee_test.go b/ees/ee_test.go index aa2797067..cf881fbbf 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -33,7 +33,7 @@ func TestNewEventExporter(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileCSV cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) errExpect := "open /var/spool/cgrates/ees/*default_" if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -69,7 +69,7 @@ func TestNewEventExporterCase2(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileFWV cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) errExpect := "open /var/spool/cgrates/ees/*default_" if strings.Contains(errExpect, err.Error()) { t.Errorf("Expected %+v but got %+v", errExpect, err) @@ -105,7 +105,7 @@ func TestNewEventExporterCase3(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) if err != nil { t.Error(err) } @@ -133,7 +133,7 @@ func TestNewEventExporterCase4(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPjsonMap cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) if err != nil { t.Error(err) } @@ -161,7 +161,7 @@ func TestNewEventExporterCase6(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaVirt cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) if err != nil { t.Error(err) } @@ -186,7 +186,7 @@ func TestNewEventExporterDefaultCase(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaNone cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) errExpect := fmt.Sprintf("unsupported exporter type: <%s>", utils.MetaNone) if err.Error() != errExpect { t.Errorf("Expected %+v \n but got %+v", errExpect, err) @@ -200,7 +200,7 @@ func TestNewEventExporterCase7(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 cgrCfg.EEsCfg().Exporters[0].ExportPath = "/invalid/path" filterS := engine.NewFilterS(cgrCfg, nil, nil) - ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + ee, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) if err != nil { t.Error(err) } @@ -230,7 +230,7 @@ func TestNewEventExporterCase8(t *testing.T) { cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQL cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0 filterS := engine.NewFilterS(cgrCfg, nil, nil) - _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS) + _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, filterS, nil) errExpect := "MANDATORY_IE_MISSING: [sqlTableName]" if err == nil || err.Error() != errExpect { t.Errorf("Expected %+v \n but got %+v", errExpect, err) @@ -241,7 +241,7 @@ func TestNewEventExporterCase8(t *testing.T) { func TestNewEventExporterDcCase(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.GeneralCfg().DefaultTimezone = "invalid_timezone" - _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil) + _, err := NewEventExporter(cgrCfg.EEsCfg().Exporters[0], cgrCfg, nil, nil) errExpect := "unknown time zone invalid_timezone" if err == nil || err.Error() != errExpect { t.Errorf("Expected %+v \n but got %+v", errExpect, err) diff --git a/ees/ees.go b/ees/ees.go index e4c3341c4..fea62701c 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -172,7 +172,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGR } } if !isCached { - if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS); err != nil { + if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil { return } if hasCache { diff --git a/ees/ees_test.go b/ees/ees_test.go index 1664d081e..811fc6a00 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -276,7 +276,7 @@ func TestV1ProcessEvent4(t *testing.T) { utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } - newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS) + newEeS, err := NewEventExporter(cfg.EEsCfg().Exporters[0], cfg, filterS, nil) if err != nil { t.Error(err) } diff --git a/ees/libcdre.go b/ees/libcdre.go index 2a7b23e0a..5adc20610 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -178,7 +178,7 @@ func (expEv *ExportEvents) ReplayFailedPosts(attempts int) (failedEvents *Export Opts: expEv.Opts, Attempts: attempts, FailedPostsDir: utils.MetaNone, - }, config.CgrConfig(), nil); err != nil { + }, config.CgrConfig(), nil, nil); err != nil { return } keyFunc := func() string { return utils.EmptyString } diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index fab9b3e0f..f9b3a8de0 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -53,7 +53,7 @@ func TestNatsEE(t *testing.T) { break } } - evExp, err := NewEventExporter(cfg, cgrCfg, nil) + evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil) if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestNatsEE2(t *testing.T) { break } } - evExp, err := NewEventExporter(cfg, cgrCfg, nil) + evExp, err := NewEventExporter(cfg, cgrCfg, nil, nil) if err != nil { t.Fatal(err) } diff --git a/ees/rpc.go b/ees/rpc.go index d4f383c58..5c3647124 100644 --- a/ees/rpc.go +++ b/ees/rpc.go @@ -25,11 +25,12 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" - "github.com/cgrates/rpcclient" ) -func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (e *RPCee, err error) { +func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage, + connMgr *engine.ConnManager) (e *RPCee, err error) { e = &RPCee{ cfg: cfg, dc: dc, @@ -39,9 +40,10 @@ func NewRpcEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (e *RPCee, } type RPCee struct { - cfg *config.EventExporterCfg - dc *utils.SafeMapStorage - conn *rpcclient.RPCClient + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + connMgr *engine.ConnManager + // conn *rpcclient.RPCClient //opts codec string @@ -50,6 +52,7 @@ type RPCee struct { keyPath string certPath string caPath string + connIDs []string connTimeout time.Duration replyTimeout time.Duration @@ -61,12 +64,6 @@ func (e *RPCee) Cfg() (eCfg *config.EventExporterCfg) { } func (e *RPCee) Connect() (err error) { - e.Lock() - if e.conn, err = rpcclient.NewRPCClient(context.TODO(), utils.TCP, e.cfg.ExportPath, e.tls, - e.keyPath, e.certPath, e.caPath, 1, 1, e.connTimeout, e.replyTimeout, e.codec, nil, false, nil); err != nil { - return - } - e.Unlock() return } @@ -74,13 +71,13 @@ func (e *RPCee) ExportEvent(ctx *context.Context, args interface{}, _ string) (e e.Lock() defer e.Unlock() var rply interface{} - return e.conn.Call(ctx, e.serviceMethod, args, &rply) + return e.connMgr.Call(ctx, e.connIDs, e.serviceMethod, args, &rply) } func (e *RPCee) Close() (err error) { e.Lock() defer e.Unlock() - e.conn = nil + e.connMgr = nil return } @@ -128,6 +125,9 @@ func (e *RPCee) parseOpts() (err error) { if e.cfg.Opts.TLS != nil { e.tls = *e.cfg.Opts.TLS } + if e.cfg.Opts.ConnIDs != nil { + e.connIDs = *e.cfg.Opts.ConnIDs + } if e.cfg.Opts.RPCConnTimeout != nil { e.connTimeout = *e.cfg.Opts.RPCConnTimeout }