Added integration tests for *export action

This commit is contained in:
Trial97
2020-09-04 12:38:36 +03:00
committed by Dan Christian Bogos
parent 228665989e
commit 5c1cd3379e
11 changed files with 441 additions and 25 deletions

View File

@@ -57,9 +57,68 @@
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -66,9 +66,69 @@
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -62,9 +62,69 @@
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -63,9 +63,69 @@
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -59,9 +59,69 @@
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -65,9 +65,69 @@
"store_interval": "1s"
},
"ees": {
"enabled": true,
"exporters": [
{
"id": "sqs_fail",
"type": "*sqs_json_map",
// export_path for sqs: "endpoint"
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "kafka_fail",
"type": "*kafka_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "amqp_fail",
"type": "*amqp_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "s3_fail",
"type": "*s3_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
{
"id": "aws_fail",
"type": "*amqpv1_json_map",
"export_path": "notAValidURL",
"tenant": "cgrates.org",
"attempts": 1,
"fields":[
{"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"},
],
},
],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
"ees_conns": ["*localhost"]
},
}

View File

@@ -31,7 +31,7 @@ import (
func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc utils.MapStorage) (pstrJSON *PosterJSONMapEE, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
dc[utils.ExporterID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
pstrJSON = &PosterJSONMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,

View File

@@ -103,6 +103,7 @@ func getActionFunc(typ string) (actionTypeFunc, bool) {
utils.MetaRemoveExpired: removeExpired,
utils.MetaPostEvent: postEvent,
utils.MetaCDRAccount: resetAccountCDR,
utils.MetaExport: export,
}
f, exists := actionFuncMap[typ]
return f, exists
@@ -1023,3 +1024,43 @@ func resetAccountCDR(ub *Account, action *Action, acts Actions, _ interface{}) e
}
return nil
}
func export(ub *Account, a *Action, acs Actions, extraData interface{}) (err error) {
var cgrEv *utils.CGREvent
switch {
case ub != nil:
cgrEv = &utils.CGREvent{
Tenant: utils.NewTenantID(ub.ID).Tenant,
ID: utils.GenUUID(),
Event: map[string]interface{}{
utils.Account: ub.ID,
utils.EventType: utils.AccountUpdate,
utils.EventSource: utils.AccountService,
utils.AllowNegative: ub.AllowNegative,
utils.Disabled: ub.Disabled,
utils.BalanceMap: ub.BalanceMap,
utils.UnitCounters: ub.UnitCounters,
utils.ActionTriggers: ub.ActionTriggers,
utils.UpdateTime: ub.UpdateTime,
},
}
case extraData != nil:
ev, canCast := extraData.(*utils.CGREvent)
if !canCast {
return
}
cgrEv = ev // only export CGREvents
default:
return // nothing to post
}
args := &utils.CGREventWithIDs{
IDs: strings.Split(a.ExtraParameters, utils.INFIELD_SEP),
CGREventWithOpts: &utils.CGREventWithOpts{
Opts: make(map[string]interface{}),
CGREvent: cgrEv,
},
}
var rply map[string]map[string]interface{}
return connMgr.Call(config.CgrConfig().ApierCfg().EEsConns, nil,
utils.EventExporterSv1ProcessEvent, args, &rply)
}

View File

