diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json
index 0f0adfe1c..ff29f72a5 100755
--- a/data/conf/cgrates/cgrates.json
+++ b/data/conf/cgrates/cgrates.json
@@ -1080,6 +1080,8 @@
// "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
// "store_uncompressed_limit": 0, // used to compress data
// "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|$rpc_conns_id>
+// "ees_conns": [], // connections to EEs for StatUpdates, empty to disable export functionality: <""|*internal|$rpc_conns_id>
+// "ees_exporter_ids": [], // list of EventExporter profiles to use for real-time StatUpdate exports
// "indexed_selects": true, // enable profile matching exclusively on indexes
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing
diff --git a/dispatchers/efs.go b/dispatchers/efs.go
index cda1baf9f..a1c4cbb35 100644
--- a/dispatchers/efs.go
+++ b/dispatchers/efs.go
@@ -21,6 +21,7 @@ package dispatchers
import (
"github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/utils"
)
@@ -51,9 +52,9 @@ func (dS *DispatcherService) EfSv1ProcessEvent(ctx *context.Context, args *utils
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaEFs, utils.EfSv1ProcessEvent, args, reply)
}
-func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) (err error) {
+func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args efs.ReplayEventsParams, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
- if args != nil && len(args.Tenant) != 0 {
+ if len(args.Tenant) != 0 {
tnt = args.Tenant
}
ev := make(map[string]any)
diff --git a/dispatchers/ers.go b/dispatchers/ers.go
index 2f4973e95..996479918 100644
--- a/dispatchers/ers.go
+++ b/dispatchers/ers.go
@@ -21,6 +21,7 @@ package dispatchers
import (
"github.com/cgrates/birpc/context"
+ "github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/utils"
)
@@ -39,7 +40,7 @@ func (dS *DispatcherService) ErSv1Ping(ctx *context.Context, args *utils.CGREven
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1Ping, args, reply)
}
-func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args utils.StringWithAPIOpts, reply *string) (err error) {
+func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args ers.V1RunReaderParams, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if len(args.Tenant) != 0 {
tnt = args.Tenant
diff --git a/ees/libcdre.go b/ees/libcdre.go
deleted file mode 100644
index d64dc19c8..000000000
--- a/ees/libcdre.go
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package ees
-
-/*
-// NewFailoverPosterFromFile returns ExportEvents from the file
-// used only on replay failed post
-func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) {
- var fileContent []byte
- err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
- if fileContent, err = os.ReadFile(filePath); err != nil {
- return err
- }
- return os.Remove(filePath)
- }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
- if err != nil {
- return
- }
- dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
- // unmarshall it
- expEv := new(utils.FailedExportersLogg)
- err = dec.Decode(&expEv)
- switch providerType {
- case utils.EEs:
- opts, err := AsOptsEESConfig(expEv.Opts)
- if err != nil {
- return nil, err
- }
- failPoster = &FailedExportersEEs{
- module: expEv.Module,
- failedPostsDir: expEv.FailedPostsDir,
- Path: expEv.Path,
- Opts: opts,
- Events: expEv.Events,
- Format: expEv.Format,
- }
- case utils.Kafka:
- failPoster = expEv
- }
- return
-}
-
-func AsOptsEESConfig(opts map[string]any) (*config.EventExporterOpts, error) {
- optsCfg := new(config.EventExporterOpts)
- if len(opts) == 0 {
- return optsCfg, nil
- }
- if _, has := opts[utils.CSVFieldSepOpt]; has {
- optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt))
- }
- if _, has := opts[utils.ElsIndex]; has {
- optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex))
- }
- if _, has := opts[utils.ElsIfPrimaryTerm]; has {
- x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm)
- if err != nil {
- return nil, err
- }
- optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x)
- }
- if _, has := opts[utils.ElsIfSeqNo]; has {
- x, err := utils.IfaceAsInt(utils.ElsIfSeqNo)
- if err != nil {
- return nil, err
- }
- optsCfg.ElsIfSeqNo = utils.IntPointer(x)
- }
- if _, has := opts[utils.ElsOpType]; has {
- optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType))
- }
- if _, has := opts[utils.ElsPipeline]; has {
- optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline))
- }
- if _, has := opts[utils.ElsRouting]; has {
- optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting))
- }
- if _, has := opts[utils.ElsTimeout]; has {
- t, err := utils.IfaceAsDuration(utils.ElsTimeout)
- if err != nil {
- return nil, err
- }
- optsCfg.ElsTimeout = &t
- }
- if _, has := opts[utils.ElsVersionLow]; has {
- x, err := utils.IfaceAsInt(utils.ElsVersionLow)
- if err != nil {
- return nil, err
- }
- optsCfg.ElsVersion = utils.IntPointer(x)
- }
- if _, has := opts[utils.ElsVersionType]; has {
- optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType))
- }
- if _, has := opts[utils.ElsWaitForActiveShards]; has {
- optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards))
- }
- if _, has := opts[utils.SQLMaxIdleConnsCfg]; has {
- x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg)
- if err != nil {
- return nil, err
- }
- optsCfg.SQLMaxIdleConns = utils.IntPointer(x)
- }
- if _, has := opts[utils.SQLMaxOpenConns]; has {
- x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns)
- if err != nil {
- return nil, err
- }
- optsCfg.SQLMaxOpenConns = utils.IntPointer(x)
- }
- if _, has := opts[utils.SQLConnMaxLifetime]; has {
- t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime)
- if err != nil {
- return nil, err
- }
- optsCfg.SQLConnMaxLifetime = &t
- }
- if _, has := opts[utils.MYSQLDSNParams]; has {
- optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string)
- }
- if _, has := opts[utils.SQLTableNameOpt]; has {
- optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt))
- }
- if _, has := opts[utils.SQLDBNameOpt]; has {
- optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt))
- }
- if _, has := opts[utils.PgSSLModeCfg]; has {
- optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg))
- }
- if _, has := opts[utils.KafkaTopic]; has {
- optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic))
- }
- if _, has := opts[utils.AMQPQueueID]; has {
- optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID))
- }
- if _, has := opts[utils.AMQPRoutingKey]; has {
- optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey))
- }
- if _, has := opts[utils.AMQPExchange]; has {
- optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange))
- }
- if _, has := opts[utils.AMQPExchangeType]; has {
- optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType))
- }
- if _, has := opts[utils.AWSRegion]; has {
- optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion))
- }
- if _, has := opts[utils.AWSKey]; has {
- optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey))
- }
- if _, has := opts[utils.AWSSecret]; has {
- optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret))
- }
- if _, has := opts[utils.AWSToken]; has {
- optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken))
- }
- if _, has := opts[utils.SQSQueueID]; has {
- optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID))
- }
- if _, has := opts[utils.S3Bucket]; has {
- optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket))
- }
- if _, has := opts[utils.S3FolderPath]; has {
- optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath))
- }
- if _, has := opts[utils.NatsJetStream]; has {
- x, err := utils.IfaceAsBool(utils.NatsJetStream)
- if err != nil {
- return nil, err
- }
- optsCfg.NATSJetStream = utils.BoolPointer(x)
- }
- if _, has := opts[utils.NatsSubject]; has {
- optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject))
- }
- if _, has := opts[utils.NatsJWTFile]; has {
- optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile))
- }
- if _, has := opts[utils.NatsSeedFile]; has {
- optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile))
- }
- if _, has := opts[utils.NatsCertificateAuthority]; has {
- optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority))
- }
- if _, has := opts[utils.NatsClientCertificate]; has {
- optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate))
- }
- if _, has := opts[utils.NatsClientKey]; has {
- optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey))
- }
- if _, has := opts[utils.NatsJetStreamMaxWait]; has {
- t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait)
- if err != nil {
- return nil, err
- }
- optsCfg.NATSJetStreamMaxWait = &t
- }
- if _, has := opts[utils.RpcCodec]; has {
- optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec))
- }
- if _, has := opts[utils.ServiceMethod]; has {
- optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod))
- }
- if _, has := opts[utils.KeyPath]; has {
- optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath))
- }
- if _, has := opts[utils.CertPath]; has {
- optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath))
- }
- if _, has := opts[utils.CaPath]; has {
- optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath))
- }
- if _, has := opts[utils.Tls]; has {
- x, err := utils.IfaceAsBool(utils.Tls)
- if err != nil {
- return nil, err
- }
- optsCfg.TLS = utils.BoolPointer(x)
- }
- if _, has := opts[utils.ConnIDs]; has {
- optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string)
- }
- if _, has := opts[utils.RpcConnTimeout]; has {
- t, err := utils.IfaceAsDuration(utils.RpcConnTimeout)
- if err != nil {
- return nil, err
- }
- optsCfg.RPCConnTimeout = &t
- }
- if _, has := opts[utils.RpcReplyTimeout]; has {
- t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout)
- if err != nil {
- return nil, err
- }
- optsCfg.RPCReplyTimeout = &t
- }
- if _, has := opts[utils.RPCAPIOpts]; has {
- optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]any)
- }
- return optsCfg, nil
-}
-
-// FailedExportersEEs used to save the failed post to file
-type FailedExportersEEs struct {
- lk sync.RWMutex
- Path string
- Opts *config.EventExporterOpts
- Format string
- Events []any
- failedPostsDir string
- module string
-}
-
-// AddEvent adds one event
-func (expEv *FailedExportersEEs) AddEvent(ev any) {
- expEv.lk.Lock()
- expEv.Events = append(expEv.Events, ev)
- expEv.lk.Unlock()
-}
-
-// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
- eesFailedEvents := &FailedExportersEEs{
- Path: expEv.Path,
- Opts: expEv.Opts,
- Format: expEv.Format,
- }
-
- eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path,
- utils.MetaNone, attempts, expEv.Opts)
- var ee EventExporter
- if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
- return
- }
- keyFunc := func() string { return utils.EmptyString }
- if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap {
- keyFunc = utils.UUIDSha1Prefix
- }
- for _, ev := range expEv.Events {
- if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
- eesFailedEvents.AddEvent(ev)
- }
- }
- ee.Close()
- if len(eesFailedEvents.Events) > 0 {
- err = utils.ErrPartiallyExecuted
- } else {
- eesFailedEvents = nil
- }
- return
-}
-*/
diff --git a/ees/libcdre_it_test.go b/ees/libcdre_it_test.go
deleted file mode 100644
index ac4f07ee4..000000000
--- a/ees/libcdre_it_test.go
+++ /dev/null
@@ -1,106 +0,0 @@
-//go:build integration
-// +build integration
-
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package ees
-
-/*
-func TestWriteFldPosts(t *testing.T) {
- // can't convert
- var notanExportEvent string
- writeFailedPosts("somestring", notanExportEvent)
- // can convert & write
- dir := "/tmp/engine/libcdre_test/"
- exportEvent := &ExportEvents{
- failedPostsDir: dir,
- module: "module",
- }
- if err := os.RemoveAll(dir); err != nil {
- t.Fatal("Error removing folder: ", dir, err)
- }
- if err := os.MkdirAll(dir, 0755); err != nil {
- t.Fatal("Error creating folder: ", dir, err)
- }
- config.CgrConfig().GeneralCfg().FailedPostsDir = dir
- writeFailedPosts("itmID", exportEvent)
-
- if filename, err := filepath.Glob(filepath.Join(dir, "module|*.gob")); err != nil {
- t.Error(err)
- } else if len(filename) == 0 {
- t.Error("Expecting one file")
- } else if len(filename) > 1 {
- t.Error("Expecting only one file")
- }
-}
-
-func TestWriteToFile(t *testing.T) {
- filePath := "/tmp/engine/libcdre_test/writeToFile.txt"
- exportEvent := &ExportEvents{}
- //call WriteToFile function
- if err := exportEvent.WriteToFile(filePath); err != nil {
- t.Error(err)
- }
- // check if the file exists / throw error if the file doesn't exist
- if _, err := os.Stat(filePath); os.IsNotExist(err) {
- t.Fatalf("File doesn't exists")
- }
- //check if the file was written correctly
- rcv, err := NewExportEventsFromFile(filePath)
- if err != nil {
- t.Errorf("Error deconding the file content: %+v", err)
- }
- if !reflect.DeepEqual(rcv, exportEvent) {
- t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
- }
- //populate the exportEvent struct
- exportEvent = &ExportEvents{
- Events: []any{"something1", "something2"},
- Path: "path",
- Format: "test",
- }
- filePath = "/tmp/engine/libcdre_test/writeToFile2.txt"
- if err := exportEvent.WriteToFile(filePath); err != nil {
- t.Error(err)
- }
- // check if the file exists / throw error if the file doesn't exist
- if _, err := os.Stat(filePath); os.IsNotExist(err) {
- t.Fatalf("File doesn't exists")
- }
- //check if the file was written correctly
- rcv, err = NewExportEventsFromFile(filePath)
- if err != nil {
- t.Errorf("Error deconding the file content: %+v", err)
- }
- if !reflect.DeepEqual(rcv, exportEvent) {
- t.Errorf("Expected: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
- }
- //wrong path *reading
- exportEvent = &ExportEvents{}
- filePath = "/tmp/engine/libcdre_test/wrongpath.txt"
- if _, err = NewExportEventsFromFile(filePath); err == nil || err.Error() != "open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory" {
- t.Errorf("Expecting: 'open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory',\nReceived: '%+v'", err)
- }
- //wrong path *writing
- filePath = utils.EmptyString
- if err := exportEvent.WriteToFile(filePath); err == nil || err.Error() != "open : no such file or directory" {
- t.Error(err)
- }
-}
-*/
diff --git a/ees/libcdre_test.go b/ees/libcdre_test.go
deleted file mode 100644
index 6903d3988..000000000
--- a/ees/libcdre_test.go
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package ees
-
-/*
-func TestSetFldPostCacheTTL(t *testing.T) {
- var1 := failedPostCache
- SetFailedPostCacheTTL(50 * time.Millisecond)
- var2 := failedPostCache
- if reflect.DeepEqual(var1, var2) {
- t.Error("Expecting to be different")
- }
-}
-
-func TestAddFldPost(t *testing.T) {
- SetFailedPostCacheTTL(5 * time.Second)
- AddFailedPost("", "path1", "format1", "module1", "1", &config.EventExporterOpts{})
- x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1"))
- if !ok {
- t.Error("Error reading from cache")
- }
- if x == nil {
- t.Error("Received an empty element")
- }
-
- failedPost, canCast := x.(*ExportEvents)
- if !canCast {
- t.Error("Error when casting")
- }
- eOut := &ExportEvents{
- Path: "path1",
- Format: "format1",
- module: "module1",
- Events: []any{"1"},
- Opts: &config.EventExporterOpts{},
- }
- if !reflect.DeepEqual(eOut, failedPost) {
- t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
- }
- AddFailedPost("", "path1", "format1", "module1", "2", &config.EventExporterOpts{})
- AddFailedPost("", "path2", "format2", "module2", "3", &config.EventExporterOpts{
- SQSQueueID: utils.StringPointer("qID"),
- })
- x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1"))
- if !ok {
- t.Error("Error reading from cache")
- }
- if x == nil {
- t.Error("Received an empty element")
- }
- failedPost, canCast = x.(*ExportEvents)
- if !canCast {
- t.Error("Error when casting")
- }
- eOut = &ExportEvents{
- Path: "path1",
- Format: "format1",
- module: "module1",
- Events: []any{"1", "2"},
- Opts: &config.EventExporterOpts{},
- }
- if !reflect.DeepEqual(eOut, failedPost) {
- t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
- }
- x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path2", "format2", "module2", "qID"))
- if !ok {
- t.Error("Error reading from cache")
- }
- if x == nil {
- t.Error("Received an empty element")
- }
- failedPost, canCast = x.(*ExportEvents)
- if !canCast {
- t.Error("Error when casting")
- }
- eOut = &ExportEvents{
- Path: "path2",
- Format: "format2",
- module: "module2",
- Events: []any{"3"},
- Opts: &config.EventExporterOpts{
- SQSQueueID: utils.StringPointer("qID"),
- },
- }
- if !reflect.DeepEqual(eOut, failedPost) {
- t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
- }
- for _, id := range failedPostCache.GetItemIDs("") {
- failedPostCache.Set(id, nil, nil)
- }
-}
-
-func TestFilePath(t *testing.T) {
- exportEvent := &ExportEvents{}
- rcv := exportEvent.FilePath()
- if rcv[0] != '|' {
- t.Errorf("Expecting: '|', received: %+v", rcv[0])
- } else if rcv[8:] != ".gob" {
- t.Errorf("Expecting: '.gob', received: %+v", rcv[8:])
- }
- exportEvent = &ExportEvents{
- module: "module",
- }
- rcv = exportEvent.FilePath()
- if rcv[:7] != "module|" {
- t.Errorf("Expecting: 'module|', received: %+v", rcv[:7])
- } else if rcv[14:] != ".gob" {
- t.Errorf("Expecting: '.gob', received: %+v", rcv[14:])
- }
-
-}
-
-func TestSetModule(t *testing.T) {
- exportEvent := &ExportEvents{}
- eOut := &ExportEvents{
- module: "module",
- }
- exportEvent.SetModule("module")
- if !reflect.DeepEqual(eOut, exportEvent) {
- t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
- }
-}
-
-func TestAddEvent(t *testing.T) {
- exportEvent := &ExportEvents{}
- eOut := &ExportEvents{Events: []any{"event1"}}
- exportEvent.AddEvent("event1")
- if !reflect.DeepEqual(eOut, exportEvent) {
- t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
- }
- exportEvent = &ExportEvents{}
- eOut = &ExportEvents{Events: []any{"event1", "event2", "event3"}}
- exportEvent.AddEvent("event1")
- exportEvent.AddEvent("event2")
- exportEvent.AddEvent("event3")
- if !reflect.DeepEqual(eOut, exportEvent) {
- t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(exportEvent))
- }
-}
-*/
diff --git a/efs/efs.go b/efs/efs.go
index bc9faf283..d066c1fff 100644
--- a/efs/efs.go
+++ b/efs/efs.go
@@ -19,8 +19,10 @@ along with this program. If not, see
package efs
import (
- "os"
- "path"
+ "fmt"
+ "io/fs"
+ "path/filepath"
+ "slices"
"strings"
"sync"
@@ -76,14 +78,14 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts
}
case utils.Kafka:
}
- var failedPost *FailedExportersLogg
+ var failedPost *FailedExportersLog
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
- failedPost = x.(*FailedExportersLogg)
+ failedPost = x.(*FailedExportersLog)
}
}
if failedPost == nil {
- failedPost = &FailedExportersLogg{
+ failedPost = &FailedExportersLog{
Path: args.Path,
Format: format,
Opts: args.APIOpts,
@@ -97,56 +99,64 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts
return nil
}
+// ReplayEventsParams contains parameters for replaying failed posts.
+type ReplayEventsParams struct {
+ Tenant string
+ Provider string // source of failed posts
+ SourcePath string // path for events to be replayed
+ FailedPath string // path for events that failed to replay, *none to discard, defaults to SourceDir if empty
+ Modules []string // list of modules to replay requests for, nil for all
+}
+
// V1ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again.
-func (efS *EfS) V1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) error {
- failedPostsDir := efS.cfg.EFsCfg().FailedPostsDir
- if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString {
- failedPostsDir = *args.FailedRequestsInDir
+func (efS *EfS) V1ReplayEvents(ctx *context.Context, args ReplayEventsParams, reply *string) error {
+
+ // Set default directories if not provided.
+ if args.SourcePath == "" {
+ args.SourcePath = efS.cfg.EFsCfg().FailedPostsDir
}
- failedOutDir := failedPostsDir
- if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != utils.EmptyString {
- failedOutDir = *args.FailedRequestsOutDir
+ if args.FailedPath == "" {
+ args.FailedPath = args.SourcePath
}
- // check all the files in the FailedPostsInDirectory
- filesInDir, err := os.ReadDir(failedPostsDir)
- if err != nil {
- return err
- }
- if len(filesInDir) == 0 {
- return utils.ErrNotFound
- }
- // check every file and check if any of them match the modules
- for _, file := range filesInDir {
- if len(args.Modules) != 0 {
- var allowedModule bool
- for _, module := range args.Modules {
- if strings.HasPrefix(file.Name(), module) {
- allowedModule = true
- break
- }
- }
- if !allowedModule {
- continue
- }
+
+ if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error {
+ if err != nil {
+ utils.Logger.Warning(fmt.Sprintf("<%s> failed to access path %s: %v", utils.EFs, path, err))
+ return nil // skip paths that cause an error
}
- filePath := path.Join(failedPostsDir, file.Name())
- var expEv FailoverPoster
- if expEv, err = NewFailoverPosterFromFile(filePath, args.TypeProvider, efS); err != nil {
- return err
+ if d.IsDir() {
+ return nil // skip directories
}
- // check if the failed out dir path is the same as the same in dir in order to export again in case of failure
+
+ // Skip files not belonging to the specified modules.
+ if len(args.Modules) != 0 && !slices.ContainsFunc(args.Modules, func(mod string) bool {
+ return strings.HasPrefix(d.Name(), mod)
+ }) {
+ utils.Logger.Info(fmt.Sprintf("<%s> skipping file %s: not found within specified modules", utils.EFs, d.Name()))
+ return nil
+ }
+
+ expEv, err := NewFailoverPosterFromFile(path, args.Provider, efS)
+ if err != nil {
+ return fmt.Errorf("failed to init failover poster from %s: %v", path, err)
+ }
+
+ // Determine the failover path.
failoverPath := utils.MetaNone
- if failedOutDir != utils.MetaNone {
- failoverPath = path.Join(failedOutDir, file.Name())
+ if args.FailedPath != utils.MetaNone {
+ failoverPath = filepath.Join(args.FailedPath, d.Name())
}
err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant)
- if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
+ if err != nil && failoverPath != utils.MetaNone {
+ // Write the events that failed to be replayed to the failover directory.
if err = WriteToFile(failoverPath, expEv); err != nil {
- return utils.NewErrServerError(err)
+ return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err)
}
}
-
+ return nil
+ }); err != nil {
+ return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
diff --git a/efs/efs_it_test.go b/efs/efs_it_test.go
index d0851c5e7..c37d7c639 100644
--- a/efs/efs_it_test.go
+++ b/efs/efs_it_test.go
@@ -25,6 +25,7 @@ import (
"encoding/gob"
"fmt"
"os"
+ "os/exec"
"path"
"testing"
"time"
@@ -35,6 +36,9 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/nats-io/nats.go"
)
var (
@@ -70,7 +74,7 @@ func TestDecodeExportEvents(t *testing.T) {
}
dec := gob.NewDecoder(bytes.NewBuffer(content))
gob.Register(new(utils.CGREvent))
- singleEvent := new(FailedExportersLogg)
+ singleEvent := new(FailedExportersLog)
if err := dec.Decode(&singleEvent); err != nil {
t.Error(err)
} else {
@@ -183,3 +187,184 @@ func testEfsSKillEngine(t *testing.T) {
t.Error(err)
}
}
+
+// TestEFsReplayEvents tests the implementation of the EfSv1.ReplayEvents.
+func TestEFsReplayEvents(t *testing.T) {
+ switch *utils.DBType {
+ case utils.MetaInternal:
+ case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
+ t.SkipNow()
+ default:
+ t.Fatal("unsupported dbtype value")
+ }
+ failedDir := t.TempDir()
+ content := fmt.Sprintf(`{
+"data_db": {
+ "db_type": "*internal"
+},
+"stor_db": {
+ "db_type": "*internal"
+},
+"admins": {
+ "enabled": true
+},
+"efs": {
+ "enabled": true,
+ "failed_posts_ttl": "3ms",
+ "poster_attempts": 1
+},
+"ees": {
+ "enabled": true,
+ "exporters": [
+ {
+ "id": "nats_exporter",
+ "type": "*natsJSONMap",
+ "flags": ["*log"],
+ "efs_conns": ["*localhost"],
+ "export_path": "nats://localhost:4222",
+ "attempts": 1,
+ "failed_posts_dir": "%s",
+ "synchronous": true,
+ "opts": {
+ "natsSubject": "processed_cdrs",
+ },
+ "fields":[
+ {"tag": "TestField", "path": "*exp.TestField", "type": "*variable", "value": "~*req.TestField"},
+ ]
+ }
+ ]
+}
+}`, failedDir)
+
+ testEnv := engine.TestEnvironment{
+ ConfigJSON: content,
+ // LogBuffer: &bytes.Buffer{},
+ Encoding: *utils.Encoding,
+ }
+ // defer fmt.Println(testEnv.LogBuffer)
+ client, _ := testEnv.Setup(t, context.Background())
+ // helper to sort slices
+ less := func(a, b string) bool { return a < b }
+ // amount of events to export/replay
+ count := 5
+ t.Run("successful nats export", func(t *testing.T) {
+ cmd := exec.Command("nats-server")
+ if err := cmd.Start(); err != nil {
+ t.Fatalf("failed to start nats-server: %v", err)
+ }
+ time.Sleep(50 * time.Millisecond)
+ defer cmd.Process.Kill()
+ nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
+ if err != nil {
+ t.Fatalf("failed to connect to nats-server: %v", err)
+ }
+ defer nc.Drain()
+ ch := make(chan *nats.Msg, count)
+ sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
+ if err != nil {
+ t.Fatalf("failed to subscribe to nats queue: %v", err)
+ }
+ var reply map[string]map[string]any
+ for i := range count {
+ if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, &utils.CGREventWithEeIDs{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "test",
+ Event: map[string]any{
+ "TestField": i,
+ },
+ },
+ }, &reply); err != nil {
+ t.Errorf("EeSv1.ProcessEvent returned unexpected err: %v", err)
+ }
+ }
+ time.Sleep(1 * time.Millisecond) // wait for the channel to receive the replayed exports
+ want := make([]string, 0, count)
+ for i := range count {
+ want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i))
+ }
+ if err := sub.Unsubscribe(); err != nil {
+ t.Errorf("failed to unsubscribe from nats subject: %v", err)
+ }
+ close(ch)
+ got := make([]string, 0, count)
+ for elem := range ch {
+ got = append(got, string(elem.Data))
+ }
+ if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
+ t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
+ }
+ })
+ t.Run("replay failed nats export", func(t *testing.T) {
+ t.Skip("skipping due to gob decoding err")
+ var exportReply map[string]map[string]any
+ for i := range count {
+ err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
+ &utils.CGREventWithEeIDs{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ ID: "test",
+ Event: map[string]any{
+ "TestField": i,
+ },
+ },
+ }, &exportReply)
+ if err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
+ t.Errorf("EeSv1.ProcessEvent err = %v, want %v", err, utils.ErrPartiallyExecuted)
+ }
+ }
+ time.Sleep(5 * time.Millisecond)
+ replayFailedDir := t.TempDir()
+ var replayReply string
+ if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{
+ Tenant: "cgrates.org",
+ Provider: utils.EEs,
+ SourcePath: failedDir,
+ FailedPath: replayFailedDir,
+ Modules: []string{"test", "EEs"},
+ }, &replayReply); err != nil {
+ t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err)
+ }
+ cmd := exec.Command("nats-server")
+ if err := cmd.Start(); err != nil {
+ t.Fatalf("failed to start nats-server: %v", err)
+ }
+ time.Sleep(50 * time.Millisecond)
+ defer cmd.Process.Kill()
+ nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
+ if err != nil {
+ t.Fatalf("failed to connect to nats-server: %v", err)
+ }
+ defer nc.Drain()
+ ch := make(chan *nats.Msg, count)
+ sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
+ if err != nil {
+ t.Fatalf("failed to subscribe to nats queue: %v", err)
+ }
+ if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{
+ Tenant: "cgrates.org",
+ Provider: utils.EEs,
+ SourcePath: replayFailedDir,
+ FailedPath: utils.MetaNone,
+ Modules: []string{"test", "EEs"},
+ }, &replayReply); err != nil {
+ t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err)
+ }
+ time.Sleep(time.Millisecond) // wait for the channel to receive the replayed exports
+ want := make([]string, 0, count)
+ for i := range count {
+ want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i))
+ }
+ if err := sub.Unsubscribe(); err != nil {
+ t.Errorf("failed to unsubscribe from nats subject: %v", err)
+ }
+ close(ch)
+ got := make([]string, 0, count)
+ for elem := range ch {
+ got = append(got, string(elem.Data))
+ }
+ if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
+ t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
+ }
+ })
+}
diff --git a/efs/failed_ees.go b/efs/failed_ees.go
index db934abda..80fe050a4 100644
--- a/efs/failed_ees.go
+++ b/efs/failed_ees.go
@@ -250,7 +250,7 @@ func (expEv *FailedExportersEEs) AddEvent(ev any) {
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
- eesFailedEvents := &FailedExportersEEs{
+ failedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
@@ -268,14 +268,16 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempt
}
for _, ev := range expEv.Events {
if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil {
- eesFailedEvents.AddEvent(ev)
+ failedEvents.AddEvent(ev)
}
}
ee.Close()
- if len(eesFailedEvents.Events) > 0 {
- err = utils.ErrPartiallyExecuted
- } else {
- eesFailedEvents = nil
+ switch len(failedEvents.Events) {
+ case 0: // none failed to be replayed
+ return nil
+ case len(expEv.Events): // all failed, return last encountered error
+ return err
+ default:
+ return utils.ErrPartiallyExecuted
}
- return
}
diff --git a/efs/failed_loggs.go b/efs/failed_logs.go
similarity index 68%
rename from efs/failed_loggs.go
rename to efs/failed_logs.go
index 2074329dd..938bb2337 100644
--- a/efs/failed_loggs.go
+++ b/efs/failed_logs.go
@@ -31,8 +31,8 @@ import (
"github.com/segmentio/kafka-go"
)
-// FailedExportersLogg is a failover poster for kafka logger type
-type FailedExportersLogg struct {
+// FailedExportersLog is a failover poster for kafka logger type
+type FailedExportersLog struct {
lk sync.RWMutex
Path string
Opts map[string]any // this is meta
@@ -46,50 +46,51 @@ type FailedExportersLogg struct {
}
// AddEvent adds one event
-func (expEv *FailedExportersLogg) AddEvent(ev any) {
+func (expEv *FailedExportersLog) AddEvent(ev any) {
expEv.lk.Lock()
+ defer expEv.lk.Unlock()
expEv.Events = append(expEv.Events, ev)
- expEv.lk.Unlock()
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
-func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
- var fileContent []byte
- if fileContent, err = os.ReadFile(filePath); err != nil {
+func NewExportEventsFromFile(filePath string) (*FailedExportersLog, error) {
+ content, err := os.ReadFile(filePath)
+ if err != nil {
return nil, err
}
- if err = os.Remove(filePath); err != nil {
+ if err := os.Remove(filePath); err != nil {
return nil, err
}
- dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
- // unmarshall it
- expEv = new(FailedExportersLogg)
- err = dec.Decode(&expEv)
- return
+ var expEv FailedExportersLog
+ dec := gob.NewDecoder(bytes.NewBuffer(content))
+ if err := dec.Decode(&expEv); err != nil {
+ return nil, err
+ }
+ return &expEv, nil
}
// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
+func (expEv *FailedExportersLog) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) error {
nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID])
logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level])
if err != nil {
- return
+ return err
}
expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl,
expEv.connMngr, expEv.cfg)
for _, event := range expEv.Events {
- var content []byte
- if content, err = utils.ToUnescapedJSON(event); err != nil {
- return
+ content, err := utils.ToUnescapedJSON(event)
+ if err != nil {
+ return err
}
- if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
+ if err := expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(utils.GenUUID()),
Value: content,
}); err != nil {
var reply string
// if there are any errors in kafka, we will post in FailedPostDirectory
- if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
+ return expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
&utils.ArgsFailedPosts{
Tenant: tnt,
Path: expLogger.Writer.Addr.String(),
@@ -97,11 +98,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
FailedDir: expLogger.FldPostDir,
Module: utils.Kafka,
APIOpts: expLogger.GetMeta(),
- }, &reply); err != nil {
- return err
- }
- return nil
+ }, &reply)
}
}
- return err
+ return nil
}
diff --git a/efs/libefs.go b/efs/libefs.go
index 29095dadf..2b0125cd9 100644
--- a/efs/libefs.go
+++ b/efs/libefs.go
@@ -21,6 +21,7 @@ package efs
import (
"bytes"
"encoding/gob"
+ "errors"
"fmt"
"os"
"path"
@@ -45,23 +46,22 @@ func SetFailedPostCacheTTL(ttl time.Duration) {
}
func writeFailedPosts(_ string, value any) {
- expEv, canConvert := value.(*FailedExportersLogg)
+ expEv, canConvert := value.(*FailedExportersLog)
if !canConvert {
return
}
filePath := expEv.FilePath()
expEv.lk.RLock()
+ defer expEv.lk.RUnlock()
if err := WriteToFile(filePath, expEv); err != nil {
utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
filePath, err))
- expEv.lk.RUnlock()
return
}
- expEv.lk.RUnlock()
}
// FilePath returns the file path it should use for saving the failed events
-func (expEv *FailedExportersLogg) FilePath() string {
+func (expEv *FailedExportersLog) FilePath() string {
return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
}
@@ -84,28 +84,32 @@ func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
-func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) {
- var fileContent []byte
- err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
- if fileContent, err = os.ReadFile(filePath); err != nil {
- return err
+func NewFailoverPosterFromFile(filePath, provider string, efs *EfS) (FailoverPoster, error) {
+ var content []byte
+ err := guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
+ var readErr error
+ if content, readErr = os.ReadFile(filePath); readErr != nil {
+ return readErr
}
return os.Remove(filePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
- return
+ return nil, err
}
- dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
- // unmarshall it
- expEv := new(FailedExportersLogg)
- err = dec.Decode(&expEv)
- switch providerType {
+
+ dec := gob.NewDecoder(bytes.NewBuffer(content))
+ var expEv FailedExportersLog
+ if err := dec.Decode(&expEv); err != nil {
+ return nil, err
+ }
+
+ switch provider {
case utils.EEs:
opts, err := AsOptsEESConfig(expEv.Opts)
if err != nil {
return nil, err
}
- failPoster = &FailedExportersEEs{
+ return &FailedExportersEEs{
module: expEv.Module,
failedPostsDir: expEv.FailedPostsDir,
Path: expEv.Path,
@@ -114,11 +118,12 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos
Format: expEv.Format,
connMngr: efs.connMgr,
- }
+ }, nil
case utils.Kafka:
expEv.cfg = efs.cfg
expEv.connMngr = efs.connMgr
- failPoster = expEv
+ return &expEv, nil
+ default:
+ return nil, errors.New("invalid provider")
}
- return
}
diff --git a/go.mod b/go.mod
index cc006fd44..f810eafd8 100644
--- a/go.mod
+++ b/go.mod
@@ -30,6 +30,7 @@ require (
github.com/fiorix/go-diameter/v4 v4.0.4
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.8.1
+ github.com/google/go-cmp v0.6.0
github.com/mediocregopher/radix/v3 v3.8.1
github.com/miekg/dns v1.1.62
github.com/nats-io/nats-server/v2 v2.10.18
diff --git a/utils/coreutils.go b/utils/coreutils.go
index c0e1e7f57..a775dd92a 100644
--- a/utils/coreutils.go
+++ b/utils/coreutils.go
@@ -823,14 +823,6 @@ type ArgsFailedPosts struct {
APIOpts map[string]any // Specially for the meta
}
-type ArgsReplayFailedPosts struct {
- Tenant string
- TypeProvider string
- FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
- FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
- Modules []string // list of modules for which replay the requests, nil for all
-}
-
// GetIndexesArg the API argumets to specify an index
type GetIndexesArg struct {
IdxItmType string
diff --git a/utils/failover_export.go b/utils/failover_export.go
deleted file mode 100644
index b64ee3378..000000000
--- a/utils/failover_export.go
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package utils
-
-/*
-var failedPostCache *ltcache.Cache
-
-func init() {
- failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts)
-}
-
-// SetFailedPostCacheTTL recreates the failed cache
-func SetFailedPostCacheTTL(ttl time.Duration) {
- failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts)
-}
-
-func writeFailedPosts(_ string, value any) {
- expEv, canConvert := value.(*FailedExportersLogg)
- if !canConvert {
- return
- }
- filePath := expEv.FilePath()
- expEv.lk.RLock()
- if err := WriteToFile(filePath, expEv); err != nil {
- Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
- filePath, err))
- expEv.lk.RUnlock()
- return
- }
- expEv.lk.RUnlock()
-}
-
-// FilePath returns the file path it should use for saving the failed events
-func (expEv *FailedExportersLogg) FilePath() string {
- return path.Join(expEv.FailedPostsDir, expEv.Module+PipeSep+UUIDSha1Prefix()+GOBSuffix)
-}
-
-// WriteToFile writes the events to file
-func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
- fileOut, err := os.Create(filePath)
- if err != nil {
- return err
- }
- encd := gob.NewEncoder(fileOut)
- gob.Register(new(CGREvent))
- err = encd.Encode(expEv)
- fileOut.Close()
- return
-}
-
-type FailedExportersLogg struct {
- lk sync.RWMutex
- Path string
- Opts map[string]any // THIS WILL BE META
- Format string
- Events []any
- FailedPostsDir string
- Module string
-}
-
-func AddFailedMessage(failedPostsDir, expPath, format,
- module string, ev any, opts map[string]any) {
- key := ConcatenatedKey(failedPostsDir, expPath, format, module)
- switch module {
- case EEs:
- // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
- var amqpQueueID string
- var s3BucketID string
- var sqsQueueID string
- var kafkaTopic string
- if _, has := opts[AMQPQueueID]; has {
- amqpQueueID = IfaceAsString(opts[AMQPQueueID])
- }
- if _, has := opts[S3Bucket]; has {
- s3BucketID = IfaceAsString(opts[S3Bucket])
- }
- if _, has := opts[SQSQueueID]; has {
- sqsQueueID = IfaceAsString(opts[SQSQueueID])
- }
- if _, has := opts[kafkaTopic]; has {
- kafkaTopic = IfaceAsString(opts[KafkaTopic])
- }
- if qID := FirstNonEmpty(amqpQueueID, s3BucketID,
- sqsQueueID, kafkaTopic); len(qID) != 0 {
- key = ConcatenatedKey(key, qID)
- }
- case Kafka:
- }
- var failedPost *FailedExportersLogg
- if x, ok := failedPostCache.Get(key); ok {
- if x != nil {
- failedPost = x.(*FailedExportersLogg)
- }
- }
- if failedPost == nil {
- failedPost = &FailedExportersLogg{
- Path: expPath,
- Format: format,
- Opts: opts,
- Module: module,
- FailedPostsDir: failedPostsDir,
- }
- failedPostCache.Set(key, failedPost, nil)
- }
- failedPost.AddEvent(ev)
-}
-
-// AddEvent adds one event
-func (expEv *FailedExportersLogg) AddEvent(ev any) {
- expEv.lk.Lock()
- expEv.Events = append(expEv.Events, ev)
- expEv.lk.Unlock()
-}
-
-// NewExportEventsFromFile returns ExportEvents from the file
-// used only on replay failed post
-func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
- var fileContent []byte
- if fileContent, err = os.ReadFile(filePath); err != nil {
- return nil, err
- }
- if err = os.Remove(filePath); err != nil {
- return nil, err
- }
- dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
- // unmarshall it
- expEv = new(FailedExportersLogg)
- err = dec.Decode(&expEv)
- return
-}
-
-type FailoverPoster interface {
- ReplayFailedPosts(int, string) error
-}
-
-// ReplayFailedPosts tryies to post cdrs again
-func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) {
- nodeID := IfaceAsString(expEv.Opts[NodeID])
- logLvl, err := IfaceAsInt(expEv.Opts[Level])
- if err != nil {
- return
- }
- expLogger := NewExportLogger(nodeID, tnt, logLvl,
- expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
- for _, event := range expEv.Events {
- var content []byte
- if content, err = ToUnescapedJSON(event); err != nil {
- return
- }
- if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
- Key: []byte(GenUUID()),
- Value: content,
- }); err != nil {
- // if there are any errors in kafka, we will post in FailedPostDirectory
- AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka,
- event, expLogger.GetMeta())
- return nil
- }
- }
- return err
-}
-*/