diff --git a/apier/v1/api_interfaces.go b/apier/v1/api_interfaces.go
index 5a4d7f503..b1e03a6b4 100644
--- a/apier/v1/api_interfaces.go
+++ b/apier/v1/api_interfaces.go
@@ -122,6 +122,8 @@ type CacheSv1Interface interface {
RemoveGroup(args *utils.ArgsGetGroupWithArgDispatcher, rply *string) error
ReloadCache(attrs utils.AttrReloadCacheWithArgDispatcher, reply *string) error
LoadCache(args utils.AttrReloadCacheWithArgDispatcher, reply *string) error
+ ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error)
+ ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error)
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
diff --git a/apier/v1/caches.go b/apier/v1/caches.go
index 8d8964a05..21f03ce34 100644
--- a/apier/v1/caches.go
+++ b/apier/v1/caches.go
@@ -115,13 +115,13 @@ func (chSv1 *CacheSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string)
return nil
}
-// V1ReplicateSet replicate an item
-func (chSv1 *CacheSv1) V1ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
+// ReplicateSet replicate an item
+func (chSv1 *CacheSv1) ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
return chSv1.cacheS.V1ReplicateSet(args, reply)
}
-// V1ReplicateSet replicate an item
-func (chSv1 *CacheSv1) V1ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
+// ReplicateRemove remove an item
+func (chSv1 *CacheSv1) ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
return chSv1.cacheS.V1ReplicateRemove(args, reply)
}
diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go
index 0f2fc52a0..675e5a86f 100755
--- a/apier/v1/dispatcher.go
+++ b/apier/v1/dispatcher.go
@@ -645,7 +645,17 @@ func (dS *DispatcherCacheSv1) LoadCache(args utils.AttrReloadCacheWithArgDispatc
return dS.dS.CacheSv1LoadCache(args, reply)
}
-// Ping used to detreminate if component is active
+// ReplicateSet replicate an item
+func (dS *DispatcherCacheSv1) ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
+ return dS.dS.CacheSv1ReplicateSet(args, reply)
+}
+
+// ReplicateRemove remove an item
+func (dS *DispatcherCacheSv1) ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
+ return dS.dS.CacheSv1ReplicateRemove(args, reply)
+}
+
+// Ping used to determinate if component is active
func (dS *DispatcherCacheSv1) Ping(args *utils.CGREventWithArgDispatcher, reply *string) error {
return dS.dS.CacheSv1Ping(args, reply)
}
diff --git a/config/config_defaults.go b/config/config_defaults.go
index 9b4c5058f..fbebf0fd2 100755
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -216,7 +216,8 @@ const CGRATES_CFG_JSON = `
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control charger filter indexes caching
"*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher filter indexes caching
"*dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher routes caching
- "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false},
+ "*dispatcher_loads": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher load ( in case of *load strategy )
+ "*dispatchers": {"limit": -1, "ttl": "", "static_ttl": false, "replicate": false}, // control dispatcher interface
"*diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false, "replicate": false}, // diameter messages caching
"*rpc_responses": {"limit": 0, "ttl": "2s", "static_ttl": false, "replicate": false}, // RPC responses caching
"*closed_sessions": {"limit": -1, "ttl": "10s", "static_ttl": false, "replicate": false}, // closed sessions cached for CDRs
diff --git a/config/config_json_test.go b/config/config_json_test.go
index ef4b0e25f..fa691c760 100755
--- a/config/config_json_test.go
+++ b/config/config_json_test.go
@@ -169,6 +169,9 @@ func TestCacheJsonCfg(t *testing.T) {
utils.CacheDispatcherLoads: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
+ utils.CacheDispatchers: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
+ Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false),
+ Replicate: utils.BoolPointer(false)},
utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false),
Replicate: utils.BoolPointer(false)},
diff --git a/config/config_test.go b/config/config_test.go
index d5b04b3fe..3212687eb 100755
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -695,6 +695,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherLoads: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
+ utils.CacheDispatchers: &CacheParamCfg{Limit: -1,
+ TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1,
TTL: time.Duration(3 * time.Hour), StaticTTL: false},
utils.CacheRPCResponses: &CacheParamCfg{Limit: 0,
diff --git a/config/configsanity.go b/config/configsanity.go
index ecba62bfe..0efb4290a 100644
--- a/config/configsanity.go
+++ b/config/configsanity.go
@@ -490,8 +490,14 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
// Cache check
for _, connID := range cfg.cacheCfg.ReplicationConns {
- if _, has := cfg.rpcConns[connID]; !has {
+ if conn, has := cfg.rpcConns[connID]; !has {
return fmt.Errorf("<%s> connection with id: <%s> not defined", utils.CacheS, connID)
+ } else {
+ for _, rpc := range conn.Conns {
+ if rpc.Transport != utils.MetaGOB {
+ return fmt.Errorf("<%s> unsuported transport <%s> for connection with ID: <%s>", utils.CacheS, rpc.Transport, connID)
+ }
+ }
}
}
for cacheID := range cfg.cacheCfg.Partitions {
diff --git a/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json b/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json
new file mode 100644
index 000000000..70925f2fe
--- /dev/null
+++ b/data/conf/samples/cache_replicate/dispatcher_engine/cgrates.json
@@ -0,0 +1,52 @@
+{
+
+"general": {
+ "node_id": "DispatcherEngine",
+ "log_level": 7,
+ "reconnects": 1,
+},
+
+
+"listen": {
+ "rpc_json": ":2012",
+ "rpc_gob": ":2013",
+ "http": ":2080",
+},
+
+"stor_db": {
+ "db_type": "*internal",
+},
+
+
+"caches":{
+ "partitions": {
+ "*dispatcher_routes": {"limit": -1, "ttl": "1h", "replicate": true}
+ },
+ "replication_conns": ["cacheReplication"],
+},
+
+
+"rpc_conns": {
+ "cacheReplication": {
+ "conns": [{"address": "127.0.0.1:3013", "transport":"*gob"}],
+ },
+},
+
+
+"schedulers": {
+ "enabled": true,
+},
+
+
+"dispatchers":{
+ "enabled": true,
+},
+
+
+"apiers": {
+ "enabled": true,
+ "scheduler_conns": ["*internal"],
+},
+
+
+}
\ No newline at end of file
diff --git a/data/conf/samples/cache_replicate/dispatcher_engine2/cgrates.json b/data/conf/samples/cache_replicate/dispatcher_engine2/cgrates.json
new file mode 100644
index 000000000..53564b5be
--- /dev/null
+++ b/data/conf/samples/cache_replicate/dispatcher_engine2/cgrates.json
@@ -0,0 +1,51 @@
+{
+
+"general": {
+ "node_id": "DispatcherEngine2",
+ "log_level": 7,
+ "reconnects": 1,
+},
+
+
+"listen": {
+ "rpc_json": ":3012",
+ "rpc_gob": ":3013",
+ "http": ":3080",
+},
+
+
+"data_db": {
+ "db_type": "redis",
+ "db_port": 6379,
+ "db_name": "11",
+},
+
+
+"stor_db": {
+ "db_type": "*internal",
+},
+
+"caches":{
+ "partitions": {
+ "*dispatcher_routes": {"limit": -1, "ttl": "1h"}
+ },
+},
+
+
+"schedulers": {
+ "enabled": true,
+},
+
+
+"dispatchers":{
+ "enabled": true,
+},
+
+
+"apiers": {
+ "enabled": true,
+ "scheduler_conns": ["*internal"],
+},
+
+
+}
\ No newline at end of file
diff --git a/data/conf/samples/cache_replicate/engine1/cgrates.json b/data/conf/samples/cache_replicate/engine1/cgrates.json
new file mode 100644
index 000000000..8293a6448
--- /dev/null
+++ b/data/conf/samples/cache_replicate/engine1/cgrates.json
@@ -0,0 +1,53 @@
+{
+// CGRateS Configuration file
+//
+
+
+"general": {
+ "node_id": "Engine1",
+ "log_level": 7
+},
+
+
+"listen": {
+ "rpc_json": ":6012",
+ "rpc_gob": ":6013",
+ "http": ":6080",
+},
+
+"data_db": {
+ "db_type": "*internal",
+},
+
+"stor_db": {
+ "db_type": "*internal"
+},
+
+
+"rpc_conns": {
+ "conn1": {
+ "strategy": "*first",
+ "conns": [{"address": "127.0.0.1:6012", "transport":"*json"}],
+ },
+},
+
+"attributes": {
+ "enabled": true
+},
+
+"rals": {
+ "enabled": true,
+},
+
+"schedulers": {
+ "enabled": true,
+},
+
+
+"apiers": {
+ "enabled": true,
+ "caches_conns":["conn1"],
+ "scheduler_conns": ["*internal"],
+},
+
+}
diff --git a/data/conf/samples/dispatchers/dispatchers_internal/cgrates.json b/data/conf/samples/dispatchers/dispatchers_internal/cgrates.json
index 0df802204..7baa8bb2e 100644
--- a/data/conf/samples/dispatchers/dispatchers_internal/cgrates.json
+++ b/data/conf/samples/dispatchers/dispatchers_internal/cgrates.json
@@ -1,76 +1,74 @@
{
- // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
- // Copyright (C) ITsysCOM GmbH
- //
- // This file contains the default configuration hardcoded into CGRateS.
- // This is what you get when you load CGRateS with an empty configuration file.
-
- "general": {
- "node_id": "DispatcherS1",
- "log_level": 7,
- "reconnects": 1,
- },
-
-
- "listen": {
- "rpc_json": ":2012",
- "rpc_gob": ":2013",
- "http": ":2080",
- },
-
-
- "data_db": {
- "db_type": "*internal",
- },
+"general": {
+ "node_id": "DispatcherS1",
+ "log_level": 7,
+ "reconnects": 1,
+},
- "stor_db": {
- "db_type": "*internal",
+"listen": {
+ "rpc_json": ":2012",
+ "rpc_gob": ":2013",
+ "http": ":2080",
+},
+
+
+"data_db": {
+ "db_type": "*internal",
+},
+
+
+"stor_db": {
+ "db_type": "*internal",
+},
+
+"caches":{
+ "partitions": {
+ "*dispatcher_routes": {"limit": -1, "ttl": "2s"}
},
+},
+
+
+"schedulers": {
+ "enabled": true,
+},
+
+
+"attributes": {
+ "enabled": true
+},
- "caches":{
- "partitions": {
- "*dispatcher_routes": {"limit": -1, "ttl": "2s"}
- },
- },
-
- "schedulers": {
- "enabled": true,
- },
-
- "attributes": {
- "enabled": true
- },
-
- "rals": {
- "enabled": true,
- },
+"rals": {
+ "enabled": true,
+},
- "chargers": {
- "enabled": true,
- },
+"chargers": {
+ "enabled": true,
+},
- "sessions": {
- "enabled": true,
- "attributes_conns": ["*localhost"],
- "rals_conns": ["*localhost"],
- "resources_conns": ["*localhost"],
- "chargers_conns": ["*localhost"],
- "listen_bijson": ":3014",
- },
+"sessions": {
+ "enabled": true,
+ "attributes_conns": ["*localhost"],
+ "rals_conns": ["*localhost"],
+ "resources_conns": ["*localhost"],
+ "chargers_conns": ["*localhost"],
+ "listen_bijson": ":3014",
+},
- "dispatchers":{
- "enabled": true,
- "attributes_conns": ["*internal"],
- },
+"dispatchers":{
+ "enabled": true,
+ "attributes_conns": ["*internal"],
+},
- "apiers": {
- "enabled": true,
- "scheduler_conns": ["*internal"],
- },
- }
\ No newline at end of file
+"apiers": {
+ "enabled": true,
+ "scheduler_conns": ["*internal"],
+},
+
+
+}
\ No newline at end of file
diff --git a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherHosts.csv b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherHosts.csv
new file mode 100644
index 000000000..ff8ed797f
--- /dev/null
+++ b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherHosts.csv
@@ -0,0 +1,2 @@
+#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
+cgrates.org,Engine1,127.0.0.1:6012,*json,false
diff --git a/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv
new file mode 100644
index 000000000..623c9f9d7
--- /dev/null
+++ b/data/tariffplans/cache_replications/dispatcher_engine/DispatcherProfiles.csv
@@ -0,0 +1,2 @@
+#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
+cgrates.org,Engine1,*any,,,*weight,,Engine1,,20,false,,10
\ No newline at end of file
diff --git a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherHosts.csv b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherHosts.csv
new file mode 100644
index 000000000..a5b9e604b
--- /dev/null
+++ b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherHosts.csv
@@ -0,0 +1,3 @@
+#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
+cgrates.org,Self,*internal,,
+cgrates.org,Engine1,127.0.0.1:6012,*json,false
diff --git a/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv
new file mode 100644
index 000000000..321c4af89
--- /dev/null
+++ b/data/tariffplans/cache_replications/dispatcher_engine2/DispatcherProfiles.csv
@@ -0,0 +1,3 @@
+#Tenant,ID,Subsystems,FilterIDs,ActivationInterval,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters,Weight
+cgrates.org,InternalDispatcher,*caches;*core,,,*weight,,Self,,20,false,,30
+cgrates.org,ExternalDispatcher,*attributes,,,*weight,,Engine1,,20,false,,10
\ No newline at end of file
diff --git a/dispatchers/caches.go b/dispatchers/caches.go
index cb751ac22..e585466e3 100644
--- a/dispatchers/caches.go
+++ b/dispatchers/caches.go
@@ -355,3 +355,49 @@ func (dS *DispatcherService) CacheSv1LoadCache(args utils.AttrReloadCacheWithArg
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
utils.CacheSv1LoadCache, args, reply)
}
+
+// ReplicateRemove remove an item
+func (dS *DispatcherService) CacheSv1ReplicateRemove(args *utils.ArgCacheReplicateRemove, reply *string) (err error) {
+ tnt := dS.cfg.GeneralCfg().DefaultTenant
+ if args.TenantArg.Tenant != utils.EmptyString {
+ tnt = args.TenantArg.Tenant
+ }
+ if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
+ if args.ArgDispatcher == nil {
+ return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
+ }
+ if err = dS.authorize(utils.CacheSv1ReplicateRemove, tnt,
+ args.APIKey, utils.TimePointer(time.Now())); err != nil {
+ return
+ }
+ }
+ var routeID *string
+ if args.ArgDispatcher != nil {
+ routeID = args.ArgDispatcher.RouteID
+ }
+ return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
+ utils.CacheSv1ReplicateRemove, args, reply)
+}
+
+// ReplicateSet replicate an item
+func (dS *DispatcherService) CacheSv1ReplicateSet(args *utils.ArgCacheReplicateSet, reply *string) (err error) {
+ tnt := dS.cfg.GeneralCfg().DefaultTenant
+ if args.TenantArg.Tenant != utils.EmptyString {
+ tnt = args.TenantArg.Tenant
+ }
+ if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
+ if args.ArgDispatcher == nil {
+ return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
+ }
+ if err = dS.authorize(utils.CacheSv1ReplicateSet, tnt,
+ args.APIKey, utils.TimePointer(time.Now())); err != nil {
+ return
+ }
+ }
+ var routeID *string
+ if args.ArgDispatcher != nil {
+ routeID = args.ArgDispatcher.RouteID
+ }
+ return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCaches, routeID,
+ utils.CacheSv1ReplicateSet, args, reply)
+}
diff --git a/dispatchers/lib_test.go b/dispatchers/lib_test.go
index 72785fa78..14590d7cf 100644
--- a/dispatchers/lib_test.go
+++ b/dispatchers/lib_test.go
@@ -58,7 +58,7 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
}
type testDispatcher struct {
- CfgParh string
+ CfgPath string
Cfg *config.CGRConfig
RPC *rpc.Client
cmd *exec.Cmd
@@ -66,9 +66,9 @@ type testDispatcher struct {
func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool) (d *testDispatcher) {
d = new(testDispatcher)
- d.CfgParh = cfgPath
+ d.CfgPath = cfgPath
var err error
- d.Cfg, err = config.NewCGRConfigFromPath(d.CfgParh)
+ d.Cfg, err = config.NewCGRConfigFromPath(d.CfgPath)
if err != nil {
t.Fatalf("Error at config init :%v\n", err)
}
@@ -87,7 +87,7 @@ func newTestEngine(t *testing.T, cfgPath string, initDataDB, intitStoreDB bool)
func (d *testDispatcher) startEngine(t *testing.T) {
var err error
- if d.cmd, err = engine.StartEngine(d.CfgParh, dspDelay); err != nil {
+ if d.cmd, err = engine.StartEngine(d.CfgPath, dspDelay); err != nil {
t.Fatalf("Error at engine start:%v\n", err)
}
@@ -122,7 +122,7 @@ func (d *testDispatcher) loadData(t *testing.T, path string) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path}
if err := d.RPC.Call(utils.APIerSv1LoadTariffPlanFromFolder, attrs, &reply); err != nil {
- t.Errorf("<%s>Error at loading data from folder :%v", d.CfgParh, err)
+ t.Errorf("<%s>Error at loading data from folder :%v", d.CfgPath, err)
}
}
@@ -133,7 +133,7 @@ func (d *testDispatcher) loadData2(t *testing.T, path string) {
if err != nil {
t.Error(err)
}
- loader := exec.Command(loaderPath, "-config_path", d.CfgParh, "-path", path)
+ loader := exec.Command(loaderPath, "-config_path", d.CfgPath, "-path", path)
if err := loader.Start(); err != nil {
t.Error(err)
diff --git a/dispatchers/libdispatcher.go b/dispatchers/libdispatcher.go
index 317e4e673..8cfa4825e 100644
--- a/dispatchers/libdispatcher.go
+++ b/dispatchers/libdispatcher.go
@@ -252,8 +252,10 @@ func (_ *singleResultstrategyDispatcher) dispatch(dm *engine.DataManager, routeI
continue
}
if routeID != nil && *routeID != "" { // cache the discovered route
- engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
- nil, true, utils.EmptyString)
+ if err = engine.Cache.Set(utils.CacheDispatcherRoutes, *routeID, dH,
+ nil, true, utils.EmptyString); err != nil {
+ return
+ }
}
break
}
@@ -348,11 +350,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*routeID); ok && x != nil {
dH = x.(*engine.DispatcherHost)
- lM.incrementLoad(dH.ID)
- engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
+ lM.incrementLoad(dH.ID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
- lM.decrementLoad(dH.ID) // call ended
- engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
+ lM.decrementLoad(dH.ID, ld.tntID) // call ended
if !utils.IsNetworkError(err) {
return
}
@@ -363,11 +363,9 @@ func (ld *loadStrategyDispatcher) dispatch(dm *engine.DataManager, routeID *stri
err = utils.NewErrDispatcherS(err)
return
}
- lM.incrementLoad(hostID)
- engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
+ lM.incrementLoad(hostID, ld.tntID)
err = dH.Call(serviceMethod, args, reply)
- lM.decrementLoad(hostID) // call ended
- engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, ld.tntID, lM)
+ lM.decrementLoad(hostID, ld.tntID) // call ended
if utils.IsNetworkError(err) {
continue
}
@@ -396,14 +394,16 @@ func (lM *LoadMetrics) getHosts(hostIDs []string) []string {
return hostIDs
}
-func (lM *LoadMetrics) incrementLoad(hostID string) {
+func (lM *LoadMetrics) incrementLoad(hostID, tntID string) {
lM.Lock()
lM.HostsLoad[hostID] += 1
+ engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
}
-func (lM *LoadMetrics) decrementLoad(hostID string) {
+func (lM *LoadMetrics) decrementLoad(hostID, tntID string) {
lM.Lock()
lM.HostsLoad[hostID] -= 1
+ engine.Cache.ReplicateSet(utils.CacheDispatcherLoads, tntID, lM)
lM.Unlock()
}
diff --git a/engine/caches.go b/engine/caches.go
index 670999d8d..5cd197f5c 100644
--- a/engine/caches.go
+++ b/engine/caches.go
@@ -19,6 +19,7 @@ along with this program. If not, see
package engine
import (
+ "encoding/gob"
"fmt"
"sync"
"time"
@@ -32,6 +33,32 @@ var Cache *CacheS
func init() {
Cache = NewCacheS(config.CgrConfig(), nil)
+ //Threshold
+ gob.Register(new(Threshold))
+ gob.Register(new(ThresholdProfile))
+ gob.Register(new(ThresholdProfileWithArgDispatcher))
+ gob.Register(new(ThresholdWithArgDispatcher))
+ //Resource
+ gob.Register(new(Resource))
+ gob.Register(new(ResourceProfile))
+ gob.Register(new(ResourceProfileWithArgDispatcher))
+ gob.Register(new(ResourceWithArgDispatcher))
+ //Stats
+ gob.Register(new(StatQueue))
+ gob.Register(new(StatQueueProfile))
+ gob.Register(new(StatQueueProfileWithArgDispatcher))
+ gob.Register(new(StoredStatQueue))
+ gob.Register(new(StatQueueProfileWithArgDispatcher))
+ //Suppliers
+ gob.Register(new(SupplierProfile))
+ gob.Register(new(SupplierProfileWithArgDispatcher))
+ //Filters
+ gob.Register(new(Filter))
+ gob.Register(new(FilterWithArgDispatcher))
+ //Dispatcher
+ gob.Register(new(DispatcherHost))
+ gob.Register(new(DispatcherHostProfile))
+ gob.Register(new(DispatcherHostWithArgDispatcher))
}
//SetCache shared the cache from other subsystems
diff --git a/engine/dispatcherprfl.go b/engine/dispatcherprfl.go
index bf9af81eb..b113f9a23 100644
--- a/engine/dispatcherprfl.go
+++ b/engine/dispatcherprfl.go
@@ -152,7 +152,19 @@ func (dH *DispatcherHost) TenantID() string {
// GetRPCConnection builds or returns the cached connection
func (dH *DispatcherHost) Call(serviceMethod string, args interface{}, reply interface{}) error {
if dH.rpcConn == nil {
- return utils.ErrNotConnected
+ var err error
+ // connect the rpcConn
+ cfg := config.CgrConfig()
+ if dH.rpcConn, err = NewRPCPool(
+ rpcclient.PoolFirst,
+ cfg.TlsCfg().ClientKey,
+ cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
+ cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
+ cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
+ dH.Conns, IntRPC.GetInternalChanel(), false); err != nil {
+ return err
+ }
+
}
return dH.rpcConn.Call(serviceMethod, args, reply)
}
diff --git a/engine/dispatcherprfl_test.go b/engine/dispatcherprfl_test.go
index bb146af46..b04b8f443 100644
--- a/engine/dispatcherprfl_test.go
+++ b/engine/dispatcherprfl_test.go
@@ -245,9 +245,6 @@ func TestDispatcherHostCall(t *testing.T) {
reply: utils.StringPointer(""),
}
var reply string
- if err := dspHost.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err == nil || err.Error() != utils.ErrNotConnected.Error() {
- t.Errorf("Expected: %s , received: %v", utils.ErrNotConnected.Error(), err)
- }
dspHost.rpcConn = tRPC
if err := dspHost.Call(utils.AttributeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
t.Error(err)
diff --git a/engine/libtest.go b/engine/libtest.go
index 3b287de01..84336ffde 100644
--- a/engine/libtest.go
+++ b/engine/libtest.go
@@ -521,6 +521,10 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
+ utils.CacheDispatchers: {
+ Items: 0,
+ Groups: 0,
+ },
utils.CacheDestinations: {
Items: 0,
Groups: 0,
diff --git a/general_tests/cacherpl_it_test.go b/general_tests/cacherpl_it_test.go
new file mode 100644
index 000000000..b544a0985
--- /dev/null
+++ b/general_tests/cacherpl_it_test.go
@@ -0,0 +1,257 @@
+// +build integration
+
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+package general_tests
+
+import (
+ "net/rpc"
+ "os/exec"
+ "path"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/engine"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var (
+ dspEngine1Cfg *config.CGRConfig
+ dspEngine1CfgPath string
+ dspEngine1RPC *rpc.Client
+ dspEngine2Cfg *config.CGRConfig
+ dspEngine2CfgPath string
+ dspEngine2RPC *rpc.Client
+ engine1Cfg *config.CGRConfig
+ engine1CfgPath string
+ engine1RPC *rpc.Client
+
+ sTestsCacheRpl = []func(t *testing.T){
+ testCacheRplInitCfg,
+ testCacheRplInitDataDb,
+ testCacheRplStartEngine,
+ testCacheRplRpcConn,
+ testCacheRplAddData,
+ testCacheRplPing,
+ testCacheRplCheckReplication,
+
+ testCacheRplStopEngine,
+ }
+)
+
+func TestCacheReplications(t *testing.T) {
+ switch *dbType {
+ case utils.MetaInternal:
+ t.SkipNow()
+ case utils.MetaMySQL:
+ for _, stest := range sTestsCacheRpl {
+ t.Run("TestCacheReplications", stest)
+ }
+ case utils.MetaMongo:
+ t.SkipNow()
+ case utils.MetaPostgres:
+ t.SkipNow()
+ default:
+ t.Fatal("Unknown Database type")
+ }
+
+}
+
+func testCacheRplInitCfg(t *testing.T) {
+ var err error
+ dspEngine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine")
+ dspEngine1Cfg, err = config.NewCGRConfigFromPath(dspEngine1CfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+
+ dspEngine2CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "dispatcher_engine2")
+ dspEngine2Cfg, err = config.NewCGRConfigFromPath(dspEngine2CfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+
+ engine1CfgPath = path.Join(*dataDir, "conf", "samples", "cache_replicate", "engine1")
+ engine1Cfg, err = config.NewCGRConfigFromPath(engine1CfgPath)
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func testCacheRplInitDataDb(t *testing.T) {
+ if err := engine.InitDataDb(dspEngine1Cfg); err != nil {
+ t.Fatal(err)
+ }
+ if err := engine.InitDataDb(dspEngine2Cfg); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCacheRplStartEngine(t *testing.T) {
+ if _, err := engine.StopStartEngine(dspEngine1CfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := engine.StartEngine(dspEngine2CfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := engine.StartEngine(engine1CfgPath, *waitRater); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCacheRplRpcConn(t *testing.T) {
+ var err error
+ dspEngine1RPC, err = newRPCClient(dspEngine1Cfg.ListenCfg())
+ if err != nil {
+ t.Fatal(err)
+ }
+ dspEngine2RPC, err = rpc.Dial(utils.TCP, dspEngine2Cfg.ListenCfg().RPCGOBListen)
+ if err != nil {
+ t.Fatal(err)
+ }
+ engine1RPC, err = newRPCClient(engine1Cfg.ListenCfg())
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testCacheRplAddData(t *testing.T) {
+ wchan := make(chan struct{}, 1)
+ go func() {
+ loaderPath, err := exec.LookPath("cgr-loader")
+ if err != nil {
+ t.Error(err)
+ }
+ loader := exec.Command(loaderPath, "-config_path", dspEngine1CfgPath, "-path",
+ path.Join(*dataDir, "tariffplans", "cache_replications", "dispatcher_engine"))
+
+ if err := loader.Start(); err != nil {
+ t.Error(err)
+ }
+ loader.Wait()
+ wchan <- struct{}{}
+ }()
+ select {
+ case <-wchan:
+ case <-time.After(2 * time.Second):
+ t.Errorf("cgr-loader failed: ")
+ }
+
+ go func() {
+ loaderPath, err := exec.LookPath("cgr-loader")
+ if err != nil {
+ t.Error(err)
+ }
+ loader := exec.Command(loaderPath, "-config_path", dspEngine2CfgPath, "-path",
+ path.Join(*dataDir, "tariffplans", "cache_replications", "dispatcher_engine2"))
+
+ if err := loader.Start(); err != nil {
+ t.Error(err)
+ }
+ loader.Wait()
+ wchan <- struct{}{}
+ }()
+ select {
+ case <-wchan:
+ case <-time.After(2 * time.Second):
+ t.Errorf("cgr-loader failed: ")
+ }
+}
+
+func testCacheRplPing(t *testing.T) {
+ var reply map[string]interface{}
+ ev := utils.TenantWithArgDispatcher{
+ TenantArg: &utils.TenantArg{
+ Tenant: "cgrates.org",
+ },
+ ArgDispatcher: &utils.ArgDispatcher{
+ RouteID: utils.StringPointer("testRoute123"),
+ },
+ }
+ if err := dspEngine1RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
+ t.Error(err)
+ } else if reply[utils.NodeID] != "Engine1" {
+ t.Errorf("Received: %s", utils.ToJSON(reply))
+ }
+
+ var rpl string
+ if err := dspEngine1RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ },
+ ArgDispatcher: &utils.ArgDispatcher{
+ RouteID: utils.StringPointer("testRoute123"),
+ },
+ }, &rpl); err != nil {
+ t.Error(err)
+ } else if rpl != utils.Pong {
+ t.Errorf("Received: %s", rpl)
+ }
+}
+
+func testCacheRplCheckReplication(t *testing.T) {
+ var reply map[string]interface{}
+ ev := utils.TenantWithArgDispatcher{
+ TenantArg: &utils.TenantArg{
+ Tenant: "cgrates.org",
+ },
+ }
+ if err := dspEngine2RPC.Call(utils.CoreSv1Status, &ev, &reply); err != nil {
+ t.Error(err)
+ } else if reply[utils.NodeID] != "DispatcherEngine2" {
+ t.Errorf("Received: %s", utils.ToJSON(reply))
+ }
+ var rcvKeys []string
+ expKeys := []string{"testRoute123:*core", "testRoute123:*attributes"}
+ argsAPI := utils.ArgsGetCacheItemIDsWithArgDispatcher{
+ TenantArg: utils.TenantArg{
+ Tenant: "cgrates.org",
+ },
+ ArgsGetCacheItemIDs: utils.ArgsGetCacheItemIDs{
+ CacheID: utils.CacheDispatcherRoutes,
+ },
+ }
+ if err := dspEngine2RPC.Call(utils.CacheSv1GetItemIDs, argsAPI, &rcvKeys); err != nil {
+ t.Error(err.Error())
+ }
+ if !reflect.DeepEqual(expKeys, rcvKeys) {
+ t.Errorf("Expected: %+v, received: %+v", expKeys, rcvKeys)
+ }
+
+ var rpl string
+ if err := dspEngine2RPC.Call(utils.AttributeSv1Ping, &utils.CGREventWithArgDispatcher{
+ CGREvent: &utils.CGREvent{
+ Tenant: "cgrates.org",
+ },
+ ArgDispatcher: &utils.ArgDispatcher{
+ RouteID: utils.StringPointer("testRoute123"),
+ },
+ }, &rpl); err != nil {
+ t.Error(err)
+ } else if rpl != utils.Pong {
+ t.Errorf("Received: %s", rpl)
+ }
+}
+
+func testCacheRplStopEngine(t *testing.T) {
+ if err := engine.KillEngine(*waitRater); err != nil {
+ t.Error(err)
+ }
+}
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index cb2b3d2e1..72ec09534 100755
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -1433,8 +1433,12 @@ type DPRArgs struct {
type ArgCacheReplicateSet struct {
CacheID, ItemID string
Value interface{}
+ *ArgDispatcher
+ TenantArg
}
type ArgCacheReplicateRemove struct {
CacheID, ItemID string
+ *ArgDispatcher
+ TenantArg
}
diff --git a/utils/consts.go b/utils/consts.go
index 509d26b3e..11e7beb50 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -65,7 +65,7 @@ var (
CacheResourceProfiles, CacheResources, CacheEventResources, CacheStatQueueProfiles,
CacheStatQueues, CacheThresholdProfiles, CacheThresholds, CacheFilters,
CacheSupplierProfiles, CacheAttributeProfiles, CacheChargerProfiles,
- CacheDispatcherProfiles, CacheDispatcherHosts, CacheResourceFilterIndexes,
+ CacheDispatcherProfiles, CacheDispatcherHosts, CacheDispatchers, CacheResourceFilterIndexes,
CacheStatFilterIndexes, CacheThresholdFilterIndexes, CacheSupplierFilterIndexes,
CacheAttributeFilterIndexes, CacheChargerFilterIndexes, CacheDispatcherFilterIndexes,
CacheDispatcherRoutes, CacheDispatcherLoads, CacheDiameterMessages, CacheRPCResponses, CacheClosedSessions,