Add test for replicate routeID through dispatchers

This commit is contained in:
TeoV
2020-04-14 16:28:27 +03:00
committed by Dan Christian Bogos
parent 6a31c38025
commit 0862d5e39b
25 changed files with 627 additions and 92 deletions

View File

@@ -122,6 +122,8 @@ type CacheSv1Interface interface {
RemoveGroup(args *utils.ArgsGetGroupWithArgDispatcher, rply *string) error
ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, reply *string) error
LoadCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) error
ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error)
ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error)
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}

View File

@@ -115,13 +115,13 @@ func (chSv1 *CacheSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string)
return nil
}
// V1ReplicateSet replicate an item
func (chSv1 *CacheSv1) V1ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
// ReplicateSet replicate an item
func (chSv1 *CacheSv1) ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
return chSv1.cacheS.V1ReplicateSet(args, reply)
}
// V1ReplicateSet replicate an item
func (chSv1 *CacheSv1) V1ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
// ReplicateRemove remove an item
func (chSv1 *CacheSv1) ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
return chSv1.cacheS.V1ReplicateRemove(args, reply)
}

View File

@@ -645,7 +645,17 @@ func (dS *DispatcherCacheSv1) LoadCache(args utils.AttrReloadCacheWithArgDispatc
return dS.dS.CacheSv1LoadCache(args, reply)
}
// Ping used to detreminate if component is active
// ReplicateSet replicate an item
func (dS *DispatcherCacheSv1) ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
return dS.dS.CacheSv1ReplicateSet(args, reply)
}
// ReplicateRemove remove an item
func (dS *DispatcherCacheSv1) ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
return dS.dS.CacheSv1ReplicateRemove(args, reply)
}
// Ping used to determinate if component is active
func (dS *DispatcherCacheSv1) Ping(args *utils.CGREventWithArgDispatcher, reply *string) error {
return dS.dS.CacheSv1Ping(args, reply)
}

View File

