Add sqlBatchSize to ERS SQL OPTS and adjust delete functionality

This commit is contained in:
arberkatellari
2025-01-14 17:57:57 +02:00
committed by Dan Christian Bogos
parent 7664ffccb4
commit b8cc20bb7c
11 changed files with 26 additions and 2 deletions

View File

@@ -451,6 +451,7 @@ const CGRATES_CFG_JSON = `
// SQL
// "sqlDBName": "cgrates", // the name of the database from were the events are read
// "sqlTableName": "cdrs", // the name of the table from were the events are read
// "sqlBatchSize: 0, // number of SQL rows that can be selected at a time. 0 or lower for unlimited
// "sqlDeleteIndexedFields": [], // list of fields to DELETE from the table
// "pgSSLMode": "disable", // the ssl mode for postgres db

View File

@@ -258,6 +258,7 @@ func (kafkaROpts *KafkaROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
type SQLROpts struct {
DBName *string
TableName *string
BatchSize *int
DeleteIndexedFields *[]string
PgSSLMode *string
}
@@ -269,6 +270,9 @@ func (sqlOpts *SQLROpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error
if jsnCfg.SQLTableName != nil {
sqlOpts.TableName = jsnCfg.SQLTableName
}
if jsnCfg.SQLBatchSize != nil {
sqlOpts.BatchSize = jsnCfg.SQLBatchSize
}
if jsnCfg.SQLDeleteIndexedFields != nil {
dif := make([]string, len(*jsnCfg.SQLDeleteIndexedFields))
copy(dif, *jsnCfg.SQLDeleteIndexedFields)
@@ -671,6 +675,10 @@ func (sqlOpts *SQLROpts) Clone() *SQLROpts {
cln.TableName = new(string)
*cln.TableName = *sqlOpts.TableName
}
if sqlOpts.BatchSize != nil {
cln.BatchSize = new(int)
*cln.BatchSize = *sqlOpts.BatchSize
}
if sqlOpts.DeleteIndexedFields != nil {
idx := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(idx, *sqlOpts.DeleteIndexedFields)
@@ -928,6 +936,9 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
if sqlOpts.TableName != nil {
opts[utils.SQLTableNameOpt] = *sqlOpts.TableName
}
if sqlOpts.BatchSize != nil {
opts[utils.SQLBatchSize] = *sqlOpts.BatchSize
}
if sqlOpts.DeleteIndexedFields != nil {
deleteIndexedFields := make([]string, len(*sqlOpts.DeleteIndexedFields))
copy(deleteIndexedFields, *sqlOpts.DeleteIndexedFields)

View File

@@ -1439,8 +1439,9 @@ func TestEventReaderOptsCfg(t *testing.T) {
KafkaTopic: utils.StringPointer("kafka"),
KafkaMaxWait: utils.StringPointer("1m"),
SQLDBName: utils.StringPointer("dbname"),
SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}),
SQLTableName: utils.StringPointer("tablename"),
SQLBatchSize: utils.IntPointer(-1),
SQLDeleteIndexedFields: utils.SliceStringPointer([]string{"id"}),
PgSSLMode: utils.StringPointer("sslmode"),
AWSRegion: utils.StringPointer("eu"),
AWSKey: utils.StringPointer("key"),
@@ -1521,6 +1522,7 @@ func TestEventReaderCfgClone(t *testing.T) {
SQL: &SQLROpts{
DBName: utils.StringPointer("dbname"),
TableName: utils.StringPointer("tablename"),
BatchSize: utils.IntPointer(0),
DeleteIndexedFields: utils.SliceStringPointer([]string{"id"}),
PgSSLMode: utils.StringPointer("sslmode"),
},

View File

@@ -248,6 +248,7 @@ type EventReaderOptsJson struct {
KafkaSkipTLSVerify *bool `json:"kafkaSkipTLSVerify"`
SQLDBName *string `json:"sqlDBName"`
SQLTableName *string `json:"sqlTableName"`
SQLBatchSize *int `json:"sqlBatchSize"`
SQLDeleteIndexedFields *[]string `json:"sqlDeleteIndexedFields"`
PgSSLMode *string `json:"pgSSLMode"`
AWSRegion *string `json:"awsRegion"`

View File

@@ -26,6 +26,7 @@
"opts": {
"sqlDBName":"cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": 2,
"sqlDeleteIndexedFields": ["id"],
},
"start_delay": "500ms", // wait for db to be populated before starting reader

View File

@@ -26,6 +26,7 @@
"opts": {
"sqlDBName":"cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": 20,
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"tenant": "cgrates.org",

View File

@@ -26,6 +26,7 @@
"opts": {
"sqlDBName":"cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": 1,
},
"start_delay": "500ms", // wait for db to be populated before starting reader
"processed_path": "*delete",

View File

@@ -42,6 +42,7 @@
"opts": {
"sqlDBName":"cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": 0,
"sqlDeleteIndexedFields": ["id"],
},
"start_delay": "500ms", // wait for db to be populated before starting reader

View File

@@ -25,6 +25,7 @@
"opts": {
"sqlDBName": "cgrates2",
"sqlTableName":"cdrs",
"sqlBatchSize": -1,
"sqlUpdateIndexedFields": ["id", "cgrid"],
},
"flags": ["*log"],

View File

@@ -167,6 +167,9 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
rdr.rdrErr <- err
return
}
if rdr.Config().Opts.SQL.BatchSize != nil && *rdr.Config().Opts.SQL.BatchSize > 0 {
tx.Limit(*rdr.Config().Opts.SQL.BatchSize) // limit how much can be selected per iteration
}
rows, err := tx.Rows() // get all rows selected
if err != nil {
rdr.rdrErr <- err
@@ -228,7 +231,7 @@ func (rdr *SQLEventReader) readLoop(db *gorm.DB, sqlDB io.Closer) {
}
}
}
if err = tx.Delete(nil, sqlClauseVars).Error; err != nil { // to ensure we don't read it again
if err = db.Table(rdr.tableName).Delete(nil, sqlClauseVars).Error; err != nil { // to ensure we don't read it again
utils.Logger.Warning(
fmt.Sprintf("<%s> deleting message %s error: %s",
utils.ERs, utils.ToJSON(ev), err.Error()))

View File

@@ -2809,6 +2809,7 @@ const (
SQLDBNameOpt = "sqlDBName"
SQLTableNameOpt = "sqlTableName"
SQLBatchSize = "sqlBatchSize"
SQLDeleteIndexedFieldsOpt = "sqlDeleteIndexedFields"
SQLUpdateIndexedFieldsOpt = "sqlUpdateIndexedFields"