Add CoreService ( Allow status command to be executed without activating RALs ) fixes #1565

This commit is contained in:
TeoV
2019-06-04 13:05:22 +03:00
committed by Dan Christian Bogos
parent f8b0ff7f54
commit cbc9c73d3e
12 changed files with 216 additions and 74 deletions

43
apier/v1/core.go Normal file
View File

@@ -0,0 +1,43 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewCoreSv1(cS *engine.CoreService) *CoreSv1 {
return &CoreSv1{cS: cS}
}
// Exports RPC from RLs
type CoreSv1 struct {
cS *engine.CoreService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (cS *CoreSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(cS, serviceMethod, args, reply)
}
func (cS *CoreSv1) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]interface{}) error {
return cS.cS.Status(arg, reply)
}

View File

@@ -501,10 +501,6 @@ type DispatcherResponder struct {
dS *dispatchers.DispatcherService
}
func (dS *DispatcherResponder) Status(args *utils.TenantWithArgDispatcher, reply *map[string]interface{}) error {
return dS.dS.ResponderStatus(args, reply)
}
func (dS *DispatcherResponder) GetCost(args *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) error {
return dS.dS.ResponderGetCost(args, reply)
}
@@ -769,3 +765,16 @@ type DispatcherConfigSv1 struct {
func (dS *DispatcherConfigSv1) GetJSONSection(args *config.StringWithArgDispatcher, reply *map[string]interface{}) (err error) {
return dS.dS.ConfigSv1GetJSONSection(args, reply)
}
func NewDispatcherCoreSv1(dps *dispatchers.DispatcherService) *DispatcherCoreSv1 {
return &DispatcherCoreSv1{dS: dps}
}
// Exports RPC from RLs
type DispatcherCoreSv1 struct {
dS *dispatchers.DispatcherService
}
func (dS *DispatcherCoreSv1) Status(args *utils.TenantWithArgDispatcher, reply *map[string]interface{}) error {
return dS.dS.CoreSv1Status(args, reply)
}

View File

@@ -1247,6 +1247,9 @@ func startDispatcherService(internalDispatcherSChan, internalAttributeSChan chan
server.RpcRegisterName(utils.ConfigSv1,
v1.NewDispatcherConfigSv1(dspS))
server.RpcRegisterName(utils.CoreSv1,
v1.NewDispatcherCoreSv1(dspS))
internalDispatcherSChan <- dspS
}
@@ -1301,6 +1304,14 @@ func initGuardianSv1(internalGuardianSChan chan rpcclient.RpcClientConnection, s
internalGuardianSChan <- grdSv1
}
func initCoreSv1(internalCoreSv1Chan chan rpcclient.RpcClientConnection, server *utils.Server) {
cSv1 := v1.NewCoreSv1(engine.NewCoreService())
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cSv1)
}
internalCoreSv1Chan <- cSv1
}
func initSchedulerS(internalSchedSChan chan rpcclient.RpcClientConnection,
srvMngr *servmanager.ServiceManager, server *utils.Server) {
schdS := servmanager.NewSchedulerS(srvMngr)
@@ -1663,6 +1674,7 @@ func main() {
internalApierV2Chan := make(chan rpcclient.RpcClientConnection, 1)
internalServeManagerChan := make(chan rpcclient.RpcClientConnection, 1)
internalConfigChan := make(chan rpcclient.RpcClientConnection, 1)
internalCoreSv1Chan := make(chan rpcclient.RpcClientConnection, 1)
// init internalRPCSet
engine.IntRPC = engine.NewRPCClientSet()
@@ -1686,6 +1698,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan)
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
}
// init CacheS
@@ -1694,6 +1707,9 @@ func main() {
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
// init CoreSv1
initCoreSv1(internalCoreSv1Chan, server)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, cacheS, exitChan)
initServiceManagerV1(internalServeManagerChan, srvManager, server)

View File

