diff --git a/data/conf/samples/dispatcher_opts_host1/cgrates.json b/data/conf/samples/dispatcher_opts_host1/cgrates.json deleted file mode 100644 index 31b3e6602..000000000 --- a/data/conf/samples/dispatcher_opts_host1/cgrates.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - -"general": { - "node_id": "HOST1", - "log_level": 7 -}, - -"listen": { - "rpc_json": ":2012", - "rpc_gob": ":2013", - "http": ":2080" -}, - -"data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10" -}, - -"stor_db": { - "db_password": "CGRateS.org" -}, - -"dispatchers":{ - "enabled": true, - "prevent_loop": true -}, - -"caches":{ - "partitions": { - "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, - "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, - "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} - }, - //"replication_conns": ["gob_cache"] , - "remote_conns": ["gob_cache"] -}, - -"rpc_conns": { - "gob_cache": { - "strategy": "*first", - "conns": [ - {"address": "127.0.0.1:4013", "transport":"*gob"} - ] - } -}, - -"apiers": { - "enabled": true -} - - -} \ No newline at end of file diff --git a/data/conf/samples/dispatcher_opts_host2/cgrates.json b/data/conf/samples/dispatcher_opts_host2/cgrates.json deleted file mode 100644 index 46ce4b2ae..000000000 --- a/data/conf/samples/dispatcher_opts_host2/cgrates.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - -"general": { - "node_id": "HOST2", - "log_level": 7 -}, - -"listen": { - "rpc_json": ":4012", - "rpc_gob": ":4013", - "http": ":4080" -}, - -"data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10" -}, - -"stor_db": { - "db_password": "CGRateS.org" -}, - -"dispatchers":{ - "enabled": true, - "prevent_loop": true -}, - -"caches":{ - "partitions": { - "*dispatcher_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, - "*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false}, - "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "remote":true, "replicate": false} - }, - //"replication_conns": ["gob_cache"], - "remote_conns": ["gob_cache"] -}, - -"apiers": { - "enabled": true -}, - -"rpc_conns": { - "gob_cache": { - "strategy": "*first", - "conns": [ - {"address": "127.0.0.1:4013", "transport":"*gob"} - ] - } -} - -} \ No newline at end of file diff --git a/data/conf/samples/dispatcher_opts_setter/cgrates.json b/data/conf/samples/dispatcher_opts_setter/cgrates.json deleted file mode 100644 index f0dc0a028..000000000 --- a/data/conf/samples/dispatcher_opts_setter/cgrates.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - -"general": { - "node_id": "DispatcherOpts_Setter", - "log_level": 7 -}, - -"listen": { - "rpc_json": ":6012", - "rpc_gob": ":6013", - "http": ":6080" -}, - -"data_db": { - "db_type": "redis", - "db_port": 6379, - "db_name": "10" -}, - -"stor_db": { - "db_password": "CGRateS.org" -}, - -"dispatchers":{ - "enabled": false -}, - -"apiers": { - "enabled": true, - "caches_conns":["broadcast_cache"] -}, - -"rpc_conns": { - "broadcast_cache": { - "strategy": "*broadcast", - "conns": [ - {"address": "127.0.0.1:2012", "transport":"*json"}, - {"address": "127.0.0.1:4012", "transport":"*json"}, - {"address": "127.0.0.1:6012", "transport":"*json"} - ] - } -} - -} \ No newline at end of file diff --git a/general_tests/dispatcher_opts_it_test.go b/general_tests/dispatcher_opts_it_test.go deleted file mode 100644 index 08f466dd2..000000000 --- a/general_tests/dispatcher_opts_it_test.go +++ /dev/null @@ -1,830 +0,0 @@ -//go:build integration -// +build integration - -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package general_tests - -import ( - "path" - "reflect" - "testing" - "time" - - "github.com/cgrates/birpc" - "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -var ( - setterCfgPath string - cfg2CfgPath string - cfg1CfgPath string - setterCfg *config.CGRConfig - cfg2OptsCfg *config.CGRConfig - cfg1Cfg *config.CGRConfig - setterRPC *birpc.Client - cgr2RPC *birpc.Client - cgr1RPC *birpc.Client - cfg1ConfigDIR string - cfg2ConfigDIR string - setterConfigDIR string - dpsOptsTest = []func(t *testing.T){ - // FIRST PART OF THE TEST - // Start engine with Dispatcher on engine 2012 - testDispatcherCgr1InitCfg, - testDispatcherCgr1InitDataDb, - testDispatcherCgr1StartEngine, - testDispatcherCgr1RPCConn, - - // Sending Status requests in both engines, with *dispatchers:false - testDispatcherCgr2InitCfg, - testDispatcherCgr2StartEngine, - testDispatcherCgr2RPCConn, - - testDispatcherCgr1CoreStatus, // *disaptchers:false - testDispatcherCgr2CoreStatus, // *disaptchers:false - - testDispatcherGetItemBothEnginesFirstAttempt, // NOT FOUND - - testDispatcherCgr1StopEngine, - testDispatcherCgr2StopEngine, - - // SECOND PART OF THE TEST - // START HOST2 engine - - testDispatcherSetterInitCfg, - testDispatcherSetterStartEngine, - testDispatcherSetterRPCConn, - - testDispatcherCgr2StartEngine, - testDispatcherCgr2RPCConn, - - testDispatcherSetterSetDispatcherProfile, // contains both hosts, HOST1 prio, host2 backup - - testDispatcherCgr2CoreStatusWithRouteID, // HOST2 matched because HOST1 is not started yet - testDispatcherCgr2GetItemHOST2, - - // START HOST1 engine - testDispatcherCgr1StartEngine, - testDispatcherCgr1RPCConn, - testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt, // same HOST2 will be matched, due to routeID - - // clear cache in order to remove routeID - testDisaptcherCacheClear, - testDispatcherCgr1CoreStatusWithRouteIDButHost1, // due to clearing cache, HOST1 will be matched - - // verify cache of dispatchers, SetDispatcherProfile API should reload the dispatchers cache (instance, profile and route) - testDispatcherCgr1CheckCacheAfterRouting, - testDispatcherSetterSetDispatcherProfileOverwrite, - testDispatcherCheckCacheAfterSetDispatcherDSP1, - testDispatcherSetterSetAnotherProifle, //DSP2 - testDispatcherCheckCacheAfterSetDispatcherDSP1, //we set DSP2, so for DSP1 nothing changed - testDispatcherCheckCacheAfterSetDispatcherDSP2, //NOT_FOUND for every get, cause it was not used that profile before - - testDispatcherCgr1StopEngine, - testDispatcherCgr2StopEngine, - } -) - -func TestDispatcherOpts(t *testing.T) { - for _, test := range dpsOptsTest { - t.Run("dispatcher-opts", test) - } -} - -func testDispatcherCgr1InitCfg(t *testing.T) { - cfg1ConfigDIR = "dispatcher_opts_host1" - var err error - cfg1CfgPath = path.Join(*utils.DataDir, "conf", "samples", cfg1ConfigDIR) - cfg1Cfg, err = config.NewCGRConfigFromPath(cfg1CfgPath) - if err != nil { - t.Error(err) - } -} - -func testDispatcherCgr1InitDataDb(t *testing.T) { - if err := engine.InitDataDb(cfg1Cfg); err != nil { - t.Fatal(err) - } -} - -// Start CGR Engine woth Dispatcher enabled -func testDispatcherCgr1StartEngine(t *testing.T) { - if _, err := engine.StartEngine(cfg1CfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } -} - -func testDispatcherCgr1RPCConn(t *testing.T) { - var err error - cgr1RPC, err = newRPCClient(cfg1Cfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed - if err != nil { - t.Fatal(err) - } -} - -func testDispatcherCgr2InitCfg(t *testing.T) { - cfg2ConfigDIR = "dispatcher_opts_host2" //changed with the cfg with dispatcher on - var err error - cfg2CfgPath = path.Join(*utils.DataDir, "conf", "samples", cfg2ConfigDIR) - cfg2OptsCfg, err = config.NewCGRConfigFromPath(cfg2CfgPath) - if err != nil { - t.Error(err) - } -} - -// Start CGR Engine woth Dispatcher enabled -func testDispatcherCgr2StartEngine(t *testing.T) { - if _, err := engine.StartEngine(cfg2CfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } -} - -func testDispatcherCgr2RPCConn(t *testing.T) { - var err error - cgr2RPC, err = newRPCClient(cfg2OptsCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed - if err != nil { - t.Fatal(err) - } -} - -func testDispatcherCgr1CoreStatus(t *testing.T) { - // HOST1 host matched :2012 - var reply map[string]any - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.OptsRouteID: "account#dan.bogos", - utils.MetaDispatchers: false, - }, - } - if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else if reply[utils.NodeID] != "HOST1" { - t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) - } -} - -func testDispatcherCgr2CoreStatus(t *testing.T) { - // HOST2 host matched because it was called from engine with port :4012 -> host2 - var reply map[string]any - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.OptsRouteID: "account#dan.bogos", - utils.MetaDispatchers: false, - }, - } - if err := cgr2RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else if reply[utils.NodeID] != "HOST2" { - t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) - } -} - -func testDispatcherGetItemBothEnginesFirstAttempt(t *testing.T) { - // get for *dispatcher_routes - argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherRoutes, - ItemID: "account#dan.bogos:*core", - }, - } - var reply any - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - - // get for *dispatcher_profiles - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherProfiles, - ItemID: "cgrates.org:DSP1", - }, - } - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - - // get for *dispatchers - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatchers, - ItemID: "cgrates.org:DSP1", - }, - } - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Error(err) - } -} - -func testDispatcherSetterInitCfg(t *testing.T) { - setterConfigDIR = "dispatcher_opts_setter" - var err error - setterCfgPath = path.Join(*utils.DataDir, "conf", "samples", setterConfigDIR) - setterCfg, err = config.NewCGRConfigFromPath(setterCfgPath) - if err != nil { - t.Error(err) - } -} - -func testDispatcherSetterStartEngine(t *testing.T) { - if _, err := engine.StartEngine(setterCfgPath, *utils.WaitRater); err != nil { - t.Fatal(err) - } -} - -func testDispatcherSetterRPCConn(t *testing.T) { - var err error - setterRPC, err = newRPCClient(setterCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed - if err != nil { - t.Fatal(err) - } -} - -func testDispatcherSetterSetDispatcherProfile(t *testing.T) { - // Set DispatcherHost - var replyStr string - setDispatcherHost := &engine.DispatcherHostWithAPIOpts{ - DispatcherHost: &engine.DispatcherHost{ - Tenant: "cgrates.org", - RemoteHost: &config.RemoteHost{ - ID: "HOST1", - Address: "127.0.0.1:2012", // CGR1 - Transport: "*json", - ConnectAttempts: 1, - Reconnects: 3, - ConnectTimeout: time.Minute, - ReplyTimeout: 2 * time.Minute, - }, - }, - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - } - if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } - - setDispatcherHost = &engine.DispatcherHostWithAPIOpts{ - DispatcherHost: &engine.DispatcherHost{ - Tenant: "cgrates.org", - RemoteHost: &config.RemoteHost{ - ID: "HOST2", - Address: "127.0.0.1:4012", // CGR2 - Transport: "*json", - ConnectAttempts: 1, - Reconnects: 3, - ConnectTimeout: time.Minute, - ReplyTimeout: 2 * time.Minute, - }, - }, - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - } - if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherHost, setDispatcherHost, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherHost: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } - - // Set DispatcherProfile - setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{ - DispatcherProfile: &engine.DispatcherProfile{ - Tenant: "cgrates.org", - ID: "DSP1", - Strategy: "*weight", - Weight: 10, - Subsystems: []string{utils.MetaAny}, - Hosts: engine.DispatcherHostProfiles{ - { - ID: "HOST1", - Weight: 10, - }, - { - ID: "HOST2", - Weight: 5, - }, - }, - }, - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - } - if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } -} - -func testDispatcherCgr2CoreStatusWithRouteID(t *testing.T) { - var reply map[string]any - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.OptsRouteID: "account#dan.bogos", - }, - } - // even if HOST1 is prio, this engine was not staretd yet, so HOST2 matched - if err := cgr2RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else if reply[utils.NodeID] != "HOST2" { - t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) - } -} - -func testDispatcherCgr1CoreStatusWithRouteIDSecondAttempt(t *testing.T) { - var reply map[string]any - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.OptsRouteID: "account#dan.bogos", - }, - } - // same HOST2 will be matched, due to routeID - if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else if reply[utils.NodeID] != "HOST2" { - t.Errorf("Expected HOST2, received %v", reply[utils.NodeID]) - } -} - -func testDispatcherCgr2GetItemHOST2(t *testing.T) { - // get for *dispatcher_routes - argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherRoutes, - ItemID: "account#dan.bogos:*core", - }, - } - var reply any - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } else { - expected := map[string]any{ - utils.Tenant: "cgrates.org", - utils.ProfileID: "DSP1", - "HostID": "HOST2", - } - if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) - } - } - - // get for *dispatcher_profiles - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherProfiles, - ItemID: "cgrates.org:DSP1", - }, - } - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } else { - expected := map[string]any{ - utils.FilterIDs: nil, - "Hosts": []any{ - map[string]any{ - utils.Blocker: false, - utils.FilterIDs: nil, - utils.ID: "HOST1", - utils.Params: nil, - utils.Weight: 10., - }, - map[string]any{ - utils.Blocker: false, - utils.FilterIDs: nil, - utils.ID: "HOST2", - utils.Params: nil, - utils.Weight: 5., - }, - }, - utils.ActivationIntervalString: nil, - utils.ID: "DSP1", - utils.Strategy: "*weight", - utils.Subsystems: []any{"*any"}, - "StrategyParams": nil, - utils.Tenant: "cgrates.org", - utils.Weight: 10., - } - if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) - } - } - - // get for *dispatchers - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatchers, - ItemID: "cgrates.org:DSP1", - }, - } - // reply here is an interface type(singleResultDispatcher), it exists - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } -} - -func testDisaptcherCacheClear(t *testing.T) { - var reply string - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Errorf("Unexpected reply returned") - } - - if err := cgr2RPC.Call(context.Background(), utils.CacheSv1Clear, &utils.AttrCacheIDsWithAPIOpts{ - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - }, &reply); err != nil { - t.Fatal(err) - } else if reply != utils.OK { - t.Errorf("Unexpected reply returned") - } -} - -func testDispatcherCgr1CoreStatusWithRouteIDButHost1(t *testing.T) { - var reply map[string]any - ev := utils.TenantWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.OptsRouteID: "account#dan.bogos", - }, - } - // as the cache was cleared, HOST1 will match due to his high prio, and it will be set as *dispatcher_routes as HOST1 - if err := cgr1RPC.Call(context.Background(), utils.CoreSv1Status, &ev, &reply); err != nil { - t.Error(err) - } else if reply[utils.NodeID] != "HOST1" { - t.Errorf("Expected HOST1, received %v", reply[utils.NodeID]) - } -} - -func testDispatcherCgr1CheckCacheAfterRouting(t *testing.T) { - // get for *dispatcher_routes - argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherRoutes, - ItemID: "account#dan.bogos:*core", - }, - } - var reply any - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } else { - expected := map[string]any{ - utils.Tenant: "cgrates.org", - utils.ProfileID: "DSP1", - "HostID": "HOST1", - } - if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) - } - } - - // get for *dispatcher_profiles - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherProfiles, - ItemID: "cgrates.org:DSP1", - }, - } - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } else { - expected := map[string]any{ - utils.ActivationIntervalString: nil, - utils.FilterIDs: nil, - "Hosts": []any{ - map[string]any{ - utils.Blocker: false, - utils.FilterIDs: nil, - utils.ID: "HOST1", - utils.Params: nil, - utils.Weight: 10., - }, - map[string]any{ - utils.Blocker: false, - utils.FilterIDs: nil, - utils.ID: "HOST2", - utils.Params: nil, - utils.Weight: 5., - }, - }, - utils.ID: "DSP1", - utils.Strategy: "*weight", - utils.Subsystems: []any{"*any"}, - "StrategyParams": nil, - utils.Tenant: "cgrates.org", - utils.Weight: 10., - } - if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) - } - } - - // get for *dispatchers - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatchers, - ItemID: "cgrates.org:DSP1", - }, - } - // reply here is an interface type(singleResultDispatcher), it exists - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } -} - -func testDispatcherSetterSetDispatcherProfileOverwrite(t *testing.T) { - // as the cache was cleared, and previously the HOST1 matched, setting the profile with only HOST2 will remove the - // DispatcherRoutes, DispatcherProfile and the DispatcherInstance - var replyStr string - // Set DispatcherProfile - setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{ - DispatcherProfile: &engine.DispatcherProfile{ - Tenant: "cgrates.org", - ID: "DSP1", - Strategy: "*weight", - Weight: 10, - Subsystems: []string{utils.MetaAny}, - Hosts: engine.DispatcherHostProfiles{ - { - ID: "HOST2", - Weight: 5, - }, - }, - }, - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - } - if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } - time.Sleep(100 * time.Millisecond) -} - -func testDispatcherCheckCacheAfterSetDispatcherDSP1(t *testing.T) { - // get for *dispatcher_routes - argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - "adi3": "nu", - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherRoutes, - ItemID: "account#dan.bogos:*core", - }, - } - var reply any // Should receive NOT_FOUND, as CallCache that was called in API will remove the DispatcherRoute - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v", err) - } - - // get for *dispatcher_profiles - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - "adi2": "nu", - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherProfiles, - ItemID: "cgrates.org:DSP1", - }, - } - // as the DSP1 profile was overwritten, only HOST2 in profile will be contained - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err != nil { - t.Error(err) - } else { - expected := map[string]any{ - utils.FilterIDs: nil, - "Hosts": []any{ - map[string]any{ - utils.Blocker: false, - utils.FilterIDs: nil, - utils.ID: "HOST2", - utils.Params: nil, - utils.Weight: 5., - }, - }, - "ActivationInterval": nil, - "Subsystems": []any{"*any"}, - utils.ID: "DSP1", - utils.Strategy: "*weight", - "StrategyParams": nil, - utils.Tenant: "cgrates.org", - utils.Weight: 10., - } - if !reflect.DeepEqual(expected, reply) { - t.Errorf("Expected %+v, \n received %+v", utils.ToJSON(expected), utils.ToJSON(reply)) - } - } - - // get for *dispatchers - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - "adi1": "nu", - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatchers, - ItemID: "cgrates.org:DSP1", - }, - } - // DispatcherInstance should also be removed, so it will be NOT_FOUND - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v and reply: %v", err, reply) - } -} - -func testDispatcherSetterSetAnotherProifle(t *testing.T) { - var replyStr string - // Set DispatcherProfile DSP2 with the existing hosts - setDispatcherProfile := &engine.DispatcherProfileWithAPIOpts{ - DispatcherProfile: &engine.DispatcherProfile{ - Tenant: "cgrates.org", - ID: "DSP2", - Strategy: "*weight", - Weight: 20, - Subsystems: []string{utils.MetaAny}, - Hosts: engine.DispatcherHostProfiles{ - { - ID: "HOST1", - Weight: 50, - }, - { - ID: "HOST2", - Weight: 125, - }, - }, - }, - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - } - if err := setterRPC.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, setDispatcherProfile, &replyStr); err != nil { - t.Error("Unexpected error when calling APIerSv1.SetDispatcherProfile: ", err) - } else if replyStr != utils.OK { - t.Error("Unexpected reply returned", replyStr) - } - time.Sleep(100 * time.Millisecond) -} - -func testDispatcherCheckCacheAfterSetDispatcherDSP2(t *testing.T) { - // get for *dispatcher_routes - argsCache := &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherRoutes, - ItemID: "account#dan.bogos:*core", - }, - } - var reply any - // NOT_FOUND - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v", err) - } - - // get for *dispatcher_profiles - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatcherProfiles, - ItemID: "cgrates.org:DSP2", - }, - } - // NOT_FOUND - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v", err) - } - - // get for *dispatchers - argsCache = &utils.ArgsGetCacheItemWithAPIOpts{ - Tenant: "cgrates.org", - APIOpts: map[string]any{ - utils.MetaDispatchers: false, - }, - ArgsGetCacheItem: utils.ArgsGetCacheItem{ - CacheID: utils.CacheDispatchers, - ItemID: "cgrates.org:DSP2", - }, - } - // NOT_FOUND - if err := cgr1RPC.Call(context.Background(), utils.CacheSv1GetItem, argsCache, - &reply); err == nil || err.Error() != utils.ErrNotFound.Error() { - t.Errorf("Unexpected error returned: %v", err) - } -} - -func testDispatcherCgr1StopEngine(t *testing.T) { - if err := engine.KillEngine(*utils.WaitRater); err != nil { - t.Error(err) - } -} - -func testDispatcherCgr2StopEngine(t *testing.T) { - if err := engine.KillEngine(*utils.WaitRater); err != nil { - t.Error(err) - } -} diff --git a/general_tests/dsp_routes_it_test.go b/general_tests/dsp_routes_it_test.go new file mode 100644 index 000000000..ffd9fb684 --- /dev/null +++ b/general_tests/dsp_routes_it_test.go @@ -0,0 +1,465 @@ +//go:build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package general_tests + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "testing" + "time" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +const ( + host1Cfg = `{ +"general": { + "node_id": "host1", + "log_level": 7 +}, +"listen": { + "rpc_json": ":4012", + "rpc_gob": ":4013", + "http": ":4080" +}, +"dispatchers":{ + "enabled": true, + "prevent_loop": true +}, +"caches":{ + "partitions": { + "*dispatcher_profiles": { + "limit": -1, + "remote":true + }, + "*dispatcher_routes": { + "limit": -1, + "remote":true + }, + "*dispatchers": { + "limit": -1, + "remote":true + } + }, + "remote_conns": ["gob_cache"] +}, +"rpc_conns": { + "gob_cache": { + "strategy": "*first", + "conns": [ + { + "address": "127.0.0.1:6013", + "transport": "*gob" + } + ] + } +}, +"apiers": { + "enabled": true +} +}` + host2Cfg = `{ +"general": { + "node_id": "host2", + "log_level": 7 +}, +"listen": { + "rpc_json": ":6012", + "rpc_gob": ":6013", + "http": ":6080" +}, +"dispatchers":{ + "enabled": true, + "prevent_loop": true +}, +"caches":{ + "partitions": { + "*dispatcher_profiles": { + "limit": -1, + "remote":true + }, + "*dispatcher_routes": { + "limit": -1, + "remote":true + }, + "*dispatchers": { + "limit": -1, + "remote":true + } + }, + "remote_conns": ["gob_cache"] +}, +"apiers": { + "enabled": true +}, +"rpc_conns": { + "gob_cache": { + "strategy": "*first", + "conns": [ + { + "address": "127.0.0.1:6013", + "transport":"*gob" + } + ] + } +} +}` + hostSetterCfg = `{ +"general": { + "node_id": "setter", + "log_level": 7 +}, +"apiers": { + "enabled": true, + "caches_conns": ["broadcast_cache"] +}, +"rpc_conns": { + "broadcast_cache": { + "strategy": "*broadcast", + "conns": [ + { + "address": "127.0.0.1:2012", + "transport": "*json" + }, + { + "address": "127.0.0.1:4012", + "transport": "*json" + }, + { + "address": "127.0.0.1:6012", + "transport": "*json" + } + ] + } +} +}` +) + +func TestDispatcherRoutesNotFound(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + host1 := TestEngine{ // first engine, port 4012 + ConfigJSON: host1Cfg, + } + ng1Client, _ := host1.Run(t) + host2 := TestEngine{ // second engine, port 6012 + ConfigJSON: host2Cfg, + } + ng2Client, _ := host2.Run(t) + + // Send Status requests with *dispatchers on false. + checkStatus(t, ng1Client, false, "account#dan.bogos", "host1") + checkStatus(t, ng2Client, false, "account#dan.bogos", "host2") + + // Check that dispatcher routes were not cached due to *dispatchers being false. + getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) + getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) +} + +func TestDispatcherRoutes(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("unsupported dbtype value") + } + + setter := TestEngine{ // engine used to set dispatcher hosts/profiles (:2012) + ConfigJSON: hostSetterCfg, + } + setterClient, _ := setter.Run(t) + + // Starting only the second dispatcher engine, for now. + host2 := TestEngine{ + ConfigJSON: host2Cfg, + PreserveDataDB: true, + PreserveStorDB: true, + } + ng2Client, _ := host2.Run(t) + + setDispatcherHost(t, setterClient, "host1", 4012) + setDispatcherHost(t, setterClient, "host2", 6012) + setDispatcherProfile(t, setterClient, "dsp_test", "host1;10", "host2;5") + + // Send status request to the second engine. "host2" will match, even though "host1" has the bigger weight. + // That's because the first engine has not been started yet. + checkStatus(t, ng2Client, true, "account#dan.bogos", "host2") + + // Check that the dispatcher route has been cached (same for the profile and the dispatcher itself). + getCacheItem(t, ng2Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", map[string]any{ + utils.Tenant: "cgrates.org", + utils.ProfileID: "dsp_test", + "HostID": "host2", + }) + getCacheItem(t, ng2Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test", map[string]any{ + utils.FilterIDs: nil, + "Hosts": []any{ + map[string]any{ + utils.Blocker: false, + utils.FilterIDs: nil, + utils.ID: "host1", + utils.Params: nil, + utils.Weight: 10., + }, + map[string]any{ + utils.Blocker: false, + utils.FilterIDs: nil, + utils.ID: "host2", + utils.Params: nil, + utils.Weight: 5., + }, + }, + utils.ActivationIntervalString: nil, + utils.ID: "dsp_test", + utils.Strategy: "*weight", + utils.Subsystems: []any{"*any"}, + "StrategyParams": nil, + utils.Tenant: "cgrates.org", + utils.Weight: 0., + }) + + // Reply represents a singleResultDispatcher. Unexported, so it's enough to check if it exists. + getCacheItem(t, ng2Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", map[string]any{}) + + // Start the first engine. + host1 := TestEngine{ + ConfigJSON: host1Cfg, + PreserveDataDB: true, + PreserveStorDB: true, + } + ng1Client, _ := host1.Run(t) + + // "host2" will match again due to being cached previously. + checkStatus(t, ng1Client, true, "account#dan.bogos", "host2") + + // Clear cache and try again. + clearCache(t, ng1Client) + clearCache(t, ng2Client) + + // This time it will match "host1" which has the bigger weight. + checkStatus(t, ng1Client, true, "account#dan.bogos", "host1") + + // Check the relevant cache items. Should be the same as before, the difference being the HostID + // from *dispatcher_routes ("host1" instead of "host2"). + getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", + map[string]any{ + utils.Tenant: "cgrates.org", + utils.ProfileID: "dsp_test", + "HostID": "host1", + }) + getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test", + map[string]any{ + utils.ActivationIntervalString: nil, + utils.FilterIDs: nil, + "Hosts": []any{ + map[string]any{ + utils.Blocker: false, + utils.FilterIDs: nil, + utils.ID: "host1", + utils.Params: nil, + utils.Weight: 10., + }, + map[string]any{ + utils.Blocker: false, + utils.FilterIDs: nil, + utils.ID: "host2", + utils.Params: nil, + utils.Weight: 5., + }, + }, + utils.ID: "dsp_test", + utils.Strategy: "*weight", + utils.Subsystems: []any{"*any"}, + "StrategyParams": nil, + utils.Tenant: "cgrates.org", + utils.Weight: 0., + }) + getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", map[string]any{}) + + // Overwrite the DispatcherProfile (removed host1). + setDispatcherProfile(t, setterClient, "dsp_test", "host2;5") + time.Sleep(5 * time.Millisecond) // wait for cache updates to reach all external engines + + // Check that related cache items have been updated automatically. + + // Check that cache dispatcher route/ dispatcher instance was cleared, + // as previously "host1" matched (which is now removed). + getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) + getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test", + map[string]any{ + utils.ActivationIntervalString: nil, + utils.FilterIDs: nil, + "Hosts": []any{ + map[string]any{ + utils.Blocker: false, + utils.FilterIDs: nil, + utils.ID: "host2", + utils.Params: nil, + utils.Weight: 5., + }, + }, + utils.ID: "dsp_test", + utils.Strategy: "*weight", + utils.Subsystems: []any{"*any"}, + "StrategyParams": nil, + utils.Tenant: "cgrates.org", + utils.Weight: 0., + }) + getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test", nil) + + // Nothing happens when setting a different dispatcher profile that's using the same hosts as before. + setDispatcherProfile(t, setterClient, "dsp_test2", "host1;50", "host2;150") + getCacheItem(t, ng1Client, false, utils.CacheDispatcherRoutes, "account#dan.bogos:*core", nil) + getCacheItem(t, ng1Client, false, utils.CacheDispatcherProfiles, "cgrates.org:dsp_test2", nil) + getCacheItem(t, ng1Client, false, utils.CacheDispatchers, "cgrates.org:dsp_test2", nil) +} + +func checkStatus(t *testing.T, client *birpc.Client, dispatch bool, routeID, expNodeID string) { + t.Helper() + args := &utils.TenantWithAPIOpts{ + Tenant: "cgrates.org", + APIOpts: map[string]any{ + utils.OptsRouteID: routeID, + utils.MetaDispatchers: dispatch, + }, + } + var reply map[string]any + if err := client.Call(context.Background(), utils.CoreSv1Status, args, &reply); err != nil { + t.Errorf("CoreSv1.Status unexpected err: %v", err) + } else if nodeID := reply[utils.NodeID]; nodeID != expNodeID { + t.Errorf("CoreSv1.Status NodeID=%q, want %q", nodeID, expNodeID) + } +} + +func getCacheItem(t *testing.T, client *birpc.Client, dispatch bool, cacheID, itemID string, expItem any) { + t.Helper() + args := &utils.ArgsGetCacheItemWithAPIOpts{ + Tenant: "cgrates.org", + ArgsGetCacheItem: utils.ArgsGetCacheItem{ + CacheID: cacheID, + ItemID: itemID, + }, + } + if !dispatch { + args.APIOpts = map[string]any{ + utils.MetaDispatchers: dispatch, + } + } + var reply any + err := client.Call(context.Background(), utils.CacheSv1GetItem, args, &reply) + if expItem != nil { + if err != nil { + t.Fatalf("CacheSv1.GetItem unexpected err: %v", err) + } + if !reflect.DeepEqual(reply, expItem) { + t.Errorf("CacheSv1.GetItem = %s, want %s", utils.ToJSON(reply), utils.ToJSON(expItem)) + } + return + } + if err == nil || err.Error() != utils.ErrNotFound.Error() { + t.Errorf("CacheSv1.GetItem err=%v, want %v", err, utils.ErrNotFound) + } +} + +func clearCache(t *testing.T, client *birpc.Client) { + t.Helper() + var reply string + if err := client.Call(context.Background(), utils.CacheSv1Clear, + &utils.AttrCacheIDsWithAPIOpts{ + APIOpts: map[string]any{ + utils.MetaDispatchers: false, + }, + }, &reply); err != nil { + t.Fatalf("CacheSv1.Clear unexpected err: %v", err) + } +} + +func setDispatcherHost(t *testing.T, client *birpc.Client, id string, port int) { + t.Helper() + var reply string + if err := client.Call(context.Background(), utils.APIerSv1SetDispatcherHost, + &engine.DispatcherHostWithAPIOpts{ + DispatcherHost: &engine.DispatcherHost{ + Tenant: "cgrates.org", + RemoteHost: &config.RemoteHost{ + ID: id, + Address: fmt.Sprintf("127.0.0.1:%d", port), + Transport: "*json", + ConnectAttempts: 1, + Reconnects: 3, + ConnectTimeout: time.Second, + ReplyTimeout: 2 * time.Second, + }, + }, + APIOpts: map[string]any{ + utils.MetaDispatchers: false, + }, + }, &reply); err != nil { + t.Errorf("APIerSv1.SetDispatcherHost unexpected err: %v", err) + } +} + +func setDispatcherProfile(t *testing.T, client *birpc.Client, id string, hosts ...string) { + t.Helper() + hostPrfs := make(engine.DispatcherHostProfiles, 0, len(hosts)) + for _, host := range hosts { + host, weightStr, found := strings.Cut(host, ";") + if !found { + t.Fatal("hosts don't respect the 'host;weight' format") + } + weight, err := strconv.ParseFloat(weightStr, 64) + if err != nil { + t.Fatal(err) + } + hostPrfs = append(hostPrfs, &engine.DispatcherHostProfile{ + ID: host, + Weight: weight, + }) + } + + var reply string + if err := client.Call(context.Background(), utils.APIerSv1SetDispatcherProfile, &engine.DispatcherProfileWithAPIOpts{ + DispatcherProfile: &engine.DispatcherProfile{ + Tenant: "cgrates.org", + ID: id, + Strategy: "*weight", + Subsystems: []string{utils.MetaAny}, + Hosts: hostPrfs, + }, + APIOpts: map[string]any{ + utils.MetaDispatchers: false, + }, + }, &reply); err != nil { + t.Errorf("APIerSv1.SetDispatcherProfile unexpected err: %v", err) + } +}