concurrency efs

This commit is contained in:
adi
2022-08-08 14:48:43 +03:00
committed by Dan Christian Bogos
parent 4c9b5b5cbe
commit 97d14b2801
4 changed files with 83 additions and 22 deletions

View File

@@ -1,7 +1,8 @@
{
"logger": {
"type": "*kafkaLog", // controls the destination of logs <*syslog|*stdout|*kafka>
"level": 6, // system level precision for floats
"level": 6, // system level precision for floats
"efs_conns": ["*localhost"],
"opts": {
"kafka_conn": "unavailable_conn:9092", // the connection trough kafka
"kafka_topic": "TutorialTopic", // the topic from where the events are exported
@@ -15,7 +16,6 @@
"efs": {
"enabled": true,
//"efs_conns": ["*localhost"]
},
"admins": {

View File

@@ -22,6 +22,8 @@ along with this program. If not, see <http://.gnu.org/licenses/>
package efs
import (
"bytes"
"encoding/gob"
"flag"
"fmt"
"os"
@@ -53,10 +55,38 @@ var (
testEfsInitDataDb,
testEfsStartEngine,
testEfSRPCConn,
//testEfsProcessEvent,
testEfsSKillEngine,
}
)
func TestDecodeExportEvents(t *testing.T) {
dirPath := "/var/spool/cgrates/failed_posts"
filesInDir, err := os.ReadDir(dirPath)
if err != nil {
t.Error(err)
}
for _, file := range filesInDir {
content, err := os.ReadFile(path.Join(dirPath, file.Name()))
if err != nil {
t.Error(err)
}
dec := gob.NewDecoder(bytes.NewBuffer(content))
gob.Register(new(utils.CGREvent))
singleEvent := new(FailedExportersLogg)
if err := dec.Decode(&singleEvent); err != nil {
t.Error(err)
} else {
strContent, err := utils.ToUnescapedJSON(singleEvent)
if err != nil {
t.Error(err)
}
fmt.Printf("singleEvent: %v \n", string(strContent))
}
}
}
func TestEfS(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
@@ -103,7 +133,6 @@ func testEfsInitDataDb(t *testing.T) {
// Start CGR Engine
func testEfsStartEngine(t *testing.T) {
fmt.Println(efsCfgPath)
if _, err := engine.StopStartEngine(efsCfgPath, 100); err != nil {
t.Fatal(err)
}
@@ -113,12 +142,41 @@ func testEfSRPCConn(t *testing.T) {
var err error
efsRpc, err = jsonrpc.Dial(utils.TCP, efsCfg.ListenCfg().RPCJSONListen)
if err != nil {
t.Fatal(err)
}
}
//Kill the engine when it is about to be finished
func testEfsProcessEvent(t *testing.T) {
args := &utils.ArgsFailedPosts{
Tenant: "cgrates.org",
Path: "localhost:9092",
Event: &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]interface{}{
utils.AccountField: "1002",
utils.Destination: "1003",
},
},
FailedDir: "/var/spool/cgrates/failed_posts",
Module: utils.Kafka,
APIOpts: map[string]interface{}{
utils.Level: efsCfg.LoggerCfg().Level,
utils.Format: "TutorialTopic",
utils.Conn: "localhost:9092",
utils.FailedPostsDir: "/var/spool/cgrates/failed_posts",
utils.Attempts: efsCfg.LoggerCfg().Opts.KafkaAttempts,
},
}
var reply string
if err := efsRpc.Call(context.Background(), utils.EfSv1ProcessEvent,
args, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply returned")
}
}
// Kill the engine when it is about to be finished
func testEfsSKillEngine(t *testing.T) {
time.Sleep(7 * time.Second)
if err := engine.KillEngine(100); err != nil {

View File

@@ -105,22 +105,24 @@ func (el *ExportLogger) call(m string, level int) (err error) {
Value: content,
}); err != nil {
// if there are any errors in kafka, we will post in FailedPostDirectory
args := &utils.ArgsFailedPosts{
Tenant: el.Tenant,
Path: el.Writer.Addr.String(),
Event: eventExport,
FailedDir: el.FldPostDir,
Module: utils.Kafka,
APIOpts: el.GetMeta(),
}
var reply string
if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
log.Printf("err la sefprocessEvent: %v", err)
/* utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
utils.Logger, utils.EFs, err.Error())) */
}
go func() {
args := &utils.ArgsFailedPosts{
Tenant: el.Tenant,
Path: el.Writer.Addr.String(),
Event: eventExport,
FailedDir: el.FldPostDir,
Module: utils.Kafka,
APIOpts: el.GetMeta(),
}
var reply string
if err = el.connMgr.Call(el.ctx, el.cfg.LoggerCfg().EFsConns,
utils.EfSv1ProcessEvent, args, &reply); err != nil {
log.Printf("err la sefprocessEvent: %v", err)
/* utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter could not writte failed event with <%s> service because err: <%s>",
utils.Logger, utils.EFs, err.Error())) */
}
}()
// also the content should be printed as a stdout logger type
return utils.ErrLoggerChanged
}

View File

@@ -23,6 +23,7 @@ import (
"io"
"os"
"path"
"runtime"
"runtime/pprof"
"sync"
"time"
@@ -370,7 +371,7 @@ func (cgr *CGREngine) Init(ctx *context.Context, shtDw context.CancelFunc, flags
return fmt.Errorf("Could not initialize syslog connection, err: <%s>", err)
}
efs.SetFailedPostCacheTTL(cgr.cfg.EFsCfg().FailedPostsTTL) // init failedPosts to posts loggers/exporters in case of failing
//utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, runtime.Version()))
cgr.cfg.LazySanityCheck()
return cgr.InitServices(*flags.HttpPrfPath, cpuPrfF, *flags.MemPrfDir, memPrfStop)