mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Revise kafka writer definition
This commit is contained in:
committed by
Dan Christian Bogos
parent
40943dda09
commit
39d50fd211
@@ -88,19 +88,19 @@ func (pstr *KafkaPoster) Close() {
|
||||
pstr.Lock()
|
||||
if pstr.writer != nil {
|
||||
pstr.writer.Close()
|
||||
pstr.writer = nil
|
||||
}
|
||||
pstr.writer = nil
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
func (pstr *KafkaPoster) newPostWriter() {
|
||||
pstr.Lock()
|
||||
if pstr.writer == nil {
|
||||
pstr.writer = kafka.NewWriter(kafka.WriterConfig{
|
||||
Brokers: []string{pstr.dialURL},
|
||||
MaxAttempts: pstr.attempts,
|
||||
pstr.writer = &kafka.Writer{
|
||||
Addr: kafka.TCP(pstr.dialURL),
|
||||
Topic: pstr.topic,
|
||||
})
|
||||
MaxAttempts: pstr.attempts,
|
||||
}
|
||||
}
|
||||
pstr.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user