From 97d14b280182e147589f500f50200bd84a478643 Mon Sep 17 00:00:00 2001 From: adi Date: Mon, 8 Aug 2022 14:48:43 +0300 Subject: [PATCH] concurrency efs --- .../samples/efs/efs_internal/cgrates.json | 4 +- efs/efs_it_test.go | 64 ++++++++++++++++++- engine/kafka_logger.go | 34 +++++----- services/cgr-engine.go | 3 +- 4 files changed, 83 insertions(+), 22 deletions(-) diff --git a/data/conf/samples/efs/efs_internal/cgrates.json b/data/conf/samples/efs/efs_internal/cgrates.json index 7a2d4f989..bc9ec1856 100644 --- a/data/conf/samples/efs/efs_internal/cgrates.json +++ b/data/conf/samples/efs/efs_internal/cgrates.json @@ -1,7 +1,8 @@ { "logger": { "type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka> - "level": 6, // system level precision for floats + "level": 6, // system level precision for floats + "efs_conns": ["*localhost"], "opts": { "kafka_conn": "unavailable_conn:9092", // the connection trough kafka "kafka_topic": "TutorialTopic", // the topic from where the events are exported @@ -15,7 +16,6 @@ "efs": { "enabled": true, - //"efs_conns": ["*localhost"] }, "admins": { diff --git a/efs/efs_it_test.go b/efs/efs_it_test.go index 292e5af3d..af0de542b 100644 --- a/efs/efs_it_test.go +++ b/efs/efs_it_test.go @@ -22,6 +22,8 @@ along with this program. If not, see package efs import ( + "bytes" + "encoding/gob" "flag" "fmt" "os" @@ -53,10 +55,38 @@ var ( testEfsInitDataDb, testEfsStartEngine, testEfSRPCConn, + //testEfsProcessEvent, testEfsSKillEngine, } ) +func TestDecodeExportEvents(t *testing.T) { + dirPath := "/var/spool/cgrates/failed_posts" + filesInDir, err := os.ReadDir(dirPath) + if err != nil { + t.Error(err) + } + for _, file := range filesInDir { + content, err := os.ReadFile(path.Join(dirPath, file.Name())) + if err != nil { + t.Error(err) + } + dec := gob.NewDecoder(bytes.NewBuffer(content)) + gob.Register(new(utils.CGREvent)) + singleEvent := new(FailedExportersLogg) + if err := dec.Decode(&singleEvent); err != nil { + t.Error(err) + } else { + strContent, err := utils.ToUnescapedJSON(singleEvent) + if err != nil { + t.Error(err) + } + fmt.Printf("singleEvent: %v \n", string(strContent)) + } + } + +} + func TestEfS(t *testing.T) { switch *dbType { case utils.MetaInternal: @@ -103,7 +133,6 @@ func testEfsInitDataDb(t *testing.T) { // Start CGR Engine func testEfsStartEngine(t *testing.T) { - fmt.Println(efsCfgPath) if _, err := engine.StopStartEngine(efsCfgPath, 100); err != nil { t.Fatal(err) } @@ -113,12 +142,41 @@ func testEfSRPCConn(t *testing.T) { var err error efsRpc, err = jsonrpc.Dial(utils.TCP, efsCfg.ListenCfg().RPCJSONListen) if err != nil { - t.Fatal(err) } } -//Kill the engine when it is about to be finished +func testEfsProcessEvent(t *testing.T) { + args := &utils.ArgsFailedPosts{ + Tenant: "cgrates.org", + Path: "localhost:9092", + Event: &utils.CGREvent{ + Tenant: "cgrates.org", + Event: map[string]interface{}{ + utils.AccountField: "1002", + utils.Destination: "1003", + }, + }, + FailedDir: "/var/spool/cgrates/failed_posts", + Module: utils.Kafka, + APIOpts: map[string]interface{}{ + utils.Level: efsCfg.LoggerCfg().Level, + utils.Format: "TutorialTopic", + utils.Conn: "localhost:9092", + utils.FailedPostsDir: "/var/spool/cgrates/failed_posts", + utils.Attempts: efsCfg.LoggerCfg().Opts.KafkaAttempts, + }, + } + var reply string + if err := efsRpc.Call(context.Background(), utils.EfSv1ProcessEvent, + args, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Unexpected reply returned") + } +} + +// Kill the engine when it is about to be finished func testEfsSKillEngine(t *testing.T) { time.Sleep(7 * time.Second) if err := engine.KillEngine(100); err != nil { diff --git a/engine/kafka_logger.go b/engine/kafka_logger.go index a0ddf520b..83cd43f5d 100644 --- a/engine/kafka_logger.go +++ b/engine/kafka_logger.go @@ -105,22 +105,24 @@ func (el *ExportLogger) call(m string, level int) (err error) { Value: content, }); err != nil { // if there are any errors in kafka, we will post in FailedPostDirectory - args := &utils.ArgsFailedPosts{ - Tenant: el.Tenant, - Path: el.Writer.Addr.String(), - Event: eventExport, - FailedDir: el.FldPostDir, - Module: utils.Kafka, - APIOpts: el.GetMeta(), - } - var reply string - if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns, - utils.EfSv1ProcessEvent, args, &reply); err != nil { - log.Printf("err la sefprocessEvent: %v", err) - /* utils.Logger.Warning( - fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>", - utils.Logger, utils.EFs, err.Error())) */ - } + go func() { + args := &utils.ArgsFailedPosts{ + Tenant: el.Tenant, + Path: el.Writer.Addr.String(), + Event: eventExport, + FailedDir: el.FldPostDir, + Module: utils.Kafka, + APIOpts: el.GetMeta(), + } + var reply string + if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns, + utils.EfSv1ProcessEvent, args, &reply); err != nil { + log.Printf("err la sefprocessEvent: %v", err) + /* utils.Logger.Warning( + fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>", + utils.Logger, utils.EFs, err.Error())) */ + } + }() // also the content should be printed as a stdout logger type return utils.ErrLoggerChanged } diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 42c1f0c62..4bd73f07a 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -23,6 +23,7 @@ import ( "io" "os" "path" + "runtime" "runtime/pprof" "sync" "time" @@ -370,7 +371,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err) } efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing - //utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) + utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, runtime.Version())) cgr.cfg.LazySanityCheck() return cgr.InitServices(*flags.HttpPrfPath, cpuPrfF, *flags.MemPrfDir, memPrfStop)