Files
cgrates/actions/actions_test.go
ionutboangiu c3bf93f1b6 Fix context lifecycle in scheduled actions
Remove ctx field from scheduledActs struct and create a fresh context
when actions execute via cron. This prevents "context canceled" errors
that occurred when stored contexts from API calls were used for delayed
execution. The context is now properly received from the caller in case
of "*asap" actions.
2025-05-26 08:19:43 +02:00

1184 lines
37 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package actions
import (
"bytes"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func TestMatchingActionProfilesForEvent(t *testing.T) {
defer engine.Cache.Clear(nil)
engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
utils.MetaOpts: map[string]any{},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP1",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: "*topup",
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
expActionPrf := []*utils.ActionProfile{actPrf}
if rcv, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, []string{}, false); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(rcv, expActionPrf) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expActionPrf), utils.ToJSON(rcv))
}
evNM = utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
},
utils.MetaOpts: map[string]any{},
}
//This Event is not matching with our filter
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, []string{}, false); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
evNM = utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "1001",
},
utils.MetaOpts: map[string]any{},
}
actPrfIDs := []string{"inexisting_id"}
//Unable to get from database an ActionProfile if the ID won't match
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, actPrfIDs, false); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
actPrfIDs = []string{"AP1"}
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, actPrfIDs, false); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
actPrf.FilterIDs = append(actPrf.FilterIDs, "*ai:~*req.AnswerTime:2012-07-21T00:00:00Z|2012-08-21T00:00:00Z")
//this event is not active in this interval time
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, actPrfIDs, false); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
//when dataManager is nil, it won't be able to get ActionsProfile from database
acts.dm = nil
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "INVALID_TENANT",
evNM, actPrfIDs, false); err == nil || err != utils.ErrNoDatabaseConn {
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
}
acts.dm = engine.NewDataManager(data, cfg, nil)
actPrf.FilterIDs = []string{"invalid_filters"}
//Set in database and invalid filter, so it won t pass
if err := acts.dm.SetActionProfile(context.Background(), actPrf, false); err != nil {
t.Error(err)
}
engine.Cache.Clear(nil)
expected := "NOT_FOUND:invalid_filters"
if _, err := acts.matchingActionProfilesForEvent(context.Background(), "cgrates.org",
evNM, actPrfIDs, false); err == nil || err.Error() != expected {
t.Errorf("Expected %+v, received %+v", expected, err)
}
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant,
actPrf.ID, false); err != nil {
t.Error(err)
}
}
func TestScheduledActions(t *testing.T) {
engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TEST_ACTIONS1",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
}
evNM := utils.MapStorage{
utils.MetaReq: cgrEv.Event,
utils.MetaOpts: map[string]any{},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP2",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if rcv, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false, false); err != nil {
t.Error(err)
} else {
expSchedActs := newScheduledActs(context.Background(), cgrEv.Tenant, cgrEv.ID, utils.MetaNone, utils.EmptyString,
utils.EmptyString, evNM, rcv[0].acts)
if reflect.DeepEqual(expSchedActs, rcv) {
t.Errorf("Expected %+v, received %+v", expSchedActs, rcv)
}
}
cgrEv = &utils.CGREvent{
Tenant: "cgrates.org",
ID: "TEST_ACTIONS1",
Event: map[string]any{
utils.AccountsStr: "10",
},
}
if _, err := acts.scheduledActions(context.Background(), cgrEv.Tenant, cgrEv, []string{}, false, false); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
}
func TestScheduleAction(t *testing.T) {
engine.Cache.Clear(nil)
defer engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
cgrEv := []*utils.CGREvent{
{
Tenant: "cgrates.org",
ID: "TEST_ACTIONS1",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP3",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Schedule: "* * * * *",
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, false, true); err != nil {
t.Error(err)
}
//Cannot schedule an action if the ID is invalid
if err := acts.scheduleActions(context.Background(), cgrEv, []string{"INVALID_ID1"}, false, true); err == nil || err != utils.ErrPartiallyExecuted {
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
}
//When schedule is "*asap", the action will execute immediately
actPrf.Schedule = utils.MetaASAP
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, false, true); err != nil {
t.Error(err)
}
engine.Cache.Clear(nil)
//Cannot execute the action if the cron is invalid
actPrf.Schedule = "* * * *"
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.scheduleActions(context.Background(), cgrEv, []string{}, false, true); err == nil || err != utils.ErrPartiallyExecuted {
t.Error(err)
}
}
func TestAsapExecuteActions(t *testing.T) {
newData := &dataDBMockError{}
cfg := config.NewDefaultCGRConfig()
dm := engine.NewDataManager(newData, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
cgrEv := []*utils.CGREvent{
{
Tenant: "cgrates.org",
ID: "CHANGED_ID",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
},
}
evNM := utils.MapStorage{
utils.MetaReq: cgrEv[0].Event,
utils.MetaOpts: map[string]any{},
}
expSchedActs := newScheduledActs(context.Background(), cgrEv[0].Tenant, cgrEv[0].ID, utils.MetaNone, utils.EmptyString,
utils.EmptyString, evNM, nil)
if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNoDatabaseConn {
t.Errorf("Expected %+v, received %+v", utils.ErrNoDatabaseConn, err)
}
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
acts.dm = engine.NewDataManager(data, cfg, nil)
expSchedActs = newScheduledActs(context.Background(), cgrEv[0].Tenant, "another_id", utils.MetaNone, utils.EmptyString,
utils.EmptyString, evNM, nil)
if err := acts.asapExecuteActions(context.Background(), expSchedActs); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
}
func TestV1ScheduleActions(t *testing.T) {
engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
var reply string
ev := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
APIOpts: map[string]any{},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP4",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Schedule: utils.MetaASAP,
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.V1ScheduleActions(context.Background(), ev, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply %+v", reply)
}
ev.APIOpts[utils.OptsActionsProfileIDs] = []string{"invalid_id"}
if err := acts.V1ScheduleActions(context.Background(), ev, &reply); err == nil || err != utils.ErrPartiallyExecuted {
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
}
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, true); err != nil {
t.Error(err)
}
}
func TestV1ExecuteActions(t *testing.T) {
engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
var reply string
ev := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
APIOpts: map[string]any{},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP5",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Schedule: utils.MetaASAP,
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.V1ExecuteActions(context.Background(), ev, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply %+v", reply)
}
ev.APIOpts[utils.OptsActionsProfileIDs] = []string{"invalid_id"}
if err := acts.V1ExecuteActions(context.Background(), ev, &reply); err == nil || err != utils.ErrNotFound {
t.Errorf("Expected %+v, received %+v", utils.ErrNotFound, err)
}
newData := &dataDBMockError{}
newDm := engine.NewDataManager(newData, cfg, nil)
newActs := NewActionS(cfg, filters, newDm, nil)
ev.APIOpts[utils.OptsActionsProfileIDs] = []string{}
if err := newActs.V1ExecuteActions(context.Background(), ev, &reply); err == nil || err != utils.ErrPartiallyExecuted {
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
}
if err := acts.dm.RemoveActionProfile(context.Background(), actPrf.Tenant, actPrf.ID, true); err != nil {
t.Error(err)
}
}
type dataDBMockError struct {
engine.DataDBMock
}
func (dbM *dataDBMockError) GetActionProfileDrv(*context.Context, string, string) (*utils.ActionProfile, error) {
return &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP6",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10"},
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}, nil
}
func (dbM *dataDBMockError) SetActionProfileDrv(*context.Context, *utils.ActionProfile) error {
return utils.ErrNoDatabaseConn
}
func (dbM *dataDBMockError) GetIndexesDrv(ctx *context.Context, idxItmType, tntCtx, idxKey, transactionID string) (indexes map[string]utils.StringSet, err error) {
return nil, nil
}
func TestLogActionExecute(t *testing.T) {
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
},
utils.MetaOpts: map[string]any{},
}
tmpLogger := utils.Logger
defer func() {
utils.Logger = tmpLogger
}()
var buf bytes.Buffer
utils.Logger = utils.NewStdLoggerWithWriter(&buf, "Engine1", 7)
logAction := actLog{}
if err := logAction.execute(context.Background(), evNM, utils.MetaNone); err != nil {
t.Error(err)
}
expected := "CGRateS <Engine1> [INFO] LOG Event: {\"*opts\":{},\"*req\":{\"Account\":\"10\"}}"
if rcv := buf.String(); !strings.Contains(rcv, expected) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
}
type testMockCDRsConn struct {
calls map[string]func(_ *context.Context, _, _ any) error
}
func (s *testMockCDRsConn) Call(ctx *context.Context, method string, arg, rply any) error {
call, has := s.calls[method]
if !has {
return rpcclient.ErrUnsupporteServiceMethod
}
return call(ctx, arg, rply)
}
func TestCDRLogActionExecute(t *testing.T) {
sMock := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.CDRsV1ProcessEvent: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.CGREvent)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.APIOpts[utils.MetaChargers].(bool) {
return fmt.Errorf("Expected false, received %+v", argConv.APIOpts[utils.MetaChargers])
}
if val, has := argConv.Event[utils.Subject]; !has {
return fmt.Errorf("missing Subject")
} else if strVal := utils.IfaceAsString(val); strVal != "10" {
return fmt.Errorf("Expected %+v, received %+v", "10", strVal)
}
if val, has := argConv.Event[utils.Cost]; !has {
return fmt.Errorf("missing Cost")
} else if strVal := utils.IfaceAsString(val); strVal != "0.15" {
return fmt.Errorf("Expected %+v, received %+v", "0.15", strVal)
}
if val, has := argConv.Event[utils.RequestType]; !has {
return fmt.Errorf("missing RequestType")
} else if strVal := utils.IfaceAsString(val); strVal != utils.MetaNone {
return fmt.Errorf("Expected %+v, received %+v", utils.MetaNone, strVal)
}
if val, has := argConv.Event[utils.RunID]; !has {
return fmt.Errorf("missing RunID")
} else if strVal := utils.IfaceAsString(val); strVal != utils.MetaTopUp {
return fmt.Errorf("Expected %+v, received %+v", utils.MetaNone, strVal)
}
return nil
},
},
}
internalCDRsChann := make(chan birpc.ClientConnector, 1)
internalCDRsChann <- sMock
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().CDRsConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filterS := engine.NewFilterS(cfg, nil, dm)
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), utils.CDRsV1, internalCDRsChann)
apA := &utils.APAction{
ID: "ACT_CDRLOG",
Type: utils.MetaCdrLog,
}
cdrLogAction := &actCDRLog{
config: cfg,
filterS: filterS,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
utils.Tenant: "cgrates.org",
utils.BalanceType: utils.MetaConcrete,
utils.Cost: 0.15,
utils.ActionType: utils.MetaTopUp,
},
utils.MetaOpts: map[string]any{},
}
if err := cdrLogAction.execute(context.Background(), evNM, utils.MetaNone); err != nil {
t.Error(err)
}
}
func TestCDRLogActionWithOpts(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.CDRsV1ProcessEvent: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.CGREvent)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.APIOpts[utils.MetaChargers].(bool) {
return fmt.Errorf("Expected false, received %+v", argConv.APIOpts[utils.MetaChargers])
}
if val, has := argConv.Event[utils.Tenant]; !has {
return fmt.Errorf("missing Tenant")
} else if strVal := utils.IfaceAsString(val); strVal != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", strVal)
}
if val, has := argConv.APIOpts["EventFieldOpt"]; !has {
return fmt.Errorf("missing EventFieldOpt from Opts")
} else if strVal := utils.IfaceAsString(val); strVal != "eventValue" {
return fmt.Errorf("Expected %+v, received %+v", "eventValue", strVal)
}
if val, has := argConv.APIOpts["Option1"]; !has {
return fmt.Errorf("missing Option1 from Opts")
} else if strVal := utils.IfaceAsString(val); strVal != "Value1" {
return fmt.Errorf("Expected %+v, received %+v", "Value1", strVal)
}
if val, has := argConv.APIOpts["Option3"]; !has {
return fmt.Errorf("missing Option3 from Opts")
} else if strVal := utils.IfaceAsString(val); strVal != "eventValue" {
return fmt.Errorf("Expected %+v, received %+v", "eventValue", strVal)
}
return nil
},
},
}
internalCDRsChann := make(chan birpc.ClientConnector, 1)
internalCDRsChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().CDRsConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs)}
cfg.TemplatesCfg()["CustomTemplate"] = []*config.FCTemplate{
{
Tag: "Tenant",
Type: "*constant",
Path: "*cdr.Tenant",
Value: utils.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep),
Layout: time.RFC3339,
},
{
Tag: "Opt1",
Type: "*constant",
Path: "*opts.Option1",
Value: utils.NewRSRParsersMustCompile("Value1", utils.InfieldSep),
Layout: time.RFC3339,
},
{
Tag: "Opt2",
Type: "*constant",
Path: "*opts.Option2",
Value: utils.NewRSRParsersMustCompile("Value2", utils.InfieldSep),
Layout: time.RFC3339,
},
{
Tag: "Opt3",
Type: "*variable",
Path: "*opts.Option3",
Value: utils.NewRSRParsersMustCompile("~*opts.EventFieldOpt", utils.InfieldSep),
Layout: time.RFC3339,
},
}
for _, tpl := range cfg.TemplatesCfg()["CustomTemplate"] {
tpl.ComputePath()
}
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filterS := engine.NewFilterS(cfg, nil, dm)
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs), utils.CDRsV1, internalCDRsChann)
apA := &utils.APAction{
ID: "ACT_CDRLOG2",
Type: utils.MetaCdrLog,
Opts: map[string]any{
utils.MetaTemplateID: "CustomTemplate",
},
}
cdrLogAction := &actCDRLog{
config: cfg,
filterS: filterS,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
utils.Tenant: "cgrates.org",
utils.BalanceType: utils.MetaConcrete,
utils.Cost: 0.15,
utils.ActionType: utils.MetaTopUp,
},
utils.MetaOpts: map[string]any{
"EventFieldOpt": "eventValue",
},
}
if err := cdrLogAction.execute(context.Background(), evNM, utils.MetaNone); err != nil {
t.Error(err)
}
}
func TestExportAction(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.EeSv1ProcessEvent: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.CGREventWithEeIDs)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
return nil
},
},
}
internalCDRsChann := make(chan birpc.ClientConnector, 1)
internalCDRsChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), utils.EeSv1, internalCDRsChann)
apA := &utils.APAction{
ID: "ACT_CDRLOG2",
Type: utils.MetaExport,
}
exportAction := &actExport{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
utils.Tenant: "cgrates.org",
utils.BalanceType: utils.MetaConcrete,
utils.Cost: 0.15,
utils.ActionType: utils.MetaTopUp,
},
utils.MetaOpts: map[string]any{
"EventFieldOpt": "eventValue",
},
}
if err := exportAction.execute(context.Background(), evNM, utils.MetaNone); err != nil {
t.Error(err)
}
}
func TestExportActionWithEeIDs(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.EeSv1ProcessEvent: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.CGREventWithEeIDs)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
if !reflect.DeepEqual(argConv.EeIDs, []string{"Exporter1", "Exporter2", "Exporter3"}) {
return fmt.Errorf("Expected %+v, received %+v", []string{"Exporter1", "Exporter2", "Exporter3"}, argConv.EeIDs)
}
return nil
},
},
}
internalCDRsChann := make(chan birpc.ClientConnector, 1)
internalCDRsChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().EEsConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs), utils.EeSv1, internalCDRsChann)
apA := &utils.APAction{
ID: "ACT_CDRLOG2",
Type: utils.MetaExport,
Opts: map[string]any{
utils.MetaExporterIDs: "Exporter1;Exporter2;Exporter3",
},
}
exportAction := &actExport{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaReq: map[string]any{
utils.AccountField: "10",
utils.Tenant: "cgrates.org",
utils.BalanceType: utils.MetaConcrete,
utils.Cost: 0.15,
utils.ActionType: utils.MetaTopUp,
},
utils.MetaOpts: map[string]any{
"EventFieldOpt": "eventValue",
},
}
if err := exportAction.execute(context.Background(), evNM, utils.MetaNone); err != nil {
t.Error(err)
}
}
func TestExportActionResetThresholdStaticTenantID(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.ThresholdSv1ResetThreshold: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.TenantIDWithAPIOpts)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
if argConv.ID != "TH1" {
return fmt.Errorf("Expected %+v, received %+v", "TH1", argConv.ID)
}
return nil
},
},
}
internalChann := make(chan birpc.ClientConnector, 1)
internalChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().ThresholdSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, internalChann)
apA := &utils.APAction{
ID: "ACT_RESET_TH",
Type: utils.MetaResetThreshold,
Diktats: []*utils.APDiktat{{}},
}
exportAction := &actResetThreshold{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaOpts: map[string]any{},
}
if err := exportAction.execute(context.Background(), evNM, "cgrates.org:TH1"); err != nil {
t.Error(err)
}
}
func TestExportActionResetThresholdStaticID(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.ThresholdSv1ResetThreshold: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.TenantIDWithAPIOpts)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
if argConv.ID != "TH1" {
return fmt.Errorf("Expected %+v, received %+v", "TH1", argConv.ID)
}
return nil
},
},
}
internalChann := make(chan birpc.ClientConnector, 1)
internalChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().ThresholdSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds), utils.ThresholdSv1, internalChann)
apA := &utils.APAction{
ID: "ACT_RESET_TH",
Type: utils.MetaResetThreshold,
Diktats: []*utils.APDiktat{{}},
}
exportAction := &actResetThreshold{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaOpts: map[string]any{},
}
if err := exportAction.execute(context.Background(), evNM, "TH1"); err != nil {
t.Error(err)
}
}
func TestExportActionResetStatStaticTenantID(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.StatSv1ResetStatQueue: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.TenantIDWithAPIOpts)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
if argConv.ID != "ST1" {
return fmt.Errorf("Expected %+v, received %+v", "TH1", argConv.ID)
}
return nil
},
},
}
internalChann := make(chan birpc.ClientConnector, 1)
internalChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().StatSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), utils.StatSv1, internalChann)
apA := &utils.APAction{
ID: "ACT_RESET_ST",
Type: utils.MetaResetStatQueue,
Diktats: []*utils.APDiktat{{}},
}
exportAction := &actResetStat{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaOpts: map[string]any{},
}
if err := exportAction.execute(context.Background(), evNM, "cgrates.org:ST1"); err != nil {
t.Error(err)
}
}
func TestExportActionResetStatStaticID(t *testing.T) {
// Clear cache because connManager sets the internal connection in cache
engine.Cache.Clear([]string{utils.CacheRPCConnections})
sMock2 := &testMockCDRsConn{
calls: map[string]func(_ *context.Context, _, _ any) error{
utils.StatSv1ResetStatQueue: func(_ *context.Context, arg, rply any) error {
argConv, can := arg.(*utils.TenantIDWithAPIOpts)
if !can {
return fmt.Errorf("Wrong argument type: %T", arg)
}
if argConv.Tenant != "cgrates.org" {
return fmt.Errorf("Expected %+v, received %+v", "cgrates.org", argConv.Tenant)
}
if argConv.ID != "ST1" {
return fmt.Errorf("Expected %+v, received %+v", "TH1", argConv.ID)
}
return nil
},
},
}
internalChann := make(chan birpc.ClientConnector, 1)
internalChann <- sMock2
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().StatSConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats)}
connMgr := engine.NewConnManager(config.CgrConfig())
connMgr.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats), utils.StatSv1, internalChann)
apA := &utils.APAction{
ID: "ACT_RESET_ST",
Type: utils.MetaResetStatQueue,
Diktats: []*utils.APDiktat{{
Value: "ST1",
}},
}
exportAction := &actResetStat{
tnt: "cgrates.org",
config: cfg,
connMgr: connMgr,
aCfg: apA,
}
evNM := utils.MapStorage{
utils.MetaOpts: map[string]any{},
}
if err := exportAction.execute(context.Background(), evNM, "ST1"); err != nil {
t.Error(err)
}
}
func TestACScheduledActions(t *testing.T) {
engine.Cache.Clear(nil)
defer engine.Cache.Clear(nil)
cfg := config.NewDefaultCGRConfig()
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "TestACScheduledActions",
FilterIDs: []string{"*string:~*req.Destination:1005"},
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: "inexistent_type",
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
cgrEv := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.Destination: "1005",
},
}
tmpLogger := utils.Logger
defer func() {
utils.Logger = tmpLogger
}()
var buf bytes.Buffer
utils.Logger = utils.NewStdLoggerWithWriter(&buf, "", 7)
acts := NewActionS(cfg, fltrs, dm, nil)
expected := "WARNING] <ActionS> ignoring ActionProfile with id: <cgrates.org:TestACScheduledActions> creating action: <TOPUP>, error: <unsupported action type: <inexistent_type>>"
if _, err := acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, false, true); err != nil {
t.Error(err)
} else if rcv := buf.String(); !strings.Contains(rcv, expected) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
actPrf.Actions[0].Type = utils.MetaResetStatQueue
actPrf.Targets = map[string]utils.StringSet{
utils.MetaStats: map[string]struct{}{
"ID_TEST": {},
},
}
engine.Cache.Clear(nil)
if err := dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
mapStorage := utils.MapStorage{
utils.MetaReq: cgrEv.Event,
utils.MetaOpts: cgrEv.APIOpts,
}
expectedSChed := []*scheduledActs{
{
tenant: "cgrates.org",
apID: "TestACScheduledActions",
trgTyp: utils.MetaStats,
trgID: "ID_TEST",
schedule: utils.EmptyString,
data: mapStorage,
},
}
var err error
var schedActs []*scheduledActs
if schedActs, err = acts.scheduledActions(context.Background(), "cgrates.org", cgrEv, []string{}, false, true); err != nil {
t.Error(err)
} else if len(schedActs) != 1 {
t.Errorf("expected 1 schedActs, received <%v>. \n<%+v>", len(schedActs), schedActs)
}
//execute asap the actions
schedActs[0].trgID = "invalid_type"
if err := acts.asapExecuteActions(context.Background(), schedActs[0]); err == nil || err != utils.ErrPartiallyExecuted {
t.Errorf("Expected %+v, received %+v", utils.ErrPartiallyExecuted, err)
}
schedActs[0].trgID = "ID_TEST"
schedActs[0].acts = nil
schedActs[0].cch = nil
if !reflect.DeepEqual(schedActs, expectedSChed) {
t.Errorf("Expected %+v, received %+v", expectedSChed, schedActs)
}
}
func TestV1ScheduleActionsProfileIgnoreFilters(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().Opts.ProfileIgnoreFilters = []*config.DynamicBoolOpt{
config.NewDynamicBoolOpt(nil, "", true, nil),
}
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
var reply string
ev := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
APIOpts: map[string]any{
utils.OptsActionsProfileIDs: []string{"AP7"},
utils.MetaProfileIgnoreFilters: true,
"testFieldIgnore": "testValue",
},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP7",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10", "*prefix:~*opts.testFieldIgnore:testValue1"},
Schedule: utils.MetaASAP,
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.V1ScheduleActions(context.Background(), ev, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply %+v", reply)
}
}
func TestV1ExecuteActionsProfileIgnoreFilters(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ActionSCfg().Opts.ProfileIgnoreFilters = []*config.DynamicBoolOpt{
config.NewDynamicBoolOpt(nil, "", true, nil),
}
data, _ := engine.NewInternalDB(nil, nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg, nil)
filters := engine.NewFilterS(cfg, nil, dm)
acts := NewActionS(cfg, filters, dm, nil)
var reply string
ev := &utils.CGREvent{
Tenant: "cgrates.org",
Event: map[string]any{
utils.AccountField: "1001",
utils.Destination: 1002,
},
APIOpts: map[string]any{
utils.OptsActionsProfileIDs: []string{"AP8"},
utils.MetaProfileIgnoreFilters: true,
"testFieldIgnore": "testValue",
},
}
actPrf := &utils.ActionProfile{
Tenant: "cgrates.org",
ID: "AP8",
FilterIDs: []string{"*string:~*req.Account:1001|1002|1003", "*prefix:~*req.Destination:10", "*prefix:~*opts.testFieldIgnore:testValue1"},
Schedule: utils.MetaASAP,
Actions: []*utils.APAction{
{
ID: "TOPUP",
FilterIDs: []string{},
Type: utils.MetaLog,
Diktats: []*utils.APDiktat{{
Path: "~*balance.TestBalance.Value",
Value: "10",
}},
},
},
}
if err := acts.dm.SetActionProfile(context.Background(), actPrf, true); err != nil {
t.Error(err)
}
if err := acts.V1ExecuteActions(context.Background(), ev, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Unexpected reply %+v", reply)
}
}