From b8cc20bb7cad0d1dc350ecfc5f2c60b7acd606b3 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Tue, 14 Jan 2025 17:57:57 +0200 Subject: [PATCH] Add sqlBatchSize to ERS SQL OPTS and adjust delete functionality --- config/config_defaults.go | 1 + config/erscfg.go | 11 +++++++++++ config/erscfg_test.go | 4 +++- config/libconfig_json.go | 1 + .../ers_mysql_delete_indexed_fields/cgrates.json | 1 + data/conf/samples/ers_mysql_filters/cgrates.json | 1 + data/conf/samples/ers_mysql_meta_delete/cgrates.json | 1 + data/conf/samples/ers_mysql_move/cgrates.json | 1 + data/conf/samples/ers_mysql_raw_update/cgrates.json | 1 + ers/sql.go | 5 ++++- utils/consts.go | 1 + 11 files changed, 26 insertions(+), 2 deletions(-) diff --git a/config/config_defaults.go b/config/config_defaults.go index 6b66be146..6eceaf6a4 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -451,6 +451,7 @@ const CGRATES_CFG_JSON = ` // SQL // "sqlDBName": "cgrates", // the name of the database from were the events are read // "sqlTableName": "cdrs", // the name of the table from were the events are read + // "sqlBatchSize: 0, // number of SQL rows that can be selected at a time. 0 or lower for unlimited // "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table // "pgSSLMode": "disable", // the ssl mode for postgres db diff --git a/config/erscfg.go b/config/erscfg.go index d5b88e927..5be1c20ae 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -258,6 +258,7 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err type SQLROpts struct { DBName *string TableName *string + BatchSize *int DeleteIndexedFields *[]string PgSSLMode *string } @@ -269,6 +270,9 @@ func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error if jsnCfg.SQLTableName != nil { sqlOpts.TableName = jsnCfg.SQLTableName } + if jsnCfg.SQLBatchSize != nil { + sqlOpts.BatchSize = jsnCfg.SQLBatchSize + } if jsnCfg.SQLDeleteIndexedFields != nil { dif := make([]string, len(*jsnCfg.SQLDeleteIndexedFields)) copy(dif, *jsnCfg.SQLDeleteIndexedFields) @@ -671,6 +675,10 @@ func (sqlOpts *SQLROpts) Clone() *SQLROpts { cln.TableName = new(string) *cln.TableName = *sqlOpts.TableName } + if sqlOpts.BatchSize != nil { + cln.BatchSize = new(int) + *cln.BatchSize = *sqlOpts.BatchSize + } if sqlOpts.DeleteIndexedFields != nil { idx := make([]string, len(*sqlOpts.DeleteIndexedFields)) copy(idx, *sqlOpts.DeleteIndexedFields) @@ -928,6 +936,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string if sqlOpts.TableName != nil { opts[utils.SQLTableNameOpt] = *sqlOpts.TableName } + if sqlOpts.BatchSize != nil { + opts[utils.SQLBatchSize] = *sqlOpts.BatchSize + } if sqlOpts.DeleteIndexedFields != nil { deleteIndexedFields := make([]string, len(*sqlOpts.DeleteIndexedFields)) copy(deleteIndexedFields, *sqlOpts.DeleteIndexedFields) diff --git a/config/erscfg_test.go b/config/erscfg_test.go index b1c22cc51..f8c908f9c 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -1439,8 +1439,9 @@ func TestEventReaderOptsCfg(t *testing.T) { KafkaTopic: utils.StringPointer("kafka"), KafkaMaxWait: utils.StringPointer("1m"), SQLDBName: utils.StringPointer("dbname"), - SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}), SQLTableName: utils.StringPointer("tablename"), + SQLBatchSize: utils.IntPointer(-1), + SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}), PgSSLMode: utils.StringPointer("sslmode"), AWSRegion: utils.StringPointer("eu"), AWSKey: utils.StringPointer("key"), @@ -1521,6 +1522,7 @@ func TestEventReaderCfgClone(t *testing.T) { SQL: &SQLROpts{ DBName: utils.StringPointer("dbname"), TableName: utils.StringPointer("tablename"), + BatchSize: utils.IntPointer(0), DeleteIndexedFields: utils.SliceStringPointer([]string{"id"}), PgSSLMode: utils.StringPointer("sslmode"), }, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index f0420701d..039216e20 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -248,6 +248,7 @@ type EventReaderOptsJson struct { KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"` SQLDBName *string `json:"sqlDBName"` SQLTableName *string `json:"sqlTableName"` + SQLBatchSize *int `json:"sqlBatchSize"` SQLDeleteIndexedFields *[]string `json:"sqlDeleteIndexedFields"` PgSSLMode *string `json:"pgSSLMode"` AWSRegion *string `json:"awsRegion"` diff --git a/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json b/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json index 6eab99ca9..2dfc81bc1 100644 --- a/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json +++ b/data/conf/samples/ers_mysql_delete_indexed_fields/cgrates.json @@ -26,6 +26,7 @@ "opts": { "sqlDBName":"cgrates2", "sqlTableName":"cdrs", + "sqlBatchSize": 2, "sqlDeleteIndexedFields": ["id"], }, "start_delay": "500ms", // wait for db to be populated before starting reader diff --git a/data/conf/samples/ers_mysql_filters/cgrates.json b/data/conf/samples/ers_mysql_filters/cgrates.json index bc9c33021..dc0dc357e 100644 --- a/data/conf/samples/ers_mysql_filters/cgrates.json +++ b/data/conf/samples/ers_mysql_filters/cgrates.json @@ -26,6 +26,7 @@ "opts": { "sqlDBName":"cgrates2", "sqlTableName":"cdrs", + "sqlBatchSize": 20, }, "start_delay": "500ms", // wait for db to be populated before starting reader "tenant": "cgrates.org", diff --git a/data/conf/samples/ers_mysql_meta_delete/cgrates.json b/data/conf/samples/ers_mysql_meta_delete/cgrates.json index ab47b761a..fe978ad9f 100644 --- a/data/conf/samples/ers_mysql_meta_delete/cgrates.json +++ b/data/conf/samples/ers_mysql_meta_delete/cgrates.json @@ -26,6 +26,7 @@ "opts": { "sqlDBName":"cgrates2", "sqlTableName":"cdrs", + "sqlBatchSize": 1, }, "start_delay": "500ms", // wait for db to be populated before starting reader "processed_path": "*delete", diff --git a/data/conf/samples/ers_mysql_move/cgrates.json b/data/conf/samples/ers_mysql_move/cgrates.json index d034ae7de..456f44728 100644 --- a/data/conf/samples/ers_mysql_move/cgrates.json +++ b/data/conf/samples/ers_mysql_move/cgrates.json @@ -42,6 +42,7 @@ "opts": { "sqlDBName":"cgrates2", "sqlTableName":"cdrs", + "sqlBatchSize": 0, "sqlDeleteIndexedFields": ["id"], }, "start_delay": "500ms", // wait for db to be populated before starting reader diff --git a/data/conf/samples/ers_mysql_raw_update/cgrates.json b/data/conf/samples/ers_mysql_raw_update/cgrates.json index 4e5ff1576..5086b1486 100644 --- a/data/conf/samples/ers_mysql_raw_update/cgrates.json +++ b/data/conf/samples/ers_mysql_raw_update/cgrates.json @@ -25,6 +25,7 @@ "opts": { "sqlDBName": "cgrates2", "sqlTableName":"cdrs", + "sqlBatchSize": -1, "sqlUpdateIndexedFields": ["id", "cgrid"], }, "flags": ["*log"], diff --git a/ers/sql.go b/ers/sql.go index 0a26bd0c6..b4814154b 100644 --- a/ers/sql.go +++ b/ers/sql.go @@ -167,6 +167,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { rdr.rdrErr <- err return } + if rdr.Config().Opts.SQL.BatchSize != nil && *rdr.Config().Opts.SQL.BatchSize > 0 { + tx.Limit(*rdr.Config().Opts.SQL.BatchSize) // limit how much can be selected per iteration + } rows, err := tx.Rows() // get all rows selected if err != nil { rdr.rdrErr <- err @@ -228,7 +231,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) { } } } - if err = tx.Delete(nil, sqlClauseVars).Error; err != nil { // to ensure we don't read it again + if err = db.Table(rdr.tableName).Delete(nil, sqlClauseVars).Error; err != nil { // to ensure we don't read it again utils.Logger.Warning( fmt.Sprintf("<%s> deleting message %s error: %s", utils.ERs, utils.ToJSON(ev), err.Error())) diff --git a/utils/consts.go b/utils/consts.go index 8b3f685e7..8ed3c612b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -2809,6 +2809,7 @@ const ( SQLDBNameOpt = "sqlDBName" SQLTableNameOpt = "sqlTableName" + SQLBatchSize = "sqlBatchSize" SQLDeleteIndexedFieldsOpt = "sqlDeleteIndexedFields" SQLUpdateIndexedFieldsOpt = "sqlUpdateIndexedFields"