Revise EfSv1.ReplayEvents API + tests

- renamed parameter type: ArgsReplyFailedPosts -> ReplayEventsParams
- renamed param fields:
  - FailedRequestsInDir -> SourcePath
  - FailedRequestsOutDir -> FailedPath
  - TypeProvider -> Provider
- changed param fields types from *string to string
- used the SourcePath and FailedPath params directly instead of creating separate variables
- used filepath.WalkDir instead of reading the directory and looping over the entries
- used slices.ContainsFunc to check if the file belongs to any module (if 1+ is specified)
- used filepath.Join instead of path.Join
- used the path provided by WalkFunc instead of building the file paths ourselves
- made error returns more descriptive
- added logs for directories/files that are skipped
- paths that cannot be accessed are skipped after logging the error
This commit is contained in:
ionutboangiu
2024-09-25 18:47:46 +03:00
committed by Dan Christian Bogos
parent f26a0c2d0b
commit 9b8ac3199b
14 changed files with 303 additions and 854 deletions

View File

@@ -1080,6 +1080,8 @@
// "store_interval": "", // dump cache regularly to dataDB, 0 - dump at start/shutdown: <""|$dur>
// "store_uncompressed_limit": 0, // used to compress data
// "thresholds_conns": [], // connections to ThresholdS for StatUpdates, empty to disable thresholds functionality: <""|*internal|$rpc_conns_id>
// "ees_conns": [], // connections to EEs for StatUpdates, empty to disable export functionality: <""|*internal|$rpc_conns_id>
// "ees_exporter_ids": [], // list of EventExporter profiles to use for real-time StatUpdate exports
// "indexed_selects": true, // enable profile matching exclusively on indexes
// //"string_indexed_fields": [], // query indexes based on these fields for faster processing
// "prefix_indexed_fields": [], // query indexes based on these fields for faster processing

View File

