Updated internal RPC

This commit is contained in:
Tripon Alexandru-Ionut
2019-04-08 14:44:15 +03:00
committed by Dan Christian Bogos
parent 6b487537b8
commit 0580c5ba39
5 changed files with 77 additions and 56 deletions

View File

@@ -302,7 +302,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(ssv1)
} else {
engine.IntRPC.AddConnection(utils.SessionSv1, ssv1)
engine.AddInternalRPCClient(utils.SessionSv1, ssv1)
}
// Register BiRpc handlers
if cfg.SessionSCfg().ListenBijson != "" {
@@ -681,7 +681,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(aSv1)
} else {
engine.IntRPC.AddConnection(utils.AttributeSv1, aSv1)
engine.AddInternalRPCClient(utils.AttributeSv1, aSv1)
}
internalAttributeSChan <- aSv1
}
@@ -734,7 +734,7 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(cSv1)
} else {
engine.IntRPC.AddConnection(utils.ChargerSv1, cSv1)
engine.AddInternalRPCClient(utils.ChargerSv1, cSv1)
}
internalChargerSChan <- cSv1
}
@@ -785,7 +785,7 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(rsV1)
} else {
engine.IntRPC.AddConnection(utils.ResourceSv1, rsV1)
engine.AddInternalRPCClient(utils.ResourceSv1, rsV1)
}
internalRsChan <- rsV1
}
@@ -836,7 +836,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(stsV1)
} else {
engine.IntRPC.AddConnection(utils.StatSv1, stsV1)
engine.AddInternalRPCClient(utils.StatSv1, stsV1)
}
internalStatSChan <- stsV1
}
@@ -871,7 +871,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(tSv1)
} else {
engine.IntRPC.AddConnection(utils.ThresholdSv1, tSv1)
engine.AddInternalRPCClient(utils.ThresholdSv1, tSv1)
}
internalThresholdSChan <- tSv1
}
@@ -956,7 +956,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
if !config.CgrConfig().DispatcherSCfg().Enabled {
server.RpcRegister(splV1)
} else {
engine.IntRPC.AddConnection(utils.SupplierSv1, splV1)
engine.AddInternalRPCClient(utils.SupplierSv1, splV1)
}
internalSupplierSChan <- splV1
}
@@ -1109,7 +1109,7 @@ func initCacheS(internalCacheSChan chan rpcclient.RpcClientConnection,
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(v1.NewCacheSv1(chS))
} else {
engine.IntRPC.AddConnection(utils.CacheSv1, v1.NewCacheSv1(chS))
engine.AddInternalRPCClient(utils.CacheSv1, v1.NewCacheSv1(chS))
}
internalCacheSChan <- chS
return
@@ -1119,7 +1119,7 @@ func initGuardianSv1(server *utils.Server) {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(v1.NewGuardianSv1())
} else {
engine.IntRPC.AddConnection(utils.GuardianSv1, v1.NewGuardianSv1())
engine.AddInternalRPCClient(utils.GuardianSv1, v1.NewGuardianSv1())
}
}
@@ -1129,7 +1129,7 @@ func initSchedulerS(internalCacheSChan chan rpcclient.RpcClientConnection,
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(v1.NewSchedulerSv1(schdS))
} else {
engine.IntRPC.AddConnection(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS))
engine.AddInternalRPCClient(utils.SchedulerSv1, v1.NewSchedulerSv1(schdS))
}
internalCacheSChan <- schdS
}
@@ -1427,6 +1427,9 @@ func main() {
engine.SetRpSubjectPrefixMatching(cfg.RalsCfg().RpSubjectPrefixMatching)
stopHandled := false
// initializate internal RPC conections for dispatcher
engine.InitInternalRPC()
// Rpc/http server
server := utils.NewServer()

View File

@@ -191,10 +191,10 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(responder)
} else {
engine.IntRPC.AddConnection(utils.Responder, responder)
engine.AddInternalRPCClient(utils.Responder, responder)
}
server.RpcRegister(apierRpcV1)
server.RpcRegister(apierRpcV2)
server.RpcRegister(apierRpcV1) // ToDo: Add apierv1 to dispatcher
server.RpcRegister(apierRpcV2) // ToDo: Add apierv2 to dispatcher
utils.RegisterRpcParams("", &v1.CDRsV1{})
utils.RegisterRpcParams("", &v2.CDRsV2{})

View File

