diff --git a/actions/export.go b/actions/export.go
index eb8d8b170..2d8d638d4 100644
--- a/actions/export.go
+++ b/actions/export.go
@@ -78,8 +78,10 @@ func (aL *actHTTPPost) execute(ctx *context.Context, data utils.MapStorage, _ st
var partExec bool
for _, pstr := range aL.pstrs {
if async, has := aL.cfg().Opts[utils.MetaAsync]; has && utils.IfaceAsString(async) == utils.TrueStr {
- go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString)
- } else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString); err != nil {
+ go ees.ExportWithAttempts(context.Background(), pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString,
+ nil, aL.config.GeneralCfg().DefaultTenant)
+ } else if err = ees.ExportWithAttempts(ctx, pstr, &ees.HTTPPosterRequest{Body: body, Header: make(http.Header)}, utils.EmptyString,
+ nil, aL.config.GeneralCfg().DefaultTenant); err != nil {
if pstr.Cfg().FailedPostsDir != utils.MetaNone {
err = nil
} else {
diff --git a/apis/efs.go b/apis/efs.go
index a5abf0b53..3891148d7 100644
--- a/apis/efs.go
+++ b/apis/efs.go
@@ -25,7 +25,6 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/utils"
)
@@ -44,18 +43,13 @@ type EfSv1 struct {
ping
}
-type ArgsReplayFailedPosts struct {
- TypeProvider string
- FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
- FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
- Modules []string // list of modules for which replay the requests, nil for all
+// ProcessEvent will write into gob formnat file the Events that were failed to be exported.
+func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error {
+ return efS.efs.V1ProcessEvent(ctx, args, reply)
}
-func (efS *EfSv1) ProcessEvent(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error {
- return nil
-}
-
-func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts, reply *string) error {
+// ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again.
+func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *efs.ArgsReplayFailedPosts, reply *string) error {
failedPostsDir := efS.cfg.LoggerCfg().Opts.FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString {
failedPostsDir = *args.FailedRequestsInDir
@@ -87,8 +81,8 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts
}
}
filePath := path.Join(failedPostsDir, file.Name())
- var expEv utils.FailoverPoster
- if expEv, err = ees.NewFailoverPosterFromFile(filePath, args.TypeProvider); err != nil {
+ var expEv efs.FailoverPoster
+ if expEv, err = efs.NewFailoverPosterFromFile(filePath, args.TypeProvider, efS.efs); err != nil {
return err
}
// check if the failed out dir path is the same as the same in dir in order to export again in case of failure
@@ -97,9 +91,9 @@ func (efS *EfSv1) ReplayEvents(ctx *context.Context, args *ArgsReplayFailedPosts
failoverPath = path.Join(failedOutDir, file.Name())
}
- err = expEv.ReplayFailedPosts(efS.cfg.EFsCfg().PosterAttempts)
+ err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant)
if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
- if err = utils.WriteToFile(failoverPath, expEv); err != nil {
+ if err = efs.WriteToFile(failoverPath, expEv); err != nil {
return utils.NewErrServerError(err)
}
}
diff --git a/ees/ees.go b/ees/ees.go
index d5df79aae..8b37b6c7d 100644
--- a/ees/ees.go
+++ b/ees/ees.go
@@ -197,7 +197,7 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
utils.EEs, ee.Cfg().ID))
}
go func(evict, sync bool, ee EventExporter) {
- if err := exportEventWithExporter(ctx, ee, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS); err != nil {
+ if err := exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv.CGREvent, evict, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil {
withErr = true
}
if sync {
@@ -243,7 +243,8 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
return
}
-func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS) (err error) {
+func exportEventWithExporter(ctx *context.Context, exp EventExporter, connMngr *engine.ConnManager,
+ ev *utils.CGREvent, oneTime bool, cfg *config.CGRConfig, filterS *engine.FilterS, tnt string) (err error) {
defer func() {
updateEEMetrics(exp.GetMetrics(), ev.ID, ev.Event, err != nil, utils.FirstNonEmpty(exp.Cfg().Timezone,
cfg.GeneralCfg().DefaultTimezone))
@@ -276,16 +277,34 @@ func exportEventWithExporter(ctx *context.Context, exp EventExporter, ev *utils.
}
extraData := exp.ExtraData(ev)
- return ExportWithAttempts(ctx, exp, eEv, extraData)
+ return ExportWithAttempts(ctx, exp, eEv, extraData, connMngr, tnt)
}
-func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{}) (err error) {
+func ExportWithAttempts(ctx *context.Context, exp EventExporter, eEv interface{}, key interface{},
+ connMnngr *engine.ConnManager, tnt string) (err error) {
if exp.Cfg().FailedPostsDir != utils.MetaNone {
defer func() {
if err != nil {
- utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
- exp.Cfg().Type, utils.EEs,
- eEv, exp.Cfg().Opts.AsMapInterface())
+ args := &utils.ArgsFailedPosts{
+ Tenant: tnt,
+ Path: exp.Cfg().ExportPath,
+ Event: eEv,
+ FailedDir: exp.Cfg().FailedPostsDir,
+ Module: utils.EEs,
+ APIOpts: exp.Cfg().Opts.AsMapInterface(),
+ }
+ var reply string
+ if err = connMnngr.Call(ctx, exp.Cfg().EFsConns,
+ utils.EfSv1ProcessEvent, args, &reply); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> Exporter <%s> could not be written with <%s> service because err: <%s>",
+ utils.EEs, exp.Cfg().ID, utils.EFs, err.Error()))
+ /*
+ utils.AddFailedMessage(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
+ exp.Cfg().Type, utils.EEs,
+ eEv, exp.Cfg().Opts.AsMapInterface())
+ */
+ }
}
}()
}
@@ -431,7 +450,7 @@ func (eeS *EeS) V1ArchiveEventsInReply(ctx *context.Context, args *ArchiveEvents
// exported will be true if there will be at least one exporter archived
exported = true
- if err = exportEventWithExporter(ctx, ee, cgrEv, false, eeS.cfg, eeS.fltrS); err != nil {
+ if err = exportEventWithExporter(ctx, ee, eeS.connMgr, cgrEv, false, eeS.cfg, eeS.fltrS, cgrEv.Tenant); err != nil {
return err
}
}
diff --git a/ees/libcdre.go b/ees/libcdre.go
index 7211bd5ae..6e7458a31 100644
--- a/ees/libcdre.go
+++ b/ees/libcdre.go
@@ -18,18 +18,7 @@ along with this program. If not, see
package ees
-import (
- "bytes"
- "encoding/gob"
- "os"
- "sync"
-
- "github.com/cgrates/birpc/context"
- "github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/guardian"
- "github.com/cgrates/cgrates/utils"
-)
-
+/*
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) {
@@ -321,3 +310,4 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
}
return
}
+*/
diff --git a/ees/nats_it_test.go b/ees/nats_it_test.go
index f9b3a8de0..773b12c7a 100644
--- a/ees/nats_it_test.go
+++ b/ees/nats_it_test.go
@@ -106,7 +106,7 @@ func TestNatsEE(t *testing.T) {
"Destination": "1002",
},
}
- if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
+ if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)
@@ -172,7 +172,7 @@ func TestNatsEE2(t *testing.T) {
"Destination": "1002",
},
}
- if err := exportEventWithExporter(context.Background(), evExp, cgrEv, true, cgrCfg, new(engine.FilterS)); err != nil {
+ if err := exportEventWithExporter(context.Background(), evExp, nil, cgrEv, true, cgrCfg, new(engine.FilterS), "cgrates.org"); err != nil {
t.Fatal(err)
}
testCleanDirectory(t)
diff --git a/ees/poster_it_test.go b/ees/poster_it_test.go
index 1cdc31ecf..cdc18e97b 100644
--- a/ees/poster_it_test.go
+++ b/ees/poster_it_test.go
@@ -160,7 +160,7 @@ func TestSQSPoster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
- if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
+ if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
@@ -239,7 +239,7 @@ func TestS3Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
- if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key); err != nil {
+ if err := ExportWithAttempts(context.Background(), pstr, []byte(body), key, nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
key += ".json"
@@ -294,7 +294,7 @@ func TestAMQPv1Poster(t *testing.T) {
Attempts: 5,
Opts: opts,
}, nil)
- if err := ExportWithAttempts(context.Background(), pstr, []byte(body), ""); err != nil {
+ if err := ExportWithAttempts(context.Background(), pstr, []byte(body), "", nil, "cgrates.org"); err != nil {
t.Fatal(err)
}
// Create client
diff --git a/efs/efs.go b/efs/efs.go
index b57442723..0ffcef06a 100644
--- a/efs/efs.go
+++ b/efs/efs.go
@@ -21,8 +21,10 @@ package efs
import (
"sync"
+ "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
)
type EfS struct {
@@ -38,3 +40,63 @@ func NewEfs(cfg *config.CGRConfig, connMgr *engine.ConnManager) *EfS {
connMgr: connMgr,
}
}
+
+// V1ProcessEvent will write into gob formnat file the Events that were failed to be exported.
+func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts, reply *string) error {
+ var format string
+ if _, has := args.APIOpts[utils.Format]; has {
+ format = utils.IfaceAsString(args.APIOpts[utils.Format])
+ }
+ key := utils.ConcatenatedKey(args.FailedDir, args.Path, format, args.Module)
+ switch args.Module {
+ case utils.EEs:
+ // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
+ var amqpQueueID string
+ var s3BucketID string
+ var sqsQueueID string
+ var kafkaTopic string
+ if _, has := args.APIOpts[utils.AMQPQueueID]; has {
+ amqpQueueID = utils.IfaceAsString(args.APIOpts[utils.AMQPQueueID])
+ }
+ if _, has := args.APIOpts[utils.S3Bucket]; has {
+ s3BucketID = utils.IfaceAsString(args.APIOpts[utils.S3Bucket])
+ }
+ if _, has := args.APIOpts[utils.SQSQueueID]; has {
+ sqsQueueID = utils.IfaceAsString(args.APIOpts[utils.SQSQueueID])
+ }
+ if _, has := args.APIOpts[kafkaTopic]; has {
+ kafkaTopic = utils.IfaceAsString(args.APIOpts[utils.KafkaTopic])
+ }
+ if qID := utils.FirstNonEmpty(amqpQueueID, s3BucketID,
+ sqsQueueID, kafkaTopic); len(qID) != 0 {
+ key = utils.ConcatenatedKey(key, qID)
+ }
+ case utils.Kafka:
+ }
+ var failedPost *FailedExportersLogg
+ if x, ok := failedPostCache.Get(key); ok {
+ if x != nil {
+ failedPost = x.(*FailedExportersLogg)
+ }
+ }
+ if failedPost == nil {
+ failedPost = &FailedExportersLogg{
+ Path: args.Path,
+ Format: format,
+ Opts: args.APIOpts,
+ Module: args.Module,
+ FailedPostsDir: args.FailedDir,
+ }
+ failedPostCache.Set(key, failedPost, nil)
+ }
+ failedPost.AddEvent(args.Event)
+ return nil
+}
+
+type ArgsReplayFailedPosts struct {
+ Tenant string
+ TypeProvider string
+ FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
+ FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
+ Modules []string // list of modules for which replay the requests, nil for all
+}
diff --git a/efs/failed_ees.go b/efs/failed_ees.go
new file mode 100644
index 000000000..db2a82777
--- /dev/null
+++ b/efs/failed_ees.go
@@ -0,0 +1,286 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package efs
+
+import (
+ "sync"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/ees"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+// FailedExportersEEs used to save the failed post to file
+type FailedExportersEEs struct {
+ lk sync.RWMutex
+ Path string
+ Opts *config.EventExporterOpts
+ Format string
+ Events []interface{}
+ failedPostsDir string
+ module string
+
+ connMngr *engine.ConnManager
+}
+
+func AsOptsEESConfig(opts map[string]interface{}) (*config.EventExporterOpts, error) {
+ optsCfg := new(config.EventExporterOpts)
+ if len(opts) == 0 {
+ return optsCfg, nil
+ }
+ if _, has := opts[utils.CSVFieldSepOpt]; has {
+ optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt))
+ }
+ if _, has := opts[utils.ElsIndex]; has {
+ optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex))
+ }
+ if _, has := opts[utils.ElsIfPrimaryTerm]; has {
+ x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x)
+ }
+ if _, has := opts[utils.ElsIfSeqNo]; has {
+ x, err := utils.IfaceAsInt(utils.ElsIfSeqNo)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.ElsIfSeqNo = utils.IntPointer(x)
+ }
+ if _, has := opts[utils.ElsOpType]; has {
+ optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType))
+ }
+ if _, has := opts[utils.ElsPipeline]; has {
+ optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline))
+ }
+ if _, has := opts[utils.ElsRouting]; has {
+ optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting))
+ }
+ if _, has := opts[utils.ElsTimeout]; has {
+ t, err := utils.IfaceAsDuration(utils.ElsTimeout)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.ElsTimeout = &t
+ }
+ if _, has := opts[utils.ElsVersionLow]; has {
+ x, err := utils.IfaceAsInt(utils.ElsVersionLow)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.ElsVersion = utils.IntPointer(x)
+ }
+ if _, has := opts[utils.ElsVersionType]; has {
+ optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType))
+ }
+ if _, has := opts[utils.ElsWaitForActiveShards]; has {
+ optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards))
+ }
+ if _, has := opts[utils.SQLMaxIdleConnsCfg]; has {
+ x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.SQLMaxIdleConns = utils.IntPointer(x)
+ }
+ if _, has := opts[utils.SQLMaxOpenConns]; has {
+ x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.SQLMaxOpenConns = utils.IntPointer(x)
+ }
+ if _, has := opts[utils.SQLConnMaxLifetime]; has {
+ t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.SQLConnMaxLifetime = &t
+ }
+ if _, has := opts[utils.MYSQLDSNParams]; has {
+ optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string)
+ }
+ if _, has := opts[utils.SQLTableNameOpt]; has {
+ optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt))
+ }
+ if _, has := opts[utils.SQLDBNameOpt]; has {
+ optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt))
+ }
+ if _, has := opts[utils.PgSSLModeCfg]; has {
+ optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg))
+ }
+ if _, has := opts[utils.KafkaTopic]; has {
+ optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic))
+ }
+ if _, has := opts[utils.AMQPQueueID]; has {
+ optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID))
+ }
+ if _, has := opts[utils.AMQPRoutingKey]; has {
+ optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey))
+ }
+ if _, has := opts[utils.AMQPExchange]; has {
+ optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange))
+ }
+ if _, has := opts[utils.AMQPExchangeType]; has {
+ optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType))
+ }
+ if _, has := opts[utils.AWSRegion]; has {
+ optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion))
+ }
+ if _, has := opts[utils.AWSKey]; has {
+ optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey))
+ }
+ if _, has := opts[utils.AWSSecret]; has {
+ optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret))
+ }
+ if _, has := opts[utils.AWSToken]; has {
+ optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken))
+ }
+ if _, has := opts[utils.SQSQueueID]; has {
+ optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID))
+ }
+ if _, has := opts[utils.S3Bucket]; has {
+ optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket))
+ }
+ if _, has := opts[utils.S3FolderPath]; has {
+ optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath))
+ }
+ if _, has := opts[utils.NatsJetStream]; has {
+ x, err := utils.IfaceAsBool(utils.NatsJetStream)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.NATSJetStream = utils.BoolPointer(x)
+ }
+ if _, has := opts[utils.NatsSubject]; has {
+ optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject))
+ }
+ if _, has := opts[utils.NatsJWTFile]; has {
+ optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile))
+ }
+ if _, has := opts[utils.NatsSeedFile]; has {
+ optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile))
+ }
+ if _, has := opts[utils.NatsCertificateAuthority]; has {
+ optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority))
+ }
+ if _, has := opts[utils.NatsClientCertificate]; has {
+ optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate))
+ }
+ if _, has := opts[utils.NatsClientKey]; has {
+ optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey))
+ }
+ if _, has := opts[utils.NatsJetStreamMaxWait]; has {
+ t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.NATSJetStreamMaxWait = &t
+ }
+ if _, has := opts[utils.RpcCodec]; has {
+ optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec))
+ }
+ if _, has := opts[utils.ServiceMethod]; has {
+ optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod))
+ }
+ if _, has := opts[utils.KeyPath]; has {
+ optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath))
+ }
+ if _, has := opts[utils.CertPath]; has {
+ optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath))
+ }
+ if _, has := opts[utils.CaPath]; has {
+ optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath))
+ }
+ if _, has := opts[utils.Tls]; has {
+ x, err := utils.IfaceAsBool(utils.Tls)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.TLS = utils.BoolPointer(x)
+ }
+ if _, has := opts[utils.ConnIDs]; has {
+ optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string)
+ }
+ if _, has := opts[utils.RpcConnTimeout]; has {
+ t, err := utils.IfaceAsDuration(utils.RpcConnTimeout)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.RPCConnTimeout = &t
+ }
+ if _, has := opts[utils.RpcReplyTimeout]; has {
+ t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout)
+ if err != nil {
+ return nil, err
+ }
+ optsCfg.RPCReplyTimeout = &t
+ }
+ if _, has := opts[utils.RPCAPIOpts]; has {
+ optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]interface{})
+ }
+ return optsCfg, nil
+}
+
+// AddEvent adds one event
+func (expEv *FailedExportersEEs) AddEvent(ev interface{}) {
+ expEv.lk.Lock()
+ expEv.Events = append(expEv.Events, ev)
+ expEv.lk.Unlock()
+}
+
+// ReplayFailedPosts tryies to post cdrs again
+func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
+ eesFailedEvents := &FailedExportersEEs{
+ Path: expEv.Path,
+ Opts: expEv.Opts,
+ Format: expEv.Format,
+ }
+
+ var ee ees.EventExporter
+ if ee, err = ees.NewEventExporter(&config.EventExporterCfg{
+ ID: "ReplayFailedPosts",
+ Type: expEv.Format,
+ ExportPath: expEv.Path,
+ Opts: expEv.Opts,
+ Attempts: attempts,
+ FailedPostsDir: utils.MetaNone,
+ }, config.CgrConfig(), nil, nil); err != nil {
+ return
+ }
+ keyFunc := func() string { return utils.EmptyString }
+ if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap {
+ keyFunc = utils.UUIDSha1Prefix
+ }
+ for _, ev := range expEv.Events {
+ if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil {
+ eesFailedEvents.AddEvent(ev)
+ }
+ }
+ ee.Close()
+ if len(eesFailedEvents.Events) > 0 {
+ err = utils.ErrPartiallyExecuted
+ } else {
+ eesFailedEvents = nil
+ }
+ return
+}
diff --git a/efs/failed_loggs.go b/efs/failed_loggs.go
new file mode 100644
index 000000000..d0bd74b25
--- /dev/null
+++ b/efs/failed_loggs.go
@@ -0,0 +1,111 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package efs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "os"
+ "sync"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/segmentio/kafka-go"
+)
+
+// FailedExportersLogg is a failover poster for kafka logger type
+type FailedExportersLogg struct {
+ lk sync.RWMutex
+ Path string
+ Opts map[string]interface{} // this is meta
+ Format string
+ Events []interface{}
+ FailedPostsDir string
+ Module string
+
+ efsConns []string
+ connMngr *engine.ConnManager
+}
+
+// AddEvent adds one event
+func (expEv *FailedExportersLogg) AddEvent(ev interface{}) {
+ expEv.lk.Lock()
+ expEv.Events = append(expEv.Events, ev)
+ expEv.lk.Unlock()
+}
+
+// NewExportEventsFromFile returns ExportEvents from the file
+// used only on replay failed post
+func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
+ var fileContent []byte
+ if fileContent, err = os.ReadFile(filePath); err != nil {
+ return nil, err
+ }
+ if err = os.Remove(filePath); err != nil {
+ return nil, err
+ }
+ dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
+ // unmarshall it
+ expEv = new(FailedExportersLogg)
+ err = dec.Decode(&expEv)
+ return
+}
+
+// ReplayFailedPosts tryies to post cdrs again
+func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
+ nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID])
+ logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level])
+ if err != nil {
+ return
+ }
+ expLogger := utils.NewExportLogger(nodeID, tnt, logLvl,
+ expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
+ for _, event := range expEv.Events {
+ var content []byte
+ if content, err = utils.ToUnescapedJSON(event); err != nil {
+ return
+ }
+ if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
+ Key: []byte(utils.GenUUID()),
+ Value: content,
+ }); err != nil {
+ var reply string
+ // if there are any errors in kafka, we will post in FailedPostDirectory
+ if err = expEv.connMngr.Call(ctx, expEv.efsConns, utils.EfSv1ProcessEvent,
+ &utils.ArgsFailedPosts{
+ Tenant: tnt,
+ Path: expLogger.Writer.Addr.String(),
+ Event: event,
+ FailedDir: expLogger.FldPostDir,
+ Module: utils.Kafka,
+ APIOpts: expLogger.GetMeta(),
+ }, &reply); err != nil {
+ return err
+ /*
+ utils.AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), utils.
+ MetaKafkaLog, utils.Kafka,
+ event, expLogger.GetMeta())
+ */
+ }
+ return nil
+ }
+ }
+ return err
+}
diff --git a/efs/libefs.go b/efs/libefs.go
new file mode 100644
index 000000000..f883d537c
--- /dev/null
+++ b/efs/libefs.go
@@ -0,0 +1,124 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package efs
+
+import (
+ "bytes"
+ "encoding/gob"
+ "fmt"
+ "os"
+ "path"
+ "time"
+
+ "github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/guardian"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/ltcache"
+)
+
+var failedPostCache *ltcache.Cache
+
+func init() {
+ failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts)
+}
+
+// SetFailedPostCacheTTL recreates the failed cache
+func SetFailedPostCacheTTL(ttl time.Duration) {
+ failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts)
+}
+
+func writeFailedPosts(_ string, value interface{}) {
+ expEv, canConvert := value.(*FailedExportersLogg)
+ if !canConvert {
+ return
+ }
+ filePath := expEv.FilePath()
+ expEv.lk.RLock()
+ if err := WriteToFile(filePath, expEv); err != nil {
+ utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
+ filePath, err))
+ expEv.lk.RUnlock()
+ return
+ }
+ expEv.lk.RUnlock()
+}
+
+// FilePath returns the file path it should use for saving the failed events
+func (expEv *FailedExportersLogg) FilePath() string {
+ return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
+}
+
+type FailoverPoster interface {
+ ReplayFailedPosts(*context.Context, int, string) error
+}
+
+// WriteToFile writes the events to file
+func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
+ fileOut, err := os.Create(filePath)
+ if err != nil {
+ return err
+ }
+ encd := gob.NewEncoder(fileOut)
+ gob.Register(new(utils.CGREvent))
+ err = encd.Encode(expEv)
+ fileOut.Close()
+ return
+}
+
+// NewFailoverPosterFromFile returns ExportEvents from the file
+// used only on replay failed post
+func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) {
+ var fileContent []byte
+ err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
+ if fileContent, err = os.ReadFile(filePath); err != nil {
+ return err
+ }
+ return os.Remove(filePath)
+ }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
+ if err != nil {
+ return
+ }
+ dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
+ // unmarshall it
+ expEv := new(FailedExportersLogg)
+ err = dec.Decode(&expEv)
+ switch providerType {
+ case utils.EEs:
+ opts, err := AsOptsEESConfig(expEv.Opts)
+ if err != nil {
+ return nil, err
+ }
+ failPoster = &FailedExportersEEs{
+ module: expEv.Module,
+ failedPostsDir: expEv.FailedPostsDir,
+ Path: expEv.Path,
+ Opts: opts,
+ Events: expEv.Events,
+ Format: expEv.Format,
+
+ connMngr: efs.connMgr,
+ }
+ case utils.Kafka:
+ expEv.efsConns = efs.cfg.LoggerCfg().EFsConns
+ expEv.connMngr = efs.connMgr
+ failPoster = expEv
+ }
+ return
+}
diff --git a/general_tests/cdrs_post_failover_it_test.go b/general_tests/cdrs_post_failover_it_test.go
index 4473b3d67..e563b74a4 100644
--- a/general_tests/cdrs_post_failover_it_test.go
+++ b/general_tests/cdrs_post_failover_it_test.go
@@ -30,7 +30,7 @@ import (
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/ees"
+ "github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/utils"
@@ -203,19 +203,19 @@ func testCDRsPostFailoverToFile(t *testing.T) {
fileName := file.Name()
filePath := path.Join(cdrsPostFailCfg.EFsCfg().FailedPostsDir, fileName)
- ev, err := ees.NewFailoverPosterFromFile(filePath, utils.EEs)
+ ev, err := efs.NewFailoverPosterFromFile(filePath, utils.EEs)
if err != nil {
t.Errorf("<%s> for file <%s>", err, fileName)
continue
- } else if len(ev.(*ees.FailedExportersEEs).Events) == 0 {
+ } else if len(ev.(*efs.FailedExportersEEs).Events) == 0 {
t.Error("Expected at least one event")
continue
}
- if ev.(*ees.FailedExportersEEs).Format != utils.MetaS3jsonMap {
- t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*ees.FailedExportersEEs).Format)
+ if ev.(*efs.FailedExportersEEs).Format != utils.MetaS3jsonMap {
+ t.Errorf("Expected event to use %q received: %q", utils.MetaS3jsonMap, ev.(*efs.FailedExportersEEs).Format)
}
- if len(ev.(*ees.FailedExportersEEs).Events) != 3 {
- t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*ees.FailedExportersEEs).Events))
+ if len(ev.(*efs.FailedExportersEEs).Events) != 3 {
+ t.Errorf("Expected all the events to be saved in the same file, ony %v saved in this file.", len(ev.(*efs.FailedExportersEEs).Events))
}
}
}
diff --git a/services/cgr-engine.go b/services/cgr-engine.go
index 0cf235458..ebd57f3dd 100644
--- a/services/cgr-engine.go
+++ b/services/cgr-engine.go
@@ -32,6 +32,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
+ "github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
@@ -364,7 +365,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
cgr.cfg.LoggerCfg().Opts.FailedPostsDir); err != nil {
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
- utils.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
+ efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version()))
cgr.cfg.LazySanityCheck()
diff --git a/utils/coreutils.go b/utils/coreutils.go
index 7d0add8ba..233086c0e 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -780,6 +780,15 @@ func IsURL(path string) bool {
strings.HasPrefix(path, "http://")
}
+type ArgsFailedPosts struct {
+ Tenant string
+ Path string // Path of the exported type
+ Event interface{} // Event that must be written in file
+ FailedDir string // Directory that contains the file with Failed post
+ Module string // Type of efs <*ees|*kafkaLogger>
+ APIOpts map[string]interface{} // Specially for the meta
+}
+
// GetIndexesArg the API argumets to specify an index
type GetIndexesArg struct {
IdxItmType string
diff --git a/utils/failover_export.go b/utils/failover_export.go
index 91c7524f6..d2c278600 100644
--- a/utils/failover_export.go
+++ b/utils/failover_export.go
@@ -18,20 +18,7 @@ along with this program. If not, see
package utils
-import (
- "bytes"
- "encoding/gob"
- "fmt"
- "os"
- "path"
- "sync"
- "time"
-
- "github.com/cgrates/birpc/context"
- "github.com/cgrates/ltcache"
- "github.com/segmentio/kafka-go"
-)
-
+/*
var failedPostCache *ltcache.Cache
func init() {
@@ -159,13 +146,12 @@ func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err e
}
type FailoverPoster interface {
- ReplayFailedPosts(int) error
+ ReplayFailedPosts(int, string) error
}
// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
+func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) {
nodeID := IfaceAsString(expEv.Opts[NodeID])
- tnt := IfaceAsString(expEv.Opts[Tenant])
logLvl, err := IfaceAsInt(expEv.Opts[Level])
if err != nil {
return
@@ -177,15 +163,16 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int) (err error) {
if content, err = ToUnescapedJSON(event); err != nil {
return
}
- if err = expLogger.writer.WriteMessages(context.Background(), kafka.Message{
+ if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
- AddFailedMessage(expLogger.fldPostDir, expLogger.writer.Addr.String(), MetaKafkaLog, Kafka,
+ AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka,
event, expLogger.GetMeta())
return nil
}
}
return err
}
+*/
diff --git a/utils/kafka_logger.go b/utils/kafka_logger.go
index 8ba4d926a..7b44bc805 100644
--- a/utils/kafka_logger.go
+++ b/utils/kafka_logger.go
@@ -31,22 +31,22 @@ import (
type ExportLogger struct {
sync.Mutex
- logLevel int
- fldPostDir string
- writer *kafka.Writer
- nodeID string
- tenant string
+ LogLevel int
+ FldPostDir string
+ Writer *kafka.Writer
+ NodeID string
+ Tenant string
}
// NewExportLogger will export loggers to kafka
func NewExportLogger(nodeID, tenant string, level int,
connOpts, connTopic string, attempts int, fldPostDir string) (el *ExportLogger) {
el = &ExportLogger{
- logLevel: level,
- fldPostDir: fldPostDir,
- nodeID: nodeID,
- tenant: tenant,
- writer: &kafka.Writer{
+ LogLevel: level,
+ FldPostDir: fldPostDir,
+ NodeID: nodeID,
+ Tenant: tenant,
+ Writer: &kafka.Writer{
Addr: kafka.TCP(connOpts),
Topic: connTopic,
MaxAttempts: attempts,
@@ -56,18 +56,18 @@ func NewExportLogger(nodeID, tenant string, level int,
}
func (el *ExportLogger) Close() (err error) {
- if el.writer != nil {
- err = el.writer.Close()
- el.writer = nil
+ if el.Writer != nil {
+ err = el.Writer.Close()
+ el.Writer = nil
}
return
}
func (el *ExportLogger) call(m string, level int) (err error) {
eventExport := &CGREvent{
- Tenant: el.tenant,
+ Tenant: el.Tenant,
Event: map[string]interface{}{
- NodeID: el.nodeID,
+ NodeID: el.NodeID,
Message: m,
Severity: level,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
@@ -78,15 +78,17 @@ func (el *ExportLogger) call(m string, level int) (err error) {
if content, err = ToUnescapedJSON(eventExport); err != nil {
return
}
- if err = el.writer.WriteMessages(context.Background(), kafka.Message{
+ if err = el.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
- // if there are any errors in kafka, we will post in FailedPostDirectory
- AddFailedMessage(el.fldPostDir, el.writer.Addr.String(), MetaKafkaLog, Kafka,
- eventExport, el.GetMeta())
- // also the content should be printed as a stdout logger type
- return ErrLoggerChanged
+ /*
+ // if there are any errors in kafka, we will post in FailedPostDirectory
+ AddFailedMessage(el.FldPostDir, el.Writer.Addr.String(), MetaKafkaLog, Kafka,
+ eventExport, el.GetMeta())
+ // also the content should be printed as a stdout logger type
+ return ErrLoggerChanged
+ */
}
return
}
@@ -103,22 +105,22 @@ func (sl *ExportLogger) GetSyslog() *syslog.Writer {
// GetLogLevel() returns the level logger number for the server
func (el *ExportLogger) GetLogLevel() int {
- return el.logLevel
+ return el.LogLevel
}
// SetLogLevel changes the log level
func (el *ExportLogger) SetLogLevel(level int) {
- el.logLevel = level
+ el.LogLevel = level
}
// Alert logs to EEs with alert level
func (el *ExportLogger) Alert(m string) (err error) {
- if el.logLevel < LOGLEVEL_ALERT {
+ if el.LogLevel < LOGLEVEL_ALERT {
return nil
}
if err = el.call(m, LOGLEVEL_ALERT); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Alert(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Alert(m)
err = nil
}
}
@@ -127,12 +129,12 @@ func (el *ExportLogger) Alert(m string) (err error) {
// Crit logs to EEs with critical level
func (el *ExportLogger) Crit(m string) (err error) {
- if el.logLevel < LOGLEVEL_CRITICAL {
+ if el.LogLevel < LOGLEVEL_CRITICAL {
return nil
}
if el.call(m, LOGLEVEL_CRITICAL); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Crit(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Crit(m)
err = nil
}
}
@@ -141,12 +143,12 @@ func (el *ExportLogger) Crit(m string) (err error) {
// Debug logs to EEs with debug level
func (el *ExportLogger) Debug(m string) (err error) {
- if el.logLevel < LOGLEVEL_DEBUG {
+ if el.LogLevel < LOGLEVEL_DEBUG {
return nil
}
if err = el.call(m, LOGLEVEL_DEBUG); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Debug(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Debug(m)
err = nil
}
}
@@ -155,12 +157,12 @@ func (el *ExportLogger) Debug(m string) (err error) {
// Emerg logs to EEs with emergency level
func (el *ExportLogger) Emerg(m string) (err error) {
- if el.logLevel < LOGLEVEL_EMERGENCY {
+ if el.LogLevel < LOGLEVEL_EMERGENCY {
return nil
}
if err = el.call(m, LOGLEVEL_EMERGENCY); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Emerg(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Emerg(m)
err = nil
}
}
@@ -169,12 +171,12 @@ func (el *ExportLogger) Emerg(m string) (err error) {
// Err logs to EEs with error level
func (el *ExportLogger) Err(m string) (err error) {
- if el.logLevel < LOGLEVEL_ERROR {
+ if el.LogLevel < LOGLEVEL_ERROR {
return nil
}
if err = el.call(m, LOGLEVEL_ERROR); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Err(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Err(m)
err = nil
}
}
@@ -183,12 +185,12 @@ func (el *ExportLogger) Err(m string) (err error) {
// Info logs to EEs with info level
func (el *ExportLogger) Info(m string) (err error) {
- if el.logLevel < LOGLEVEL_INFO {
+ if el.LogLevel < LOGLEVEL_INFO {
return nil
}
if err = el.call(m, LOGLEVEL_INFO); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Info(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Info(m)
err = nil
}
}
@@ -197,12 +199,12 @@ func (el *ExportLogger) Info(m string) (err error) {
// Notice logs to EEs with notice level
func (el *ExportLogger) Notice(m string) (err error) {
- if el.logLevel < LOGLEVEL_NOTICE {
+ if el.LogLevel < LOGLEVEL_NOTICE {
return nil
}
if err = el.call(m, LOGLEVEL_NOTICE); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Notice(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Notice(m)
err = nil
}
}
@@ -211,12 +213,12 @@ func (el *ExportLogger) Notice(m string) (err error) {
// Warning logs to EEs with warning level
func (el *ExportLogger) Warning(m string) (err error) {
- if el.logLevel < LOGLEVEL_WARNING {
+ if el.LogLevel < LOGLEVEL_WARNING {
return nil
}
if err = el.call(m, LOGLEVEL_WARNING); err != nil {
if err == ErrLoggerChanged {
- NewStdLogger(el.nodeID, el.logLevel).Warning(m)
+ NewStdLogger(el.NodeID, el.LogLevel).Warning(m)
err = nil
}
}
@@ -225,12 +227,12 @@ func (el *ExportLogger) Warning(m string) (err error) {
func (el *ExportLogger) GetMeta() map[string]interface{} {
return map[string]interface{}{
- Tenant: el.tenant,
- NodeID: el.nodeID,
- Level: el.logLevel,
- Format: el.writer.Topic,
- Conn: el.writer.Addr.String(),
- FailedPostsDir: el.fldPostDir,
- Attempts: el.writer.MaxAttempts,
+ Tenant: el.Tenant,
+ NodeID: el.NodeID,
+ Level: el.LogLevel,
+ Format: el.Writer.Topic,
+ Conn: el.Writer.Addr.String(),
+ FailedPostsDir: el.FldPostDir,
+ Attempts: el.Writer.MaxAttempts,
}
}