@@ -21,6 +21,7 @@ package dispatchers
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/utils"
)
@@ -51,9 +52,9 @@ func (dS *DispatcherService) EfSv1ProcessEvent(ctx *context.Context, args *utils
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaEFs, utils.EfSv1ProcessEvent, args, reply)
}
func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) (err error) {
func (dS *DispatcherService) EfSv1ReplayEvents(ctx *context.Context, args efs.ReplayEventsParams, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args != nil && len(args.Tenant) != 0 {
if len(args.Tenant) != 0 {
tnt = args.Tenant
}
ev := make(map[string]any)

View File

@@ -21,6 +21,7 @@ package dispatchers
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/utils"
)
@@ -39,7 +40,7 @@ func (dS *DispatcherService) ErSv1Ping(ctx *context.Context, args *utils.CGREven
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1Ping, args, reply)
}
func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args utils.StringWithAPIOpts, reply *string) (err error) {
func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args ers.V1RunReaderParams, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if len(args.Tenant) != 0 {
tnt = args.Tenant

View File

@@ -1,308 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
/*
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
func NewFailoverPosterFromFile(filePath, providerType string) (failPoster utils.FailoverPoster, err error) {
var fileContent []byte
err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return err
}
return os.Remove(filePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
return
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv := new(utils.FailedExportersLogg)
err = dec.Decode(&expEv)
switch providerType {
case utils.EEs:
opts, err := AsOptsEESConfig(expEv.Opts)
if err != nil {
return nil, err
}
failPoster = &FailedExportersEEs{
module: expEv.Module,
failedPostsDir: expEv.FailedPostsDir,
Path: expEv.Path,
Opts: opts,
Events: expEv.Events,
Format: expEv.Format,
}
case utils.Kafka:
failPoster = expEv
}
return
}
func AsOptsEESConfig(opts map[string]any) (*config.EventExporterOpts, error) {
optsCfg := new(config.EventExporterOpts)
if len(opts) == 0 {
return optsCfg, nil
}
if _, has := opts[utils.CSVFieldSepOpt]; has {
optsCfg.CSVFieldSeparator = utils.StringPointer(utils.IfaceAsString(utils.CSVFieldSepOpt))
}
if _, has := opts[utils.ElsIndex]; has {
optsCfg.ElsIndex = utils.StringPointer(utils.IfaceAsString(utils.ElsIndex))
}
if _, has := opts[utils.ElsIfPrimaryTerm]; has {
x, err := utils.IfaceAsInt(utils.ElsIfPrimaryTerm)
if err != nil {
return nil, err
}
optsCfg.ElsIfPrimaryTerm = utils.IntPointer(x)
}
if _, has := opts[utils.ElsIfSeqNo]; has {
x, err := utils.IfaceAsInt(utils.ElsIfSeqNo)
if err != nil {
return nil, err
}
optsCfg.ElsIfSeqNo = utils.IntPointer(x)
}
if _, has := opts[utils.ElsOpType]; has {
optsCfg.ElsOpType = utils.StringPointer(utils.IfaceAsString(utils.ElsOpType))
}
if _, has := opts[utils.ElsPipeline]; has {
optsCfg.ElsPipeline = utils.StringPointer(utils.IfaceAsString(utils.ElsPipeline))
}
if _, has := opts[utils.ElsRouting]; has {
optsCfg.ElsRouting = utils.StringPointer(utils.IfaceAsString(utils.ElsRouting))
}
if _, has := opts[utils.ElsTimeout]; has {
t, err := utils.IfaceAsDuration(utils.ElsTimeout)
if err != nil {
return nil, err
}
optsCfg.ElsTimeout = &t
}
if _, has := opts[utils.ElsVersionLow]; has {
x, err := utils.IfaceAsInt(utils.ElsVersionLow)
if err != nil {
return nil, err
}
optsCfg.ElsVersion = utils.IntPointer(x)
}
if _, has := opts[utils.ElsVersionType]; has {
optsCfg.ElsVersionType = utils.StringPointer(utils.IfaceAsString(utils.ElsVersionType))
}
if _, has := opts[utils.ElsWaitForActiveShards]; has {
optsCfg.ElsWaitForActiveShards = utils.StringPointer(utils.IfaceAsString(utils.ElsWaitForActiveShards))
}
if _, has := opts[utils.SQLMaxIdleConnsCfg]; has {
x, err := utils.IfaceAsInt(utils.SQLMaxIdleConnsCfg)
if err != nil {
return nil, err
}
optsCfg.SQLMaxIdleConns = utils.IntPointer(x)
}
if _, has := opts[utils.SQLMaxOpenConns]; has {
x, err := utils.IfaceAsInt(utils.SQLMaxOpenConns)
if err != nil {
return nil, err
}
optsCfg.SQLMaxOpenConns = utils.IntPointer(x)
}
if _, has := opts[utils.SQLConnMaxLifetime]; has {
t, err := utils.IfaceAsDuration(utils.SQLConnMaxLifetime)
if err != nil {
return nil, err
}
optsCfg.SQLConnMaxLifetime = &t
}
if _, has := opts[utils.MYSQLDSNParams]; has {
optsCfg.MYSQLDSNParams = opts[utils.SQLConnMaxLifetime].(map[string]string)
}
if _, has := opts[utils.SQLTableNameOpt]; has {
optsCfg.SQLTableName = utils.StringPointer(utils.IfaceAsString(utils.SQLTableNameOpt))
}
if _, has := opts[utils.SQLDBNameOpt]; has {
optsCfg.SQLDBName = utils.StringPointer(utils.IfaceAsString(utils.SQLDBNameOpt))
}
if _, has := opts[utils.PgSSLModeCfg]; has {
optsCfg.PgSSLMode = utils.StringPointer(utils.IfaceAsString(utils.PgSSLModeCfg))
}
if _, has := opts[utils.KafkaTopic]; has {
optsCfg.KafkaTopic = utils.StringPointer(utils.IfaceAsString(utils.KafkaTopic))
}
if _, has := opts[utils.AMQPQueueID]; has {
optsCfg.AMQPQueueID = utils.StringPointer(utils.IfaceAsString(utils.AMQPQueueID))
}
if _, has := opts[utils.AMQPRoutingKey]; has {
optsCfg.AMQPRoutingKey = utils.StringPointer(utils.IfaceAsString(utils.AMQPRoutingKey))
}
if _, has := opts[utils.AMQPExchange]; has {
optsCfg.AMQPExchange = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchange))
}
if _, has := opts[utils.AMQPExchangeType]; has {
optsCfg.AMQPExchangeType = utils.StringPointer(utils.IfaceAsString(utils.AMQPExchangeType))
}
if _, has := opts[utils.AWSRegion]; has {
optsCfg.AWSRegion = utils.StringPointer(utils.IfaceAsString(utils.AWSRegion))
}
if _, has := opts[utils.AWSKey]; has {
optsCfg.AWSKey = utils.StringPointer(utils.IfaceAsString(utils.AWSKey))
}
if _, has := opts[utils.AWSSecret]; has {
optsCfg.AWSSecret = utils.StringPointer(utils.IfaceAsString(utils.AWSSecret))
}
if _, has := opts[utils.AWSToken]; has {
optsCfg.AWSToken = utils.StringPointer(utils.IfaceAsString(utils.AWSToken))
}
if _, has := opts[utils.SQSQueueID]; has {
optsCfg.SQSQueueID = utils.StringPointer(utils.IfaceAsString(utils.SQSQueueID))
}
if _, has := opts[utils.S3Bucket]; has {
optsCfg.S3BucketID = utils.StringPointer(utils.IfaceAsString(utils.S3Bucket))
}
if _, has := opts[utils.S3FolderPath]; has {
optsCfg.S3FolderPath = utils.StringPointer(utils.IfaceAsString(utils.S3FolderPath))
}
if _, has := opts[utils.NatsJetStream]; has {
x, err := utils.IfaceAsBool(utils.NatsJetStream)
if err != nil {
return nil, err
}
optsCfg.NATSJetStream = utils.BoolPointer(x)
}
if _, has := opts[utils.NatsSubject]; has {
optsCfg.NATSSubject = utils.StringPointer(utils.IfaceAsString(utils.NatsSubject))
}
if _, has := opts[utils.NatsJWTFile]; has {
optsCfg.NATSJWTFile = utils.StringPointer(utils.IfaceAsString(utils.NatsJWTFile))
}
if _, has := opts[utils.NatsSeedFile]; has {
optsCfg.NATSSeedFile = utils.StringPointer(utils.IfaceAsString(utils.NatsSeedFile))
}
if _, has := opts[utils.NatsCertificateAuthority]; has {
optsCfg.NATSCertificateAuthority = utils.StringPointer(utils.IfaceAsString(utils.NatsCertificateAuthority))
}
if _, has := opts[utils.NatsClientCertificate]; has {
optsCfg.NATSClientCertificate = utils.StringPointer(utils.IfaceAsString(utils.NatsClientCertificate))
}
if _, has := opts[utils.NatsClientKey]; has {
optsCfg.NATSClientKey = utils.StringPointer(utils.IfaceAsString(utils.NatsClientKey))
}
if _, has := opts[utils.NatsJetStreamMaxWait]; has {
t, err := utils.IfaceAsDuration(utils.NatsJetStreamMaxWait)
if err != nil {
return nil, err
}
optsCfg.NATSJetStreamMaxWait = &t
}
if _, has := opts[utils.RpcCodec]; has {
optsCfg.RPCCodec = utils.StringPointer(utils.IfaceAsString(utils.RpcCodec))
}
if _, has := opts[utils.ServiceMethod]; has {
optsCfg.ServiceMethod = utils.StringPointer(utils.IfaceAsString(utils.ServiceMethod))
}
if _, has := opts[utils.KeyPath]; has {
optsCfg.KeyPath = utils.StringPointer(utils.IfaceAsString(utils.KeyPath))
}
if _, has := opts[utils.CertPath]; has {
optsCfg.CertPath = utils.StringPointer(utils.IfaceAsString(utils.CertPath))
}
if _, has := opts[utils.CaPath]; has {
optsCfg.CAPath = utils.StringPointer(utils.IfaceAsString(utils.CaPath))
}
if _, has := opts[utils.Tls]; has {
x, err := utils.IfaceAsBool(utils.Tls)
if err != nil {
return nil, err
}
optsCfg.TLS = utils.BoolPointer(x)
}
if _, has := opts[utils.ConnIDs]; has {
optsCfg.ConnIDs = opts[utils.ConnIDs].(*[]string)
}
if _, has := opts[utils.RpcConnTimeout]; has {
t, err := utils.IfaceAsDuration(utils.RpcConnTimeout)
if err != nil {
return nil, err
}
optsCfg.RPCConnTimeout = &t
}
if _, has := opts[utils.RpcReplyTimeout]; has {
t, err := utils.IfaceAsDuration(utils.RpcReplyTimeout)
if err != nil {
return nil, err
}
optsCfg.RPCReplyTimeout = &t
}
if _, has := opts[utils.RPCAPIOpts]; has {
optsCfg.RPCAPIOpts = opts[utils.RPCAPIOpts].(map[string]any)
}
return optsCfg, nil
}
// FailedExportersEEs used to save the failed post to file
type FailedExportersEEs struct {
lk sync.RWMutex
Path string
Opts *config.EventExporterOpts
Format string
Events []any
failedPostsDir string
module string
}
// AddEvent adds one event
func (expEv *FailedExportersEEs) AddEvent(ev any) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersEEs) ReplayFailedPosts(attempts int) (err error) {
eesFailedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
}
eeCfg := config.NewEventExporterCfg("ReplayFailedPosts", expEv.Format, expEv.Path,
utils.MetaNone, attempts, expEv.Opts)
var ee EventExporter
if ee, err = NewEventExporter(eeCfg, config.CgrConfig(), nil, nil); err != nil {
return
}
keyFunc := func() string { return utils.EmptyString }
if expEv.Format == utils.MetaKafkajsonMap || expEv.Format == utils.MetaS3jsonMap {
keyFunc = utils.UUIDSha1Prefix
}
for _, ev := range expEv.Events {
if err = ExportWithAttempts(context.Background(), ee, ev, keyFunc()); err != nil {
eesFailedEvents.AddEvent(ev)
}
}
ee.Close()
if len(eesFailedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
eesFailedEvents = nil
}
return
}
*/

View File

@@ -1,106 +0,0 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
/*
func TestWriteFldPosts(t *testing.T) {
// can't convert
var notanExportEvent string
writeFailedPosts("somestring", notanExportEvent)
// can convert & write
dir := "/tmp/engine/libcdre_test/"
exportEvent := &ExportEvents{
failedPostsDir: dir,
module: "module",
}
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
config.CgrConfig().GeneralCfg().FailedPostsDir = dir
writeFailedPosts("itmID", exportEvent)
if filename, err := filepath.Glob(filepath.Join(dir, "module|*.gob")); err != nil {
t.Error(err)
} else if len(filename) == 0 {
t.Error("Expecting one file")
} else if len(filename) > 1 {
t.Error("Expecting only one file")
}
}
func TestWriteToFile(t *testing.T) {
filePath := "/tmp/engine/libcdre_test/writeToFile.txt"
exportEvent := &ExportEvents{}
//call WriteToFile function
if err := exportEvent.WriteToFile(filePath); err != nil {
t.Error(err)
}
// check if the file exists / throw error if the file doesn't exist
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Fatalf("File doesn't exists")
}
//check if the file was written correctly
rcv, err := NewExportEventsFromFile(filePath)
if err != nil {
t.Errorf("Error deconding the file content: %+v", err)
}
if !reflect.DeepEqual(rcv, exportEvent) {
t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
}
//populate the exportEvent struct
exportEvent = &ExportEvents{
Events: []any{"something1", "something2"},
Path: "path",
Format: "test",
}
filePath = "/tmp/engine/libcdre_test/writeToFile2.txt"
if err := exportEvent.WriteToFile(filePath); err != nil {
t.Error(err)
}
// check if the file exists / throw error if the file doesn't exist
if _, err := os.Stat(filePath); os.IsNotExist(err) {
t.Fatalf("File doesn't exists")
}
//check if the file was written correctly
rcv, err = NewExportEventsFromFile(filePath)
if err != nil {
t.Errorf("Error deconding the file content: %+v", err)
}
if !reflect.DeepEqual(rcv, exportEvent) {
t.Errorf("Expected: %+v,\nReceived: %+v", utils.ToJSON(exportEvent), utils.ToJSON(rcv))
}
//wrong path *reading
exportEvent = &ExportEvents{}
filePath = "/tmp/engine/libcdre_test/wrongpath.txt"
if _, err = NewExportEventsFromFile(filePath); err == nil || err.Error() != "open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory" {
t.Errorf("Expecting: 'open /tmp/engine/libcdre_test/wrongpath.txt: no such file or directory',\nReceived: '%+v'", err)
}
//wrong path *writing
filePath = utils.EmptyString
if err := exportEvent.WriteToFile(filePath); err == nil || err.Error() != "open : no such file or directory" {
t.Error(err)
}
}
*/

View File

@@ -1,156 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
/*
func TestSetFldPostCacheTTL(t *testing.T) {
var1 := failedPostCache
SetFailedPostCacheTTL(50 * time.Millisecond)
var2 := failedPostCache
if reflect.DeepEqual(var1, var2) {
t.Error("Expecting to be different")
}
}
func TestAddFldPost(t *testing.T) {
SetFailedPostCacheTTL(5 * time.Second)
AddFailedPost("", "path1", "format1", "module1", "1", &config.EventExporterOpts{})
x, ok := failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast := x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut := &ExportEvents{
Path: "path1",
Format: "format1",
module: "module1",
Events: []any{"1"},
Opts: &config.EventExporterOpts{},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("", "path1", "format1", "module1", "2", &config.EventExporterOpts{})
AddFailedPost("", "path2", "format2", "module2", "3", &config.EventExporterOpts{
SQSQueueID: utils.StringPointer("qID"),
})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast = x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut = &ExportEvents{
Path: "path1",
Format: "format1",
module: "module1",
Events: []any{"1", "2"},
Opts: &config.EventExporterOpts{},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
x, ok = failedPostCache.Get(utils.ConcatenatedKey("", "path2", "format2", "module2", "qID"))
if !ok {
t.Error("Error reading from cache")
}
if x == nil {
t.Error("Received an empty element")
}
failedPost, canCast = x.(*ExportEvents)
if !canCast {
t.Error("Error when casting")
}
eOut = &ExportEvents{
Path: "path2",
Format: "format2",
module: "module2",
Events: []any{"3"},
Opts: &config.EventExporterOpts{
SQSQueueID: utils.StringPointer("qID"),
},
}
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
for _, id := range failedPostCache.GetItemIDs("") {
failedPostCache.Set(id, nil, nil)
}
}
func TestFilePath(t *testing.T) {
exportEvent := &ExportEvents{}
rcv := exportEvent.FilePath()
if rcv[0] != '|' {
t.Errorf("Expecting: '|', received: %+v", rcv[0])
} else if rcv[8:] != ".gob" {
t.Errorf("Expecting: '.gob', received: %+v", rcv[8:])
}
exportEvent = &ExportEvents{
module: "module",
}
rcv = exportEvent.FilePath()
if rcv[:7] != "module|" {
t.Errorf("Expecting: 'module|', received: %+v", rcv[:7])
} else if rcv[14:] != ".gob" {
t.Errorf("Expecting: '.gob', received: %+v", rcv[14:])
}
}
func TestSetModule(t *testing.T) {
exportEvent := &ExportEvents{}
eOut := &ExportEvents{
module: "module",
}
exportEvent.SetModule("module")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
}
}
func TestAddEvent(t *testing.T) {
exportEvent := &ExportEvents{}
eOut := &ExportEvents{Events: []any{"event1"}}
exportEvent.AddEvent("event1")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", eOut, exportEvent)
}
exportEvent = &ExportEvents{}
eOut = &ExportEvents{Events: []any{"event1", "event2", "event3"}}
exportEvent.AddEvent("event1")
exportEvent.AddEvent("event2")
exportEvent.AddEvent("event3")
if !reflect.DeepEqual(eOut, exportEvent) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(exportEvent))
}
}
*/

View File

@@ -19,8 +19,10 @@ along with this program. If not, see <http://.gnu.org/licenses/>
package efs
import (
"os"
"path"
"fmt"
"io/fs"
"path/filepath"
"slices"
"strings"
"sync"
@@ -76,14 +78,14 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts
}
case utils.Kafka:
}
var failedPost *FailedExportersLogg
var failedPost *FailedExportersLog
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
failedPost = x.(*FailedExportersLogg)
failedPost = x.(*FailedExportersLog)
}
}
if failedPost == nil {
failedPost = &FailedExportersLogg{
failedPost = &FailedExportersLog{
Path: args.Path,
Format: format,
Opts: args.APIOpts,
@@ -97,56 +99,64 @@ func (efs *EfS) V1ProcessEvent(ctx *context.Context, args *utils.ArgsFailedPosts
return nil
}
// ReplayEventsParams contains parameters for replaying failed posts.
type ReplayEventsParams struct {
Tenant string
Provider string // source of failed posts
SourcePath string // path for events to be replayed
FailedPath string // path for events that failed to replay, *none to discard, defaults to SourceDir if empty
Modules []string // list of modules to replay requests for, nil for all
}
// V1ReplayEvents will read the Events from gob files that were failed to be exported and try to re-export them again.
func (efS *EfS) V1ReplayEvents(ctx *context.Context, args *utils.ArgsReplayFailedPosts, reply *string) error {
failedPostsDir := efS.cfg.EFsCfg().FailedPostsDir
if args.FailedRequestsInDir != nil && *args.FailedRequestsInDir != utils.EmptyString {
failedPostsDir = *args.FailedRequestsInDir
func (efS *EfS) V1ReplayEvents(ctx *context.Context, args ReplayEventsParams, reply *string) error {
// Set default directories if not provided.
if args.SourcePath == "" {
args.SourcePath = efS.cfg.EFsCfg().FailedPostsDir
}
failedOutDir := failedPostsDir
if args.FailedRequestsOutDir != nil && *args.FailedRequestsOutDir != utils.EmptyString {
failedOutDir = *args.FailedRequestsOutDir
if args.FailedPath == "" {
args.FailedPath = args.SourcePath
}
// check all the files in the FailedPostsInDirectory
filesInDir, err := os.ReadDir(failedPostsDir)
if err != nil {
return err
}
if len(filesInDir) == 0 {
return utils.ErrNotFound
}
// check every file and check if any of them match the modules
for _, file := range filesInDir {
if len(args.Modules) != 0 {
var allowedModule bool
for _, module := range args.Modules {
if strings.HasPrefix(file.Name(), module) {
allowedModule = true
break
}
}
if !allowedModule {
continue
}
if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> failed to access path %s: %v", utils.EFs, path, err))
return nil // skip paths that cause an error
}
filePath := path.Join(failedPostsDir, file.Name())
var expEv FailoverPoster
if expEv, err = NewFailoverPosterFromFile(filePath, args.TypeProvider, efS); err != nil {
return err
if d.IsDir() {
return nil // skip directories
}
// check if the failed out dir path is the same as the same in dir in order to export again in case of failure
// Skip files not belonging to the specified modules.
if len(args.Modules) != 0 && !slices.ContainsFunc(args.Modules, func(mod string) bool {
return strings.HasPrefix(d.Name(), mod)
}) {
utils.Logger.Info(fmt.Sprintf("<%s> skipping file %s: not found within specified modules", utils.EFs, d.Name()))
return nil
}
expEv, err := NewFailoverPosterFromFile(path, args.Provider, efS)
if err != nil {
return fmt.Errorf("failed to init failover poster from %s: %v", path, err)
}
// Determine the failover path.
failoverPath := utils.MetaNone
if failedOutDir != utils.MetaNone {
failoverPath = path.Join(failedOutDir, file.Name())
if args.FailedPath != utils.MetaNone {
failoverPath = filepath.Join(args.FailedPath, d.Name())
}
err = expEv.ReplayFailedPosts(ctx, efS.cfg.EFsCfg().PosterAttempts, args.Tenant)
if err != nil && failedOutDir != utils.MetaNone { // Got error from HTTPPoster could be that content was not written, we need to write it ourselves
if err != nil && failoverPath != utils.MetaNone {
// Write the events that failed to be replayed to the failover directory.
if err = WriteToFile(failoverPath, expEv); err != nil {
return utils.NewErrServerError(err)
return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err)
}
}
return nil
}); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil

View File

@@ -25,6 +25,7 @@ import (
"encoding/gob"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"
@@ -35,6 +36,9 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/nats-io/nats.go"
)
var (
@@ -70,7 +74,7 @@ func TestDecodeExportEvents(t *testing.T) {
}
dec := gob.NewDecoder(bytes.NewBuffer(content))
gob.Register(new(utils.CGREvent))
singleEvent := new(FailedExportersLogg)
singleEvent := new(FailedExportersLog)
if err := dec.Decode(&singleEvent); err != nil {
t.Error(err)
} else {
@@ -183,3 +187,184 @@ func testEfsSKillEngine(t *testing.T) {
t.Error(err)
}
}
// TestEFsReplayEvents tests the implementation of the EfSv1.ReplayEvents.
func TestEFsReplayEvents(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("unsupported dbtype value")
}
failedDir := t.TempDir()
content := fmt.Sprintf(`{
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"admins": {
"enabled": true
},
"efs": {
"enabled": true,
"failed_posts_ttl": "3ms",
"poster_attempts": 1
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "nats_exporter",
"type": "*natsJSONMap",
"flags": ["*log"],
"efs_conns": ["*localhost"],
"export_path": "nats://localhost:4222",
"attempts": 1,
"failed_posts_dir": "%s",
"synchronous": true,
"opts": {
"natsSubject": "processed_cdrs",
},
"fields":[
{"tag": "TestField", "path": "*exp.TestField", "type": "*variable", "value": "~*req.TestField"},
]
}
]
}
}`, failedDir)
testEnv := engine.TestEnvironment{
ConfigJSON: content,
// LogBuffer: &bytes.Buffer{},
Encoding: *utils.Encoding,
}
// defer fmt.Println(testEnv.LogBuffer)
client, _ := testEnv.Setup(t, context.Background())
// helper to sort slices
less := func(a, b string) bool { return a < b }
// amount of events to export/replay
count := 5
t.Run("successful nats export", func(t *testing.T) {
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatalf("failed to start nats-server: %v", err)
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
if err != nil {
t.Fatalf("failed to connect to nats-server: %v", err)
}
defer nc.Drain()
ch := make(chan *nats.Msg, count)
sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
if err != nil {
t.Fatalf("failed to subscribe to nats queue: %v", err)
}
var reply map[string]map[string]any
for i := range count {
if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, &utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "test",
Event: map[string]any{
"TestField": i,
},
},
}, &reply); err != nil {
t.Errorf("EeSv1.ProcessEvent returned unexpected err: %v", err)
}
}
time.Sleep(1 * time.Millisecond) // wait for the channel to receive the replayed exports
want := make([]string, 0, count)
for i := range count {
want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i))
}
if err := sub.Unsubscribe(); err != nil {
t.Errorf("failed to unsubscribe from nats subject: %v", err)
}
close(ch)
got := make([]string, 0, count)
for elem := range ch {
got = append(got, string(elem.Data))
}
if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
}
})
t.Run("replay failed nats export", func(t *testing.T) {
t.Skip("skipping due to gob decoding err")
var exportReply map[string]map[string]any
for i := range count {
err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
&utils.CGREventWithEeIDs{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "test",
Event: map[string]any{
"TestField": i,
},
},
}, &exportReply)
if err == nil || err.Error() != utils.ErrPartiallyExecuted.Error() {
t.Errorf("EeSv1.ProcessEvent err = %v, want %v", err, utils.ErrPartiallyExecuted)
}
}
time.Sleep(5 * time.Millisecond)
replayFailedDir := t.TempDir()
var replayReply string
if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{
Tenant: "cgrates.org",
Provider: utils.EEs,
SourcePath: failedDir,
FailedPath: replayFailedDir,
Modules: []string{"test", "EEs"},
}, &replayReply); err != nil {
t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err)
}
cmd := exec.Command("nats-server")
if err := cmd.Start(); err != nil {
t.Fatalf("failed to start nats-server: %v", err)
}
time.Sleep(50 * time.Millisecond)
defer cmd.Process.Kill()
nc, err := nats.Connect("nats://localhost:4222", nats.Timeout(time.Second), nats.DrainTimeout(time.Second))
if err != nil {
t.Fatalf("failed to connect to nats-server: %v", err)
}
defer nc.Drain()
ch := make(chan *nats.Msg, count)
sub, err := nc.ChanQueueSubscribe("processed_cdrs", "", ch)
if err != nil {
t.Fatalf("failed to subscribe to nats queue: %v", err)
}
if err := client.Call(context.Background(), utils.EfSv1ReplayEvents, ReplayEventsParams{
Tenant: "cgrates.org",
Provider: utils.EEs,
SourcePath: replayFailedDir,
FailedPath: utils.MetaNone,
Modules: []string{"test", "EEs"},
}, &replayReply); err != nil {
t.Errorf("EfSv1.ReplayEvents returned unexpected err: %v", err)
}
time.Sleep(time.Millisecond) // wait for the channel to receive the replayed exports
want := make([]string, 0, count)
for i := range count {
want = append(want, fmt.Sprintf(`{"TestField":"%d"}`, i))
}
if err := sub.Unsubscribe(); err != nil {
t.Errorf("failed to unsubscribe from nats subject: %v", err)
}
close(ch)
got := make([]string, 0, count)
for elem := range ch {
got = append(got, string(elem.Data))
}
if diff := cmp.Diff(want, got, cmpopts.SortSlices(less)); diff != "" {
t.Errorf("unexpected nats messages received over channel (-want +got): \n%s", diff)
}
})
}

View File

@@ -250,7 +250,7 @@ func (expEv *FailedExportersEEs) AddEvent(ev any) {
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
eesFailedEvents := &FailedExportersEEs{
failedEvents := &FailedExportersEEs{
Path: expEv.Path,
Opts: expEv.Opts,
Format: expEv.Format,
@@ -268,14 +268,16 @@ func (expEv *FailedExportersEEs) ReplayFailedPosts(ctx *context.Context, attempt
}
for _, ev := range expEv.Events {
if err = ees.ExportWithAttempts(context.Background(), ee, ev, keyFunc(), expEv.connMngr, tnt); err != nil {
eesFailedEvents.AddEvent(ev)
failedEvents.AddEvent(ev)
}
}
ee.Close()
if len(eesFailedEvents.Events) > 0 {
err = utils.ErrPartiallyExecuted
} else {
eesFailedEvents = nil
switch len(failedEvents.Events) {
case 0: // none failed to be replayed
return nil
case len(expEv.Events): // all failed, return last encountered error
return err
default:
return utils.ErrPartiallyExecuted
}
return
}

View File

@@ -31,8 +31,8 @@ import (
"github.com/segmentio/kafka-go"
)
// FailedExportersLogg is a failover poster for kafka logger type
type FailedExportersLogg struct {
// FailedExportersLog is a failover poster for kafka logger type
type FailedExportersLog struct {
lk sync.RWMutex
Path string
Opts map[string]any // this is meta
@@ -46,50 +46,51 @@ type FailedExportersLogg struct {
}
// AddEvent adds one event
func (expEv *FailedExportersLogg) AddEvent(ev any) {
func (expEv *FailedExportersLog) AddEvent(ev any) {
expEv.lk.Lock()
defer expEv.lk.Unlock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
if fileContent, err = os.ReadFile(filePath); err != nil {
func NewExportEventsFromFile(filePath string) (*FailedExportersLog, error) {
content, err := os.ReadFile(filePath)
if err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
if err := os.Remove(filePath); err != nil {
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
err = dec.Decode(&expEv)
return
var expEv FailedExportersLog
dec := gob.NewDecoder(bytes.NewBuffer(content))
if err := dec.Decode(&expEv); err != nil {
return nil, err
}
return &expEv, nil
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) (err error) {
func (expEv *FailedExportersLog) ReplayFailedPosts(ctx *context.Context, attempts int, tnt string) error {
nodeID := utils.IfaceAsString(expEv.Opts[utils.NodeID])
logLvl, err := utils.IfaceAsInt(expEv.Opts[utils.Level])
if err != nil {
return
return err
}
expLogger := engine.NewExportLogger(ctx, nodeID, tnt, logLvl,
expEv.connMngr, expEv.cfg)
for _, event := range expEv.Events {
var content []byte
if content, err = utils.ToUnescapedJSON(event); err != nil {
return
content, err := utils.ToUnescapedJSON(event)
if err != nil {
return err
}
if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
if err := expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(utils.GenUUID()),
Value: content,
}); err != nil {
var reply string
// if there are any errors in kafka, we will post in FailedPostDirectory
if err = expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
return expEv.connMngr.Call(ctx, expEv.cfg.LoggerCfg().EFsConns, utils.EfSv1ProcessEvent,
&utils.ArgsFailedPosts{
Tenant: tnt,
Path: expLogger.Writer.Addr.String(),
@@ -97,11 +98,8 @@ func (expEv *FailedExportersLogg) ReplayFailedPosts(ctx *context.Context, attemp
FailedDir: expLogger.FldPostDir,
Module: utils.Kafka,
APIOpts: expLogger.GetMeta(),
}, &reply); err != nil {
return err
}
return nil
}, &reply)
}
}
return err
return nil
}

View File

@@ -21,6 +21,7 @@ package efs
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"os"
"path"
@@ -45,23 +46,22 @@ func SetFailedPostCacheTTL(ttl time.Duration) {
}
func writeFailedPosts(_ string, value any) {
expEv, canConvert := value.(*FailedExportersLogg)
expEv, canConvert := value.(*FailedExportersLog)
if !canConvert {
return
}
filePath := expEv.FilePath()
expEv.lk.RLock()
defer expEv.lk.RUnlock()
if err := WriteToFile(filePath, expEv); err != nil {
utils.Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
filePath, err))
expEv.lk.RUnlock()
return
}
expEv.lk.RUnlock()
}
// FilePath returns the file path it should use for saving the failed events
func (expEv *FailedExportersLogg) FilePath() string {
func (expEv *FailedExportersLog) FilePath() string {
return path.Join(expEv.FailedPostsDir, expEv.Module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
}
@@ -84,28 +84,32 @@ func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
// NewFailoverPosterFromFile returns ExportEvents from the file
// used only on replay failed post
func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPoster FailoverPoster, err error) {
var fileContent []byte
err = guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
if fileContent, err = os.ReadFile(filePath); err != nil {
return err
func NewFailoverPosterFromFile(filePath, provider string, efs *EfS) (FailoverPoster, error) {
var content []byte
err := guardian.Guardian.Guard(context.TODO(), func(_ *context.Context) error {
var readErr error
if content, readErr = os.ReadFile(filePath); readErr != nil {
return readErr
}
return os.Remove(filePath)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+filePath)
if err != nil {
return
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv := new(FailedExportersLogg)
err = dec.Decode(&expEv)
switch providerType {
dec := gob.NewDecoder(bytes.NewBuffer(content))
var expEv FailedExportersLog
if err := dec.Decode(&expEv); err != nil {
return nil, err
}
switch provider {
case utils.EEs:
opts, err := AsOptsEESConfig(expEv.Opts)
if err != nil {
return nil, err
}
failPoster = &FailedExportersEEs{
return &FailedExportersEEs{
module: expEv.Module,
failedPostsDir: expEv.FailedPostsDir,
Path: expEv.Path,
@@ -114,11 +118,12 @@ func NewFailoverPosterFromFile(filePath, providerType string, efs *EfS) (failPos
Format: expEv.Format,
connMngr: efs.connMgr,
}
}, nil
case utils.Kafka:
expEv.cfg = efs.cfg
expEv.connMngr = efs.connMgr
failPoster = expEv
return &expEv, nil
default:
return nil, errors.New("invalid provider")
}
return
}

1
go.mod
View File

@@ -30,6 +30,7 @@ require (
github.com/fiorix/go-diameter/v4 v4.0.4
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.8.1
github.com/google/go-cmp v0.6.0
github.com/mediocregopher/radix/v3 v3.8.1
github.com/miekg/dns v1.1.62
github.com/nats-io/nats-server/v2 v2.10.18

View File

@@ -823,14 +823,6 @@ type ArgsFailedPosts struct {
APIOpts map[string]any // Specially for the meta
}
type ArgsReplayFailedPosts struct {
Tenant string
TypeProvider string
FailedRequestsInDir *string // if defined it will be our source of requests to be replayed
FailedRequestsOutDir *string // if defined it will become our destination for files failing to be replayed, *none to be discarded
Modules []string // list of modules for which replay the requests, nil for all
}
// GetIndexesArg the API argumets to specify an index
type GetIndexesArg struct {
IdxItmType string

View File

@@ -1,178 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
/*
var failedPostCache *ltcache.Cache
func init() {
failedPostCache = ltcache.NewCache(-1, 5*time.Second, true, writeFailedPosts)
}
// SetFailedPostCacheTTL recreates the failed cache
func SetFailedPostCacheTTL(ttl time.Duration) {
failedPostCache = ltcache.NewCache(-1, ttl, true, writeFailedPosts)
}
func writeFailedPosts(_ string, value any) {
expEv, canConvert := value.(*FailedExportersLogg)
if !canConvert {
return
}
filePath := expEv.FilePath()
expEv.lk.RLock()
if err := WriteToFile(filePath, expEv); err != nil {
Logger.Warning(fmt.Sprintf("Unable to write failed post to file <%s> because <%s>",
filePath, err))
expEv.lk.RUnlock()
return
}
expEv.lk.RUnlock()
}
// FilePath returns the file path it should use for saving the failed events
func (expEv *FailedExportersLogg) FilePath() string {
return path.Join(expEv.FailedPostsDir, expEv.Module+PipeSep+UUIDSha1Prefix()+GOBSuffix)
}
// WriteToFile writes the events to file
func WriteToFile(filePath string, expEv FailoverPoster) (err error) {
fileOut, err := os.Create(filePath)
if err != nil {
return err
}
encd := gob.NewEncoder(fileOut)
gob.Register(new(CGREvent))
err = encd.Encode(expEv)
fileOut.Close()
return
}
type FailedExportersLogg struct {
lk sync.RWMutex
Path string
Opts map[string]any // THIS WILL BE META
Format string
Events []any
FailedPostsDir string
Module string
}
func AddFailedMessage(failedPostsDir, expPath, format,
module string, ev any, opts map[string]any) {
key := ConcatenatedKey(failedPostsDir, expPath, format, module)
switch module {
case EEs:
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
var amqpQueueID string
var s3BucketID string
var sqsQueueID string
var kafkaTopic string
if _, has := opts[AMQPQueueID]; has {
amqpQueueID = IfaceAsString(opts[AMQPQueueID])
}
if _, has := opts[S3Bucket]; has {
s3BucketID = IfaceAsString(opts[S3Bucket])
}
if _, has := opts[SQSQueueID]; has {
sqsQueueID = IfaceAsString(opts[SQSQueueID])
}
if _, has := opts[kafkaTopic]; has {
kafkaTopic = IfaceAsString(opts[KafkaTopic])
}
if qID := FirstNonEmpty(amqpQueueID, s3BucketID,
sqsQueueID, kafkaTopic); len(qID) != 0 {
key = ConcatenatedKey(key, qID)
}
case Kafka:
}
var failedPost *FailedExportersLogg
if x, ok := failedPostCache.Get(key); ok {
if x != nil {
failedPost = x.(*FailedExportersLogg)
}
}
if failedPost == nil {
failedPost = &FailedExportersLogg{
Path: expPath,
Format: format,
Opts: opts,
Module: module,
FailedPostsDir: failedPostsDir,
}
failedPostCache.Set(key, failedPost, nil)
}
failedPost.AddEvent(ev)
}
// AddEvent adds one event
func (expEv *FailedExportersLogg) AddEvent(ev any) {
expEv.lk.Lock()
expEv.Events = append(expEv.Events, ev)
expEv.lk.Unlock()
}
// NewExportEventsFromFile returns ExportEvents from the file
// used only on replay failed post
func NewExportEventsFromFile(filePath string) (expEv *FailedExportersLogg, err error) {
var fileContent []byte
if fileContent, err = os.ReadFile(filePath); err != nil {
return nil, err
}
if err = os.Remove(filePath); err != nil {
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(fileContent))
// unmarshall it
expEv = new(FailedExportersLogg)
err = dec.Decode(&expEv)
return
}
type FailoverPoster interface {
ReplayFailedPosts(int, string) error
}
// ReplayFailedPosts tryies to post cdrs again
func (expEv *FailedExportersLogg) ReplayFailedPosts(attempts int, tnt string) (err error) {
nodeID := IfaceAsString(expEv.Opts[NodeID])
logLvl, err := IfaceAsInt(expEv.Opts[Level])
if err != nil {
return
}
expLogger := NewExportLogger(nodeID, tnt, logLvl,
expEv.Path, expEv.Format, attempts, expEv.FailedPostsDir)
for _, event := range expEv.Events {
var content []byte
if content, err = ToUnescapedJSON(event); err != nil {
return
}
if err = expLogger.Writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(GenUUID()),
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
AddFailedMessage(expLogger.FldPostDir, expLogger.Writer.Addr.String(), MetaKafkaLog, Kafka,
event, expLogger.GetMeta())
return nil
}
}
return err
}
*/