@@ -19,7 +19,20 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
/*
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/rpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
pstrCfg *config.CGRConfig
pstrRpc *rpc.Client
@@ -97,13 +110,13 @@ func testPosterITRpcConn(t *testing.T) {
t.Fatal(err)
}
}
func testPosterReadFolder(format string) (expEv *engine.ExportEvents, err error) {
filesInDir, _ := ioutil.ReadDir(pstrCfg.GeneralCfg().FailedPostsDir)
if len(filesInDir) == 0 {
err = fmt.Errorf("No files in directory: %s", pstrCfg.GeneralCfg().FailedPostsDir)
return
}
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
fileName := file.Name()
filePath := path.Join(pstrCfg.GeneralCfg().FailedPostsDir, fileName)
@@ -134,7 +147,7 @@ func testPosterITAMQP(t *testing.T) {
attrsAA := &utils.AttrSetActions{
ActionsId: "ACT_AMQP",
Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed
{Identifier: utils.MetaAMQPjsonMap, ExtraParameters: "endpoint"},
{Identifier: utils.MetaExport, ExtraParameters: "amqp_fail"},
},
}
if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() {
@@ -158,12 +171,12 @@ func testPosterITAMQP(t *testing.T) {
t.Fatalf("Expected 1 event received: %d events", len(ev.Events))
}
body := ev.Events[0].([]byte)
var acc engine.Account
var acc map[string]interface{}
if err := json.Unmarshal(body, &acc); err != nil {
t.Fatal(err)
}
if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID)
if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account])
}
}
@@ -172,7 +185,7 @@ func testPosterITAMQPv1(t *testing.T) {
attrsAA := &utils.AttrSetActions{
ActionsId: "ACT_AMQPv1",
Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed
{Identifier: utils.MetaAMQPV1jsonMap, ExtraParameters: "endpoint"},
{Identifier: utils.MetaExport, ExtraParameters: "aws_fail"},
},
}
if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() {
@@ -196,12 +209,12 @@ func testPosterITAMQPv1(t *testing.T) {
t.Fatalf("Expected 1 event received: %d events", len(ev.Events))
}
body := ev.Events[0].([]byte)
var acc engine.Account
var acc map[string]interface{}
if err := json.Unmarshal(body, &acc); err != nil {
t.Fatal(err)
}
if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID)
if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account])
}
}
@@ -210,7 +223,7 @@ func testPosterITSQS(t *testing.T) {
attrsAA := &utils.AttrSetActions{
ActionsId: "ACT_SQS",
Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed
{Identifier: utils.MetaSQSjsonMap, ExtraParameters: "endpoint"},
{Identifier: utils.MetaExport, ExtraParameters: "sqs_fail"},
},
}
if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() {
@@ -234,12 +247,12 @@ func testPosterITSQS(t *testing.T) {
t.Fatalf("Expected 1 event received: %d events", len(ev.Events))
}
body := ev.Events[0].([]byte)
var acc engine.Account
var acc map[string]interface{}
if err := json.Unmarshal(body, &acc); err != nil {
t.Fatal(err)
}
if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID)
if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account])
}
}
@@ -248,7 +261,7 @@ func testPosterITS3(t *testing.T) {
attrsAA := &utils.AttrSetActions{
ActionsId: "ACT_S3",
Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed
{Identifier: utils.MetaS3jsonMap, ExtraParameters: "endpoint"},
{Identifier: utils.MetaExport, ExtraParameters: "s3_fail"},
},
}
if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() {
@@ -272,12 +285,12 @@ func testPosterITS3(t *testing.T) {
t.Fatalf("Expected 1 event received: %d events", len(ev.Events))
}
body := ev.Events[0].([]byte)
var acc engine.Account
var acc map[string]interface{}
if err := json.Unmarshal(body, &acc); err != nil {
t.Fatal(err)
}
if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID)
if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account])
}
}
@@ -286,7 +299,7 @@ func testPosterITKafka(t *testing.T) {
attrsAA := &utils.AttrSetActions{
ActionsId: "ACT_Kafka",
Actions: []*utils.TPAction{ // set a action with a wrong endpoint to easily check if it was executed
{Identifier: utils.MetaKafkajsonMap, ExtraParameters: "endpoint"},
{Identifier: utils.MetaExport, ExtraParameters: "kafka_fail"},
},
}
if err := pstrRpc.Call(utils.APIerSv2SetActions, attrsAA, &reply); err != nil && err.Error() != utils.ErrExists.Error() {
@@ -310,12 +323,12 @@ func testPosterITKafka(t *testing.T) {
t.Fatalf("Expected 1 event received: %d events", len(ev.Events))
}
body := ev.Events[0].([]byte)
var acc engine.Account
var acc map[string]interface{}
if err := json.Unmarshal(body, &acc); err != nil {
t.Fatal(err)
}
if acc.ID != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc.ID)
if acc[utils.Account] != utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account) {
t.Errorf("Expected %q ,received %q", utils.ConcatenatedKey(pstrAccount.Tenant, pstrAccount.Account), acc[utils.Account])
}
}
@@ -324,4 +337,3 @@ func testPosterITStopCgrEngine(t *testing.T) {
t.Error(err)
}
}
*/

View File

@@ -99,7 +99,8 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [DataDB] Moved all specific DB options in opts
* [Config] Add new section "template"
* [LoaderS] Add support for *template type
* [ActionS] Replaced the poster action with *export that will send the event to EEs
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200
cgrates (0.10.0) UNRELEASED; urgency=medium

View File

@@ -504,6 +504,9 @@ const (
Actions = "Actions"
ActionPlans = "ActionPlans"
ActionTriggers = "ActionTriggers"
BalanceMap = "BalanceMap"
UnitCounters = "UnitCounters"
UpdateTime = "UpdateTime"
SharedGroups = "SharedGroups"
Timings = "Timings"
Rates = "Rates"