From da358161a7c0cc5039fb5a0ff28b0fa8abf87116 Mon Sep 17 00:00:00 2001 From: adi Date: Fri, 22 Jul 2022 17:04:21 +0300 Subject: [PATCH] EFs service to logger/ees and APIS --- actions/export.go | 6 +- apis/efs.go | 24 +- ees/ees.go | 35 ++- ees/libcdre.go | 14 +- ees/nats_it_test.go | 4 +- ees/poster_it_test.go | 6 +- efs/efs.go | 62 +++++ efs/failed_ees.go | 286 ++++++++++++++++++++ efs/failed_loggs.go | 111 ++++++++ efs/libefs.go | 124 +++++++++ general_tests/cdrs_post_failover_it_test.go | 14 +- services/cgr-engine.go | 3 +- utils/coreutils.go | 9 + utils/failover_export.go | 25 +- utils/kafka_logger.go | 94 +++---- 15 files changed, 702 insertions(+), 115 deletions(-) create mode 100644 efs/failed_ees.go create mode 100644 efs/failed_loggs.go create mode 100644 efs/libefs.go diff --git a/actions/export.go b/actions/export.go index eb8d8b170..2d8d638d4 100644 --- a/actions/export.go +++ b/actions/export.go @@ -78,8 +78,10 @@ func (aL *actHTTPPost) execute(ctx *context.Context, data utils.MapStorage, _ st var partExec bool for _, pstr := range aL.pstrs { if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr { - go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString) - } else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil { + go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString, + nil, aL.config.GeneralCfg().DefaultTenant) + } else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString, + nil, aL.config.GeneralCfg().DefaultTenant); err != nil { if pstr.Cfg().FailedPostsDir != utils.MetaNone { err = nil } else { diff --git a/apis/efs.go b/apis/efs.go index a5abf0b53..3891148d7 100644 --- a/apis/efs.go +++ b/apis/efs.go @@ -25,7 +25,6 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/utils" ) @@ -44,18 +43,13 @@ type EfSv1 struct { ping } -type ArgsReplayFailedPosts struct { - TypeProvider string - FailedRequestsInDir *string // if defined it will be our source of requests to be replayed - FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded - Modules []string // list of modules for which replay the requests, nil for all +// ProcessEvent will write into gob formnat file the Events that were failed to be exported. +func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error { + return efS.efs.V1ProcessEvent(ctx, args, reply) } -func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error { - return nil -} - -func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error { +// ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again. +func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *efs.ArgsReplayFailedPosts, reply *string) error { failedPostsDir := efS.cfg.LoggerCfg().Opts.FailedPostsDir if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString { failedPostsDir = *args.FailedRequestsInDir @@ -87,8 +81,8 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts } } filePath := path.Join(failedPostsDir, file.Name()) - var expEv utils.FailoverPoster - if expEv, err = ees.NewFailoverPosterFromFile(filePath, args.TypeProvider); err != nil { + var expEv efs.FailoverPoster + if expEv, err = efs.NewFailoverPosterFromFile(filePath, args.TypeProvider, efS.efs); err != nil { return err } // check if the failed out dir path is the same as the same in dir in order to export again in case of failure @@ -97,9 +91,9 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts failoverPath = path.Join(failedOutDir, file.Name()) } - err = expEv.ReplayFailedPosts(efS.cfg.EFsCfg().PosterAttempts) + err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant) if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves - if err = utils.WriteToFile(failoverPath, expEv); err != nil { + if err = efs.WriteToFile(failoverPath, expEv); err != nil { return utils.NewErrServerError(err) } } diff --git a/ees/ees.go b/ees/ees.go index d5df79aae..8b37b6c7d 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -197,7 +197,7 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe utils.EEs, ee.Cfg().ID)) } go func(evict, sync bool, ee EventExporter) { - if err := exportEventWithExporter(ctx, ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS); err != nil { + if err := exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil { withErr = true } if sync { @@ -243,7 +243,8 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe return } -func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) { +func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *engine.ConnManager, + ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS, tnt string) (err error) { defer func() { updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone, cfg.GeneralCfg().DefaultTimezone)) @@ -276,16 +277,34 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils. } extraData := exp.ExtraData(ev) - return ExportWithAttempts(ctx, exp, eEv, extraData) + return ExportWithAttempts(ctx, exp, eEv, extraData, connMngr, tnt) } -func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}) (err error) { +func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}, + connMnngr *engine.ConnManager, tnt string) (err error) { if exp.Cfg().FailedPostsDir != utils.MetaNone { defer func() { if err != nil { - utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath, - exp.Cfg().Type, utils.EEs, - eEv, exp.Cfg().Opts.AsMapInterface()) + args := &utils.ArgsFailedPosts{ + Tenant: tnt, + Path: exp.Cfg().ExportPath, + Event: eEv, + FailedDir: exp.Cfg().FailedPostsDir, + Module: utils.EEs, + APIOpts: exp.Cfg().Opts.AsMapInterface(), + } + var reply string + if err = connMnngr.Call(ctx, exp.Cfg().EFsConns, + utils.EfSv1ProcessEvent, args, &reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>", + utils.EEs, exp.Cfg().ID, utils.EFs, err.Error())) + /* + utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath, + exp.Cfg().Type, utils.EEs, + eEv, exp.Cfg().Opts.AsMapInterface()) + */ + } } }() } @@ -431,7 +450,7 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents // exported will be true if there will be at least one exporter archived exported = true - if err = exportEventWithExporter(ctx, ee, cgrEv, false, eeS.cfg, eeS.fltrS); err != nil { + if err = exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv, false, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil { return err } } diff --git a/ees/libcdre.go b/ees/libcdre.go index 7211bd5ae..6e7458a31 100644 --- a/ees/libcdre.go +++ b/ees/libcdre.go @@ -18,18 +18,7 @@ along with this program. If not, see package ees -import ( - "bytes" - "encoding/gob" - "os" - "sync" - - "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/guardian" - "github.com/cgrates/cgrates/utils" -) - +/* // NewFailoverPosterFromFile returns ExportEvents from the file // used only on replay failed post func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) { @@ -321,3 +310,4 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) { } return } +*/ diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go index f9b3a8de0..773b12c7a 100644 --- a/ees/nats_it_test.go +++ b/ees/nats_it_test.go @@ -106,7 +106,7 @@ func TestNatsEE(t *testing.T) { "Destination": "1002", }, } - if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { + if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil { t.Fatal(err) } testCleanDirectory(t) @@ -172,7 +172,7 @@ func TestNatsEE2(t *testing.T) { "Destination": "1002", }, } - if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil { + if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil { t.Fatal(err) } testCleanDirectory(t) diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go index 1cdc31ecf..cdc18e97b 100644 --- a/ees/poster_it_test.go +++ b/ees/poster_it_test.go @@ -160,7 +160,7 @@ func TestSQSPoster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil { t.Fatal(err) } @@ -239,7 +239,7 @@ func TestS3Poster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key, nil, "cgrates.org"); err != nil { t.Fatal(err) } key += ".json" @@ -294,7 +294,7 @@ func TestAMQPv1Poster(t *testing.T) { Attempts: 5, Opts: opts, }, nil) - if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil { + if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil { t.Fatal(err) } // Create client diff --git a/efs/efs.go b/efs/efs.go index b57442723..0ffcef06a 100644 --- a/efs/efs.go +++ b/efs/efs.go @@ -21,8 +21,10 @@ package efs import ( "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" ) type EfS struct { @@ -38,3 +40,63 @@ func NewEfs(cfg *config.CGRConfig, connMgr *engine.ConnManager) *EfS { connMgr: connMgr, } } + +// V1ProcessEvent will write into gob formnat file the Events that were failed to be exported. +func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error { + var format string + if _, has := args.APIOpts[utils.Format]; has { + format = utils.IfaceAsString(args.APIOpts[utils.Format]) + } + key := utils.ConcatenatedKey(args.FailedDir, args.Path, format, args.Module) + switch args.Module { + case utils.EEs: + // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id + var amqpQueueID string + var s3BucketID string + var sqsQueueID string + var kafkaTopic string + if _, has := args.APIOpts[utils.AMQPQueueID]; has { + amqpQueueID = utils.IfaceAsString(args.APIOpts[utils.AMQPQueueID]) + } + if _, has := args.APIOpts[utils.S3Bucket]; has { + s3BucketID = utils.IfaceAsString(args.APIOpts[utils.S3Bucket]) + } + if _, has := args.APIOpts[utils.SQSQueueID]; has { + sqsQueueID = utils.IfaceAsString(args.APIOpts[utils.SQSQueueID]) + } + if _, has := args.APIOpts[kafkaTopic]; has { + kafkaTopic = utils.IfaceAsString(args.APIOpts[utils.KafkaTopic]) + } + if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID, + sqsQueueID, kafkaTopic); len(qID) != 0 { + key = utils.ConcatenatedKey(key, qID) + } + case utils.Kafka: + } + var failedPost *FailedExportersLogg + if x, ok := failedPostCache.Get(key); ok { + if x != nil { + failedPost = x.(*FailedExportersLogg) + } + } + if failedPost == nil { + failedPost = &FailedExportersLogg{ + Path: args.Path, + Format: format, + Opts: args.APIOpts, + Module: args.Module, + FailedPostsDir: args.FailedDir, + } + failedPostCache.Set(key, failedPost, nil) + } + failedPost.AddEvent(args.Event) + return nil +} + +type ArgsReplayFailedPosts struct { + Tenant string + TypeProvider string + FailedRequestsInDir *string // if defined it will be our source of requests to be replayed + FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded + Modules []string // list of modules for which replay the requests, nil for all +} diff --git a/efs/failed_ees.go b/efs/failed_ees.go new file mode 100644 index 000000000..db2a82777 --- /dev/null +++ b/efs/failed_ees.go @@ -0,0 +1,286 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package efs + +import ( + "sync" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/ees" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// FailedExportersEEs used to save the failed post to file +type FailedExportersEEs struct { + lk sync.RWMutex + Path string + Opts *config.EventExporterOpts + Format string + Events []interface{} + failedPostsDir string + module string + + connMngr *engine.ConnManager +} + +func AsOptsEESConfig(opts map[string]interface{}) (*config.EventExporterOpts, error) { + optsCfg := new(config.EventExporterOpts) + if len(opts) == 0 { + return optsCfg, nil + } + if _, has := opts[utils.CSVFieldSepOpt]; has { + optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt)) + } + if _, has := opts[utils.ElsIndex]; has { + optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex)) + } + if _, has := opts[utils.ElsIfPrimaryTerm]; has { + x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm) + if err != nil { + return nil, err + } + optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x) + } + if _, has := opts[utils.ElsIfSeqNo]; has { + x, err := utils.IfaceAsInt(utils.ElsIfSeqNo) + if err != nil { + return nil, err + } + optsCfg.ElsIfSeqNo = utils.IntPointer(x) + } + if _, has := opts[utils.ElsOpType]; has { + optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType)) + } + if _, has := opts[utils.ElsPipeline]; has { + optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline)) + } + if _, has := opts[utils.ElsRouting]; has { + optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting)) + } + if _, has := opts[utils.ElsTimeout]; has { + t, err := utils.IfaceAsDuration(utils.ElsTimeout) + if err != nil { + return nil, err + } + optsCfg.ElsTimeout = &t + } + if _, has := opts[utils.ElsVersionLow]; has { + x, err := utils.IfaceAsInt(utils.ElsVersionLow) + if err != nil { + return nil, err + } + optsCfg.ElsVersion = utils.IntPointer(x) + } + if _, has := opts[utils.ElsVersionType]; has { + optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType)) + } + if _, has := opts[utils.ElsWaitForActiveShards]; has { + optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards)) + } + if _, has := opts[utils.SQLMaxIdleConnsCfg]; has { + x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg) + if err != nil { + return nil, err + } + optsCfg.SQLMaxIdleConns = utils.IntPointer(x) + } + if _, has := opts[utils.SQLMaxOpenConns]; has { + x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns) + if err != nil { + return nil, err + } + optsCfg.SQLMaxOpenConns = utils.IntPointer(x) + } + if _, has := opts[utils.SQLConnMaxLifetime]; has { + t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime) + if err != nil { + return nil, err + } + optsCfg.SQLConnMaxLifetime = &t + } + if _, has := opts[utils.MYSQLDSNParams]; has { + optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string) + } + if _, has := opts[utils.SQLTableNameOpt]; has { + optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt)) + } + if _, has := opts[utils.SQLDBNameOpt]; has { + optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt)) + } + if _, has := opts[utils.PgSSLModeCfg]; has { + optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg)) + } + if _, has := opts[utils.KafkaTopic]; has { + optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic)) + } + if _, has := opts[utils.AMQPQueueID]; has { + optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID)) + } + if _, has := opts[utils.AMQPRoutingKey]; has { + optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey)) + } + if _, has := opts[utils.AMQPExchange]; has { + optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange)) + } + if _, has := opts[utils.AMQPExchangeType]; has { + optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType)) + } + if _, has := opts[utils.AWSRegion]; has { + optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion)) + } + if _, has := opts[utils.AWSKey]; has { + optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey)) + } + if _, has := opts[utils.AWSSecret]; has { + optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret)) + } + if _, has := opts[utils.AWSToken]; has { + optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken)) + } + if _, has := opts[utils.SQSQueueID]; has { + optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID)) + } + if _, has := opts[utils.S3Bucket]; has { + optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket)) + } + if _, has := opts[utils.S3FolderPath]; has { + optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath)) + } + if _, has := opts[utils.NatsJetStream]; has { + x, err := utils.IfaceAsBool(utils.NatsJetStream) + if err != nil { + return nil, err + } + optsCfg.NATSJetStream = utils.BoolPointer(x) + } + if _, has := opts[utils.NatsSubject]; has { + optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject)) + } + if _, has := opts[utils.NatsJWTFile]; has { + optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile)) + } + if _, has := opts[utils.NatsSeedFile]; has { + optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile)) + } + if _, has := opts[utils.NatsCertificateAuthority]; has { + optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority)) + } + if _, has := opts[utils.NatsClientCertificate]; has { + optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate)) + } + if _, has := opts[utils.NatsClientKey]; has { + optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey)) + } + if _, has := opts[utils.NatsJetStreamMaxWait]; has { + t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait) + if err != nil { + return nil, err + } + optsCfg.NATSJetStreamMaxWait = &t + } + if _, has := opts[utils.RpcCodec]; has { + optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec)) + } + if _, has := opts[utils.ServiceMethod]; has { + optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod)) + } + if _, has := opts[utils.KeyPath]; has { + optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath)) + } + if _, has := opts[utils.CertPath]; has { + optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath)) + } + if _, has := opts[utils.CaPath]; has { + optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath)) + } + if _, has := opts[utils.Tls]; has { + x, err := utils.IfaceAsBool(utils.Tls) + if err != nil { + return nil, err + } + optsCfg.TLS = utils.BoolPointer(x) + } + if _, has := opts[utils.ConnIDs]; has { + optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string) + } + if _, has := opts[utils.RpcConnTimeout]; has { + t, err := utils.IfaceAsDuration(utils.RpcConnTimeout) + if err != nil { + return nil, err + } + optsCfg.RPCConnTimeout = &t + } + if _, has := opts[utils.RpcReplyTimeout]; has { + t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout) + if err != nil { + return nil, err + } + optsCfg.RPCReplyTimeout = &t + } + if _, has := opts[utils.RPCAPIOpts]; has { + optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]interface{}) + } + return optsCfg, nil +} + +// AddEvent adds one event +func (expEv *FailedExportersEEs) AddEvent(ev interface{}) { + expEv.lk.Lock() + expEv.Events = append(expEv.Events, ev) + expEv.lk.Unlock() +} + +// ReplayFailedPosts tryies to post cdrs again +func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) { + eesFailedEvents := &FailedExportersEEs{ + Path: expEv.Path, + Opts: expEv.Opts, + Format: expEv.Format, + } + + var ee ees.EventExporter + if ee, err = ees.NewEventExporter(&config.EventExporterCfg{ + ID: "ReplayFailedPosts", + Type: expEv.Format, + ExportPath: expEv.Path, + Opts: expEv.Opts, + Attempts: attempts, + FailedPostsDir: utils.MetaNone, + }, config.CgrConfig(), nil, nil); err != nil { + return + } + keyFunc := func() string { return utils.EmptyString } + if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap { + keyFunc = utils.UUIDSha1Prefix + } + for _, ev := range expEv.Events { + if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil { + eesFailedEvents.AddEvent(ev) + } + } + ee.Close() + if len(eesFailedEvents.Events) > 0 { + err = utils.ErrPartiallyExecuted + } else { + eesFailedEvents = nil + } + return +} diff --git a/efs/failed_loggs.go b/efs/failed_loggs.go new file mode 100644 index 000000000..d0bd74b25 --- /dev/null +++ b/efs/failed_loggs.go @@ -0,0 +1,111 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package efs + +import ( + "bytes" + "encoding/gob" + "os" + "sync" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/segmentio/kafka-go" +) + +// FailedExportersLogg is a failover poster for kafka logger type +type FailedExportersLogg struct { + lk sync.RWMutex + Path string + Opts map[string]interface{} // this is meta + Format string + Events []interface{} + FailedPostsDir string + Module string + + efsConns []string + connMngr *engine.ConnManager +} + +// AddEvent adds one event +func (expEv *FailedExportersLogg) AddEvent(ev interface{}) { + expEv.lk.Lock() + expEv.Events = append(expEv.Events, ev) + expEv.lk.Unlock() +} + +// NewExportEventsFromFile returns ExportEvents from the file +// used only on replay failed post +func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) { + var fileContent []byte + if fileContent, err = os.ReadFile(filePath); err != nil { + return nil, err + } + if err = os.Remove(filePath); err != nil { + return nil, err + } + dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) + // unmarshall it + expEv = new(FailedExportersLogg) + err = dec.Decode(&expEv) + return +} + +// ReplayFailedPosts tryies to post cdrs again +func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) { + nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID]) + logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level]) + if err != nil { + return + } + expLogger := utils.NewExportLogger(nodeID, tnt, logLvl, + expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir) + for _, event := range expEv.Events { + var content []byte + if content, err = utils.ToUnescapedJSON(event); err != nil { + return + } + if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(utils.GenUUID()), + Value: content, + }); err != nil { + var reply string + // if there are any errors in kafka, we will post in FailedPostDirectory + if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent, + &utils.ArgsFailedPosts{ + Tenant: tnt, + Path: expLogger.Writer.Addr.String(), + Event: event, + FailedDir: expLogger.FldPostDir, + Module: utils.Kafka, + APIOpts: expLogger.GetMeta(), + }, &reply); err != nil { + return err + /* + utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils. + MetaKafkaLog, utils.Kafka, + event, expLogger.GetMeta()) + */ + } + return nil + } + } + return err +} diff --git a/efs/libefs.go b/efs/libefs.go new file mode 100644 index 000000000..f883d537c --- /dev/null +++ b/efs/libefs.go @@ -0,0 +1,124 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package efs + +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + "path" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" +) + +var failedPostCache *ltcache.Cache + +func init() { + failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts) +} + +// SetFailedPostCacheTTL recreates the failed cache +func SetFailedPostCacheTTL(ttl time.Duration) { + failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts) +} + +func writeFailedPosts(_ string, value interface{}) { + expEv, canConvert := value.(*FailedExportersLogg) + if !canConvert { + return + } + filePath := expEv.FilePath() + expEv.lk.RLock() + if err := WriteToFile(filePath, expEv); err != nil { + utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>", + filePath, err)) + expEv.lk.RUnlock() + return + } + expEv.lk.RUnlock() +} + +// FilePath returns the file path it should use for saving the failed events +func (expEv *FailedExportersLogg) FilePath() string { + return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix) +} + +type FailoverPoster interface { + ReplayFailedPosts(*context.Context, int, string) error +} + +// WriteToFile writes the events to file +func WriteToFile(filePath string, expEv FailoverPoster) (err error) { + fileOut, err := os.Create(filePath) + if err != nil { + return err + } + encd := gob.NewEncoder(fileOut) + gob.Register(new(utils.CGREvent)) + err = encd.Encode(expEv) + fileOut.Close() + return +} + +// NewFailoverPosterFromFile returns ExportEvents from the file +// used only on replay failed post +func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) { + var fileContent []byte + err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error { + if fileContent, err = os.ReadFile(filePath); err != nil { + return err + } + return os.Remove(filePath) + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath) + if err != nil { + return + } + dec := gob.NewDecoder(bytes.NewBuffer(fileContent)) + // unmarshall it + expEv := new(FailedExportersLogg) + err = dec.Decode(&expEv) + switch providerType { + case utils.EEs: + opts, err := AsOptsEESConfig(expEv.Opts) + if err != nil { + return nil, err + } + failPoster = &FailedExportersEEs{ + module: expEv.Module, + failedPostsDir: expEv.FailedPostsDir, + Path: expEv.Path, + Opts: opts, + Events: expEv.Events, + Format: expEv.Format, + + connMngr: efs.connMgr, + } + case utils.Kafka: + expEv.efsConns = efs.cfg.LoggerCfg().EFsConns + expEv.connMngr = efs.connMgr + failPoster = expEv + } + return +} diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go index 4473b3d67..e563b74a4 100644 --- a/general_tests/cdrs_post_failover_it_test.go +++ b/general_tests/cdrs_post_failover_it_test.go @@ -30,7 +30,7 @@ import ( "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/ees" + "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/utils" @@ -203,19 +203,19 @@ func testCDRsPostFailoverToFile(t *testing.T) { fileName := file.Name() filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName) - ev, err := ees.NewFailoverPosterFromFile(filePath, utils.EEs) + ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs) if err != nil { t.Errorf("<%s> for file <%s>", err, fileName) continue - } else if len(ev.(*ees.FailedExportersEEs).Events) == 0 { + } else if len(ev.(*efs.FailedExportersEEs).Events) == 0 { t.Error("Expected at least one event") continue } - if ev.(*ees.FailedExportersEEs).Format != utils.MetaS3jsonMap { - t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*ees.FailedExportersEEs).Format) + if ev.(*efs.FailedExportersEEs).Format != utils.MetaS3jsonMap { + t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*efs.FailedExportersEEs).Format) } - if len(ev.(*ees.FailedExportersEEs).Events) != 3 { - t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*ees.FailedExportersEEs).Events)) + if len(ev.(*efs.FailedExportersEEs).Events) != 3 { + t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*efs.FailedExportersEEs).Events)) } } } diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 0cf235458..ebd57f3dd 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/servmanager" @@ -364,7 +365,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil { return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } - utils.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing + efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) cgr.cfg.LazySanityCheck() diff --git a/utils/coreutils.go b/utils/coreutils.go index 7d0add8ba..233086c0e 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -780,6 +780,15 @@ func IsURL(path string) bool { strings.HasPrefix(path, "http://") } +type ArgsFailedPosts struct { + Tenant string + Path string // Path of the exported type + Event interface{} // Event that must be written in file + FailedDir string // Directory that contains the file with Failed post + Module string // Type of efs <*ees|*kafkaLogger> + APIOpts map[string]interface{} // Specially for the meta +} + // GetIndexesArg the API argumets to specify an index type GetIndexesArg struct { IdxItmType string diff --git a/utils/failover_export.go b/utils/failover_export.go index 91c7524f6..d2c278600 100644 --- a/utils/failover_export.go +++ b/utils/failover_export.go @@ -18,20 +18,7 @@ along with this program. If not, see package utils -import ( - "bytes" - "encoding/gob" - "fmt" - "os" - "path" - "sync" - "time" - - "github.com/cgrates/birpc/context" - "github.com/cgrates/ltcache" - "github.com/segmentio/kafka-go" -) - +/* var failedPostCache *ltcache.Cache func init() { @@ -159,13 +146,12 @@ func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err e } type FailoverPoster interface { - ReplayFailedPosts(int) error + ReplayFailedPosts(int, string) error } // ReplayFailedPosts tryies to post cdrs again -func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) { +func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) { nodeID := IfaceAsString(expEv.Opts[NodeID]) - tnt := IfaceAsString(expEv.Opts[Tenant]) logLvl, err := IfaceAsInt(expEv.Opts[Level]) if err != nil { return @@ -177,15 +163,16 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) { if content, err = ToUnescapedJSON(event); err != nil { return } - if err = expLogger.writer.WriteMessages(context.Background(), kafka.Message{ + if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(GenUUID()), Value: content, }); err != nil { // if there are any errors in kafka, we will post in FailedPostDirectory - AddFailedMessage(expLogger.fldPostDir, expLogger.writer.Addr.String(), MetaKafkaLog, Kafka, + AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka, event, expLogger.GetMeta()) return nil } } return err } +*/ diff --git a/utils/kafka_logger.go b/utils/kafka_logger.go index 8ba4d926a..7b44bc805 100644 --- a/utils/kafka_logger.go +++ b/utils/kafka_logger.go @@ -31,22 +31,22 @@ import ( type ExportLogger struct { sync.Mutex - logLevel int - fldPostDir string - writer *kafka.Writer - nodeID string - tenant string + LogLevel int + FldPostDir string + Writer *kafka.Writer + NodeID string + Tenant string } // NewExportLogger will export loggers to kafka func NewExportLogger(nodeID, tenant string, level int, connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) { el = &ExportLogger{ - logLevel: level, - fldPostDir: fldPostDir, - nodeID: nodeID, - tenant: tenant, - writer: &kafka.Writer{ + LogLevel: level, + FldPostDir: fldPostDir, + NodeID: nodeID, + Tenant: tenant, + Writer: &kafka.Writer{ Addr: kafka.TCP(connOpts), Topic: connTopic, MaxAttempts: attempts, @@ -56,18 +56,18 @@ func NewExportLogger(nodeID, tenant string, level int, } func (el *ExportLogger) Close() (err error) { - if el.writer != nil { - err = el.writer.Close() - el.writer = nil + if el.Writer != nil { + err = el.Writer.Close() + el.Writer = nil } return } func (el *ExportLogger) call(m string, level int) (err error) { eventExport := &CGREvent{ - Tenant: el.tenant, + Tenant: el.Tenant, Event: map[string]interface{}{ - NodeID: el.nodeID, + NodeID: el.NodeID, Message: m, Severity: level, Timestamp: time.Now().Format("2006-01-02 15:04:05"), @@ -78,15 +78,17 @@ func (el *ExportLogger) call(m string, level int) (err error) { if content, err = ToUnescapedJSON(eventExport); err != nil { return } - if err = el.writer.WriteMessages(context.Background(), kafka.Message{ + if err = el.Writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(GenUUID()), Value: content, }); err != nil { - // if there are any errors in kafka, we will post in FailedPostDirectory - AddFailedMessage(el.fldPostDir, el.writer.Addr.String(), MetaKafkaLog, Kafka, - eventExport, el.GetMeta()) - // also the content should be printed as a stdout logger type - return ErrLoggerChanged + /* + // if there are any errors in kafka, we will post in FailedPostDirectory + AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka, + eventExport, el.GetMeta()) + // also the content should be printed as a stdout logger type + return ErrLoggerChanged + */ } return } @@ -103,22 +105,22 @@ func (sl *ExportLogger) GetSyslog() *syslog.Writer { // GetLogLevel() returns the level logger number for the server func (el *ExportLogger) GetLogLevel() int { - return el.logLevel + return el.LogLevel } // SetLogLevel changes the log level func (el *ExportLogger) SetLogLevel(level int) { - el.logLevel = level + el.LogLevel = level } // Alert logs to EEs with alert level func (el *ExportLogger) Alert(m string) (err error) { - if el.logLevel < LOGLEVEL_ALERT { + if el.LogLevel < LOGLEVEL_ALERT { return nil } if err = el.call(m, LOGLEVEL_ALERT); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Alert(m) + NewStdLogger(el.NodeID, el.LogLevel).Alert(m) err = nil } } @@ -127,12 +129,12 @@ func (el *ExportLogger) Alert(m string) (err error) { // Crit logs to EEs with critical level func (el *ExportLogger) Crit(m string) (err error) { - if el.logLevel < LOGLEVEL_CRITICAL { + if el.LogLevel < LOGLEVEL_CRITICAL { return nil } if el.call(m, LOGLEVEL_CRITICAL); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Crit(m) + NewStdLogger(el.NodeID, el.LogLevel).Crit(m) err = nil } } @@ -141,12 +143,12 @@ func (el *ExportLogger) Crit(m string) (err error) { // Debug logs to EEs with debug level func (el *ExportLogger) Debug(m string) (err error) { - if el.logLevel < LOGLEVEL_DEBUG { + if el.LogLevel < LOGLEVEL_DEBUG { return nil } if err = el.call(m, LOGLEVEL_DEBUG); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Debug(m) + NewStdLogger(el.NodeID, el.LogLevel).Debug(m) err = nil } } @@ -155,12 +157,12 @@ func (el *ExportLogger) Debug(m string) (err error) { // Emerg logs to EEs with emergency level func (el *ExportLogger) Emerg(m string) (err error) { - if el.logLevel < LOGLEVEL_EMERGENCY { + if el.LogLevel < LOGLEVEL_EMERGENCY { return nil } if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Emerg(m) + NewStdLogger(el.NodeID, el.LogLevel).Emerg(m) err = nil } } @@ -169,12 +171,12 @@ func (el *ExportLogger) Emerg(m string) (err error) { // Err logs to EEs with error level func (el *ExportLogger) Err(m string) (err error) { - if el.logLevel < LOGLEVEL_ERROR { + if el.LogLevel < LOGLEVEL_ERROR { return nil } if err = el.call(m, LOGLEVEL_ERROR); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Err(m) + NewStdLogger(el.NodeID, el.LogLevel).Err(m) err = nil } } @@ -183,12 +185,12 @@ func (el *ExportLogger) Err(m string) (err error) { // Info logs to EEs with info level func (el *ExportLogger) Info(m string) (err error) { - if el.logLevel < LOGLEVEL_INFO { + if el.LogLevel < LOGLEVEL_INFO { return nil } if err = el.call(m, LOGLEVEL_INFO); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Info(m) + NewStdLogger(el.NodeID, el.LogLevel).Info(m) err = nil } } @@ -197,12 +199,12 @@ func (el *ExportLogger) Info(m string) (err error) { // Notice logs to EEs with notice level func (el *ExportLogger) Notice(m string) (err error) { - if el.logLevel < LOGLEVEL_NOTICE { + if el.LogLevel < LOGLEVEL_NOTICE { return nil } if err = el.call(m, LOGLEVEL_NOTICE); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Notice(m) + NewStdLogger(el.NodeID, el.LogLevel).Notice(m) err = nil } } @@ -211,12 +213,12 @@ func (el *ExportLogger) Notice(m string) (err error) { // Warning logs to EEs with warning level func (el *ExportLogger) Warning(m string) (err error) { - if el.logLevel < LOGLEVEL_WARNING { + if el.LogLevel < LOGLEVEL_WARNING { return nil } if err = el.call(m, LOGLEVEL_WARNING); err != nil { if err == ErrLoggerChanged { - NewStdLogger(el.nodeID, el.logLevel).Warning(m) + NewStdLogger(el.NodeID, el.LogLevel).Warning(m) err = nil } } @@ -225,12 +227,12 @@ func (el *ExportLogger) Warning(m string) (err error) { func (el *ExportLogger) GetMeta() map[string]interface{} { return map[string]interface{}{ - Tenant: el.tenant, - NodeID: el.nodeID, - Level: el.logLevel, - Format: el.writer.Topic, - Conn: el.writer.Addr.String(), - FailedPostsDir: el.fldPostDir, - Attempts: el.writer.MaxAttempts, + Tenant: el.Tenant, + NodeID: el.NodeID, + Level: el.LogLevel, + Format: el.Writer.Topic, + Conn: el.Writer.Addr.String(), + FailedPostsDir: el.FldPostDir, + Attempts: el.Writer.MaxAttempts, } }