From 8787d463eb359882b82fb28a8de800ce158e0c49 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 4 May 2021 18:02:42 +0300 Subject: [PATCH] Prepared flatStore --- ees/sql.go | 6 +++--- ees/sql_it_test.go | 16 ++++++++-------- ees/sql_test.go | 16 ++++++++-------- ers/flatstore.go | 7 +++++++ ers/sql.go | 8 ++++---- ers/sql_test.go | 36 ++++++++++++++++++------------------ utils/consts.go | 6 ++++-- 7 files changed, 52 insertions(+), 43 deletions(-) diff --git a/ees/sql.go b/ees/sql.go index 65eb2f6f0..4fbac55b8 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -71,7 +71,7 @@ func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialecto password, _ := u.User.Password() dbname := utils.SQLDefaultDBName - if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLDBName]; has { + if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLDBNameOpt]; has { dbname = utils.IfaceAsString(vals) } ssl := utils.SQLDefaultSSLMode @@ -79,8 +79,8 @@ func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialecto ssl = utils.IfaceAsString(vals) } // tableName is mandatory in opts - if iface, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLTableName]; !has { - return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableName) + if iface, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLTableNameOpt]; !has { + return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) } else { sqlEe.tableName = utils.IfaceAsString(iface) } diff --git a/ees/sql_it_test.go b/ees/sql_it_test.go index 2cd0290ed..b78108643 100644 --- a/ees/sql_it_test.go +++ b/ees/sql_it_test.go @@ -300,8 +300,8 @@ func TestOpenDB3Err(t *testing.T) { func TestSQLExportEvent1(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` cgrEv := new(utils.CGREvent) newIDb := engine.NewInternalDB(nil, nil, true) @@ -343,8 +343,8 @@ func TestSQLExportEvent1(t *testing.T) { func TestSQLExportEvent2(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` cgrEv := new(utils.CGREvent) newIDb := engine.NewInternalDB(nil, nil, true) @@ -386,8 +386,8 @@ func TestSQLExportEvent2(t *testing.T) { func TestSQLExportEvent3(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` cgrEv := new(utils.CGREvent) newIDb := engine.NewInternalDB(nil, nil, true) @@ -429,8 +429,8 @@ func TestSQLExportEvent3(t *testing.T) { func TestSQLExportEvent4(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "cgrates" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "cgrates" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` cgrEv := new(utils.CGREvent) newIDb := engine.NewInternalDB(nil, nil, true) diff --git a/ees/sql_test.go b/ees/sql_test.go index 3ce875632..abfaf4f03 100644 --- a/ees/sql_test.go +++ b/ees/sql_test.go @@ -60,8 +60,8 @@ func TestSqlGetMetrics(t *testing.T) { func TestNewSQLeUrl(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" cgrCfg.EEsCfg().Exporters[0].Opts[utils.SSLModeCfg] = "test" newIDb := engine.NewInternalDB(nil, nil, true) newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) @@ -84,8 +84,8 @@ func TestNewSQLeUrl(t *testing.T) { func TestNewSQLeUrlSQL(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "mysql" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "mysql" cgrCfg.EEsCfg().Exporters[0].ExportPath = `mysql://cgrates:CGRateS.org@127.0.0.1:3306` newIDb := engine.NewInternalDB(nil, nil, true) newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) @@ -110,8 +110,8 @@ func TestNewSQLeUrlSQL(t *testing.T) { func TestNewSQLeUrlPostgres(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" cgrCfg.EEsCfg().Exporters[0].ExportPath = `postgres://cgrates:CGRateS.org@127.0.0.1:3306` newIDb := engine.NewInternalDB(nil, nil, true) newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) @@ -136,8 +136,8 @@ func TestNewSQLeUrlPostgres(t *testing.T) { func TestNewSQLeExportPathError(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableName] = "expTable" - cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBName] = "postgres" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLTableNameOpt] = "expTable" + cgrCfg.EEsCfg().Exporters[0].Opts[utils.SQLDBNameOpt] = "postgres" cgrCfg.EEsCfg().Exporters[0].ExportPath = ":foo" newIDb := engine.NewInternalDB(nil, nil, true) newDM := engine.NewDataManager(newIDb, cgrCfg.CacheCfg(), nil) diff --git a/ers/flatstore.go b/ers/flatstore.go index 240c64a36..c20638414 100644 --- a/ers/flatstore.go +++ b/ers/flatstore.go @@ -38,6 +38,13 @@ import ( "github.com/cgrates/cgrates/utils" ) +type fstRecord struct { + inv []string + bye []string + ack []string + fileName string +} + func NewFlatstoreER(cfg *config.CGRConfig, cfgIdx int, rdrEvents chan *erEvent, rdrErr chan error, fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { diff --git a/ers/sql.go b/ers/sql.go index 9b82fae2f..1ec4591cb 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -269,7 +269,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac rdr.connType = u.Scheme dbname := utils.SQLDefaultDBName - if vals, has := opts[utils.SQLDBName]; has { + if vals, has := opts[utils.SQLDBNameOpt]; has { dbname = utils.IfaceAsString(vals) } ssl := utils.SQLDefaultSSLMode @@ -278,7 +278,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac } rdr.tableName = utils.CDRsTBL - if vals, has := opts[utils.SQLTableName]; has { + if vals, has := opts[utils.SQLTableNameOpt]; has { rdr.tableName = utils.IfaceAsString(vals) } switch rdr.connType { @@ -318,7 +318,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac } outDBname = utils.SQLDefaultDBName - if vals, has := processedOpt[utils.SQLDBName]; has { + if vals, has := processedOpt[utils.SQLDBNameOpt]; has { outDBname = utils.IfaceAsString(vals) } outSSL = utils.SQLDefaultSSLMode @@ -326,7 +326,7 @@ func (rdr *SQLEventReader) setURL(inURL, outURL string, opts map[string]interfac outSSL = utils.IfaceAsString(vals) } rdr.expTableName = utils.CDRsTBL - if vals, has := processedOpt[utils.SQLTableName]; has { + if vals, has := processedOpt[utils.SQLTableNameOpt]; has { rdr.expTableName = utils.IfaceAsString(vals) } diff --git a/ers/sql_test.go b/ers/sql_test.go index 9bc28a17d..7db51a633 100644 --- a/ers/sql_test.go +++ b/ers/sql_test.go @@ -41,13 +41,13 @@ func TestSQLSetURL(t *testing.T) { inURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" outURL := "*mysql://cgrates:CGRateS.org@127.0.0.1:3306" if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBName: "cgrates2", - utils.SQLTableName: "cdrs2", - utils.SSLModeCfg: "enabled", + utils.SQLDBNameOpt: "cgrates2", + utils.SQLTableNameOpt: "cdrs2", + utils.SSLModeCfg: "enabled", - utils.SQLDBName + utils.ProcessedOpt: "cgrates3", - utils.SQLTableName + utils.ProcessedOpt: "cdrs3", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates3", + utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs3", + utils.SSLModeCfg + utils.ProcessedOpt: "enabled", }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { @@ -76,13 +76,13 @@ func TestSQLSetURL(t *testing.T) { inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" outURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBName: "cgrates2", - utils.SQLTableName: "cdrs2", - utils.SSLModeCfg: "enabled", + utils.SQLDBNameOpt: "cgrates2", + utils.SQLTableNameOpt: "cdrs2", + utils.SSLModeCfg: "enabled", - utils.SQLDBName + utils.ProcessedOpt: "cgrates3", - utils.SQLTableName + utils.ProcessedOpt: "cdrs3", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates3", + utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs3", + utils.SSLModeCfg + utils.ProcessedOpt: "enabled", }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { @@ -111,13 +111,13 @@ func TestSQLSetURL(t *testing.T) { inURL = "*postgres://cgrates:CGRateS.org@127.0.0.1:3306" outURL = "" if err := sql.setURL(inURL, outURL, map[string]interface{}{ - utils.SQLDBName: "cgrates2", - utils.SQLTableName: "cdrs2", - utils.SSLModeCfg: "enabled", + utils.SQLDBNameOpt: "cgrates2", + utils.SQLTableNameOpt: "cdrs2", + utils.SSLModeCfg: "enabled", - utils.SQLDBName + utils.ProcessedOpt: "cgrates2", - utils.SQLTableName + utils.ProcessedOpt: "cdrs2", - utils.SSLModeCfg + utils.ProcessedOpt: "enabled", + utils.SQLDBNameOpt + utils.ProcessedOpt: "cgrates2", + utils.SQLTableNameOpt + utils.ProcessedOpt: "cdrs2", + utils.SSLModeCfg + utils.ProcessedOpt: "enabled", }); err != nil { t.Fatal(err) } else if expsql.connString != sql.connString { diff --git a/utils/consts.go b/utils/consts.go index 209cdc9d5..0dc2082eb 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2549,8 +2549,8 @@ const ( SQLDefaultDBName = "cgrates" SQLDefaultSSLMode = "disable" - SQLDBName = "sqlDBName" - SQLTableName = "sqlTableName" + SQLDBNameOpt = "sqlDBName" + SQLTableNameOpt = "sqlTableName" // fileCSV RowLengthOpt = "RowLength" @@ -2566,6 +2566,8 @@ const ( FlatstorePrfx = "fst" FstFailedCallsPrefixOpt = "fstFailedCallsPrefix" FstPartialRecordCacheOpt = "fstRecordCacheTTL" + FstMethodOpt = "fstMethod" // + FstMadatoryACKOpt = "fstMadatoryACK" // fileXML XMLRootPathOpt = "xmlRootPath"