diff --git a/ees/ee_test.go b/ees/ee_test.go index 1dc4e0820..6a891895d 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -110,6 +110,9 @@ func TestNewEventExporterCase3(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } eeExpect, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) @@ -134,6 +137,9 @@ func TestNewEventExporterCase4(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } eeExpect, err := NewHTTPjsonMapEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) @@ -158,6 +164,9 @@ func TestNewEventExporterCase5(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } eeExpect, err := NewPosterJSONMapEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) diff --git a/ees/ees.go b/ees/ees.go index df1390bd0..6ee7f73ef 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -125,7 +125,6 @@ func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []st eeS.cfg.EEsNoLksCfg().AttributeSConns, nil, utils.AttributeSv1ProcessEvent, attrArgs, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { - cgrEv = rplyEv.CGREvent } else if err != nil && err.Error() == utils.ErrNotFound.Error() { err = nil // cancel ErrNotFound diff --git a/ees/ees_test.go b/ees/ees_test.go index 0de421e3b..56003b0aa 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -54,7 +54,7 @@ func TestListenAndServe(t *testing.T) { cfgRld := make(chan struct{}, 1) cfgRld <- struct{}{} go func() { - time.Sleep(10) + time.Sleep(10 * time.Nanosecond) stopChan <- struct{}{} }() var err error diff --git a/ees/elastic.go b/ees/elastic.go index 7c5897c63..1d0cd8a10 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -119,7 +119,7 @@ func (eEe *ElasticEe) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) { - return + } // ExportEvent implements EventExporter diff --git a/ees/filecsv.go b/ees/filecsv.go index d4fff06a4..0e1bd5205 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -88,7 +88,6 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EventExporterS, fCsv.id, err.Error())) } - return } // ExportEvent implements EventExporter diff --git a/ees/filefwv.go b/ees/filefwv.go index ffc5945f1..cd989b64f 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -78,7 +78,6 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", utils.EventExporterS, fFwv.id, err.Error())) } - return } // ExportEvent implements EventExporter diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 99970bfce..79916b198 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -64,7 +64,6 @@ func (httpEE *HTTPjsonMapEE) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) { - return } // ExportEvent implements EventExporter diff --git a/ees/httpjsonmap_test.go b/ees/httpjsonmap_test.go index 8f91cde67..199e247fa 100644 --- a/ees/httpjsonmap_test.go +++ b/ees/httpjsonmap_test.go @@ -159,6 +159,9 @@ func TestHttpJsonMapExportEvent3(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } httpEE, err := NewHTTPjsonMapEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) @@ -202,6 +205,9 @@ func TestHttpJsonMapExportEvent4(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } httpEE, err := NewHTTPjsonMapEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) @@ -249,6 +255,9 @@ func TestHttpJsonMapExportEvent5(t *testing.T) { "Local", utils.EmptyString, )) + if err != nil { + t.Error(err) + } httpEE, err := NewHTTPjsonMapEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) diff --git a/ees/httppost.go b/ees/httppost.go index b411950a1..87af99174 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -58,7 +58,6 @@ func (httpPost *HTTPPost) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) { - return } // ExportEvent implements EventExporter diff --git a/ees/posterjsonmap.go b/ees/posterjsonmap.go index 28be1d499..54dada8f1 100644 --- a/ees/posterjsonmap.go +++ b/ees/posterjsonmap.go @@ -76,7 +76,6 @@ func (pstrEE *PosterJSONMapEE) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) { pstrEE.poster.Close() - return } // ExportEvent implements EventExporter diff --git a/ees/sql.go b/ees/sql.go index 2119f9624..aecb58551 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -39,7 +39,7 @@ func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} - dialect, err := sqlEe.NewSQLEeUrl(cgrCfg) + dialect, err := sqlEe.NewSQLEeURL(cgrCfg) if err != nil { return } @@ -62,7 +62,7 @@ type SQLEe struct { dc utils.MapStorage } -func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) { +func (sqlEe *SQLEe) NewSQLEeURL(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) { var u *url.URL // var err error if u, err = url.Parse(strings.TrimPrefix(cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ExportPath, utils.Meta)); err != nil { diff --git a/ees/sql_test.go b/ees/sql_test.go index 2d1bd3e9e..3b1957780 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -75,7 +75,7 @@ func TestNewSQLeUrl(t *testing.T) { } sqlEe := &SQLEe{id: cgrCfg.EEsCfg().Exporters[0].ID, cgrCfg: cgrCfg, cfgIdx: 0, filterS: filterS, dc: dc} - _, err = sqlEe.NewSQLEeUrl(cgrCfg) + _, err = sqlEe.NewSQLEeURL(cgrCfg) errExpect := "db type <> not supported" if err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) @@ -101,7 +101,7 @@ func TestNewSQLeUrlSQL(t *testing.T) { cgrCfg: cgrCfg, cfgIdx: 0, filterS: filterS, dc: dc} dialectExpect := mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", "cgrates", "CGRateS.org", "127.0.0.1", "3306", "mysql")) - if dialect, err := sqlEe.NewSQLEeUrl(cgrCfg); err != nil { + if dialect, err := sqlEe.NewSQLEeURL(cgrCfg); err != nil { t.Error(err) } else if !reflect.DeepEqual(dialect, dialectExpect) { t.Errorf("Expected %v but received %v", utils.ToJSON(dialectExpect), utils.ToJSON(dialect)) @@ -127,7 +127,7 @@ func TestNewSQLeUrlPostgres(t *testing.T) { cgrCfg: cgrCfg, cfgIdx: 0, filterS: filterS, dc: dc} dialectExpect := postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", "127.0.0.1", "3306", "postgres", "cgrates", "CGRateS.org", utils.SQLDefaultSSLMode)) - if dialect, err := sqlEe.NewSQLEeUrl(cgrCfg); err != nil { + if dialect, err := sqlEe.NewSQLEeURL(cgrCfg); err != nil { t.Error(err) } else if !reflect.DeepEqual(dialect, dialectExpect) { t.Errorf("Expected %v but received %v", utils.ToJSON(dialectExpect), utils.ToJSON(dialect)) @@ -152,7 +152,7 @@ func TestNewSQLeExportPathError(t *testing.T) { sqlEe := &SQLEe{id: cgrCfg.EEsCfg().Exporters[0].ID, cgrCfg: cgrCfg, cfgIdx: 0, filterS: filterS, dc: dc} errExpect := `parse ":foo": missing protocol scheme` - if _, err := sqlEe.NewSQLEeUrl(cgrCfg); err == nil || err.Error() != errExpect { + if _, err := sqlEe.NewSQLEeURL(cgrCfg); err == nil || err.Error() != errExpect { t.Errorf("Expected %v but received %v", errExpect, err) } } diff --git a/ees/virtualee.go b/ees/virtualee.go index 8e8f6168e..b79718568 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -56,7 +56,6 @@ func (vEe *VirtualEe) ID() string { // OnEvicted implements EventExporter, doing the cleanup before exit func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) { - return } // ExportEvent implements EventExporter diff --git a/engine/lib_test.go b/engine/lib_test.go new file mode 100644 index 000000000..30dbcdfd9 --- /dev/null +++ b/engine/lib_test.go @@ -0,0 +1,47 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "errors" + "flag" + "net/rpc" + "net/rpc/jsonrpc" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +var ( + dataDir = flag.String("data_dir", "/usr/share/cgrates", "CGR data dir path here") + waitRater = flag.Int("wait_rater", 100, "Number of miliseconds to wait for rater to start and cache") + encoding = flag.String("rpc", utils.MetaJSON, "what encoding whould be uused for rpc comunication") + dbType = flag.String("dbtype", utils.MetaInternal, "The type of DataBase (Internal/Mongo/mySql)") +) + +func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { + switch *encoding { + case utils.MetaJSON: + return jsonrpc.Dial(utils.TCP, cfg.RPCJSONListen) + case utils.MetaGOB: + return rpc.Dial(utils.TCP, cfg.RPCGOBListen) + default: + return nil, errors.New("UNSUPPORTED_RPC") + } +}