Updated dispatcher API methods

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-17 14:40:18 +03:00
committed by Dan Christian Bogos
parent bbc34b1f77
commit 7093dd70d1
34 changed files with 348 additions and 113 deletions

View File

@@ -36,6 +36,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/servmanager"
@@ -718,7 +719,7 @@ func TestApierLoadAccountActions(t *testing.T) {
func TestApierReloadScheduler(t *testing.T) {
reply := ""
// Simple test that command is executed without errors
if err := rater.Call(utils.SchedulerSv1Reload, reply, &reply); err != nil {
if err := rater.Call(utils.SchedulerSv1Reload, dispatchers.StringWithApiKey{}, &reply); err != nil {
t.Error("Got error on SchedulerSv1.Reload: ", err.Error())
} else if reply != "OK" {
t.Error("Calling SchedulerSv1.Reload got reply: ", reply)
@@ -1621,7 +1622,7 @@ func TestApierReloadCache2(t *testing.T) {
func TestApierReloadScheduler2(t *testing.T) {
reply := ""
// Simple test that command is executed without errors
if err := rater.Call(utils.SchedulerSv1Reload, reply, &reply); err != nil {
if err := rater.Call(utils.SchedulerSv1Reload, dispatchers.StringWithApiKey{}, &reply); err != nil {
t.Error("Got error on SchedulerSv1.Reload: ", err.Error())
} else if reply != utils.OK {
t.Error("Calling SchedulerSv1.Reload got reply: ", reply)
@@ -1707,7 +1708,7 @@ func TestApierStartStopServiceStatus(t *testing.T) {
} else if reply != utils.RunningCaps {
t.Errorf("Received: <%s>", reply)
}
if err := rater.Call(utils.SchedulerSv1Reload, reply, &reply); err != nil {
if err := rater.Call(utils.SchedulerSv1Reload, dispatchers.StringWithApiKey{}, &reply); err != nil {
t.Error("Got error on SchedulerSv1.Reload: ", err.Error())
} else if reply != utils.OK {
t.Error("Calling SchedulerSv1.Reload got reply: ", reply)

View File

@@ -80,6 +80,10 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error
return nil
}
func NewCDRsV1(CDRs *engine.CDRServer) *CDRsV1 {
return &CDRsV1{CDRs: CDRs}
}
// Receive CDRs via RPC methods
type CDRsV1 struct {
CDRs *engine.CDRServer

View File

@@ -475,7 +475,7 @@ func (dS *DispatcherSessionSv1) GetPassiveSessionsCount(args *dispatchers.Filter
return dS.dS.SessionSv1GetPassiveSessionsCount(args, reply)
}
func (dS *DispatcherSessionSv1) ReplicateSessions(args *dispatchers.ArgsReplicateSessionsWithApiKey,
func (dS *DispatcherSessionSv1) ReplicateSessions(args dispatchers.ArgsReplicateSessionsWithApiKey,
reply *string) (err error) {
return dS.dS.SessionSv1ReplicateSessions(args, reply)
}
@@ -633,12 +633,12 @@ type DispatcherGuardianSv1 struct {
}
// RemoteLock will lock a key from remote
func (dS *DispatcherGuardianSv1) RemoteLock(attr *dispatchers.AttrRemoteLockWithApiKey, reply *string) (err error) {
func (dS *DispatcherGuardianSv1) RemoteLock(attr dispatchers.AttrRemoteLockWithApiKey, reply *string) (err error) {
return dS.dS.GuardianSv1RemoteLock(attr, reply)
}
// RemoteUnlock will unlock a key from remote based on reference ID
func (dS *DispatcherGuardianSv1) RemoteUnlock(attr *dispatchers.AttrRemoteUnlockWithApiKey, reply *[]string) (err error) {
func (dS *DispatcherGuardianSv1) RemoteUnlock(attr dispatchers.AttrRemoteUnlockWithApiKey, reply *[]string) (err error) {
return dS.dS.GuardianSv1RemoteUnlock(attr, reply)
}
@@ -657,7 +657,7 @@ type DispatcherSchedulerSv1 struct {
}
// Reload reloads scheduler instructions
func (dS *DispatcherSchedulerSv1) Reload(attr *dispatchers.StringkWithApiKey, reply *string) (err error) {
func (dS *DispatcherSchedulerSv1) Reload(attr *dispatchers.StringWithApiKey, reply *string) (err error) {
return dS.dS.SchedulerSv1Reload(attr, reply)
}

View File

@@ -0,0 +1,139 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"time"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
type ThresholdSv1Interface interface {
GetThresholdIDs(tenant *utils.TenantWithArgDispatcher, tIDs *[]string) error
GetThresholdsForEvent(args *engine.ArgsProcessEvent, reply *engine.Thresholds) error
GetThreshold(tntID *utils.TenantIDWithArgDispatcher, t *engine.Threshold) error
ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type StatSv1Interface interface {
GetQueueIDs(tenant *utils.TenantWithArgDispatcher, qIDs *[]string) error
ProcessEvent(args *engine.StatsArgsProcessEvent, reply *[]string) error
GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent, reply *[]string) (err error)
GetQueueStringMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]string) (err error)
GetQueueFloatMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]float64) (err error)
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type ResourceSv1Interface interface {
GetResourcesForEvent(args utils.ArgRSv1ResourceUsage, reply *engine.Resources) error
AuthorizeResources(args utils.ArgRSv1ResourceUsage, reply *string) error
AllocateResources(args utils.ArgRSv1ResourceUsage, reply *string) error
ReleaseResources(args utils.ArgRSv1ResourceUsage, reply *string) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type SupplierSv1Interface interface {
GetSuppliers(args *engine.ArgsGetSuppliers, reply *engine.SortedSuppliers) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type AttributeSv1Interface interface {
GetAttributeForEvent(args *engine.AttrArgsProcessEvent, reply *engine.AttributeProfile) (err error)
ProcessEvent(args *engine.AttrArgsProcessEvent, reply *engine.AttrSProcessEventReply) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type ChargerSv1Interface interface {
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
GetChargersForEvent(cgrEv *utils.CGREventWithArgDispatcher, reply *engine.ChargerProfiles) error
ProcessEvent(args *utils.CGREventWithArgDispatcher, reply *[]*engine.ChrgSProcessEventReply) error
}
type SessionSv1Interface interface {
AuthorizeEvent(args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReply) error
AuthorizeEventWithDigest(args *sessions.V1AuthorizeArgs, rply *sessions.V1AuthorizeReplyWithDigest) error
InitiateSession(args *sessions.V1InitSessionArgs, rply *sessions.V1InitSessionReply) error
InitiateSessionWithDigest(args *sessions.V1InitSessionArgs, rply *sessions.V1InitReplyWithDigest) error
UpdateSession(args *sessions.V1UpdateSessionArgs, rply *sessions.V1UpdateSessionReply) error
// SyncSessions(args *string, rply *string) error
TerminateSession(args *sessions.V1TerminateSessionArgs, rply *string) error
ProcessCDR(cgrEv *utils.CGREventWithArgDispatcher, rply *string) error
ProcessEvent(args *sessions.V1ProcessEventArgs, rply *sessions.V1ProcessEventReply) error
GetActiveSessions(args *dispatchers.FilterSessionWithApiKey, rply *[]*sessions.ActiveSession) error
GetActiveSessionsCount(args *dispatchers.FilterSessionWithApiKey, rply *int) error
ForceDisconnect(args *dispatchers.FilterSessionWithApiKey, rply *string) error
GetPassiveSessions(args *dispatchers.FilterSessionWithApiKey, rply *[]*sessions.ActiveSession) error
GetPassiveSessionsCount(args *dispatchers.FilterSessionWithApiKey, rply *int) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
ReplicateSessions(args dispatchers.ArgsReplicateSessionsWithApiKey, rply *string) error
SetPassiveSession(args *sessions.Session, reply *string) error
}
type ResponderInterface interface {
GetCost(arg *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error)
Debit(arg *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error)
MaxDebit(arg *engine.CallDescriptorWithArgDispatcher, reply *engine.CallCost) (err error)
RefundIncrements(arg *engine.CallDescriptorWithArgDispatcher, reply *engine.Account) (err error)
RefundRounding(arg *engine.CallDescriptorWithArgDispatcher, reply *float64) (err error)
GetMaxSessionTime(arg *engine.CallDescriptorWithArgDispatcher, reply *time.Duration) (err error)
Status(arg *utils.TenantWithArgDispatcher, reply *map[string]interface{}) (err error)
Shutdown(arg *utils.TenantWithArgDispatcher, reply *string) (err error)
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type CacheSv1Interface interface {
GetItemIDs(args *dispatchers.ArgsGetCacheItemIDsWithApiKey, reply *[]string) error
HasItem(args *dispatchers.ArgsGetCacheItemWithApiKey, reply *bool) error
GetItemExpiryTime(args *dispatchers.ArgsGetCacheItemWithApiKey, reply *time.Time) error
RemoveItem(args *dispatchers.ArgsGetCacheItemWithApiKey, reply *string) error
Clear(cacheIDs *dispatchers.AttrCacheIDsWithApiKey, reply *string) error
FlushCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) error
GetCacheStats(cacheIDs *dispatchers.AttrCacheIDsWithApiKey, rply *map[string]*ltcache.CacheStats) error
PrecacheStatus(cacheIDs *dispatchers.AttrCacheIDsWithApiKey, rply *map[string]string) error
HasGroup(args *dispatchers.ArgsGetGroupWithApiKey, rply *bool) error
GetGroupItemIDs(args *dispatchers.ArgsGetGroupWithApiKey, rply *[]string) error
RemoveGroup(args *dispatchers.ArgsGetGroupWithApiKey, rply *string) error
ReloadCache(attrs dispatchers.AttrReloadCacheWithApiKey, reply *string) error
LoadCache(args dispatchers.AttrReloadCacheWithApiKey, reply *string) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type GuardianSv1Interface interface {
RemoteLock(attr dispatchers.AttrRemoteLockWithApiKey, reply *string) (err error)
RemoteUnlock(refID dispatchers.AttrRemoteUnlockWithApiKey, reply *[]string) (err error)
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type SchedulerSv1Interface interface {
Reload(arg *dispatchers.StringWithApiKey, reply *string) error
Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error
}
type CDRsV1Interface interface {
ProcessCDR(cdr *engine.CDRWithArgDispatcher, reply *string) error
ProcessEvent(arg *engine.ArgV1ProcessEvent, reply *string) error
ProcessExternalCDR(cdr *engine.ExternalCDRWithArgDispatcher, reply *string) error
RateCDRs(arg *engine.ArgRateCDRs, reply *string) error
StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply *string) error
}

View File

@@ -0,0 +1,85 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"testing"
"github.com/cgrates/cgrates/engine"
)
func TestThresholdSv1Interface(t *testing.T) {
_ = ThresholdSv1Interface(NewDispatcherThresholdSv1(nil))
_ = ThresholdSv1Interface(NewThresholdSv1(nil))
}
func TestStatSv1Interface(t *testing.T) {
_ = StatSv1Interface(NewDispatcherStatSv1(nil))
_ = StatSv1Interface(NewStatSv1(nil))
}
func TestResourceSv1Interface(t *testing.T) {
_ = ResourceSv1Interface(NewDispatcherResourceSv1(nil))
_ = ResourceSv1Interface(NewResourceSv1(nil))
}
func TestSupplierSv1Interface(t *testing.T) {
_ = SupplierSv1Interface(NewDispatcherSupplierSv1(nil))
_ = SupplierSv1Interface(NewSupplierSv1(nil))
}
func TestAttributeSv1Interface(t *testing.T) {
_ = AttributeSv1Interface(NewDispatcherAttributeSv1(nil))
_ = AttributeSv1Interface(NewAttributeSv1(nil))
}
func TestChargerSv1Interface(t *testing.T) {
_ = ChargerSv1Interface(NewDispatcherChargerSv1(nil))
_ = ChargerSv1Interface(NewChargerSv1(nil))
}
func TestSessionSv1Interface(t *testing.T) {
_ = SessionSv1Interface(NewDispatcherSessionSv1(nil))
_ = SessionSv1Interface(NewSessionSv1(nil))
}
func TestResponderInterface(t *testing.T) {
_ = ResponderInterface(NewDispatcherResponder(nil))
_ = ResponderInterface(&engine.Responder{})
}
func TestCacheSv1Interface(t *testing.T) {
_ = CacheSv1Interface(NewDispatcherCacheSv1(nil))
// _ = CacheSv1Interface(NewCacheSv1(nil))
}
func TestGuardianSv1Interface(t *testing.T) {
_ = GuardianSv1Interface(NewDispatcherGuardianSv1(nil))
_ = GuardianSv1Interface(NewGuardianSv1())
}
func TestSchedulerSv1Interface(t *testing.T) {
_ = SchedulerSv1Interface(NewDispatcherSchedulerSv1(nil))
_ = SchedulerSv1Interface(NewSchedulerSv1(nil))
}
func TestCDRsV1Interface(t *testing.T) {
_ = CDRsV1Interface(NewDispatcherSCDRsV1(nil))
_ = CDRsV1Interface(NewCDRsV1(nil))
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -30,14 +31,14 @@ func NewGuardianSv1() *GuardianSv1 {
type GuardianSv1 struct{}
// RemoteLock will lock a key from remote
func (self *GuardianSv1) RemoteLock(attr utils.AttrRemoteLock, reply *string) (err error) {
func (self *GuardianSv1) RemoteLock(attr dispatchers.AttrRemoteLockWithApiKey, reply *string) (err error) {
*reply = guardian.Guardian.GuardIDs(attr.ReferenceID, attr.Timeout, attr.LockIDs...)
return
}
// RemoteUnlock will unlock a key from remote based on reference ID
func (self *GuardianSv1) RemoteUnlock(refID string, reply *[]string) (err error) {
*reply = guardian.Guardian.UnguardIDs(refID)
func (self *GuardianSv1) RemoteUnlock(refID dispatchers.AttrRemoteUnlockWithApiKey, reply *[]string) (err error) {
*reply = guardian.Guardian.UnguardIDs(refID.RefID)
return
}

View File

@@ -27,6 +27,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -72,7 +73,7 @@ func TestGuardianSIT(t *testing.T) {
t.Error(err)
}
var unlockReply []string
if err = guardianRPC.Call(utils.GuardianSv1RemoteUnlock, reply, &unlockReply); err != nil {
if err = guardianRPC.Call(utils.GuardianSv1RemoteUnlock, dispatchers.AttrRemoteUnlockWithApiKey{RefID: reply}, &unlockReply); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(args.LockIDs, unlockReply) {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
@@ -33,8 +34,8 @@ type SchedulerSv1 struct {
}
// Reload reloads scheduler instructions
func (schdSv1 *SchedulerSv1) Reload(arg string, reply *string) error {
return schdSv1.schdS.V1Reload(arg, reply)
func (schdSv1 *SchedulerSv1) Reload(arg *dispatchers.StringWithApiKey, reply *string) error {
return schdSv1.schdS.V1Reload(arg.Arg, reply)
}
func (schdSv1 *SchedulerSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -76,38 +77,38 @@ func (ssv1 *SessionSv1) ProcessEvent(args *sessions.V1ProcessEventArgs,
return ssv1.Ss.BiRPCv1ProcessEvent(nil, args, rply)
}
func (ssv1 *SessionSv1) GetActiveSessions(args map[string]string,
func (ssv1 *SessionSv1) GetActiveSessions(args *dispatchers.FilterSessionWithApiKey,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetActiveSessions(nil, args, rply)
return ssv1.Ss.BiRPCv1GetActiveSessions(nil, args.Filters, rply)
}
func (ssv1 *SessionSv1) GetActiveSessionsCount(args map[string]string,
func (ssv1 *SessionSv1) GetActiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
rply *int) error {
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(nil, args, rply)
return ssv1.Ss.BiRPCv1GetActiveSessionsCount(nil, args.Filters, rply)
}
func (ssv1 *SessionSv1) ForceDisconnect(args map[string]string,
func (ssv1 *SessionSv1) ForceDisconnect(args *dispatchers.FilterSessionWithApiKey,
rply *string) error {
return ssv1.Ss.BiRPCv1ForceDisconnect(nil, args, rply)
return ssv1.Ss.BiRPCv1ForceDisconnect(nil, args.Filters, rply)
}
func (ssv1 *SessionSv1) GetPassiveSessions(args map[string]string,
func (ssv1 *SessionSv1) GetPassiveSessions(args *dispatchers.FilterSessionWithApiKey,
rply *[]*sessions.ActiveSession) error {
return ssv1.Ss.BiRPCv1GetPassiveSessions(nil, args, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessions(nil, args.Filters, rply)
}
func (ssv1 *SessionSv1) GetPassiveSessionsCount(args map[string]string,
func (ssv1 *SessionSv1) GetPassiveSessionsCount(args *dispatchers.FilterSessionWithApiKey,
rply *int) error {
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(nil, args, rply)
return ssv1.Ss.BiRPCv1GetPassiveSessionsCount(nil, args.Filters, rply)
}
func (ssv1 *SessionSv1) Ping(ign *utils.CGREvent, reply *string) error {
func (ssv1 *SessionSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
*reply = utils.Pong
return nil
}
func (ssv1 *SessionSv1) ReplicateSessions(args sessions.ArgsReplicateSessions, rply *string) error {
return ssv1.Ss.BiRPCv1ReplicateSessions(nil, args, rply)
func (ssv1 *SessionSv1) ReplicateSessions(args dispatchers.ArgsReplicateSessionsWithApiKey, rply *string) error {
return ssv1.Ss.BiRPCv1ReplicateSessions(nil, args.ArgsReplicateSessions, rply)
}
func (ssv1 *SessionSv1) SetPassiveSession(args *sessions.Session,

View File

@@ -125,7 +125,7 @@ func (ssv1 *SessionSv1) BiRPCv1RegisterInternalBiJSONConn(clnt *rpc2.Client, arg
return ssv1.Ss.BiRPCv1RegisterInternalBiJSONConn(clnt, args, rply)
}
func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREvent, reply *string) error {
func (ssv1 *SessionSv1) BiRPCPing(clnt *rpc2.Client, ign *utils.CGREventWithArgDispatcher, reply *string) error {
return ssv1.Ping(ign, reply)
}

View File

@@ -166,7 +166,7 @@ func (stsv1 *StatSv1) Call(serviceMethod string, args interface{}, reply interfa
}
// GetQueueIDs returns list of queueIDs registered for a tenant
func (stsv1 *StatSv1) GetQueueIDs(tenant *utils.TenantArg, qIDs *[]string) error {
func (stsv1 *StatSv1) GetQueueIDs(tenant *utils.TenantWithArgDispatcher, qIDs *[]string) error {
return stsv1.sS.V1GetQueueIDs(tenant.Tenant, qIDs)
}
@@ -181,13 +181,13 @@ func (stsv1 *StatSv1) GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent,
}
// GetStringMetrics returns the string metrics for a Queue
func (stsv1 *StatSv1) GetQueueStringMetrics(args *utils.TenantID, reply *map[string]string) (err error) {
return stsv1.sS.V1GetQueueStringMetrics(args, reply)
func (stsv1 *StatSv1) GetQueueStringMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]string) (err error) {
return stsv1.sS.V1GetQueueStringMetrics(args.TenantID, reply)
}
// GetQueueFloatMetrics returns the float metrics for a Queue
func (stsv1 *StatSv1) GetQueueFloatMetrics(args *utils.TenantID, reply *map[string]float64) (err error) {
return stsv1.sS.V1GetQueueFloatMetrics(args, reply)
func (stsv1 *StatSv1) GetQueueFloatMetrics(args *utils.TenantIDWithArgDispatcher, reply *map[string]float64) (err error) {
return stsv1.sS.V1GetQueueFloatMetrics(args.TenantID, reply)
}
func (stSv1 *StatSv1) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {

View File

@@ -41,7 +41,7 @@ func (tSv1 *ThresholdSv1) Call(serviceMethod string, args interface{}, reply int
}
// GetThresholdIDs returns list of threshold IDs registered for a tenant
func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant *utils.TenantArg, tIDs *[]string) error {
func (tSv1 *ThresholdSv1) GetThresholdIDs(tenant *utils.TenantWithArgDispatcher, tIDs *[]string) error {
return tSv1.tS.V1GetThresholdIDs(tenant.Tenant, tIDs)
}
@@ -51,8 +51,8 @@ func (tSv1 *ThresholdSv1) GetThresholdsForEvent(args *engine.ArgsProcessEvent, r
}
// GetThreshold queries a Threshold
func (tSv1 *ThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error {
return tSv1.tS.V1GetThreshold(tntID, t)
func (tSv1 *ThresholdSv1) GetThreshold(tntID *utils.TenantIDWithArgDispatcher, t *engine.Threshold) error {
return tSv1.tS.V1GetThreshold(tntID.TenantID, t)
}
// ProcessEvent will process an Event

View File

@@ -793,9 +793,9 @@ func startCDRS(internalCdrSChan, internalRaterChan, internalAttributeSChan, inte
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHandlersToServer(server)
utils.Logger.Info("Registering CDRS RPC service.")
cdrSrv := v1.CDRsV1{CDRs: cdrServer}
server.RpcRegister(&cdrSrv)
server.RpcRegister(&v2.CDRsV2{CDRsV1: cdrSrv})
cdrSrv := v1.NewCDRsV1(cdrServer)
server.RpcRegister(cdrSrv)
server.RpcRegister(&v2.CDRsV2{CDRsV1: *cdrSrv})
// Make the cdr server available for internal communication
server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this)
internalCdrSChan <- cdrServer // Signal that cdrS is operational

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -50,13 +51,13 @@ func (self *CmdActiveSessions) RpcMethod() string {
func (self *CmdActiveSessions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &map[string]string{}
self.rpcParams = &dispatchers.FilterSessionWithApiKey{}
}
return self.rpcParams
}
func (self *CmdActiveSessions) PostprocessRpcParams() error {
param := self.rpcParams.(*map[string]string)
param := self.rpcParams.(*dispatchers.FilterSessionWithApiKey)
self.rpcParams = param
return nil
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package console
import (
"github.com/cgrates/cgrates/dispatchers"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
)
@@ -50,13 +51,13 @@ func (self *CmdPassiveSessions) RpcMethod() string {
func (self *CmdPassiveSessions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &map[string]string{}
self.rpcParams = &dispatchers.FilterSessionWithApiKey{}
}
return self.rpcParams
}
func (self *CmdPassiveSessions) PostprocessRpcParams() error {
param := self.rpcParams.(*map[string]string)
param := self.rpcParams.(*dispatchers.FilterSessionWithApiKey)
self.rpcParams = param
return nil
}

View File

@@ -32,7 +32,7 @@ func init() {
type CmdStatus struct {
name string
rpcMethod string
rpcParams *EmptyWrapper
rpcParams *utils.TenantWithArgDispatcher
*CommandExecuter
}
@@ -46,7 +46,7 @@ func (self *CmdStatus) RpcMethod() string {
func (self *CmdStatus) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &EmptyWrapper{}
self.rpcParams = &utils.TenantWithArgDispatcher{}
}
return self.rpcParams
}

View File

@@ -27,7 +27,7 @@ func init() {
c := &CmdGetThreshold{
name: "threshold",
rpcMethod: "ApierV1.GetThresholdProfile",
rpcParams: &utils.TenantID{},
rpcParams: &utils.TenantIDWithArgDispatcher{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
@@ -36,7 +36,7 @@ func init() {
type CmdGetThreshold struct {
name string
rpcMethod string
rpcParams *utils.TenantID
rpcParams *utils.TenantIDWithArgDispatcher
*CommandExecuter
}
@@ -50,7 +50,7 @@ func (self *CmdGetThreshold) RpcMethod() string {
func (self *CmdGetThreshold) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &utils.TenantID{}
self.rpcParams = &utils.TenantIDWithArgDispatcher{}
}
return self.rpcParams
}

View File

@@ -39,7 +39,7 @@ func (dS *DispatcherService) CacheSv1Ping(args *utils.CGREventWithArgDispatcher,
}
}
return dS.Dispatch(args.CGREvent, utils.MetaCaches, args.RouteID,
utils.CacheSv1Ping, args.CGREvent, reply)
utils.CacheSv1Ping, args, reply)
}
// GetItemIDs returns the IDs for cacheID with given prefix

View File

@@ -39,7 +39,7 @@ func (dS *DispatcherService) CDRsV1Ping(args *utils.CGREventWithArgDispatcher,
}
}
return dS.Dispatch(args.CGREvent, utils.MetaCDRs, args.RouteID,
utils.CDRsV1Ping, args.CGREvent, reply)
utils.CDRsV1Ping, args, reply)
}
func (dS *DispatcherService) CDRsV1GetCDRs(args utils.RPCCDRsFilterWithArgDispatcher, reply *[]*engine.CDR) (err error) {

View File

@@ -38,11 +38,11 @@ func (dS *DispatcherService) GuardianSv1Ping(args *utils.CGREventWithArgDispatch
}
}
return dS.Dispatch(args.CGREvent, utils.MetaGuardian, args.RouteID,
utils.GuardianSv1Ping, args.CGREvent, reply)
utils.GuardianSv1Ping, args, reply)
}
// RemoteLock will lock a key from remote
func (dS *DispatcherService) GuardianSv1RemoteLock(args *AttrRemoteLockWithApiKey,
func (dS *DispatcherService) GuardianSv1RemoteLock(args AttrRemoteLockWithApiKey,
reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
@@ -59,7 +59,7 @@ func (dS *DispatcherService) GuardianSv1RemoteLock(args *AttrRemoteLockWithApiKe
}
// RemoteUnlock will unlock a key from remote based on reference ID
func (dS *DispatcherService) GuardianSv1RemoteUnlock(args *AttrRemoteUnlockWithApiKey,
func (dS *DispatcherService) GuardianSv1RemoteUnlock(args AttrRemoteUnlockWithApiKey,
reply *[]string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
@@ -72,5 +72,5 @@ func (dS *DispatcherService) GuardianSv1RemoteUnlock(args *AttrRemoteUnlockWithA
}
}
return dS.Dispatch(&utils.CGREvent{Tenant: args.TenantArg.Tenant}, utils.MetaGuardian, args.RouteID,
utils.GuardianSv1RemoteUnlock, args.RefID, reply)
utils.GuardianSv1RemoteUnlock, args, reply)
}

View File

@@ -35,7 +35,7 @@ func (dS *DispatcherService) ResourceSv1Ping(args *utils.CGREventWithArgDispatch
}
}
return dS.Dispatch(args.CGREvent, utils.MetaResources, args.RouteID,
utils.ResourceSv1Ping, args.CGREvent, rpl)
utils.ResourceSv1Ping, args, rpl)
}
func (dS *DispatcherService) ResourceSv1GetResourcesForEvent(args utils.ArgRSv1ResourceUsage,

View File

@@ -39,7 +39,7 @@ func (dS *DispatcherService) ResponderPing(args *utils.CGREventWithArgDispatcher
}
}
return dS.Dispatch(args.CGREvent, utils.MetaResponder, args.RouteID,
utils.ResponderPing, args.CGREvent, reply)
utils.ResponderPing, args, reply)
}
func (dS *DispatcherService) ResponderStatus(args *utils.TenantWithArgDispatcher,
@@ -56,7 +56,7 @@ func (dS *DispatcherService) ResponderStatus(args *utils.TenantWithArgDispatcher
return dS.Dispatch(&utils.CGREvent{
Tenant: args.Tenant,
}, utils.MetaResponder, args.RouteID, utils.ResponderStatus,
"", reply)
args, reply)
}
func (dS *DispatcherService) ResponderGetCost(args *engine.CallDescriptorWithArgDispatcher,
@@ -163,7 +163,7 @@ func (dS *DispatcherService) ResponderShutdown(args *utils.TenantWithArgDispatch
return dS.Dispatch(&utils.CGREvent{
Tenant: args.Tenant,
}, utils.MetaResponder, args.RouteID, utils.ResponderShutdown,
"", reply)
args, reply)
}
func (dS *DispatcherService) ResponderGetTimeout(args *utils.TenantWithArgDispatcher,

View File

@@ -38,16 +38,16 @@ var sTestsDspRsp = []func(t *testing.T){
//Test start here
func TestDspResponderTMySQL(t *testing.T) {
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
testDsp(t, sTestsDspRsp, "TestDspResponder", "all", "all2", "dispatchers", "tutorial", "oldtutorial", "dispatchers")
}
func TestDspResponderMongo(t *testing.T) {
testDsp(t, sTestsDspRsp, "TestDspAttributeS", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
testDsp(t, sTestsDspRsp, "TestDspResponder", "all", "all2", "dispatchers_mongo", "tutorial", "oldtutorial", "dispatchers")
}
func testDspResponderStatus(t *testing.T) {
var reply map[string]interface{}
if err := allEngine.RCP.Call(utils.ResponderStatus, "", &reply); err != nil {
if err := allEngine.RCP.Call(utils.ResponderStatus, utils.TenantWithArgDispatcher{}, &reply); err != nil {
t.Error(err)
} else if reply[utils.NodeID] != "ALL" {
t.Errorf("Received: %s", reply)
@@ -104,7 +104,7 @@ func getNodeWithRoute(route string, t *testing.T) string {
} else if pingReply != utils.Pong {
t.Errorf("Received: %s", pingReply)
}
if err := dispEngine.RCP.Call(utils.ResponderStatus, &ev, &reply); err != nil {
if err := dispEngine.RCP.Call(utils.ResponderStatus, ev, &reply); err != nil {
t.Error(err)
}
if reply[utils.NodeID] == nil {
@@ -134,7 +134,7 @@ func testDspResponderShutdown(t *testing.T) {
APIKey: utils.StringPointer("rsp12345"),
},
}
if err := dispEngine.RCP.Call(utils.ResponderShutdown, &ev, &reply); err != nil {
if err := dispEngine.RCP.Call(utils.ResponderShutdown, ev, &reply); err != nil {
t.Error(err)
} else if reply != "Done!" {
t.Errorf("Received: %s", utils.ToJSON(reply))

View File

@@ -36,10 +36,10 @@ func (dS *DispatcherService) SchedulerSv1Ping(args *utils.CGREventWithArgDispatc
}
}
return dS.Dispatch(args.CGREvent, utils.MetaScheduler, args.RouteID,
utils.SchedulerSv1Ping, args.CGREvent, reply)
utils.SchedulerSv1Ping, args, reply)
}
func (dS *DispatcherService) SchedulerSv1Reload(args *StringkWithApiKey, reply *string) (err error) {
func (dS *DispatcherService) SchedulerSv1Reload(args *StringWithApiKey, reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")
}

View File

@@ -37,7 +37,7 @@ func (dS *DispatcherService) SessionSv1Ping(args *utils.CGREventWithArgDispatche
}
}
return dS.Dispatch(args.CGREvent, utils.MetaSessionS, args.RouteID,
utils.SessionSv1Ping, args.CGREvent, reply)
utils.SessionSv1Ping, args, reply)
}
func (dS *DispatcherService) SessionSv1AuthorizeEvent(args *sessions.V1AuthorizeArgs,
@@ -264,7 +264,7 @@ func (dS *DispatcherService) SessionSv1GetPassiveSessionsCount(args *FilterSessi
utils.SessionSv1GetPassiveSessionsCount, args.Filters, reply)
}
func (dS *DispatcherService) SessionSv1ReplicateSessions(args *ArgsReplicateSessionsWithApiKey,
func (dS *DispatcherService) SessionSv1ReplicateSessions(args ArgsReplicateSessionsWithApiKey,
reply *string) (err error) {
if args.ArgDispatcher == nil {
return utils.NewErrMandatoryIeMissing("ArgDispatcher")

View File

@@ -37,7 +37,7 @@ func (dS *DispatcherService) StatSv1Ping(args *utils.CGREventWithArgDispatcher,
}
}
return dS.Dispatch(args.CGREvent, utils.MetaStats, args.RouteID,
utils.StatSv1Ping, args.CGREvent, reply)
utils.StatSv1Ping, args, reply)
}
func (dS *DispatcherService) StatSv1GetStatQueuesForEvent(args *engine.StatsArgsProcessEvent,

View File

@@ -35,7 +35,7 @@ func (dS *DispatcherService) SupplierSv1Ping(args *utils.CGREventWithArgDispatch
}
}
return dS.Dispatch(args.CGREvent, utils.MetaSuppliers, args.RouteID,
utils.SupplierSv1Ping, args.CGREvent, reply)
utils.SupplierSv1Ping, args, reply)
}
func (dS *DispatcherService) SupplierSv1GetSuppliers(args *engine.ArgsGetSuppliers,

View File

@@ -37,7 +37,7 @@ func (dS *DispatcherService) ThresholdSv1Ping(args *utils.CGREventWithArgDispatc
}
}
return dS.Dispatch(args.CGREvent, utils.MetaThresholds, args.RouteID,
utils.ThresholdSv1Ping, args.CGREvent, reply)
utils.ThresholdSv1Ping, args, reply)
}
func (dS *DispatcherService) ThresholdSv1GetThresholdsForEvent(args *engine.ArgsProcessEvent,

View File

@@ -93,7 +93,7 @@ type AttrRemoteUnlockWithApiKey struct {
RefID string
}
type StringkWithApiKey struct {
type StringWithApiKey struct {
*utils.ArgDispatcher
utils.TenantArg
Arg string

View File

@@ -479,7 +479,7 @@ func (fltr *FilterRule) passStatS(dP config.DataProvider,
for _, statItem := range fltr.statItems {
statValues := make(map[string]float64)
if err := stats.Call(utils.StatSv1GetQueueFloatMetrics,
&utils.TenantID{Tenant: tenant, ID: statItem.ItemID}, &statValues); err != nil {
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statItem.ItemID}}, &statValues); err != nil {
return false, err
}
//convert statValues to map[string]interface{}

View File

@@ -241,12 +241,12 @@ func (rs *Responder) GetMaxSessionTime(arg *CallDescriptorWithArgDispatcher, rep
return
}
func (rs *Responder) Status(arg string, reply *map[string]interface{}) (err error) {
if arg != "" { // Introduce delay in answer, used in some automated tests
if delay, err := utils.ParseDurationWithNanosecs(arg); err == nil {
time.Sleep(delay)
}
}
func (rs *Responder) Status(arg *utils.TenantWithArgDispatcher, reply *map[string]interface{}) (err error) {
// if arg != "" { // Introduce delay in answer, used in some automated tests
// if delay, err := utils.ParseDurationWithNanosecs(arg); err == nil {
// time.Sleep(delay)
// }
// }
memstats := new(runtime.MemStats)
runtime.ReadMemStats(memstats)
response := make(map[string]interface{})
@@ -261,7 +261,7 @@ func (rs *Responder) Status(arg string, reply *map[string]interface{}) (err erro
return
}
func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
func (rs *Responder) Shutdown(arg *utils.TenantWithArgDispatcher, reply *string) (err error) {
dm.DataDB().Close()
cdrStorage.Close()
defer func() { rs.ExitChan <- true }()
@@ -270,7 +270,7 @@ func (rs *Responder) Shutdown(arg string, reply *string) (err error) {
}
// Ping used to detreminate if component is active
func (chSv1 *Responder) Ping(ign *utils.CGREvent, reply *string) error {
func (chSv1 *Responder) Ping(ign *utils.CGREventWithArgDispatcher, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -260,7 +260,7 @@ func (spS *SupplierService) statMetrics(statIDs []string, tenant string) (stsMet
for _, statID := range statIDs {
var metrics map[string]float64
if err = spS.statS.Call(utils.StatSv1GetQueueFloatMetrics,
&utils.TenantID{Tenant: tenant, ID: statID}, &metrics); err != nil &&
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: statID}}, &metrics); err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<SupplierS> error: %s getting statMetrics for stat : %s", err.Error(), statID))

View File

@@ -95,12 +95,12 @@ func TestRPCITLclRpcConnPoolFirst(t *testing.T) {
// Connect rpc client to rater
func TestRPCITLclStatusSecondEngine(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node2 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -117,14 +117,14 @@ func TestRPCITLclStartFirstEngine(t *testing.T) {
// Connect rpc client to rater
func TestRPCITLclStatusFirstInitial(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node2 {
t.Fatalf("Should receive ralID different than second one, got: %s", status[utils.NodeID].(string))
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node1, status[utils.NodeID].(string))
@@ -138,14 +138,14 @@ func TestRPCITLclStatusFirstFailover(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node1 {
t.Fatalf("Should receive ralID different than first one, got: %s", status[utils.NodeID].(string))
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node2 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -157,12 +157,12 @@ func TestRPCITLclStatusFirstFailback(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == node2 {
t.Error("Should receive new ID")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node2, status[utils.NodeID].(string))
@@ -177,14 +177,14 @@ func TestRPCITLclTDirectedRPC(t *testing.T) {
}
}
func TestRPCITLclTimeout(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "10s", &status); err == nil {
t.Error("Expecting timeout")
} else if err.Error() != rpcclient.ErrReplyTimeout.Error() {
t.Error(err)
}
}
// func TestRPCITLclTimeout(t *testing.T) {
// var status map[string]interface{}
// if err := rpcPoolFirst.Call("Responder.Status", "10s", &status); err == nil {
// t.Error("Expecting timeout")
// } else if err.Error() != rpcclient.ErrReplyTimeout.Error() {
// t.Error(err)
// }
// }
// Connect rpc client to rater
func TestRPCITLclRpcConnPoolBcast(t *testing.T) {
@@ -195,12 +195,12 @@ func TestRPCITLclRpcConnPoolBcast(t *testing.T) {
func TestRPCITLclBcastStatusInitial(t *testing.T) {
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -213,12 +213,12 @@ func TestRPCITLclBcastStatusNoRals1(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -231,7 +231,7 @@ func TestRPCITLclBcastStatusBcastNoRals(t *testing.T) {
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond)
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err == nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err == nil {
t.Error("Should get error")
}
}
@@ -241,12 +241,12 @@ func TestRPCITLclBcastStatusRALs2Up(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -258,12 +258,12 @@ func TestRPCITLclStatusBcastRALs1Up(t *testing.T) {
t.Fatal(err)
}
var status map[string]interface{}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty InstanceID received")
}
if err := rpcPoolBroadcast.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolBroadcast.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty InstanceID received")
@@ -331,12 +331,12 @@ func TestRPCITRmtStatusFirstInitial(t *testing.T) {
return
}
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil { // Make sure second time we land on the same instance
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // Make sure second time we land on the same instance
t.Error(err)
} else if status[utils.NodeID].(string) != node1 {
t.Errorf("Expecting:\n%s\nReceived:\n%s", node1, status[utils.NodeID].(string))
@@ -355,14 +355,14 @@ func TestRPCITRmtStatusFirstFailover(t *testing.T) {
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node1 {
t.Fatal("Did not failover")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
@@ -383,14 +383,14 @@ func TestRPCITRmtStatusFirstFailback(t *testing.T) {
}
fmt.Println("\n\nExecuting query ...")
var status map[string]interface{}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")
} else if status[utils.NodeID].(string) == node2 {
t.Fatal("Did not do failback")
}
if err := rpcPoolFirst.Call("Responder.Status", "", &status); err != nil {
if err := rpcPoolFirst.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil {
t.Error(err)
} else if status[utils.NodeID].(string) == "" {
t.Error("Empty NodeID received")

View File

@@ -410,10 +410,10 @@ func TestSessionSRplManualReplicate(t *testing.T) {
t.Errorf("Failed to kill process, error: %v", err.Error())
}
var status map[string]interface{}
if err := smgRplcMstrRPC.Call("Responder.Status", "", &status); err == nil { // master should not longer be reachable
if err := smgRplcMstrRPC.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err == nil { // master should not longer be reachable
t.Error(err, status)
}
if err := smgRplcSlvRPC.Call("Responder.Status", "", &status); err != nil { // slave should be still operational
if err := smgRplcSlvRPC.Call("Responder.Status", utils.TenantWithArgDispatcher{}, &status); err != nil { // slave should be still operational
t.Error(err)
}
// start master