mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 13:19:53 +05:00
Add blocker for exporters + tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
75b38e24e0
commit
fb354a04a4
@@ -556,6 +556,7 @@ const CGRATES_CFG_JSON = `
|
||||
"attribute_ids": [], // select Attribute profiles instead of discovering them
|
||||
"attribute_context": "", // context used to discover matching Attribute profiles
|
||||
"synchronous": false, // block processing until export has a result
|
||||
"blocker": false, // stops the processing of the following exporters
|
||||
"attempts": 1, // export attempts
|
||||
"fields":[], // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
|
||||
|
||||
@@ -208,6 +208,7 @@ type EventExporterCfg struct {
|
||||
AttributeSIDs []string // selective AttributeS profiles
|
||||
AttributeSCtx string // context to use when querying AttributeS
|
||||
Synchronous bool
|
||||
Blocker bool
|
||||
Attempts int
|
||||
FailedPostsDir string
|
||||
ConcurrentRequests int
|
||||
@@ -421,6 +422,9 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe
|
||||
if jsnEec.Synchronous != nil {
|
||||
eeC.Synchronous = *jsnEec.Synchronous
|
||||
}
|
||||
if jsnEec.Blocker != nil {
|
||||
eeC.Blocker = *jsnEec.Blocker
|
||||
}
|
||||
if jsnEec.Attempts != nil {
|
||||
eeC.Attempts = *jsnEec.Attempts
|
||||
}
|
||||
@@ -641,6 +645,7 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) {
|
||||
Flags: eeC.Flags.Clone(),
|
||||
AttributeSCtx: eeC.AttributeSCtx,
|
||||
Synchronous: eeC.Synchronous,
|
||||
Blocker: eeC.Blocker,
|
||||
Attempts: eeC.Attempts,
|
||||
ConcurrentRequests: eeC.ConcurrentRequests,
|
||||
Fields: make([]*FCTemplate, len(eeC.Fields)),
|
||||
@@ -835,6 +840,7 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
|
||||
utils.AttributeContextCfg: eeC.AttributeSCtx,
|
||||
utils.AttributeIDsCfg: eeC.AttributeSIDs,
|
||||
utils.SynchronousCfg: eeC.Synchronous,
|
||||
utils.BlockerCfg: eeC.Blocker,
|
||||
utils.AttemptsCfg: eeC.Attempts,
|
||||
utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests,
|
||||
utils.FailedPostsDirCfg: eeC.FailedPostsDir,
|
||||
@@ -913,6 +919,7 @@ type EventExporterJsonCfg struct {
|
||||
Attribute_ids *[]string
|
||||
Attribute_context *string
|
||||
Synchronous *bool
|
||||
Blocker *bool
|
||||
Attempts *int
|
||||
Concurrent_requests *int
|
||||
Failed_posts_dir *string
|
||||
@@ -1341,6 +1348,9 @@ func diffEventExporterJsonCfg(d *EventExporterJsonCfg, v1, v2 *EventExporterCfg,
|
||||
if v1.Synchronous != v2.Synchronous {
|
||||
d.Synchronous = utils.BoolPointer(v2.Synchronous)
|
||||
}
|
||||
if v1.Blocker != v2.Blocker {
|
||||
d.Blocker = utils.BoolPointer(v2.Blocker)
|
||||
}
|
||||
if v1.Attempts != v2.Attempts {
|
||||
d.Attempts = utils.IntPointer(v2.Attempts)
|
||||
}
|
||||
|
||||
48
data/conf/samples/ees_blocker_mysql/cgrates.json
Normal file
48
data/conf/samples/ees_blocker_mysql/cgrates.json
Normal file
@@ -0,0 +1,48 @@
|
||||
{
|
||||
|
||||
"general": {
|
||||
"log_level": 7
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
"data_db": {
|
||||
"db_type": "redis",
|
||||
"db_port": 6379,
|
||||
"db_name": "10"
|
||||
},
|
||||
|
||||
"ees": {
|
||||
"enabled": true,
|
||||
"cache": {
|
||||
"*fileCSV": {"limit": -1, "ttl": "500ms", "static_ttl": false}
|
||||
},
|
||||
"exporters": [
|
||||
{
|
||||
"id": "CSVExporter1",
|
||||
"type": "*fileCSV",
|
||||
"export_path": "/tmp/CSVFile1",
|
||||
"attempts": 1,
|
||||
"blocker": true,
|
||||
"field_separator": ","
|
||||
},
|
||||
{
|
||||
"id": "CSVExporter2",
|
||||
"type": "*fileCSV",
|
||||
"export_path": "/tmp/CSVFile2",
|
||||
"attempts": 1,
|
||||
"field_separator": ","
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
|
||||
"admins": {
|
||||
"enabled": true
|
||||
}
|
||||
|
||||
}
|
||||
@@ -122,8 +122,6 @@ func (eeS *EeS) attrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent, a
|
||||
return
|
||||
}
|
||||
|
||||
// V1ProcessEvent will be called each time a new event is received from readers
|
||||
// rply -> map[string]map[string]interface{}
|
||||
func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) {
|
||||
eeS.cfg.RLocks(config.EEsJSON)
|
||||
defer eeS.cfg.RUnlocks(config.EEsJSON)
|
||||
@@ -192,7 +190,7 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
|
||||
} else {
|
||||
ctx = context.Background() // is async so lose the API context
|
||||
}
|
||||
// log the message before starting the gorutine, but still execute the exporter
|
||||
// log the message before starting the goroutine, but still execute the exporter
|
||||
if hasVerbose && !eeCfg.Synchronous {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false",
|
||||
@@ -211,6 +209,9 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe
|
||||
wg.Done()
|
||||
}
|
||||
}(!hasCache, eeCfg.Synchronous, ee)
|
||||
if eeCfg.Blocker {
|
||||
break
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
if withErr {
|
||||
|
||||
157
general_tests/ees_blocker_it_test.go
Normal file
157
general_tests/ees_blocker_it_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package general_tests
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/birpc"
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
var (
|
||||
eeSBlockerFiles = []string{"/tmp/CSVFile1", "/tmp/CSVFile2"}
|
||||
eesBlockerCfgPath string
|
||||
eesBlockerCfg *config.CGRConfig
|
||||
eesBlockerRPC *birpc.Client
|
||||
eesBlockerConfDIR string //run tests for specific configuration
|
||||
|
||||
eesBlockerTests = []func(t *testing.T){
|
||||
testEEsBlockerCreateFiles,
|
||||
testEEsBlockerLoadConfig,
|
||||
testEEsBlockerInitDataDB,
|
||||
testEEsBlockerStartEngine,
|
||||
testEEsBlockerRpcConn,
|
||||
testEEsBlockerExportEvent,
|
||||
testEEsBlockerVerifyExport,
|
||||
testEEsBlockerStopEngine,
|
||||
testEEsBlockerDeleteFiles,
|
||||
}
|
||||
)
|
||||
|
||||
// Test start here
|
||||
func TestEEsBlocker(t *testing.T) {
|
||||
switch *dbType {
|
||||
case utils.MetaInternal:
|
||||
// eesBlockerConfDIR = "ees_blocker_internal"
|
||||
t.SkipNow()
|
||||
case utils.MetaMySQL:
|
||||
eesBlockerConfDIR = "ees_blocker_mysql"
|
||||
case utils.MetaMongo:
|
||||
// eesBlockerConfDIR = "ees_blocker_mongo"
|
||||
t.SkipNow()
|
||||
case utils.MetaPostgres:
|
||||
t.SkipNow()
|
||||
default:
|
||||
t.Fatal("Unknown Database type")
|
||||
}
|
||||
for _, stest := range eesBlockerTests {
|
||||
t.Run(eesBlockerConfDIR, stest)
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerCreateFiles(t *testing.T) {
|
||||
for _, dir := range eeSBlockerFiles {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerDeleteFiles(t *testing.T) {
|
||||
for _, dir := range eeSBlockerFiles {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerLoadConfig(t *testing.T) {
|
||||
var err error
|
||||
eesBlockerCfgPath = path.Join(*dataDir, "conf", "samples", eesBlockerConfDIR)
|
||||
if eesBlockerCfg, err = config.NewCGRConfigFromPath(context.Background(), eesBlockerCfgPath); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerInitDataDB(t *testing.T) {
|
||||
if err := engine.InitDataDB(eesBlockerCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerStartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(eesBlockerCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerRpcConn(t *testing.T) {
|
||||
var err error
|
||||
eesBlockerRPC, err = newRPCClient(eesBlockerCfg.ListenCfg())
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerExportEvent(t *testing.T) {
|
||||
exportedEvent := &utils.CGREventWithEeIDs{
|
||||
CGREvent: &utils.CGREvent{
|
||||
Tenant: "cgrates.org",
|
||||
ID: "EEsProcessEvent",
|
||||
Event: map[string]interface{}{
|
||||
"TestCase": "EEsBlockerBehaviour",
|
||||
},
|
||||
APIOpts: map[string]interface{}{},
|
||||
},
|
||||
}
|
||||
|
||||
var reply map[string]map[string]interface{}
|
||||
if err := eesBlockerRPC.Call(context.Background(), utils.EeSv1ProcessEvent, exportedEvent, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerVerifyExport(t *testing.T) {
|
||||
for i, dir := range eeSBlockerFiles {
|
||||
if files, err := os.ReadDir(dir); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if i == 0 && len(files) != 1 {
|
||||
t.Errorf("expected to find only 1 file, received <%d>", len(files))
|
||||
} else if i == 1 && len(files) != 0 {
|
||||
t.Errorf("expected to find 0 files, received <%d>", len(files))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testEEsBlockerStopEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user