/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see */ package dispatchers import ( "net/rpc" "reflect" "testing" "time" "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/google/go-cmp/cmp" ) func TestLoadMetricsGetHosts(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}}, } lm, err := newLoadMetrics(dhp, 1) if err != nil { t.Fatal(err) } hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs()) // to prevent randomness we increment all loads exept the first one for _, hst := range hostsIDs[1:] { lm.incrementLoad(hst, utils.EmptyString) } // check only the first host because the rest may be in a random order // because they share the same cost if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_1" { t.Errorf("Expected: %q ,received: %q", "DSP_1", rply[0].ID) } lm.incrementLoad(hostsIDs[0], utils.EmptyString) lm.decrementLoad(hostsIDs[1], utils.EmptyString) if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" { t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID) } for _, hst := range hostsIDs { lm.incrementLoad(hst, utils.EmptyString) } if rply := lm.getHosts(dhp.Clone()); rply[0].ID != "DSP_2" { t.Errorf("Expected: %q ,received: %q", "DSP_2", rply[0].ID) } } func TestNewSingleDispatcher(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1"}, {ID: "DSP_2"}, {ID: "DSP_3"}, {ID: "DSP_4"}, {ID: "DSP_5"}, } var exp Dispatcher = &singleResultDispatcher{hosts: dhp} if rply, err := newSingleDispatcher(dhp, map[string]any{}, utils.EmptyString, nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: singleResultDispatcher structure,received: %s", utils.ToJSON(rply)) } dhp = engine.DispatcherHostProfiles{ {ID: "DSP_1"}, {ID: "DSP_2"}, {ID: "DSP_3"}, {ID: "DSP_4"}, {ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}}, } exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 1, } if rply, err := newSingleDispatcher(dhp, map[string]any{}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } dhp = engine.DispatcherHostProfiles{ {ID: "DSP_1"}, {ID: "DSP_2"}, {ID: "DSP_3"}, {ID: "DSP_4"}, } exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 2, } if rply, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: 2}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } exp = &loadDispatcher{ hosts: dhp, tntID: "cgrates.org", defaultRatio: 0, } if rply, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: 0}, "cgrates.org", nil); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, rply) { t.Errorf("Expected: loadDispatcher structure,received: %s", utils.ToJSON(rply)) } if _, err := newSingleDispatcher(dhp, map[string]any{utils.MetaDefaultRatio: "A"}, "cgrates.org", nil); err == nil { t.Fatalf("Expected error received: %v", err) } } func TestNewLoadMetrics(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 0}}, {ID: "DSP_3"}, } exp := &LoadMetrics{ HostsLoad: map[string]int64{}, HostsRatio: map[string]int64{ "DSP_1": 1, "DSP_2": 0, "DSP_3": 2, }, } if lm, err := newLoadMetrics(dhp, 2); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(exp, lm) { t.Errorf("Expected: %s ,received: %s", utils.ToJSON(exp), utils.ToJSON(lm)) } dhp = engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]any{utils.MetaRatio: "A"}}, } if _, err := newLoadMetrics(dhp, 2); err == nil { t.Errorf("Expected error received: %v", err) } } func TestLoadMetricsGetHosts2(t *testing.T) { dhp := engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]any{utils.MetaRatio: 2}}, {ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 3}}, {ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 5}}, {ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_6", Params: map[string]any{utils.MetaRatio: 0}}, } lm, err := newLoadMetrics(dhp, 1) if err != nil { t.Fatal(err) } hostsIDs := engine.DispatcherHostIDs(dhp.HostIDs()) exp := []string(hostsIDs.Clone())[:5] if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } for i := 0; i < 100; i++ { for _, dh := range dhp { for j := int64(0); j < lm.HostsRatio[dh.ID]; j++ { if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected for id<%s>: %+v ,received: %+v", dh.ID, exp, rply) } lm.incrementLoad(dh.ID, utils.EmptyString) } exp = append(exp[1:], exp[0]) } exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"} if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.decrementLoad("DSP_4", utils.EmptyString) lm.decrementLoad("DSP_4", utils.EmptyString) lm.decrementLoad("DSP_2", utils.EmptyString) exp = []string{"DSP_2", "DSP_4", "DSP_1", "DSP_3", "DSP_5"} if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_2", utils.EmptyString) exp = []string{"DSP_4", "DSP_1", "DSP_2", "DSP_3", "DSP_5"} if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_4", utils.EmptyString) if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad("DSP_4", utils.EmptyString) exp = []string{"DSP_1", "DSP_2", "DSP_3", "DSP_4", "DSP_5"} if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } } dhp = engine.DispatcherHostProfiles{ {ID: "DSP_1", Params: map[string]any{utils.MetaRatio: -1}}, {ID: "DSP_2", Params: map[string]any{utils.MetaRatio: 3}}, {ID: "DSP_3", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_4", Params: map[string]any{utils.MetaRatio: 5}}, {ID: "DSP_5", Params: map[string]any{utils.MetaRatio: 1}}, {ID: "DSP_6", Params: map[string]any{utils.MetaRatio: 0}}, } lm, err = newLoadMetrics(dhp, 1) if err != nil { t.Fatal(err) } hostsIDs = engine.DispatcherHostIDs(dhp.HostIDs()) exp = []string(hostsIDs.Clone())[:5] if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } for i := 0; i < 100; i++ { if rply := lm.getHosts(dhp.Clone()); !reflect.DeepEqual(exp, rply.HostIDs()) { t.Errorf("Expected: %+v ,received: %+v", exp, rply) } lm.incrementLoad(exp[0], utils.EmptyString) } } func TestLibDispatcherNewDispatcherMetaWeight(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaWeight, } result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } expected := &singleResultDispatcher{ hosts: engine.DispatcherHostProfiles{}, sorter: new(noSort), } if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", result.(*singleResultDispatcher).sorter, expected.sorter) } } func TestLibDispatcherNewDispatcherMetaWeightErr(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]any{ utils.MetaDefaultRatio: false, }, Strategy: utils.MetaWeight, } _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherNewDispatcherMetaRandom(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaRandom, } result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } expected := &singleResultDispatcher{ hosts: engine.DispatcherHostProfiles{}, sorter: new(randomSort), } if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter) } if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } } func TestLibDispatcherNewDispatcherMetaRandomErr(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]any{ utils.MetaDefaultRatio: false, }, Strategy: utils.MetaRandom, } _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherNewDispatcherMetaRoundRobin(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: utils.MetaRoundRobin, } result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } expected := &singleResultDispatcher{ hosts: engine.DispatcherHostProfiles{}, sorter: new(roundRobinSort), } if !reflect.DeepEqual(result.(*singleResultDispatcher).sorter, expected.sorter) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.sorter, result.(*singleResultDispatcher).sorter) } if !reflect.DeepEqual(result.(*singleResultDispatcher).hosts, expected.hosts) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*singleResultDispatcher).hosts) } } func TestLibDispatcherNewDispatcherMetaRoundRobinErr(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, StrategyParams: map[string]any{ utils.MetaDefaultRatio: false, }, Strategy: utils.MetaRoundRobin, } _, err := newDispatcher(pfl) expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherNewDispatcherPoolBroadcast(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: rpcclient.PoolBroadcast, } result, err := newDispatcher(pfl) if err != nil { t.Errorf("\nExpected , \nReceived <%+v>", err) } expected := &broadcastDispatcher{ hosts: engine.DispatcherHostProfiles{}, strategy: pfl.Strategy, } if !reflect.DeepEqual(result.(*broadcastDispatcher).strategy, expected.strategy) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.strategy, result.(*broadcastDispatcher).strategy) } if !reflect.DeepEqual(result.(*broadcastDispatcher).hosts, expected.hosts) { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected.hosts, result.(*broadcastDispatcher).hosts) } } func TestLibDispatcherNewDispatcherError(t *testing.T) { pfl := &engine.DispatcherProfile{ Hosts: engine.DispatcherHostProfiles{}, Strategy: "badStrategy", } expected := "unsupported dispatch strategy: " _, err := newDispatcher(pfl) if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherSingleResultDispatcherDispatch(t *testing.T) { wgDsp := &singleResultDispatcher{sorter: new(noSort)} dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherSingleResultDispatcherDispatchRouteID(t *testing.T) { wgDsp := &singleResultDispatcher{sorter: new(roundRobinSort)} dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherBroadcastDispatcherDispatch(t *testing.T) { wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherBroadcastDispatcherDispatchRouteID(t *testing.T) { wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherLoadDispatcherDispatch(t *testing.T) { wgDsp := &loadDispatcher{sorter: new(randomSort)} dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherLoadDispatcherDispatchHostsID(t *testing.T) { wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ {ID: "hostID1"}, {ID: "hostID2"}, }, sorter: new(noSort), } dataDB, dErr := engine.NewInternalDB(nil, nil, true, nil, config.CgrConfig().DataDbCfg().Items) if dErr != nil { t.Error(dErr) } dM := engine.NewDataManager(dataDB, config.CgrConfig().CacheCfg(), nil) err := wgDsp.Dispatch(dM, nil, nil, "", "routeID", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherLoadStrategyDispatchCaseCallError2(t *testing.T) { wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ { ID: "testID2", // FilterIDs: []string{"filterID"}, Weight: 4, Params: map[string]any{ utils.MetaRatio: 1, }, Blocker: false, }, }, defaultRatio: 1, sorter: new(noSort), } err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() newCache := engine.NewCacheS(cfg, nil, nil) engine.Cache = newCache engine.Cache.SetWithoutReplicate(utils.CacheDispatcherLoads, "testID", false, nil, true, utils.NonTransactional) wgDsp := &loadDispatcher{ tntID: "testID", hosts: engine.DispatcherHostProfiles{ { ID: "testID", // FilterIDs: []string{"filterID"}, Weight: 4, Params: map[string]any{ utils.MetaRatio: 1, }, Blocker: false, }, }, defaultRatio: 1, sorter: new(noSort), } err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "cannot cast false to *LoadMetrics" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } func TestLibDispatcherLoadStrategyDispatchCaseHostsCastError2(t *testing.T) { wgDsp := &loadDispatcher{ tntID: "testID", hosts: engine.DispatcherHostProfiles{ { ID: "testID", // FilterIDs: []string{"filterID"}, Weight: 4, Params: map[string]any{ utils.MetaRatio: false, }, Blocker: false, }, }, defaultRatio: 1, sorter: new(noSort), } err := wgDsp.Dispatch(nil, nil, nil, "", "", &DispatcherRoute{}, "", "", "") expected := "cannot convert field: false to int" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherSingleResultDispatcherCastError(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: "", Transport: "", TLS: false, }, } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(nil, nil, nil, "", "testID", &DispatcherRoute{}, "", "", "") expected := "NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } func TestLibDispatcherBroadcastDispatcherDispatchError1(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: "", Transport: "", TLS: false, }, } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") expected := "DISPATCHER_ERROR:NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } func TestLibDispatcherBroadcastDispatcherDispatchError2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", nil, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } func TestLibDispatcherBroadcastDispatcherDispatchError3(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: "", Transport: "", TLS: false, }, } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &broadcastDispatcher{hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } engine.Cache = cacheInit } func TestLibDispatcherLoadDispatcherCacheError(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: "", Transport: "", TLS: false, }, } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherRoutes, "testID:*attributes", value, nil, true, utils.NonTransactional) wgDsp := &loadDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, "", "", "") expected := "DSP_HOST_NOT_FOUND" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } func TestLibDispatcherLoadDispatcherCacheError7(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() cfg.CacheCfg().ReplicationConns = []string{"con"} cfg.CacheCfg().RemoteConns = []string{"con1"} cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Remote = true cfg.RPCConns()["con"] = &config.RPCConn{ Strategy: "", PoolSize: 0, Conns: []*config.RemoteHost{ { ID: "testID", Address: "", Transport: "", TLS: false, }, }, } cfg.RPCConns()["con1"] = &config.RPCConn{ Strategy: "*first", PoolSize: 0, Conns: []*config.RemoteHost{ { ID: "conn_internal", Address: "*internal", Transport: "", TLS: false, }, }, } //rpcCl := map[string]chan birpc.ClientConnector{} connMng := engine.NewConnManager(cfg, nil) dm := engine.NewDataManager(nil, nil, connMng) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: rpcclient.InternalRPC, Transport: utils.MetaInternal, TLS: false, }, } engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTENANT:testID", value, nil, true, utils.NonTransactional) wgDsp := &loadDispatcher{ tntID: "testTENANT", hosts: engine.DispatcherHostProfiles{ { ID: "testID", // FilterIDs: []string{"filterID1", "filterID2"}, Weight: 3, Params: map[string]any{ utils.MetaRatio: 1, }, Blocker: true, }, { ID: "testID2", // FilterIDs: []string{"filterID1", "filterID2"}, Weight: 3, Params: map[string]any{ utils.MetaRatio: 2, }, Blocker: true, }, }, defaultRatio: 0, sorter: new(noSort), } err := wgDsp.Dispatch(dm, nil, nil, "testTENANT", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "UNSUPPORTED_SERVICE_METHOD" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit } type mockTypeConDispatch struct{} func (*mockTypeConDispatch) Call(ctx *context.Context, serviceMethod string, args, reply any) error { return rpc.ErrShutdown } func TestLibDispatcherLoadDispatcherCacheError5(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: rpcclient.InternalRPC, Transport: utils.MetaInternal, TLS: false, }, } tmp := engine.IntRPC engine.IntRPC = map[string]*rpcclient.RPCClient{} chanRPC := make(chan birpc.ClientConnector, 1) chanRPC <- new(mockTypeConDispatch) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &loadDispatcher{ tntID: "testTenant", hosts: engine.DispatcherHostProfiles{ { ID: "testID", Weight: 3, Params: map[string]any{ utils.MetaRatio: 1, }, Blocker: true, }, }, defaultRatio: 0, sorter: new(noSort), } err := wgDsp.Dispatch(nil, nil, nil, "testTenant", "testID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } engine.Cache = cacheInit engine.IntRPC = tmp } func TestLibDispatcherSingleResultDispatcherCase1(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: rpcclient.InternalRPC, Transport: utils.MetaInternal, TLS: false, }, } tmp := engine.IntRPC engine.IntRPC = map[string]*rpcclient.RPCClient{} chanRPC := make(chan birpc.ClientConnector, 1) chanRPC <- new(mockTypeConDispatch) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err == nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", "connection is shut down", err) } engine.Cache = cacheInit engine.IntRPC = tmp } type mockTypeConDispatch2 struct{} func (*mockTypeConDispatch2) Call(ctx *context.Context, serviceMethod string, args, reply any) error { return nil } func TestLibDispatcherSingleResultDispatcherCase2(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() dm := engine.NewDataManager(nil, nil, nil) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: rpcclient.InternalRPC, Transport: utils.MetaInternal, TLS: false, }, } tmp := engine.IntRPC engine.IntRPC = map[string]*rpcclient.RPCClient{} chanRPC := make(chan birpc.ClientConnector, 1) chanRPC <- new(mockTypeConDispatch2) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) if err != nil { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) } engine.Cache = cacheInit engine.IntRPC = tmp } func TestLibDispatcherSingleResultDispatcherCase3(t *testing.T) { cacheInit := engine.Cache cfg := config.NewDefaultCGRConfig() cfg.CacheCfg().ReplicationConns = []string{"con"} cfg.CacheCfg().Partitions[utils.CacheDispatcherRoutes].Replicate = true cfg.RPCConns()["con"] = &config.RPCConn{ Strategy: "", PoolSize: 0, Conns: []*config.RemoteHost{ { ID: "testID", Address: "", Transport: "", TLS: false, }, }, } rpcCl := map[string]chan birpc.ClientConnector{} connMng := engine.NewConnManager(cfg, rpcCl) dm := engine.NewDataManager(nil, nil, connMng) newCache := engine.NewCacheS(cfg, dm, nil) engine.Cache = newCache value := &engine.DispatcherHost{ Tenant: "testTenant", RemoteHost: &config.RemoteHost{ ID: "testID", Address: "", Transport: utils.MetaInternal, TLS: false, }, } tmp := engine.IntRPC engine.IntRPC = map[string]*rpcclient.RPCClient{} chanRPC := make(chan birpc.ClientConnector, 1) chanRPC <- new(mockTypeConDispatch2) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, chanRPC) engine.Cache.SetWithoutReplicate(utils.CacheDispatcherHosts, "testTenant:testID", value, nil, true, utils.NonTransactional) wgDsp := &singleResultDispatcher{sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ID: "testID"}}} err := wgDsp.Dispatch(dm, nil, nil, "testTenant", "routeID", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "INTERNALLY_DISCONNECTED" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } engine.Cache = cacheInit engine.IntRPC = tmp } func TestLibDispatcherRandomSort(t *testing.T) { cfg := config.NewDefaultCGRConfig() flts := engine.NewFilterS(cfg, nil, nil) sorter := new(randomSort) hosts := engine.DispatcherHostProfiles{ {ID: "testID1"}, {ID: "testID2"}, } expHostIDs1 := engine.DispatcherHostIDs{"testID1", "testID2"} expHostIDs2 := engine.DispatcherHostIDs{"testID2", "testID1"} if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil { t.Error(err) } else if !reflect.DeepEqual(expHostIDs1, hostIDs) && !reflect.DeepEqual(expHostIDs2, hostIDs) { t.Errorf("Expected: %q or %q, received: %q", expHostIDs1, expHostIDs2, hostIDs) } } func TestLibDispatcherRoundRobinSort(t *testing.T) { cfg := config.NewDefaultCGRConfig() flts := engine.NewFilterS(cfg, nil, nil) sorter := new(roundRobinSort) hosts := engine.DispatcherHostProfiles{ {ID: "testID1"}, {ID: "testID2"}, } expHostIDs1 := engine.DispatcherHostIDs{"testID1", "testID2"} if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil { t.Error(err) } else if !reflect.DeepEqual(expHostIDs1, hostIDs) { t.Errorf("Expected: %q, received: %q", expHostIDs1, hostIDs) } expHostIDs2 := engine.DispatcherHostIDs{"testID2", "testID1"} if hostIDs, err := sorter.Sort(flts, nil, "", hosts); err != nil { t.Error(err) } else if !reflect.DeepEqual(expHostIDs2, hostIDs) { t.Errorf("Expected: %q, received: %q", expHostIDs2, hostIDs) } } func TestLibDispatcherLoadStrategyDispatchCaseCallError(t *testing.T) { wgDsp := &loadDispatcher{ hosts: engine.DispatcherHostProfiles{ { ID: "hostID", }, }, defaultRatio: 1, sorter: new(noSort), } err := wgDsp.Dispatch(nil, nil, nil, "cgrates.org", "", &DispatcherRoute{}, utils.AttributeSv1Ping, &utils.CGREvent{}, &wgDsp) expected := "NO_DATABASE_CONNECTION" if err == nil || err.Error() != expected { t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err) } } func TestLibDispatcherDispatchFilterError(t *testing.T) { cfg := config.NewDefaultCGRConfig() flts := engine.NewFilterS(cfg, nil, nil) data, err := engine.NewInternalDB(nil, nil, true, nil, cfg.DataDbCfg().Items) if err != nil { t.Error(err) } dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) var dsp Dispatcher = &singleResultDispatcher{ sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ ID: "testID", FilterIDs: []string{"*wrongType"}, }}, } expErrMsg := "inline parse error for string: <*wrongType>" if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } dsp = &loadDispatcher{ sorter: new(noSort), hosts: engine.DispatcherHostProfiles{{ ID: "testID2", FilterIDs: []string{"*wrongType"}, }}, defaultRatio: 1, } if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } dsp = &broadcastDispatcher{ hosts: engine.DispatcherHostProfiles{{ ID: "testID", FilterIDs: []string{"*wrongType"}, }}, } if err := dsp.Dispatch(dm, flts, nil, "cgrates.org", "", &DispatcherRoute{}, "", "", ""); err == nil || err.Error() != expErrMsg { t.Errorf("Expected error: %s received: %v", expErrMsg, err) } } func TestLibDispatcherNewInternalHost(t *testing.T) { tnt := "cgrates.org" want := &engine.DispatcherHost{ Tenant: tnt, RemoteHost: &config.RemoteHost{ ID: utils.MetaInternal, Address: utils.MetaInternal, ConnectAttempts: 1, Reconnects: 1, ConnectTimeout: time.Second, ReplyTimeout: time.Second, }, } got := newInternalHost(tnt) if diff := cmp.Diff(want, got, cmp.AllowUnexported(engine.DispatcherHost{})); diff != "" { t.Errorf("newInternalHost(%q) returned an unexpected value(-want +got): \n%s", tnt, diff) } }