@@ -1333,7 +1333,7 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
dH.Conns, IntRPC.GetConnChan(), cfg.GeneralCfg().InternalTtl, false); err != nil {
dH.Conns, GetInternalRPCClientChanel(), cfg.GeneralCfg().InternalTtl, false); err != nil {
return nil, err
}
if cacheWrite {

View File

@@ -19,10 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/cgrates/cgrates/config"
@@ -39,15 +36,8 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
atLestOneConnected := false // If one connected we don't longer return errors
for _, rpcConnCfg := range rpcConnCfgs {
if rpcConnCfg.Address == utils.MetaInternal {
var internalConn rpcclient.RpcClientConnection
select {
case internalConn = <-internalConnChan:
internalConnChan <- internalConn
case <-time.After(ttl):
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.TLS, keyPath, certPath, caPath, connAttempts,
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect)
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConnChan, lazyConnect)
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
codec := utils.GOB
if rpcConnCfg.Transport != "" {
@@ -69,42 +59,67 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
return rpcPool, err
}
var IntRPC *InternalRPC
var IntRPC *rpcclient.RPCClientSet
func init() {
IntRPC = &InternalRPC{subsystems: make(map[string]rpcclient.RpcClientConnection)}
}
type InternalRPC struct {
sync.Mutex
subsystems map[string]rpcclient.RpcClientConnection
}
func (irpc *InternalRPC) AddConnection(name string, conn rpcclient.RpcClientConnection) {
if conn == nil {
func InitInternalRPC() {
if !config.CgrConfig().DispatcherSCfg().Enabled {
return
}
irpc.Lock()
irpc.subsystems[name] = conn
irpc.Unlock()
var subsystems []string
subsystems = append(subsystems, utils.CacheSv1)
subsystems = append(subsystems, utils.GuardianSv1)
subsystems = append(subsystems, utils.SchedulerSv1)
subsystems = append(subsystems, utils.LoaderSv1)
if config.CgrConfig().AttributeSCfg().Enabled {
subsystems = append(subsystems, utils.AttributeSv1)
}
if config.CgrConfig().RalsCfg().RALsEnabled {
subsystems = append(subsystems, utils.ApierV1)
subsystems = append(subsystems, utils.ApierV2)
subsystems = append(subsystems, utils.Responder)
}
if config.CgrConfig().CdrsCfg().CDRSEnabled {
subsystems = append(subsystems, utils.CDRsV1)
subsystems = append(subsystems, utils.CDRsV2)
}
if config.CgrConfig().AnalyzerSCfg().Enabled {
subsystems = append(subsystems, utils.AnalyzerSv1)
}
if config.CgrConfig().SessionSCfg().Enabled {
subsystems = append(subsystems, utils.SessionSv1)
}
if config.CgrConfig().ChargerSCfg().Enabled {
subsystems = append(subsystems, utils.ChargerSv1)
}
if config.CgrConfig().ResourceSCfg().Enabled {
subsystems = append(subsystems, utils.ResourceSv1)
}
if config.CgrConfig().StatSCfg().Enabled {
subsystems = append(subsystems, utils.StatSv1)
}
if config.CgrConfig().ThresholdSCfg().Enabled {
subsystems = append(subsystems, utils.ThresholdSv1)
}
if config.CgrConfig().SupplierSCfg().Enabled {
subsystems = append(subsystems, utils.SupplierSv1)
}
IntRPC = rpcclient.NewRPCClientSet(subsystems, config.CgrConfig().GeneralCfg().InternalTtl)
}
func (irpc *InternalRPC) Call(method string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(method, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
func AddInternalRPCClient(name string, rpc rpcclient.RpcClientConnection) {
connChan := make(chan rpcclient.RpcClientConnection, 1)
connChan <- rpc
err := IntRPC.AddRPCConnection(name, "", "", false, "", "", "",
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
rpcclient.INTERNAL_RPC, connChan, true)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<InternalRCP> Error adding %s to the set: %v", name, err.Error()))
}
irpc.Lock()
defer irpc.Unlock()
conn, has := irpc.subsystems[methodSplit[0]]
if !has {
return rpcclient.ErrUnsupporteServiceMethod
}
return conn.Call(method, args, reply)
}
func (irpc *InternalRPC) GetConnChan() (connChan chan rpcclient.RpcClientConnection) {
connChan = make(chan rpcclient.RpcClientConnection, 1)
connChan <- irpc
return
func GetInternalRPCClientChanel() chan rpcclient.RpcClientConnection {
connChan := make(chan rpcclient.RpcClientConnection, 1)
connChan <- IntRPC
return connChan
}

View File

@@ -822,11 +822,13 @@ const (
// AnalyzerS APIs
const (
AnalyzerSv1 = "AnalyzerSv1"
AnalyzerSv1Ping = "AnalyzerSv1.Ping"
)
// LoaderS APIs
const (
LoaderSv1 = "LoaderSv1"
LoaderSv1Load = "LoaderSv1.Load"
LoaderSv1Ping = "LoaderSv1.Ping"
)
@@ -868,8 +870,9 @@ const (
CDRsV1ProcessExternalCDR = "CDRsV1.ProcessExternalCDR"
CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost"
CDRsV1ProcessEvent = "CDRsV1.ProcessEvent"
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"
CDRsV1Ping = "CDRsV1.Ping"
CDRsV2 = "CDRsV2"
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"
)
// Scheduler