@@ -23,7 +23,7 @@ import "github.com/cgrates/cgrates/utils"
func init() {
c := &CmdStatus{
name: "status",
rpcMethod: utils.ResponderStatus,
rpcMethod: utils.CoreSv1Status,
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}

View File

@@ -10,7 +10,7 @@ cgrates.org,ATTR_API_SUP_AUTH,*auth,*string:~APIKey:sup12345,,,APIMethods,*const
cgrates.org,ATTR_API_STAT_AUTH,*auth,*string:~APIKey:stat12345,,,APIMethods,*constant,StatSv1.Ping&StatSv1.GetStatQueuesForEvent&StatSv1.GetQueueStringMetrics&StatSv1.ProcessEvent&StatSv1.GetQueueIDs&StatSv1.GetQueueFloatMetrics,false,20
cgrates.org,ATTR_API_RES_AUTH,*auth,*string:~APIKey:res12345,,,APIMethods,*constant,ResourceSv1.Ping&ResourceSv1.GetResourcesForEvent&ResourceSv1.AuthorizeResources&ResourceSv1.AllocateResources&ResourceSv1.ReleaseResources,false,20
cgrates.org,ATTR_API_SES_AUTH,*auth,*string:~APIKey:ses12345,,,APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessEvent&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession,false,20
cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~APIKey:rsp12345,,,APIMethods,*constant,Responder.Status&Responder.GetTimeout&Responder.Shutdown&Responder.Ping,false,20
cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~APIKey:rsp12345,,,APIMethods,*constant,Responder.GetTimeout&Responder.Shutdown&Responder.Ping,false,20
cgrates.org,ATTR_API_CHC_AUTH,*auth,*string:~APIKey:chc12345,,,APIMethods,*constant,CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear,false,20
cgrates.org,ATTR_API_GRD_AUTH,*auth,*string:~APIKey:grd12345,,,APIMethods,*constant,GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock,false,20
cgrates.org,ATTR_API_SCHD_AUTH,*auth,*string:~APIKey:sched12345,,,APIMethods,*constant,SchedulerSv1.Ping,false,20
1 #Tenant ID Contexts FilterIDs ActivationInterval AttributeFilterIDs FieldName Type Value Blocker Weight
10 cgrates.org ATTR_API_STAT_AUTH *auth *string:~APIKey:stat12345 APIMethods *constant StatSv1.Ping&StatSv1.GetStatQueuesForEvent&StatSv1.GetQueueStringMetrics&StatSv1.ProcessEvent&StatSv1.GetQueueIDs&StatSv1.GetQueueFloatMetrics false 20
11 cgrates.org ATTR_API_RES_AUTH *auth *string:~APIKey:res12345 APIMethods *constant ResourceSv1.Ping&ResourceSv1.GetResourcesForEvent&ResourceSv1.AuthorizeResources&ResourceSv1.AllocateResources&ResourceSv1.ReleaseResources false 20
12 cgrates.org ATTR_API_SES_AUTH *auth *string:~APIKey:ses12345 APIMethods *constant SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessEvent&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession false 20
13 cgrates.org ATTR_API_RSP_AUTH *auth *string:~APIKey:rsp12345 APIMethods *constant Responder.Status&Responder.GetTimeout&Responder.Shutdown&Responder.Ping Responder.GetTimeout&Responder.Shutdown&Responder.Ping false 20
14 cgrates.org ATTR_API_CHC_AUTH *auth *string:~APIKey:chc12345 APIMethods *constant CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear false 20
15 cgrates.org ATTR_API_GRD_AUTH *auth *string:~APIKey:grd12345 APIMethods *constant GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock false 20
16 cgrates.org ATTR_API_SCHD_AUTH *auth *string:~APIKey:sched12345 APIMethods *constant SchedulerSv1.Ping false 20

45
dispatchers/core.go Normal file
View File

@@ -0,0 +1,45 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package dispatchers
import (
"time"
"github.com/cgrates/cgrates/utils"
)
func (dS *DispatcherService) CoreSv1Status(args *utils.TenantWithArgDispatcher,
reply *map[string]interface{}) (err error) {
tnt := utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.CoreSv1Status, tnt,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaCore,
routeID, utils.CoreSv1Status, args, reply)
}

View File

@@ -46,26 +46,6 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher
routeID, utils.ResponderPing, args, reply)
}
func (dS *DispatcherService) ResponderStatus(args *utils.TenantWithArgDispatcher,
reply *map[string]interface{}) (err error) {
tnt := utils.FirstNonEmpty(args.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
if dS.attrS != nil {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing(utils.ArgDispatcherField)
}
if err = dS.authorize(utils.ResponderStatus, tnt,
args.APIKey, utils.TimePointer(time.Now())); err != nil {
return
}
}
var routeID *string
if args.ArgDispatcher != nil {
routeID = args.ArgDispatcher.RouteID
}
return dS.Dispatch(&utils.CGREvent{Tenant: tnt}, utils.MetaResponder,
routeID, utils.ResponderStatus, args, reply)
}
func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArgDispatcher,
reply *engine.CallCost) (err error) {
if dS.attrS != nil {

64
engine/core.go Normal file
View File

@@ -0,0 +1,64 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"runtime"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func NewCoreService() *CoreService {
return &CoreService{}
}
type CoreService struct {
}
// ListenAndServe will initialize the service
func (cS *CoreService) ListenAndServe(exitChan chan bool) (err error) {
utils.Logger.Info("Starting Core service")
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return
}
// Shutdown is called to shutdown the service
func (cS *CoreService) Shutdown() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS))
utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS))
return
}
func (cS *CoreService) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]interface{}) (err error) {
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
response := make(map[string]interface{})
response[utils.NodeID] = config.CgrConfig().GeneralCfg().NodeID
response[utils.MemoryUsage] = utils.SizeFmt(float64(memstats.HeapAlloc), "")
response[utils.ActiveGoroutines] = runtime.NumGoroutine()
response[utils.Footprint] = utils.SizeFmt(float64(memstats.Sys), "")
response[utils.Version] = utils.GetCGRVersion()
response[utils.RunningSince] = utils.GetStartTime()
response[utils.GoVersion] = runtime.Version()
*reply = response
return
}

View File

