Support for internal RPC within rpc client

This commit is contained in:
DanB
2015-11-14 12:28:39 +01:00
parent fd5ed5b4b2
commit b160557d63
11 changed files with 52 additions and 27 deletions

View File

@@ -5,6 +5,7 @@ import (
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func NewSMGenericV1(sm *sessionmanager.SMGeneric) *SMGenericV1 {
@@ -77,3 +78,26 @@ func (self *SMGenericV1) ProcessCdr(ev sessionmanager.SMGenericEvent, reply *str
*reply = utils.OK
return nil
}
// rpcclient.RpcClientConnection interface
func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply interface{}) error {
switch serviceMethod {
case "SMGenericV1.GetLcrSuppliers":
argsConverted, canConvert := args.(sessionmanager.SMGenericEvent)
if !canConvert {
return rpcclient.ErrWrongArgsType
}
replyConverted, canConvert := args.(*[]string)
if !canConvert {
return rpcclient.ErrWrongReplyType
}
return self.GetLcrSuppliers(argsConverted, replyConverted)
/*smg.handlers["SMGenericV1.GetLcrSuppliers"] = smg.GetLcrSuppliers
smg.handlers["SMGenericV1.SessionStart"] = smg.SessionStart
smg.handlers["SMGenericV1.SessionUpdate"] = smg.SessionUpdate
smg.handlers["SMGenericV1.SessionEnd"] = smg.SessionEnd
smg.handlers["SMGenericV1.ProcessCdr"] = smg.ProcessCdr
*/
}
return rpcclient.ErrUnsupporteServiceMethod
}

View File

@@ -109,7 +109,7 @@ func main() {
return
}
var err error
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding)
client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding, nil)
if err != nil {
flag.PrintDefaults()
log.Fatal("Could not connect to server " + *server)

View File

@@ -115,7 +115,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan *
cdrsConn = resp
internalRaterChan <- resp
} else {
conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
@@ -147,7 +147,7 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv
raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-Generic> Could not connect to Rater via RPC: %v", err))
exitChan <- true
@@ -166,7 +166,7 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv
cdrsConn = resp
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-Generic> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
@@ -206,7 +206,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to rater via RPC: %v", err))
exitChan <- true
@@ -225,7 +225,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd
cdrsConn = resp
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-FreeSWITCH> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
@@ -255,7 +255,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to rater via RPC: %v", err))
exitChan <- true
@@ -274,7 +274,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
cdrsConn = resp
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-Kamailio> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
@@ -304,7 +304,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil { //Connected so no need to reiterate
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to rater via RPC: %v", err))
exitChan <- true
@@ -323,7 +323,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS
cdrsConn = resp
internalRaterChan <- resp
} else {
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
@@ -355,7 +355,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
raterConn = responder
internalRaterChan <- responder // Put back the connection since there might be other entities waiting for it
} else if len(cfg.CDRSRater) != 0 {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSRater, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSRater, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to rater: %s", err.Error()))
exitChan <- true
@@ -373,7 +373,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
if cfg.CDRSRater == cfg.CDRSPubSub {
pubSubConn = &engine.ProxyPubSub{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to pubsub server: %s", err.Error()))
exitChan <- true
@@ -392,7 +392,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
if cfg.CDRSRater == cfg.CDRSUsers {
usersConn = &engine.ProxyUserService{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to users server: %s", err.Error()))
exitChan <- true
@@ -411,7 +411,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
if cfg.CDRSRater == cfg.CDRSAliases {
aliasesConn = &engine.ProxyAliasService{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to aliases server: %s", err.Error()))
exitChan <- true
@@ -430,7 +430,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage,
if cfg.CDRSRater == cfg.CDRSStats {
statsConn = &engine.ProxyStats{Client: client}
} else {
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB)
client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to stats server: %s", err.Error()))
exitChan <- true

View File

@@ -239,6 +239,7 @@ type OsipsConnJsonCfg struct {
type DiameterAgentJsonCfg struct {
Enabled *bool // enables the diameter agent: <true|false>
Listen *string // address where to listen for diameter requests <x.y.z.y:1234>
Sm_generic *string // Connection towards generic SM
Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
Request_processors *[]*DARequestProcessorJsnCfg
}

View File

@@ -338,7 +338,7 @@ type ProxyAliasService struct {
}
func NewProxyAliasService(addr string, attempts, reconnects int) (*ProxyAliasService, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}

View File

@@ -87,7 +87,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
if !*testLocal {
return
}
cdrsMasterRpc, err := rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json")
cdrsMasterRpc, err := rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
@@ -104,7 +104,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) {
t.Error("Unexpected reply received: ", reply)
}
time.Sleep(time.Duration(waitRater) * time.Millisecond)
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json")
cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json", nil)
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}

View File

@@ -170,7 +170,7 @@ type ProxyPubSub struct {
}
func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}

View File

@@ -291,7 +291,7 @@ type ProxyStats struct {
}
func NewProxyStats(addr string, attempts, reconnects int) (*ProxyStats, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}

View File

@@ -407,7 +407,7 @@ type ProxyUserService struct {
}
func NewProxyUserService(addr string, attempts, reconnects int) (*ProxyUserService, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}

View File

@@ -1,17 +1,17 @@
package: github.com/cgrates/cgrates
import:
- package: github.com/ugorji/go
ref: 69aba3eabf3ef86516c9643ecb6d3785ca3b8436
ref: 1a8bf87a90ddcdc7deaa0038f127ac62135fdd58
- package: github.com/jinzhu/gorm
ref: 611e613459953787a01c2afc82835aa0ba01a045
- package: golang.org/x/net
ref: 042ba42fa6633b34205efc66ba5719cd3afd8d38
ref: c764672d0ee39ffd83cfcb375804d3181302b62b
- package: github.com/DisposaBoy/JsonConfigReader
ref: 33a99fdf1d5ee1f79b5077e9c06f955ad356d5f4
- package: github.com/hoisie/redis
ref: 788f01e396a99c96c8f56338383926f16841ebae
- package: github.com/go-sql-driver/mysql
ref: 69e3ed7607d7c139386480824801584c947c67cf
ref: d512f204a577a4ab037a1816604c48c9c13210be
- package: github.com/gorhill/cronexpr
ref: a557574d6c024ed6e36acc8b610f5f211c91568a
- package: github.com/lib/pq
@@ -25,9 +25,9 @@ import:
- package: gopkg.in/fsnotify.v1
ref: 7be54206639f256967dd82fa767397ba5f8f48f5
- package: github.com/peterh/liner
ref: b850cf8c6d0ee52309aad09ac610508c6c75e819
ref: 32e535aff4145c12d1e154754ab144b49ab578e2
- package: github.com/cgrates/rpcclient
ref: f0f378951e30943373953b44b910cc65ba832da3
ref: 028c43fc34d32dc9095c7605e2e455e0c7a5ea69
- package: github.com/cgrates/osipsdagram
ref: 3d6beed663452471dec3ca194137a30d379d9e8f
- package: github.com/cgrates/kamevapi

View File

@@ -28,7 +28,7 @@ type ProxyScribe struct {
}
func NewProxyScribe(addr string, attempts, reconnects int) (*ProxyScribe, error) {
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB)
client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil)
if err != nil {
return nil, err
}