Adding Transport option for RPC connections, support for both *json and *gob

This commit is contained in:
DanB
2016-06-18 14:25:03 +02:00
parent 4c42f21b03
commit 38487d4ba2
6 changed files with 40 additions and 26 deletions

View File

@@ -110,7 +110,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
break
}
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrsConn, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
@@ -133,7 +133,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
utils.Logger.Info("Starting CGRateS SMGeneric service.")
var ralsConns, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmGenericConfig.RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmGenericConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
@@ -142,7 +142,7 @@ func startSmGeneric(internalSMGChan chan rpcclient.RpcClientConnection, internal
}
}
if len(cfg.SmGenericConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmGenericConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMGeneric> Could not connect to RALs: %s", err.Error()))
@@ -173,7 +173,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
utils.Logger.Info("Starting CGRateS DiameterAgent service.")
var smgConn, pubsubConn *rpcclient.RpcClientPool
if len(cfg.DiameterAgentCfg().SMGenericConns) != 0 {
smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
smgConn, err = engine.NewRPCPool(rpcclient.POOL_BROADCAST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DiameterAgentCfg().SMGenericConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to SMG: %s", err.Error()))
@@ -182,7 +182,7 @@ func startDiameterAgent(internalSMGChan, internalPubSubSChan chan rpcclient.RpcC
}
}
if len(cfg.DiameterAgentCfg().PubSubConns) != 0 {
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
pubsubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DiameterAgentCfg().PubSubConns, internalPubSubSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<DiameterAgent> Could not connect to PubSubS: %s", err.Error()))
@@ -206,7 +206,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli
utils.Logger.Info("Starting CGRateS SMFreeSWITCH service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmFsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
@@ -215,7 +215,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan chan rpcclient.RpcCli
}
}
if len(cfg.SmFsConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmFsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMFreeSWITCH> Could not connect to RAL: %s", err.Error()))
@@ -235,7 +235,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
utils.Logger.Info("Starting CGRateS SMKamailio service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmKamConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
@@ -244,7 +244,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
}
}
if len(cfg.SmKamConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmKamConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMKamailio> Could not connect to RAL: %s", err.Error()))
@@ -264,7 +264,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
utils.Logger.Info("Starting CGRateS SMOpenSIPS service.")
var ralsConn, cdrsConn *rpcclient.RpcClientPool
if len(cfg.SmOsipsConfig.RALsConns) != 0 {
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
ralsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmOsipsConfig.RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to RALs: %s", err.Error()))
@@ -273,7 +273,7 @@ func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClien
}
}
if len(cfg.SmOsipsConfig.CDRsConns) != 0 {
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.SmOsipsConfig.CDRsConns, internalCDRSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SMOpenSIPS> Could not connect to CDRs: %s", err.Error()))
@@ -296,7 +296,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
utils.Logger.Info("Starting CGRateS CDRS service.")
var ralConn, pubSubConn, usersConn, aliasesConn, statsConn *rpcclient.RpcClientPool
if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
ralConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSRaterConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
@@ -305,7 +305,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSPubSubSConns) != 0 { // Pubsub connection init
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
pubSubConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSPubSubSConns, internalPubSubSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to PubSubSystem: %s", err.Error()))
@@ -314,7 +314,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSUserSConns) != 0 { // Users connection init
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
usersConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSUserSConns, internalUserSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to UserS: %s", err.Error()))
@@ -323,7 +323,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
}
if len(cfg.CDRSAliaseSConns) != 0 { // Aliases connection init
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
aliasesConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSAliaseSConns, internalAliaseSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to AliaseS: %s", err.Error()))
@@ -333,7 +333,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, logDb engine
}
if len(cfg.CDRSStatSConns) != 0 { // Stats connection init
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
statsConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.CDRSStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))

View File

@@ -113,7 +113,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, cdrstatTaskChan)
go func() {
defer close(cdrstatTaskChan)
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to CDRStatS, error: %s", err.Error()))
@@ -127,7 +127,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, histTaskChan)
go func() {
defer close(histTaskChan)
if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
if historySConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsHistorySConns, internalHistorySChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect HistoryS, error: %s", err.Error()))
exitChan <- true
@@ -142,7 +142,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, pubsubTaskChan)
go func() {
defer close(pubsubTaskChan)
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
if pubSubSConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsPubSubSConns, internalPubSubSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to PubSubS: %s", err.Error()))
exitChan <- true
@@ -157,7 +157,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, aliasesTaskChan)
go func() {
defer close(aliasesTaskChan)
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to AliaseS, error: %s", err.Error()))
exitChan <- true
@@ -173,7 +173,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC
waitTasks = append(waitTasks, usersTaskChan)
go func() {
defer close(usersTaskChan)
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, utils.GOB,
if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect UserS, error: %s", err.Error()))
exitChan <- true

View File

@@ -206,7 +206,8 @@ type SmFsJsonCfg struct {
// Represents one connection instance towards a rater/cdrs server
type HaPoolJsonCfg struct {
Address *string
Address *string
Transport *string
}
// Represents one connection instance towards FreeSWITCH

View File

@@ -35,7 +35,8 @@ func NewDfltHaPoolConfig() *HaPoolConfig {
// One connection to Rater
type HaPoolConfig struct {
Address string
Address string
Transport string
}
func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
@@ -45,6 +46,9 @@ func (self *HaPoolConfig) loadFromJsonCfg(jsnCfg *HaPoolJsonCfg) error {
if jsnCfg.Address != nil {
self.Address = *jsnCfg.Address
}
if jsnCfg.Transport != nil {
self.Transport = *jsnCfg.Transport
}
return nil
}

View File

@@ -20,6 +20,7 @@ package engine
import (
"errors"
"fmt"
"time"
"github.com/cgrates/cgrates/config"
@@ -27,7 +28,7 @@ import (
"github.com/cgrates/rpcclient"
)
func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, codec string,
func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration,
rpcConnCfgs []*config.HaPoolConfig, internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
var rpcClient *rpcclient.RpcClient
var err error
@@ -43,8 +44,14 @@ func NewRPCPool(dispatchStrategy string, connAttempts, reconnects int, connectTi
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", connAttempts, reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn)
} else {
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
codec := utils.GOB
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient
}
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil)
} else {
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}
if err == nil {
atLestOneConnected = true

View File

@@ -30,7 +30,7 @@ var (
ErrAccountNotFound = errors.New("ACCOUNT_NOT_FOUND")
ErrAccountDisabled = errors.New("ACCOUNT_DISABLED")
ErrUserNotFound = errors.New("USER_NOT_FOUND")
ErrInsufficientCredit = errors.New("INSUFFICENT_CREDIT")
ErrInsufficientCredit = errors.New("INSUFFICIENT_CREDIT")
ErrNotConvertible = errors.New("NOT_CONVERTIBLE")
CdreCdrFormats = []string{CSV, DRYRUN, CDRE_FIXED_WIDTH}
@@ -291,4 +291,6 @@ const (
SessionTTLUsage = "SessionTTLUsage"
HandlerSubstractUsage = "*substract_usage"
XML = "xml"
MetaGOBrpc = "*gob"
MetaJSONrpc = "*json"
)