@@ -216,7 +216,8 @@ const CGRATES_CFG_JSON = `
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control charger filter indexes caching
"*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher filter indexes caching
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
"*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false},
"*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy )
"*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "replicate": false}, // closed sessions cached for CDRs

View File

@@ -169,6 +169,9 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheDispatcherLoads: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
utils.CacheDispatchers: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},

View File

@@ -695,6 +695,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherLoads: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatchers: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1,
TTL: time.Duration(3 * time.Hour), StaticTTL: false},
utils.CacheRPCResponses: &CacheParamCfg{Limit: 0,

View File

@@ -490,8 +490,14 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
// Cache check
for _, connID := range cfg.cacheCfg.ReplicationConns {
if _, has := cfg.rpcConns[connID]; !has {
if conn, has := cfg.rpcConns[connID]; !has {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.CacheS, connID)
} else {
for _, rpc := range conn.Conns {
if rpc.Transport != utils.MetaGOB {
return fmt.Errorf("<%s> unsuported transport <%s> for connection with ID: <%s>", utils.CacheS, rpc.Transport, connID)
}
}
}
}
for cacheID := range cfg.cacheCfg.Partitions {

View File

@@ -0,0 +1,52 @@
{
"general": {
"node_id": "DispatcherEngine",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"stor_db": {
"db_type": "*internal",
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}
},
"replication_conns": ["cacheReplication"],
},
"rpc_conns": {
"cacheReplication": {
"conns": [{"address": "127.0.0.1:3013", "transport":"*gob"}],
},
},
"schedulers": {
"enabled": true,
},
"dispatchers":{
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,51 @@
{
"general": {
"node_id": "DispatcherEngine2",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":3012",
"rpc_gob": ":3013",
"http": ":3080",
},
"data_db": {
"db_type": "redis",
"db_port": 6379,
"db_name": "11",
},
"stor_db": {
"db_type": "*internal",
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "1h"}
},
},
"schedulers": {
"enabled": true,
},
"dispatchers":{
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,53 @@
{
// CGRateS Configuration file
//
"general": {
"node_id": "Engine1",
"log_level": 7
},
"listen": {
"rpc_json": ":6012",
"rpc_gob": ":6013",
"http": ":6080",
},
"data_db": {
"db_type": "*internal",
},
"stor_db": {
"db_type": "*internal"
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:6012", "transport":"*json"}],
},
},
"attributes": {
"enabled": true
},
"rals": {
"enabled": true,
},
"schedulers": {
"enabled": true,
},
"apiers": {
"enabled": true,
"caches_conns":["conn1"],
"scheduler_conns": ["*internal"],
},
}

View File

@@ -1,76 +1,74 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"node_id": "DispatcherS1",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": {
"db_type": "*internal",
},
"general": {
"node_id": "DispatcherS1",
"log_level": 7,
"reconnects": 1,
},
"stor_db": {
"db_type": "*internal",
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": {
"db_type": "*internal",
},
"stor_db": {
"db_type": "*internal",
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "2s"}
},
},
"schedulers": {
"enabled": true,
},
"attributes": {
"enabled": true
},
"caches":{
"partitions": {
"*dispatcher_routes": {"limit": -1, "ttl": "2s"}
},
},
"schedulers": {
"enabled": true,
},
"attributes": {
"enabled": true
},
"rals": {
"enabled": true,
},
"rals": {
"enabled": true,
},
"chargers": {
"enabled": true,
},
"chargers": {
"enabled": true,
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
"listen_bijson": ":3014",
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"rals_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"chargers_conns": ["*localhost"],
"listen_bijson": ":3014",
},
"dispatchers":{
"enabled": true,
"attributes_conns": ["*internal"],
},
"dispatchers":{
"enabled": true,
"attributes_conns": ["*internal"],
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,2 @@
#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
cgrates.org,Engine1,127.0.0.1:6012,*json,false
1 #Tenant[0] ID[1] Address[2] Transport[3] TLS[4]
2 cgrates.org Engine1 127.0.0.1:6012 *json false

View File

@@ -0,0 +1,2 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org Engine1 *any *weight Engine1 20 false 10

View File

@@ -0,0 +1,3 @@
#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
cgrates.org,Self,*internal,,
cgrates.org,Engine1,127.0.0.1:6012,*json,false
1 #Tenant[0] ID[1] Address[2] Transport[3] TLS[4]
2 cgrates.org Self *internal
3 cgrates.org Engine1 127.0.0.1:6012 *json false

View File

@@ -0,0 +1,3 @@
#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
1 #Tenant ID Subsystems FilterIDs ActivationInterval Strategy StrategyParameters ConnID ConnFilterIDs ConnWeight ConnBlocker ConnParameters Weight
2 cgrates.org InternalDispatcher *caches;*core *weight Self 20 false 30
3 cgrates.org ExternalDispatcher *attributes *weight Engine1 20 false 10

View File

@@ -355,3 +355,49 @@ func (dS *DispatcherService) CacheSv1LoadCache(args utils.AttrReloadCacheWithArg
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
utils.CacheSv1LoadCache, args, reply)
}
// ReplicateRemove remove an item
func (dS *DispatcherService) CacheSv1ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.CacheSv1ReplicateRemove, tnt,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
utils.CacheSv1ReplicateRemove, args, reply)
}
// ReplicateSet replicate an item
func (dS *DispatcherService) CacheSv1ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args.TenantArg.Tenant != utils.EmptyString {
tnt = args.TenantArg.Tenant
}
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.CacheSv1ReplicateSet, tnt,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
utils.CacheSv1ReplicateSet, args, reply)
}

View File

@@ -58,7 +58,7 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
}
type testDispatcher struct {
CfgParh string
CfgPath string
Cfg *config.CGRConfig
RPC *rpc.Client
cmd *exec.Cmd
@@ -66,9 +66,9 @@ type testDispatcher struct {
func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool) (d *testDispatcher) {
d = new(testDispatcher)
d.CfgParh = cfgPath
d.CfgPath = cfgPath
var err error
d.Cfg, err = config.NewCGRConfigFromPath(d.CfgParh)
d.Cfg, err = config.NewCGRConfigFromPath(d.CfgPath)
if err != nil {
t.Fatalf("Error at config init :%v\n", err)
}
@@ -87,7 +87,7 @@ func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool)
func (d *testDispatcher) startEngine(t *testing.T) {
var err error
if d.cmd, err = engine.StartEngine(d.CfgParh, dspDelay); err != nil {
if d.cmd, err = engine.StartEngine(d.CfgPath, dspDelay); err != nil {
t.Fatalf("Error at engine start:%v\n", err)
}
@@ -122,7 +122,7 @@ func (d *testDispatcher) loadData(t *testing.T, path string) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path}
if err := d.RPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
t.Errorf("<%s>Error at loading data from folder :%v", d.CfgParh, err)
t.Errorf("<%s>Error at loading data from folder :%v", d.CfgPath, err)
}
}
@@ -133,7 +133,7 @@ func (d *testDispatcher) loadData2(t *testing.T, path string) {
if err != nil {
t.Error(err)
}
loader := exec.Command(loaderPath, "-config_path", d.CfgParh, "-path", path)
loader := exec.Command(loaderPath, "-config_path", d.CfgPath, "-path", path)
if err := loader.Start(); err != nil {
t.Error(err)

View File

@@ -252,8 +252,10 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
continue
}
if routeID != nil && *routeID != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString)
if err = engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
nil, true, utils.EmptyString); err != nil {
return
}
}
break
}
@@ -348,11 +350,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
lM.incrementLoad(dH.ID)
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
lM.incrementLoad(dH.ID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
lM.decrementLoad(dH.ID) // call ended
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
lM.decrementLoad(dH.ID, ld.tntID) // call ended
if !utils.IsNetworkError(err) {
return
}
@@ -363,11 +363,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
err = utils.NewErrDispatcherS(err)
return
}
lM.incrementLoad(hostID)
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
lM.incrementLoad(hostID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
lM.decrementLoad(hostID) // call ended
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
lM.decrementLoad(hostID, ld.tntID) // call ended
if utils.IsNetworkError(err) {
continue
}
@@ -396,14 +394,16 @@ func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
return hostIDs
}
func (lM *LoadMetrics) incrementLoad(hostID string) {
func (lM *LoadMetrics) incrementLoad(hostID, tntID string) {
lM.Lock()
lM.HostsLoad[hostID] += 1
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
}
func (lM *LoadMetrics) decrementLoad(hostID string) {
func (lM *LoadMetrics) decrementLoad(hostID, tntID string) {
lM.Lock()
lM.HostsLoad[hostID] -= 1
engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"encoding/gob"
"fmt"
"sync"
"time"
@@ -32,6 +33,32 @@ var Cache *CacheS
func init() {
Cache = NewCacheS(config.CgrConfig(), nil)
//Threshold
gob.Register(new(Threshold))
gob.Register(new(ThresholdProfile))
gob.Register(new(ThresholdProfileWithArgDispatcher))
gob.Register(new(ThresholdWithArgDispatcher))
//Resource
gob.Register(new(Resource))
gob.Register(new(ResourceProfile))
gob.Register(new(ResourceProfileWithArgDispatcher))
gob.Register(new(ResourceWithArgDispatcher))
//Stats
gob.Register(new(StatQueue))
gob.Register(new(StatQueueProfile))
gob.Register(new(StatQueueProfileWithArgDispatcher))
gob.Register(new(StoredStatQueue))
gob.Register(new(StatQueueProfileWithArgDispatcher))
//Suppliers
gob.Register(new(SupplierProfile))
gob.Register(new(SupplierProfileWithArgDispatcher))
//Filters
gob.Register(new(Filter))
gob.Register(new(FilterWithArgDispatcher))
//Dispatcher
gob.Register(new(DispatcherHost))
gob.Register(new(DispatcherHostProfile))
gob.Register(new(DispatcherHostWithArgDispatcher))
}
//SetCache shared the cache from other subsystems

View File

@@ -152,7 +152,19 @@ func (dH *DispatcherHost) TenantID() string {
// GetRPCConnection builds or returns the cached connection
func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) error {
if dH.rpcConn == nil {
return utils.ErrNotConnected
var err error
// connect the rpcConn
cfg := config.CgrConfig()
if dH.rpcConn, err = NewRPCPool(
rpcclient.PoolFirst,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
dH.Conns, IntRPC.GetInternalChanel(), false); err != nil {
return err
}
}
return dH.rpcConn.Call(serviceMethod, args, reply)
}

View File

@@ -245,9 +245,6 @@ func TestDispatcherHostCall(t *testing.T) {
reply: utils.StringPointer(""),
}
var reply string
if err := dspHost.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err == nil || err.Error() != utils.ErrNotConnected.Error() {
t.Errorf("Expected: %s , received: %v", utils.ErrNotConnected.Error(), err)
}
dspHost.rpcConn = tRPC
if err := dspHost.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
t.Error(err)

View File

@@ -521,6 +521,10 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheDispatchers: {
Items: 0,
Groups: 0,
},
utils.CacheDestinations: {
Items: 0,
Groups: 0,

View File

@@ -0,0 +1,257 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"net/rpc"
"os/exec"
"path"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
dspEngine1Cfg *config.CGRConfig
dspEngine1CfgPath string
dspEngine1RPC *rpc.Client
dspEngine2Cfg *config.CGRConfig
dspEngine2CfgPath string
dspEngine2RPC *rpc.Client
engine1Cfg *config.CGRConfig
engine1CfgPath string
engine1RPC *rpc.Client
sTestsCacheRpl = []func(t *testing.T){
testCacheRplInitCfg,
testCacheRplInitDataDb,
testCacheRplStartEngine,
testCacheRplRpcConn,
testCacheRplAddData,
testCacheRplPing,
testCacheRplCheckReplication,
testCacheRplStopEngine,
}
)
func TestCacheReplications(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
for _, stest := range sTestsCacheRpl {
t.Run("TestCacheReplications", stest)
}
case utils.MetaMongo:
t.SkipNow()
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
}
func testCacheRplInitCfg(t *testing.T) {
var err error
dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine")
dspEngine1Cfg, err = config.NewCGRConfigFromPath(dspEngine1CfgPath)
if err != nil {
t.Error(err)
}
dspEngine2CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine2")
dspEngine2Cfg, err = config.NewCGRConfigFromPath(dspEngine2CfgPath)
if err != nil {
t.Error(err)
}
engine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "engine1")
engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath)
if err != nil {
t.Error(err)
}
}
func testCacheRplInitDataDb(t *testing.T) {
if err := engine.InitDataDb(dspEngine1Cfg); err != nil {
t.Fatal(err)
}
if err := engine.InitDataDb(dspEngine2Cfg); err != nil {
t.Fatal(err)
}
}
func testCacheRplStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(dspEngine1CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
if _, err := engine.StartEngine(dspEngine2CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
if _, err := engine.StartEngine(engine1CfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testCacheRplRpcConn(t *testing.T) {
var err error
dspEngine1RPC, err = newRPCClient(dspEngine1Cfg.ListenCfg())
if err != nil {
t.Fatal(err)
}
dspEngine2RPC, err = rpc.Dial(utils.TCP, dspEngine2Cfg.ListenCfg().RPCGOBListen)
if err != nil {
t.Fatal(err)
}
engine1RPC, err = newRPCClient(engine1Cfg.ListenCfg())
if err != nil {
t.Fatal(err)
}
}
func testCacheRplAddData(t *testing.T) {
wchan := make(chan struct{}, 1)
go func() {
loaderPath, err := exec.LookPath("cgr-loader")
if err != nil {
t.Error(err)
}
loader := exec.Command(loaderPath, "-config_path", dspEngine1CfgPath, "-path",
path.Join(*dataDir, "tariffplans", "cache_replications", "dispatcher_engine"))
if err := loader.Start(); err != nil {
t.Error(err)
}
loader.Wait()
wchan <- struct{}{}
}()
select {
case <-wchan:
case <-time.After(2 * time.Second):
t.Errorf("cgr-loader failed: ")
}
go func() {
loaderPath, err := exec.LookPath("cgr-loader")
if err != nil {
t.Error(err)
}
loader := exec.Command(loaderPath, "-config_path", dspEngine2CfgPath, "-path",
path.Join(*dataDir, "tariffplans", "cache_replications", "dispatcher_engine2"))
if err := loader.Start(); err != nil {
t.Error(err)
}
loader.Wait()
wchan <- struct{}{}
}()
select {
case <-wchan:
case <-time.After(2 * time.Second):
t.Errorf("cgr-loader failed: ")
}
}
func testCacheRplPing(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithArgDispatcher{
TenantArg: &utils.TenantArg{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRoute123"),
},
}
if err := dspEngine1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "Engine1" {
t.Errorf("Received: %s", utils.ToJSON(reply))
}
var rpl string
if err := dspEngine1RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRoute123"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl != utils.Pong {
t.Errorf("Received: %s", rpl)
}
}
func testCacheRplCheckReplication(t *testing.T) {
var reply map[string]interface{}
ev := utils.TenantWithArgDispatcher{
TenantArg: &utils.TenantArg{
Tenant: "cgrates.org",
},
}
if err := dspEngine2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "DispatcherEngine2" {
t.Errorf("Received: %s", utils.ToJSON(reply))
}
var rcvKeys []string
expKeys := []string{"testRoute123:*core", "testRoute123:*attributes"}
argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{
TenantArg: utils.TenantArg{
Tenant: "cgrates.org",
},
ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
CacheID: utils.CacheDispatcherRoutes,
},
}
if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
t.Error(err.Error())
}
if !reflect.DeepEqual(expKeys, rcvKeys) {
t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
}
var rpl string
if err := dspEngine2RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
},
ArgDispatcher: &utils.ArgDispatcher{
RouteID: utils.StringPointer("testRoute123"),
},
}, &rpl); err != nil {
t.Error(err)
} else if rpl != utils.Pong {
t.Errorf("Received: %s", rpl)
}
}
func testCacheRplStopEngine(t *testing.T) {
if err := engine.KillEngine(*waitRater); err != nil {
t.Error(err)
}
}

View File

@@ -1433,8 +1433,12 @@ type DPRArgs struct {
type ArgCacheReplicateSet struct {
CacheID, ItemID string
Value interface{}
*ArgDispatcher
TenantArg
}
type ArgCacheReplicateRemove struct {
CacheID, ItemID string
*ArgDispatcher
TenantArg
}

View File

@@ -65,7 +65,7 @@ var (
CacheResourceProfiles, CacheResources, CacheEventResources, CacheStatQueueProfiles,
CacheStatQueues, CacheThresholdProfiles, CacheThresholds, CacheFilters,
CacheSupplierProfiles, CacheAttributeProfiles, CacheChargerProfiles,
CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes,
CacheDispatcherProfiles, CacheDispatcherHosts, CacheDispatchers, CacheResourceFilterIndexes,
CacheStatFilterIndexes, CacheThresholdFilterIndexes, CacheSupplierFilterIndexes,
CacheAttributeFilterIndexes, CacheChargerFilterIndexes, CacheDispatcherFilterIndexes,
CacheDispatcherRoutes, CacheDispatcherLoads, CacheDiameterMessages, CacheRPCResponses, CacheClosedSessions,