diff --git a/engine/action_plan_test.go b/engine/action_plan_test.go index 525478d13..cd7cda20a 100644 --- a/engine/action_plan_test.go +++ b/engine/action_plan_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -422,3 +423,61 @@ func TestActionTimingGetNextStartTime(t *testing.T) { t.Errorf("Expecting: %+v, received: %+v", exp, st) } } + +func TestActionTimingExErr(t *testing.T) { + tmpDm := dm + tmp := Cache + cfg := config.NewDefaultCGRConfig() + defer func() { + dm = tmpDm + Cache = tmp + config.SetCgrConfig(config.NewDefaultCGRConfig()) + }() + db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := NewDataManager(db, cfg.CacheCfg(), nil) + at := &ActionTiming{ + Timing: &RateInterval{}, + actions: []*Action{ + { + Filters: []string{}, + ExpirationString: "*yearly", + ActionType: "test", + Id: "ZeroMonetary", + Balance: &BalanceFilter{ + Type: utils.StringPointer(utils.MetaMonetary), + + Value: &utils.ValueFormula{Static: 11}, + Weight: utils.Float64Pointer(30), + }, + }, + }, + } + fltrs := NewFilterS(cfg, nil, dm) + if err := at.Execute(nil); err == nil || err != utils.ErrPartiallyExecuted { + t.Error(err) + } + at.actions[0].ActionType = utils.MetaDebitReset + if err := at.Execute(nil); err == nil || err != utils.ErrPartiallyExecuted { + t.Error(err) + } + at.accountIDs = utils.StringMap{"cgrates.org:zeroNegative": true} + at.actions[0].ActionType = utils.MetaResetStatQueue + if err := at.Execute(nil); err == nil || err != utils.ErrPartiallyExecuted { + t.Error(err) + } + Cache.Set(utils.CacheFilters, "cgrates.org:*string:~*req.BalanceMap.*monetary[0].ID:*default", nil, []string{}, true, utils.NonTransactional) + at.actions[0].Filters = []string{"*string:~*req.BalanceMap.*monetary[0].ID:*default"} + if err := at.Execute(fltrs); err != nil { + t.Error(err) + } + SetDataStorage(nil) + if err := at.Execute(nil); err != nil { + t.Error(err) + } +} + +func TestGetDayOrEndOfMonth(t *testing.T) { + if val := getDayOrEndOfMonth(31, time.Date(2022, 12, 22, 12, 0, 0, 0, time.UTC)); val != 31 { + t.Errorf("Should Receive Last Day %v", val) + } +} diff --git a/engine/action_trigger_test.go b/engine/action_trigger_test.go index f1f8f51fc..86fc71faa 100644 --- a/engine/action_trigger_test.go +++ b/engine/action_trigger_test.go @@ -20,6 +20,7 @@ package engine import ( "bytes" + "fmt" "log" "os" "reflect" @@ -264,13 +265,19 @@ func TestATExecute22(t *testing.T) { cfg := config.NewDefaultCGRConfig() tmp := Cache tmpDm := dm - utils.Logger.SetLogLevel(4) - utils.Logger.SetSyslog(nil) - buf := new(bytes.Buffer) - log.SetOutput(buf) - defer func() { + setLogger := func(buf *bytes.Buffer) { + utils.Logger.SetLogLevel(4) + utils.Logger.SetSyslog(nil) + log.SetOutput(buf) + } + removeLogger := func() { utils.Logger.SetLogLevel(0) log.SetOutput(os.Stderr) + } + buf := new(bytes.Buffer) + setLogger(buf) + defer func() { + removeLogger() Cache = tmp SetDataStorage(tmpDm) config.SetCgrConfig(config.NewDefaultCGRConfig()) @@ -290,7 +297,6 @@ func TestATExecute22(t *testing.T) { ThresholdType: utils.TriggerMinEventCounter, ThresholdValue: 10, Recurrent: true, - MinSleep: 10 * time.Minute, } ub := &Account{ ID: "acc_id", @@ -314,6 +320,21 @@ func TestATExecute22(t *testing.T) { }, }, }, []string{}, true, utils.NonTransactional) + db.db.Set(utils.CacheActions, "actID2", Actions{ + { + Id: "cgrates.org:id1", + ActionType: utils.MetaResetStatQueue, + ExpirationString: "*yearly", + Balance: &BalanceFilter{ + Type: utils.StringPointer("test"), + Value: &utils.ValueFormula{Static: 1.1}, + }, + Filters: []string{ + "fltr", + }, + }, + }, []string{}, true, utils.NonTransactional) + fltrNew := &Filter{ ID: "fltr", Tenant: "cgrates.org", @@ -336,4 +357,25 @@ func TestATExecute22(t *testing.T) { if rcvLog := buf.String(); !strings.Contains(rcvLog, expLog) { t.Errorf("expected %v,received%v", expLog, rcvLog) } + removeLogger() + buf2 := new(bytes.Buffer) + setLogger(buf2) + at.ActionsID = "actID2" + expLog = `Error executing action` + if err := at.Execute(ub, fltr); err != nil { + t.Error(err) + } else if rcvLog := buf2.String(); !strings.Contains(rcvLog, expLog) { + t.Errorf("Logger %v doesn't contain %v", rcvLog, expLog) + } + + ub.Disabled = true + if err := at.Execute(ub, fltr); err == nil || err.Error() != fmt.Sprintf("User %s is disabled and there are triggers in action!", ub.ID) { + t.Error(err) + } + ub.Disabled = false + at.MinSleep = 10 * time.Minute + at.LastExecutionTime = time.Now().Add(-5 * time.Minute) + if err := at.Execute(ub, fltr); err != nil { + t.Error(err) + } } diff --git a/engine/caches_test.go b/engine/caches_test.go index b8f730fa8..d3ab76b31 100644 --- a/engine/caches_test.go +++ b/engine/caches_test.go @@ -145,6 +145,10 @@ func TestCacheSV1GetItemIDs(t *testing.T) { } else if !reflect.DeepEqual(reply, exp) { t.Errorf("expected %+v,received %+v", utils.ToJSON(exp), utils.ToJSON(reply)) } + tscache.Remove("cacheID", "itemID", true, utils.NonTransactional) + if err := chS.V1GetItemIDs(args, reply); err == nil || err != utils.ErrNotFound { + t.Error(err) + } } func TestCacheSV1HasItem(t *testing.T) { @@ -262,11 +266,18 @@ func TestCacheSV1GetItem(t *testing.T) { dm: dm, tCache: tscache, } - var reply interface{} = "" + var reply interface{} if err := chS.V1GetItem(args, &reply); err != nil { t.Error(err) + } else if val, cancast := reply.(string); cancast { + if val != "value" { + t.Errorf("expected value,received %v", val) + } + } + tscache.Remove("cacheID", "itemID", true, utils.NonTransactional) + if err := chS.V1GetItem(args, &reply); err == nil || err != utils.ErrNotFound { + t.Error(err) } - } func TestCacheSV1GetItemExpiryTime(t *testing.T) { @@ -589,25 +600,15 @@ func TestCachesPrecache(t *testing.T) { } func TestV1PrecacheStatus(t *testing.T) { - args := &utils.AttrCacheIDsWithAPIOpts{ APIOpts: map[string]interface{}{}, Tenant: "cgrates.org", - CacheIDs: []string{"cache1", "cache2", "cache3"}, + CacheIDs: []string{utils.CacheFilters}, } cfg := config.NewDefaultCGRConfig() - cfg.CacheCfg().Partitions = map[string]*config.CacheParamCfg{ - utils.CacheDestinations: { - Limit: 1, - Precache: true, - TTL: time.Minute * 2, - Remote: true, - }, - } + pcI := map[string]chan struct{}{ - "cache1": make(chan struct{}), - "cache2": make(chan struct{}), - "cache3": make(chan struct{}), + utils.CacheFilters: make(chan struct{}), } db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) dm := NewDataManager(db, cfg.CacheCfg(), nil) @@ -619,15 +620,17 @@ func TestV1PrecacheStatus(t *testing.T) { reply := map[string]string{} exp := map[string]string{ - "cache1": utils.MetaPrecaching, - "cache2": utils.MetaPrecaching, - "cache3": utils.MetaPrecaching, + utils.CacheFilters: utils.MetaPrecaching, } if err := chS.V1PrecacheStatus(args, &reply); err != nil { t.Error(err) } else if !reflect.DeepEqual(exp, reply) { t.Errorf("expected %+v,received %+v", exp, reply) } + args.CacheIDs = []string{} + if err := chS.V1PrecacheStatus(args, &reply); err == nil { + t.Error(err) + } } func TestCacheSV1HasGroup(t *testing.T) { @@ -997,3 +1000,24 @@ func TestCachesGetWithRemote(t *testing.T) { } */ } + +func TestV1LoadCache(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmpDm := dm + defer func() { + dm = tmpDm + }() + attr := &utils.AttrReloadCacheWithAPIOpts{ + Tenant: "cgrates.org", + FilterIDs: []string{"cgrates.org:FLTR_ID"}, + AttributeFilterIndexIDs: []string{"cgrates.org:*any:*string:*req.Account:1001", "cgrates.org:*any:*string:*req.Account:1002"}, + } + chS := NewCacheS(cfg, dm, nil) + var reply string + if err := chS.V1LoadCache(attr, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("reply should be %v", utils.OK) + } + +} diff --git a/engine/datamanager.go b/engine/datamanager.go index b120610a5..15e23d3d0 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -1629,7 +1629,7 @@ func (dm *DataManager) GetActionTriggers(id string, skipCache bool, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, utils.EmptyString, utils.FirstNonEmpty(config.CgrConfig().DataDbCfg().RmtConnID, config.CgrConfig().GeneralCfg().NodeID)), - }, attrs); err == nil { + }, &attrs); err == nil { err = dm.dataDB.SetActionTriggersDrv(id, attrs) } } diff --git a/engine/datamanager_test.go b/engine/datamanager_test.go index 9fac837f3..460ffe2ea 100644 --- a/engine/datamanager_test.go +++ b/engine/datamanager_test.go @@ -18,6 +18,7 @@ along with this program. If not, see package engine import ( + "errors" "reflect" "testing" "time" @@ -1722,58 +1723,58 @@ func TestGetResourceProfileRemote(t *testing.T) { } } -// func TestGetActionTriggers(t *testing.T) { -// cfg := config.NewDefaultCGRConfig() -// tmpDm := dm -// tmp := Cache -// defer func() { -// config.SetCgrConfig(config.NewDefaultCGRConfig()) -// Cache = tmp -// SetDataStorage(tmpDm) -// }() -// Cache.Clear(nil) -// cfg.DataDbCfg().RmtConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1)} -// cfg.DataDbCfg().RplFiltered = true -// cfg.DataDbCfg().Items = map[string]*config.ItemOpt{ -// utils.CacheActionTriggers: { -// Limit: 3, -// Replicate: true, -// APIKey: "key", -// RouteID: "route", -// Remote: true, -// }, -// } -// aT := ActionTriggers{ -// &ActionTrigger{ -// ID: "Test", -// }, -// } -// clientConn := make(chan rpcclient.ClientConnector, 1) -// clientConn <- &ccMock{ -// calls: map[string]func(args interface{}, reply interface{}) error{ -// utils.ReplicatorSv1GetActionTriggers: func(args, reply interface{}) error { -// strApiOpts, cancast := args.(*utils.StringWithAPIOpts) -// if !cancast { -// return utils.ErrNotConvertible -// } -// dm.DataDB().GetActionTriggersDrv(strApiOpts.Arg) -// *reply.(*ActionTriggers) = aT -// return nil -// }, -// }, -// } -// db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) -// connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ -// utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, -// }) -// dm := NewDataManager(db, cfg.CacheCfg(), connMgr) -// config.SetCgrConfig(cfg) -// SetDataStorage(dm) -// Cache.Set(utils.CacheActionTriggers, "Test", ActionTriggers{}, []string{}, false, utils.NonTransactional) -// if _, err := dm.GetActionTriggers(aT[0].ID, false, utils.NonTransactional); err != nil { -// t.Error(err) -// } -// } +func TestGetActionTriggers(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmpDm := dm + tmp := Cache + defer func() { + config.SetCgrConfig(config.NewDefaultCGRConfig()) + Cache = tmp + SetDataStorage(tmpDm) + }() + Cache.Clear(nil) + cfg.DataDbCfg().RmtConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1)} + cfg.DataDbCfg().RplFiltered = true + cfg.DataDbCfg().Items = map[string]*config.ItemOpt{ + utils.CacheActionTriggers: { + Limit: 3, + Replicate: true, + APIKey: "key", + RouteID: "route", + Remote: true, + }, + } + aT := ActionTriggers{ + &ActionTrigger{ + ID: "Test", + }, + } + clientConn := make(chan rpcclient.ClientConnector, 1) + clientConn <- &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ReplicatorSv1GetActionTriggers: func(args, reply interface{}) error { + strApiOpts, cancast := args.(*utils.StringWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().GetActionTriggersDrv(strApiOpts.Arg) + *reply.(*ActionTriggers) = aT + return nil + }, + }, + } + db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, + }) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) + config.SetCgrConfig(cfg) + SetDataStorage(dm) + Cache.Set(utils.CacheActionTriggers, "Test", ActionTriggers{}, []string{}, false, utils.NonTransactional) + if _, err := dm.GetActionTriggers(aT[0].ID, false, utils.NonTransactional); err != nil { + t.Error(err) + } +} func TestGetSharedGroupRemote(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -1966,3 +1967,214 @@ func TestStatQueueProfileRemote(t *testing.T) { t.Error(err) } } + +func TestDMActionsRemote(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmpDm := dm + tmp := Cache + defer func() { + config.SetCgrConfig(config.NewDefaultCGRConfig()) + Cache = tmp + SetDataStorage(tmpDm) + }() + Cache.Clear(nil) + cfg.DataDbCfg().RplConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator)} + cfg.DataDbCfg().RmtConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator)} + cfg.DataDbCfg().RplFiltered = true + cfg.DataDbCfg().Items = map[string]*config.ItemOpt{ + utils.CacheActions: { + Limit: 3, + Replicate: true, + APIKey: "key", + RouteID: "route", + Remote: true, + }, + } + clientConn := make(chan rpcclient.ClientConnector, 1) + clientConn <- &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ReplicatorSv1SetActions: func(args, reply interface{}) error { + sArgApiOpts, cancast := args.(SetActionsArgsWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().SetActionsDrv(sArgApiOpts.Key, sArgApiOpts.Acs) + return nil + }, + utils.ReplicatorSv1GetActions: func(args, reply interface{}) error { + strApiOpts, cancast := args.(utils.StringWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().GetActionsDrv(strApiOpts.Arg) + return nil + }, + utils.ReplicatorSv1RemoveActions: func(args, reply interface{}) error { + strApiOpts, cancast := args.(utils.StringWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().RemoveActionsDrv(strApiOpts.Arg) + return nil + }, + }} + acs := Actions{{ + Id: "SHARED", + ActionType: utils.MetaTopUp, + ExpirationString: utils.MetaUnlimited}} + db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, + }) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) + config.SetCgrConfig(cfg) + SetDataStorage(dm) + if err := dm.SetActions("KeyActions", acs); err != nil { + t.Error(err) + } + Cache.Set(utils.CacheActions, "KeyActions", acs, []string{}, true, utils.NonTransactional) + if val, err := dm.GetActions("KeyActions", false, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(acs, val) { + t.Errorf("expected %+v,received %+v", utils.ToJSON(acs), utils.ToJSON(val)) + } + if err := dm.RemoveActions("KeyActions"); err != nil { + t.Error(err) + } + if _, has := db.db.Get(utils.CacheActions, "KeyActions"); has { + t.Error("shouln't be in db cache") + } +} + +func TestGetDispatcherHost(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmpDm := dm + tmp := Cache + defer func() { + config.SetCgrConfig(config.NewDefaultCGRConfig()) + Cache = tmp + SetDataStorage(tmpDm) + }() + Cache.Clear(nil) + cfg.DataDbCfg().RmtConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1)} + cfg.DataDbCfg().RmtConnID = "rmt" + cfg.GeneralCfg().NodeID = "node" + cfg.DataDbCfg().Items = map[string]*config.ItemOpt{ + utils.CacheDispatcherHosts: { + Limit: 3, + Remote: true, + APIKey: "key", + RouteID: "route", + }, + } + dh := &DispatcherHost{ + Tenant: "cgrates.org:HostID", + RemoteHost: &config.RemoteHost{ + ID: "Host1", + Address: "127.0.0.1:2012", + TLS: true, + Transport: utils.MetaJSON, + }, + } + clientConn := make(chan rpcclient.ClientConnector, 1) + clientConn <- &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ReplicatorSv1GetDispatcherHost: func(args, reply interface{}) error { + tntApiOpts, cancast := args.(*utils.TenantIDWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().GetDispatcherHostDrv(tntApiOpts.Tenant, tntApiOpts.ID) + *reply.(**DispatcherHost) = dh + return nil + }, + }, + } + db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, + }) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) + config.SetCgrConfig(cfg) + SetDataStorage(dm) + + if val, err := dm.GetDispatcherHost("cgrates.org", "HostID", false, true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(dh, val) { + t.Errorf("expected %v,received %v", utils.ToJSON(val), utils.ToJSON(dh)) + } +} + +func TestGetReverseDestinationRemote(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + tmpDm := dm + tmp := Cache + tmpConn := connMgr + defer func() { + config.SetCgrConfig(config.NewDefaultCGRConfig()) + Cache = tmp + connMgr = tmpConn + SetDataStorage(tmpDm) + }() + Cache.Clear(nil) + cfg.DataDbCfg().RmtConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1)} + cfg.CacheCfg().Partitions[utils.CacheReverseDestinations].Replicate = true + cfg.CacheCfg().ReplicationConns = []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches)} + cfg.DataDbCfg().RmtConnID = "rmt" + cfg.GeneralCfg().NodeID = "node" + cfg.DataDbCfg().Items = map[string]*config.ItemOpt{ + utils.CacheReverseDestinations: { + Limit: 3, + Remote: true, + APIKey: "key", + RouteID: "route", + }, + } + ids := []string{"dest1", "dest2"} + clientConn := make(chan rpcclient.ClientConnector, 1) + clientConn <- &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.ReplicatorSv1GetReverseDestination: func(args, reply interface{}) error { + strApiOpts, cancast := args.(*utils.StringWithAPIOpts) + if !cancast { + return utils.ErrNotConvertible + } + dm.DataDB().GetReverseDestinationDrv(strApiOpts.Arg, utils.NonTransactional) + *reply.(*[]string) = ids + return nil + }, + }, + } + db := NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + connMgr := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, + }) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) + config.SetCgrConfig(cfg) + SetDataStorage(dm) + if val, err := dm.GetReverseDestination("CRUDReverseDestination", false, true, utils.NonTransactional); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, ids) { + t.Errorf("expected %v,received %v", utils.ToJSON(ids), utils.ToJSON(val)) + } + Cache = NewCacheS(cfg, dm, nil) + clientConn2 := make(chan rpcclient.ClientConnector, 1) + clientConn2 <- &ccMock{ + calls: map[string]func(args interface{}, reply interface{}) error{ + utils.CacheSv1ReplicateSet: func(args, reply interface{}) error { + return errors.New("Can't replicate") + }, + }, + } + connMgr2 := NewConnManager(cfg, map[string]chan rpcclient.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn2, + }) + SetConnManager(connMgr2) + if _, err := dm.GetReverseDestination("CRUDReverseDestination", false, true, utils.NonTransactional); err == nil || err.Error() != "Can't replicate" { + t.Error(err) + } + var dm2 *DataManager + if _, err := dm2.GetReverseDestination("CRUDReverseDestination", false, true, utils.NonTransactional); err == nil || err != utils.ErrNoDatabaseConn { + t.Error(err) + } +} diff --git a/engine/dynamicdp_test.go b/engine/dynamicdp_test.go index d418eaf18..73e32a291 100644 --- a/engine/dynamicdp_test.go +++ b/engine/dynamicdp_test.go @@ -176,7 +176,7 @@ func TestDDPFieldAsInterface(t *testing.T) { utils.Logger.SetSyslog(nil) buf2 := new(bytes.Buffer) log.SetOutput(buf2) - expLog = ` when getting GeoLocation for number` + expLog = `when getting GeoLocation for number` if rcvLog := buf2.String(); strings.Contains(rcvLog, expLog) { t.Errorf("Logger %v doesn't contain %v", rcvLog, expLog) }