@@ -20,7 +20,6 @@ package engine
import (
"reflect"
"runtime"
"strings"
"time"
@@ -241,26 +240,6 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptorWithArgDispatcher, rep
return
}
func (rs *Responder) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]interface{}) (err error) {
// if arg != "" { // Introduce delay in answer, used in some automated tests
// if delay, err := utils.ParseDurationWithNanosecs(arg); err == nil {
// time.Sleep(delay)
// }
// }
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
response := make(map[string]interface{})
response[utils.NodeID] = config.CgrConfig().GeneralCfg().NodeID
response[utils.MemoryUsage] = utils.SizeFmt(float64(memstats.HeapAlloc), "")
response[utils.ActiveGoroutines] = runtime.NumGoroutine()
response[utils.Footprint] = utils.SizeFmt(float64(memstats.Sys), "")
response[utils.Version] = utils.GetCGRVersion()
response[utils.RunningSince] = utils.GetStartTime()
response[utils.GoVersion] = runtime.Version()
*reply = response
return
}
func (rs *Responder) Shutdown(arg *utils.TenantWithArgDispatcher, reply *string) (err error) {
dm.DataDB().Close()
cdrStorage.Close()

View File

@@ -95,12 +95,12 @@ func TestRPCITLclRpcConnPoolFirst(t *testing.T) {
// Connect rpc client to rater
func TestRPCITLclStatusSecondEngine(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node2 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -117,14 +117,14 @@ func TestRPCITLclStartFirstEngine(t *testing.T) {
// Connect rpc client to rater
func TestRPCITLclStatusFirstInitial(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node2 {
t.Fatalf("Should receive ralID different than second one, got: %s", status[utils.NodeID].(string))
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node1, status[utils.NodeID].(string))
@@ -138,14 +138,14 @@ func TestRPCITLclStatusFirstFailover(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node1 {
t.Fatalf("Should receive ralID different than first one, got: %s", status[utils.NodeID].(string))
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node2 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -157,12 +157,12 @@ func TestRPCITLclStatusFirstFailback(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == node2 {
t.Error("Should receive new ID")
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -179,7 +179,7 @@ func TestRPCITLclTDirectedRPC(t *testing.T) {
// func TestRPCITLclTimeout(t *testing.T) {
// var status map[string]interface{}
// if err := rpcPoolFirst.Call("Responder.Status", "10s", &status); err == nil {
// if err := rpcPoolFirst.Call(utils.CoreSv1Status, "10s", &status); err == nil {
// t.Error("Expecting timeout")
// } else if err.Error() != rpcclient.ErrReplyTimeout.Error() {
// t.Error(err)
@@ -195,12 +195,12 @@ func TestRPCITLclRpcConnPoolBcast(t *testing.T) {
func TestRPCITLclBcastStatusInitial(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -213,12 +213,12 @@ func TestRPCITLclBcastStatusNoRals1(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -231,7 +231,7 @@ func TestRPCITLclBcastStatusBcastNoRals(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err == nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err == nil {
t.Error("Should get error")
}
}
@@ -241,12 +241,12 @@ func TestRPCITLclBcastStatusRALs2Up(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -258,12 +258,12 @@ func TestRPCITLclStatusBcastRALs1Up(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolBroadcast.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty InstanceID received")
@@ -331,12 +331,12 @@ func TestRPCITRmtStatusFirstInitial(t *testing.T) {
return
}
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node1, status[utils.NodeID].(string))
@@ -355,14 +355,14 @@ func TestRPCITRmtStatusFirstFailover(t *testing.T) {
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node1 {
t.Fatal("Did not failover")
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -383,14 +383,14 @@ func TestRPCITRmtStatusFirstFailback(t *testing.T) {
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node2 {
t.Fatal("Did not do failback")
}
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
if err := rpcPoolFirst.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")

View File

@@ -395,10 +395,10 @@ func TestSessionSRplManualReplicate(t *testing.T) {
t.Errorf("Failed to kill process, error: %v", err.Error())
}
var status map[string]interface{}
if err := smgRplcMstrRPC.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err == nil { // master should not longer be reachable
if err := smgRplcMstrRPC.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err == nil { // master should not longer be reachable
t.Error(err, status)
}
if err := smgRplcSlvRPC.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // slave should be still operational
if err := smgRplcSlvRPC.Call(utils.CoreSv1Status, utils.TenantWithArgDispatcher{}, &status); err != nil { // slave should be still operational
t.Error(err)
}
// start master

View File

@@ -385,6 +385,7 @@ const (
MetaRALs = "*rals"
MetaStats = "*stats"
MetaResponder = "*responder"
MetaCore = "*core"
MetaThresholds = "*thresholds"
MetaSuppliers = "*suppliers"
MetaAttributes = "*attributes"
@@ -775,6 +776,12 @@ const (
ConfigSv1GetJSONSection = "ConfigSv1.GetJSONSection"
)
const (
CoreS = "CoreS"
CoreSv1 = "CoreSv1"
CoreSv1Status = "CoreSv1.Status"
)
// SupplierS APIs
const (
SupplierSv1GetSuppliers = "SupplierSv1.GetSuppliers"
@@ -881,7 +888,6 @@ const (
ResponderDebit = "Responder.Debit"
ResponderRefundIncrements = "Responder.RefundIncrements"
ResponderGetMaxSessionTime = "Responder.GetMaxSessionTime"
ResponderStatus = "Responder.Status"
ResponderMaxDebit = "Responder.MaxDebit"
ResponderRefundRounding = "Responder.RefundRounding"
ResponderGetCost = "Responder.GetCost"