diff --git a/config/config_defaults.go b/config/config_defaults.go index 8d54befa6..a5a2b3d19 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -556,6 +556,7 @@ const CGRATES_CFG_JSON = ` "attribute_ids": [], // select Attribute profiles instead of discovering them "attribute_context": "", // context used to discover matching Attribute profiles "synchronous": false, // block processing until export has a result + "blocker": false, // stops the processing of the following exporters "attempts": 1, // export attempts "fields":[], // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value "failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests diff --git a/config/eescfg.go b/config/eescfg.go index 07fa4fa04..1487ced7c 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -208,6 +208,7 @@ type EventExporterCfg struct { AttributeSIDs []string // selective AttributeS profiles AttributeSCtx string // context to use when querying AttributeS Synchronous bool + Blocker bool Attempts int FailedPostsDir string ConcurrentRequests int @@ -421,6 +422,9 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe if jsnEec.Synchronous != nil { eeC.Synchronous = *jsnEec.Synchronous } + if jsnEec.Blocker != nil { + eeC.Blocker = *jsnEec.Blocker + } if jsnEec.Attempts != nil { eeC.Attempts = *jsnEec.Attempts } @@ -641,6 +645,7 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { Flags: eeC.Flags.Clone(), AttributeSCtx: eeC.AttributeSCtx, Synchronous: eeC.Synchronous, + Blocker: eeC.Blocker, Attempts: eeC.Attempts, ConcurrentRequests: eeC.ConcurrentRequests, Fields: make([]*FCTemplate, len(eeC.Fields)), @@ -835,6 +840,7 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str utils.AttributeContextCfg: eeC.AttributeSCtx, utils.AttributeIDsCfg: eeC.AttributeSIDs, utils.SynchronousCfg: eeC.Synchronous, + utils.BlockerCfg: eeC.Blocker, utils.AttemptsCfg: eeC.Attempts, utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests, utils.FailedPostsDirCfg: eeC.FailedPostsDir, @@ -913,6 +919,7 @@ type EventExporterJsonCfg struct { Attribute_ids *[]string Attribute_context *string Synchronous *bool + Blocker *bool Attempts *int Concurrent_requests *int Failed_posts_dir *string @@ -1341,6 +1348,9 @@ func diffEventExporterJsonCfg(d *EventExporterJsonCfg, v1, v2 *EventExporterCfg, if v1.Synchronous != v2.Synchronous { d.Synchronous = utils.BoolPointer(v2.Synchronous) } + if v1.Blocker != v2.Blocker { + d.Blocker = utils.BoolPointer(v2.Blocker) + } if v1.Attempts != v2.Attempts { d.Attempts = utils.IntPointer(v2.Attempts) } diff --git a/data/conf/samples/ees_blocker_mysql/cgrates.json b/data/conf/samples/ees_blocker_mysql/cgrates.json new file mode 100644 index 000000000..d3f747b1c --- /dev/null +++ b/data/conf/samples/ees_blocker_mysql/cgrates.json @@ -0,0 +1,48 @@ +{ + + "general": { + "log_level": 7 + }, + + "listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080" + }, + + "data_db": { + "db_type": "redis", + "db_port": 6379, + "db_name": "10" + }, + + "ees": { + "enabled": true, + "cache": { + "*fileCSV": {"limit": -1, "ttl": "500ms", "static_ttl": false} + }, + "exporters": [ + { + "id": "CSVExporter1", + "type": "*fileCSV", + "export_path": "/tmp/CSVFile1", + "attempts": 1, + "blocker": true, + "field_separator": "," + }, + { + "id": "CSVExporter2", + "type": "*fileCSV", + "export_path": "/tmp/CSVFile2", + "attempts": 1, + "field_separator": "," + } + ] + }, + + + "admins": { + "enabled": true + } + +} \ No newline at end of file diff --git a/ees/ees.go b/ees/ees.go index 78bb9a4e7..201d49888 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -122,8 +122,6 @@ func (eeS *EeS) attrSProcessEvent(ctx *context.Context, cgrEv *utils.CGREvent, a return } -// V1ProcessEvent will be called each time a new event is received from readers -// rply -> map[string]map[string]interface{} func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEeIDs, rply *map[string]map[string]interface{}) (err error) { eeS.cfg.RLocks(config.EEsJSON) defer eeS.cfg.RUnlocks(config.EEsJSON) @@ -192,7 +190,7 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe } else { ctx = context.Background() // is async so lose the API context } - // log the message before starting the gorutine, but still execute the exporter + // log the message before starting the goroutine, but still execute the exporter if hasVerbose && !eeCfg.Synchronous { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false", @@ -211,6 +209,9 @@ func (eeS *EeS) V1ProcessEvent(ctx *context.Context, cgrEv *utils.CGREventWithEe wg.Done() } }(!hasCache, eeCfg.Synchronous, ee) + if eeCfg.Blocker { + break + } } wg.Wait() if withErr { diff --git a/general_tests/ees_blocker_it_test.go b/general_tests/ees_blocker_it_test.go new file mode 100644 index 000000000..4e4e95a98 --- /dev/null +++ b/general_tests/ees_blocker_it_test.go @@ -0,0 +1,157 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "os" + "path" + "testing" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + eeSBlockerFiles = []string{"/tmp/CSVFile1", "/tmp/CSVFile2"} + eesBlockerCfgPath string + eesBlockerCfg *config.CGRConfig + eesBlockerRPC *birpc.Client + eesBlockerConfDIR string //run tests for specific configuration + + eesBlockerTests = []func(t *testing.T){ + testEEsBlockerCreateFiles, + testEEsBlockerLoadConfig, + testEEsBlockerInitDataDB, + testEEsBlockerStartEngine, + testEEsBlockerRpcConn, + testEEsBlockerExportEvent, + testEEsBlockerVerifyExport, + testEEsBlockerStopEngine, + testEEsBlockerDeleteFiles, + } +) + +// Test start here +func TestEEsBlocker(t *testing.T) { + switch *dbType { + case utils.MetaInternal: + // eesBlockerConfDIR = "ees_blocker_internal" + t.SkipNow() + case utils.MetaMySQL: + eesBlockerConfDIR = "ees_blocker_mysql" + case utils.MetaMongo: + // eesBlockerConfDIR = "ees_blocker_mongo" + t.SkipNow() + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + for _, stest := range eesBlockerTests { + t.Run(eesBlockerConfDIR, stest) + } +} + +func testEEsBlockerCreateFiles(t *testing.T) { + for _, dir := range eeSBlockerFiles { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } +} + +func testEEsBlockerDeleteFiles(t *testing.T) { + for _, dir := range eeSBlockerFiles { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + } +} + +func testEEsBlockerLoadConfig(t *testing.T) { + var err error + eesBlockerCfgPath = path.Join(*dataDir, "conf", "samples", eesBlockerConfDIR) + if eesBlockerCfg, err = config.NewCGRConfigFromPath(context.Background(), eesBlockerCfgPath); err != nil { + t.Error(err) + } +} + +func testEEsBlockerInitDataDB(t *testing.T) { + if err := engine.InitDataDB(eesBlockerCfg); err != nil { + t.Fatal(err) + } +} + +func testEEsBlockerStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(eesBlockerCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testEEsBlockerRpcConn(t *testing.T) { + var err error + eesBlockerRPC, err = newRPCClient(eesBlockerCfg.ListenCfg()) + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testEEsBlockerExportEvent(t *testing.T) { + exportedEvent := &utils.CGREventWithEeIDs{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "EEsProcessEvent", + Event: map[string]interface{}{ + "TestCase": "EEsBlockerBehaviour", + }, + APIOpts: map[string]interface{}{}, + }, + } + + var reply map[string]map[string]interface{} + if err := eesBlockerRPC.Call(context.Background(), utils.EeSv1ProcessEvent, exportedEvent, &reply); err != nil { + t.Error(err) + } +} + +func testEEsBlockerVerifyExport(t *testing.T) { + for i, dir := range eeSBlockerFiles { + if files, err := os.ReadDir(dir); err != nil { + t.Fatal(err) + } else if i == 0 && len(files) != 1 { + t.Errorf("expected to find only 1 file, received <%d>", len(files)) + } else if i == 1 && len(files) != 0 { + t.Errorf("expected to find 0 files, received <%d>", len(files)) + } + } +} + +func testEEsBlockerStopEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +}