diff --git a/data/conf/samples/actions_internal/cgradmin.json b/data/conf/samples/actions_internal/cgradmin.json index 14e1aa673..f2023176c 100644 --- a/data/conf/samples/actions_internal/cgradmin.json +++ b/data/conf/samples/actions_internal/cgradmin.json @@ -57,9 +57,68 @@ }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, } diff --git a/data/conf/samples/actions_internal_gob/cgradmin.json b/data/conf/samples/actions_internal_gob/cgradmin.json index f1ba2e61f..fa1d8f5c7 100644 --- a/data/conf/samples/actions_internal_gob/cgradmin.json +++ b/data/conf/samples/actions_internal_gob/cgradmin.json @@ -66,9 +66,69 @@ }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, + } diff --git a/data/conf/samples/actions_mongo/cgradmin.json b/data/conf/samples/actions_mongo/cgradmin.json index f8e7c123d..ac0067c10 100644 --- a/data/conf/samples/actions_mongo/cgradmin.json +++ b/data/conf/samples/actions_mongo/cgradmin.json @@ -62,9 +62,69 @@ }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, + } diff --git a/data/conf/samples/actions_mongo_gob/cgradmin.json b/data/conf/samples/actions_mongo_gob/cgradmin.json index f10cc7bb9..7e3bbe9e5 100644 --- a/data/conf/samples/actions_mongo_gob/cgradmin.json +++ b/data/conf/samples/actions_mongo_gob/cgradmin.json @@ -63,9 +63,69 @@ }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, + } diff --git a/data/conf/samples/actions_mysql/cgradmin.json b/data/conf/samples/actions_mysql/cgradmin.json index f86e5dc04..c2d86acf9 100644 --- a/data/conf/samples/actions_mysql/cgradmin.json +++ b/data/conf/samples/actions_mysql/cgradmin.json @@ -59,9 +59,69 @@ }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, + } diff --git a/data/conf/samples/actions_mysql_gob/cgradmin.json b/data/conf/samples/actions_mysql_gob/cgradmin.json index db4ea3648..525dcbaa3 100644 --- a/data/conf/samples/actions_mysql_gob/cgradmin.json +++ b/data/conf/samples/actions_mysql_gob/cgradmin.json @@ -65,9 +65,69 @@ "store_interval": "1s" }, +"ees": { + "enabled": true, + "exporters": [ + { + "id": "sqs_fail", + "type": "*sqs_json_map", + // export_path for sqs: "endpoint" + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "kafka_fail", + "type": "*kafka_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "amqp_fail", + "type": "*amqp_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "s3_fail", + "type": "*s3_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + { + "id": "aws_fail", + "type": "*amqpv1_json_map", + "export_path": "notAValidURL", + "tenant": "cgrates.org", + "attempts": 1, + "fields":[ + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + ], + }, + ], +}, + + "apiers": { "enabled": true, "scheduler_conns": ["*internal"], + "ees_conns": ["*localhost"] }, + } diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index b06454e05..30b587111 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -31,7 +31,7 @@ import ( func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) { - dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID + dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID pstrJSON = &PosterJSONMapEE{ id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, diff --git a/engine/action.go b/engine/action.go index 3a4e61296..6bafcc33f 100644 --- a/engine/action.go +++ b/engine/action.go @@ -103,6 +103,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) { utils.MetaRemoveExpired: removeExpired, utils.MetaPostEvent: postEvent, utils.MetaCDRAccount: resetAccountCDR, + utils.MetaExport: export, } f, exists := actionFuncMap[typ] return f, exists @@ -1023,3 +1024,43 @@ func resetAccountCDR(ub *Account, action *Action, acts Actions, _ interface{}) e } return nil } + +func export(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) { + var cgrEv *utils.CGREvent + switch { + case ub != nil: + cgrEv = &utils.CGREvent{ + Tenant: utils.NewTenantID(ub.ID).Tenant, + ID: utils.GenUUID(), + Event: map[string]interface{}{ + utils.Account: ub.ID, + utils.EventType: utils.AccountUpdate, + utils.EventSource: utils.AccountService, + utils.AllowNegative: ub.AllowNegative, + utils.Disabled: ub.Disabled, + utils.BalanceMap: ub.BalanceMap, + utils.UnitCounters: ub.UnitCounters, + utils.ActionTriggers: ub.ActionTriggers, + utils.UpdateTime: ub.UpdateTime, + }, + } + case extraData != nil: + ev, canCast := extraData.(*utils.CGREvent) + if !canCast { + return + } + cgrEv = ev // only export CGREvents + default: + return // nothing to post + } + args := &utils.CGREventWithIDs{ + IDs: strings.Split(a.ExtraParameters, utils.INFIELD_SEP), + CGREventWithOpts: &utils.CGREventWithOpts{ + Opts: make(map[string]interface{}), + CGREvent: cgrEv, + }, + } + var rply map[string]map[string]interface{} + return connMgr.Call(config.CgrConfig().ApierCfg().EEsConns, nil, + utils.EventExporterSv1ProcessEvent, args, &rply) +} diff --git a/general_tests/poster_it_test.go b/general_tests/poster_it_test.go index be6f5a7ed..2776f42a1 100644 --- a/general_tests/poster_it_test.go +++ b/general_tests/poster_it_test.go @@ -19,7 +19,20 @@ along with this program. If not, see */ package general_tests -/* +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/rpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + var ( pstrCfg *config.CGRConfig pstrRpc *rpc.Client @@ -97,13 +110,13 @@ func testPosterITRpcConn(t *testing.T) { t.Fatal(err) } } + func testPosterReadFolder(format string) (expEv *engine.ExportEvents, err error) { filesInDir, _ := ioutil.ReadDir(pstrCfg.GeneralCfg().FailedPostsDir) if len(filesInDir) == 0 { err = fmt.Errorf("No files in directory: %s", pstrCfg.GeneralCfg().FailedPostsDir) return } - for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config fileName := file.Name() filePath := path.Join(pstrCfg.GeneralCfg().FailedPostsDir, fileName) @@ -134,7 +147,7 @@ func testPosterITAMQP(t *testing.T) { attrsAA := &utils.AttrSetActions{ ActionsId: "ACT_AMQP", Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed - {Identifier: utils.MetaAMQPjsonMap, ExtraParameters: "endpoint"}, + {Identifier: utils.MetaExport, ExtraParameters: "amqp_fail"}, }, } if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { @@ -158,12 +171,12 @@ func testPosterITAMQP(t *testing.T) { t.Fatalf("Expected 1 event received: %d events", len(ev.Events)) } body := ev.Events[0].([]byte) - var acc engine.Account + var acc map[string]interface{} if err := json.Unmarshal(body, &acc); err != nil { t.Fatal(err) } - if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { - t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID) + if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { + t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account]) } } @@ -172,7 +185,7 @@ func testPosterITAMQPv1(t *testing.T) { attrsAA := &utils.AttrSetActions{ ActionsId: "ACT_AMQPv1", Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed - {Identifier: utils.MetaAMQPV1jsonMap, ExtraParameters: "endpoint"}, + {Identifier: utils.MetaExport, ExtraParameters: "aws_fail"}, }, } if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { @@ -196,12 +209,12 @@ func testPosterITAMQPv1(t *testing.T) { t.Fatalf("Expected 1 event received: %d events", len(ev.Events)) } body := ev.Events[0].([]byte) - var acc engine.Account + var acc map[string]interface{} if err := json.Unmarshal(body, &acc); err != nil { t.Fatal(err) } - if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { - t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID) + if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { + t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account]) } } @@ -210,7 +223,7 @@ func testPosterITSQS(t *testing.T) { attrsAA := &utils.AttrSetActions{ ActionsId: "ACT_SQS", Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed - {Identifier: utils.MetaSQSjsonMap, ExtraParameters: "endpoint"}, + {Identifier: utils.MetaExport, ExtraParameters: "sqs_fail"}, }, } if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { @@ -234,12 +247,12 @@ func testPosterITSQS(t *testing.T) { t.Fatalf("Expected 1 event received: %d events", len(ev.Events)) } body := ev.Events[0].([]byte) - var acc engine.Account + var acc map[string]interface{} if err := json.Unmarshal(body, &acc); err != nil { t.Fatal(err) } - if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { - t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID) + if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { + t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account]) } } @@ -248,7 +261,7 @@ func testPosterITS3(t *testing.T) { attrsAA := &utils.AttrSetActions{ ActionsId: "ACT_S3", Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed - {Identifier: utils.MetaS3jsonMap, ExtraParameters: "endpoint"}, + {Identifier: utils.MetaExport, ExtraParameters: "s3_fail"}, }, } if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { @@ -272,12 +285,12 @@ func testPosterITS3(t *testing.T) { t.Fatalf("Expected 1 event received: %d events", len(ev.Events)) } body := ev.Events[0].([]byte) - var acc engine.Account + var acc map[string]interface{} if err := json.Unmarshal(body, &acc); err != nil { t.Fatal(err) } - if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { - t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID) + if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { + t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account]) } } @@ -286,7 +299,7 @@ func testPosterITKafka(t *testing.T) { attrsAA := &utils.AttrSetActions{ ActionsId: "ACT_Kafka", Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed - {Identifier: utils.MetaKafkajsonMap, ExtraParameters: "endpoint"}, + {Identifier: utils.MetaExport, ExtraParameters: "kafka_fail"}, }, } if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() { @@ -310,12 +323,12 @@ func testPosterITKafka(t *testing.T) { t.Fatalf("Expected 1 event received: %d events", len(ev.Events)) } body := ev.Events[0].([]byte) - var acc engine.Account + var acc map[string]interface{} if err := json.Unmarshal(body, &acc); err != nil { t.Fatal(err) } - if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { - t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID) + if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) { + t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account]) } } @@ -324,4 +337,3 @@ func testPosterITStopCgrEngine(t *testing.T) { t.Error(err) } } -*/ diff --git a/packages/debian/changelog b/packages/debian/changelog index c4c576fb4..8e5b2ae3c 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -99,7 +99,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [DataDB] Moved all specific DB options in opts * [Config] Add new section "template" * [LoaderS] Add support for *template type - + * [ActionS] Replaced the poster action with *export that will send the event to EEs + -- DanB Wed, 19 Feb 2020 13:25:52 +0200 cgrates (0.10.0) UNRELEASED; urgency=medium diff --git a/utils/consts.go b/utils/consts.go index a411b32a9..fd7348d29 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -504,6 +504,9 @@ const ( Actions = "Actions" ActionPlans = "ActionPlans" ActionTriggers = "ActionTriggers" + BalanceMap = "BalanceMap" + UnitCounters = "UnitCounters" + UpdateTime = "UpdateTime" SharedGroups = "SharedGroups" Timings = "Timings" Rates = "Rates"