diff --git a/data/conf/samples/efs/efs_internal/cgrates.json b/data/conf/samples/efs/efs_internal/cgrates.json
index 7a2d4f989..bc9ec1856 100644
--- a/data/conf/samples/efs/efs_internal/cgrates.json
+++ b/data/conf/samples/efs/efs_internal/cgrates.json
@@ -1,7 +1,8 @@
{
"logger": {
"type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
- "level": 6, // system level precision for floats
+ "level": 6, // system level precision for floats
+ "efs_conns": ["*localhost"],
"opts": {
"kafka_conn": "unavailable_conn:9092", // the connection trough kafka
"kafka_topic": "TutorialTopic", // the topic from where the events are exported
@@ -15,7 +16,6 @@
"efs": {
"enabled": true,
- //"efs_conns": ["*localhost"]
},
"admins": {
diff --git a/efs/efs_it_test.go b/efs/efs_it_test.go
index 292e5af3d..af0de542b 100644
--- a/efs/efs_it_test.go
+++ b/efs/efs_it_test.go
@@ -22,6 +22,8 @@ along with this program. If not, see
package efs
import (
+ "bytes"
+ "encoding/gob"
"flag"
"fmt"
"os"
@@ -53,10 +55,38 @@ var (
testEfsInitDataDb,
testEfsStartEngine,
testEfSRPCConn,
+ //testEfsProcessEvent,
testEfsSKillEngine,
}
)
+func TestDecodeExportEvents(t *testing.T) {
+ dirPath := "/var/spool/cgrates/failed_posts"
+ filesInDir, err := os.ReadDir(dirPath)
+ if err != nil {
+ t.Error(err)
+ }
+ for _, file := range filesInDir {
+ content, err := os.ReadFile(path.Join(dirPath, file.Name()))
+ if err != nil {
+ t.Error(err)
+ }
+ dec := gob.NewDecoder(bytes.NewBuffer(content))
+ gob.Register(new(utils.CGREvent))
+ singleEvent := new(FailedExportersLogg)
+ if err := dec.Decode(&singleEvent); err != nil {
+ t.Error(err)
+ } else {
+ strContent, err := utils.ToUnescapedJSON(singleEvent)
+ if err != nil {
+ t.Error(err)
+ }
+ fmt.Printf("singleEvent: %v \n", string(strContent))
+ }
+ }
+
+}
+
func TestEfS(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
@@ -103,7 +133,6 @@ func testEfsInitDataDb(t *testing.T) {
// Start CGR Engine
func testEfsStartEngine(t *testing.T) {
- fmt.Println(efsCfgPath)
if _, err := engine.StopStartEngine(efsCfgPath, 100); err != nil {
t.Fatal(err)
}
@@ -113,12 +142,41 @@ func testEfSRPCConn(t *testing.T) {
var err error
efsRpc, err = jsonrpc.Dial(utils.TCP, efsCfg.ListenCfg().RPCJSONListen)
if err != nil {
-
t.Fatal(err)
}
}
-//Kill the engine when it is about to be finished
+func testEfsProcessEvent(t *testing.T) {
+ args := &utils.ArgsFailedPosts{
+ Tenant: "cgrates.org",
+ Path: "localhost:9092",
+ Event: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ Event: map[string]interface{}{
+ utils.AccountField: "1002",
+ utils.Destination: "1003",
+ },
+ },
+ FailedDir: "/var/spool/cgrates/failed_posts",
+ Module: utils.Kafka,
+ APIOpts: map[string]interface{}{
+ utils.Level: efsCfg.LoggerCfg().Level,
+ utils.Format: "TutorialTopic",
+ utils.Conn: "localhost:9092",
+ utils.FailedPostsDir: "/var/spool/cgrates/failed_posts",
+ utils.Attempts: efsCfg.LoggerCfg().Opts.KafkaAttempts,
+ },
+ }
+ var reply string
+ if err := efsRpc.Call(context.Background(), utils.EfSv1ProcessEvent,
+ args, &reply); err != nil {
+ t.Error(err)
+ } else if reply != utils.OK {
+ t.Errorf("Unexpected reply returned")
+ }
+}
+
+// Kill the engine when it is about to be finished
func testEfsSKillEngine(t *testing.T) {
time.Sleep(7 * time.Second)
if err := engine.KillEngine(100); err != nil {
diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go
index a0ddf520b..83cd43f5d 100644
--- a/engine/kafka_logger.go
+++ b/engine/kafka_logger.go
@@ -105,22 +105,24 @@ func (el *ExportLogger) call(m string, level int) (err error) {
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
- args := &utils.ArgsFailedPosts{
- Tenant: el.Tenant,
- Path: el.Writer.Addr.String(),
- Event: eventExport,
- FailedDir: el.FldPostDir,
- Module: utils.Kafka,
- APIOpts: el.GetMeta(),
- }
- var reply string
- if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
- utils.EfSv1ProcessEvent, args, &reply); err != nil {
- log.Printf("err la sefprocessEvent: %v", err)
- /* utils.Logger.Warning(
- fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
- utils.Logger, utils.EFs, err.Error())) */
- }
+ go func() {
+ args := &utils.ArgsFailedPosts{
+ Tenant: el.Tenant,
+ Path: el.Writer.Addr.String(),
+ Event: eventExport,
+ FailedDir: el.FldPostDir,
+ Module: utils.Kafka,
+ APIOpts: el.GetMeta(),
+ }
+ var reply string
+ if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
+ utils.EfSv1ProcessEvent, args, &reply); err != nil {
+ log.Printf("err la sefprocessEvent: %v", err)
+ /* utils.Logger.Warning(
+ fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
+ utils.Logger, utils.EFs, err.Error())) */
+ }
+ }()
// also the content should be printed as a stdout logger type
return utils.ErrLoggerChanged
}
diff --git a/services/cgr-engine.go b/services/cgr-engine.go
index 42c1f0c62..4bd73f07a 100644
--- a/services/cgr-engine.go
+++ b/services/cgr-engine.go
@@ -23,6 +23,7 @@ import (
"io"
"os"
"path"
+ "runtime"
"runtime/pprof"
"sync"
"time"
@@ -370,7 +371,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
- //utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version()))
+ utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version()))
cgr.cfg.LazySanityCheck()
return cgr.InitServices(*flags.HttpPrfPath, cpuPrfF, *flags.MemPrfDir, memPrfStop)