diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index c95a6bd85..b40dd7cdf 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -5,6 +5,7 @@ import ( "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) func NewSMGenericV1(sm *sessionmanager.SMGeneric) *SMGenericV1 { @@ -77,3 +78,26 @@ func (self *SMGenericV1) ProcessCdr(ev sessionmanager.SMGenericEvent, reply *str *reply = utils.OK return nil } + +// rpcclient.RpcClientConnection interface +func (self *SMGenericV1) Call(serviceMethod string, args interface{}, reply interface{}) error { + switch serviceMethod { + case "SMGenericV1.GetLcrSuppliers": + argsConverted, canConvert := args.(sessionmanager.SMGenericEvent) + if !canConvert { + return rpcclient.ErrWrongArgsType + } + replyConverted, canConvert := args.(*[]string) + if !canConvert { + return rpcclient.ErrWrongReplyType + } + return self.GetLcrSuppliers(argsConverted, replyConverted) + /*smg.handlers["SMGenericV1.GetLcrSuppliers"] = smg.GetLcrSuppliers + smg.handlers["SMGenericV1.SessionStart"] = smg.SessionStart + smg.handlers["SMGenericV1.SessionUpdate"] = smg.SessionUpdate + smg.handlers["SMGenericV1.SessionEnd"] = smg.SessionEnd + smg.handlers["SMGenericV1.ProcessCdr"] = smg.ProcessCdr + */ + } + return rpcclient.ErrUnsupporteServiceMethod +} diff --git a/cmd/cgr-console/cgr-console.go b/cmd/cgr-console/cgr-console.go index e7c672da5..d4723a4d7 100644 --- a/cmd/cgr-console/cgr-console.go +++ b/cmd/cgr-console/cgr-console.go @@ -109,7 +109,7 @@ func main() { return } var err error - client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding) + client, err = rpcclient.NewRpcClient("tcp", *server, 3, 3, *rpc_encoding, nil) if err != nil { flag.PrintDefaults() log.Fatal("Could not connect to server " + *server) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1e667e75f..bda3b2047 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -115,7 +115,7 @@ func startCdrc(internalCdrSChan chan *engine.CdrServer, internalRaterChan chan * cdrsConn = resp internalRaterChan <- resp } else { - conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true @@ -147,7 +147,7 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { //Connected so no need to reiterate utils.Logger.Crit(fmt.Sprintf(" Could not connect to Rater via RPC: %v", err)) exitChan <- true @@ -166,7 +166,7 @@ func startSmGeneric(internalRaterChan chan *engine.Responder, server *utils.Serv cdrsConn = resp internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true @@ -206,7 +206,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { //Connected so no need to reiterate utils.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) exitChan <- true @@ -225,7 +225,7 @@ func startSmFreeSWITCH(internalRaterChan chan *engine.Responder, cdrDb engine.Cd cdrsConn = resp internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true @@ -255,7 +255,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { //Connected so no need to reiterate utils.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) exitChan <- true @@ -274,7 +274,7 @@ func startSmKamailio(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS cdrsConn = resp internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true @@ -304,7 +304,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS raterConn = resp // Will overwrite here for the sake of keeping internally the new configuration format for ha connections internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", raterCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { //Connected so no need to reiterate utils.Logger.Crit(fmt.Sprintf(" Could not connect to rater via RPC: %v", err)) exitChan <- true @@ -323,7 +323,7 @@ func startSmOpenSIPS(internalRaterChan chan *engine.Responder, cdrDb engine.CdrS cdrsConn = resp internalRaterChan <- resp } else { - client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cdrsCfg.Server, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) exitChan <- true @@ -355,7 +355,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, raterConn = responder internalRaterChan <- responder // Put back the connection since there might be other entities waiting for it } else if len(cfg.CDRSRater) != 0 { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSRater, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSRater, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to rater: %s", err.Error())) exitChan <- true @@ -373,7 +373,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, if cfg.CDRSRater == cfg.CDRSPubSub { pubSubConn = &engine.ProxyPubSub{Client: client} } else { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSPubSub, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to pubsub server: %s", err.Error())) exitChan <- true @@ -392,7 +392,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, if cfg.CDRSRater == cfg.CDRSUsers { usersConn = &engine.ProxyUserService{Client: client} } else { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSUsers, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to users server: %s", err.Error())) exitChan <- true @@ -411,7 +411,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, if cfg.CDRSRater == cfg.CDRSAliases { aliasesConn = &engine.ProxyAliasService{Client: client} } else { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSAliases, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to aliases server: %s", err.Error())) exitChan <- true @@ -430,7 +430,7 @@ func startCDRS(internalCdrSChan chan *engine.CdrServer, logDb engine.LogStorage, if cfg.CDRSRater == cfg.CDRSStats { statsConn = &engine.ProxyStats{Client: client} } else { - client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + client, err = rpcclient.NewRpcClient("tcp", cfg.CDRSStats, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB, nil) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to stats server: %s", err.Error())) exitChan <- true diff --git a/config/libconfig_json.go b/config/libconfig_json.go index fb0970f67..aa49d499a 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -239,6 +239,7 @@ type OsipsConnJsonCfg struct { type DiameterAgentJsonCfg struct { Enabled *bool // enables the diameter agent: Listen *string // address where to listen for diameter requests + Sm_generic *string // Connection towards generic SM Timezone *string // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> Request_processors *[]*DARequestProcessorJsnCfg } diff --git a/engine/aliases.go b/engine/aliases.go index faae65b7e..c6a49520f 100644 --- a/engine/aliases.go +++ b/engine/aliases.go @@ -338,7 +338,7 @@ type ProxyAliasService struct { } func NewProxyAliasService(addr string, attempts, reconnects int) (*ProxyAliasService, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) if err != nil { return nil, err } diff --git a/engine/cdrs_local_test.go b/engine/cdrs_local_test.go index 7e18e2830..4a710e855 100644 --- a/engine/cdrs_local_test.go +++ b/engine/cdrs_local_test.go @@ -87,7 +87,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) { if !*testLocal { return } - cdrsMasterRpc, err := rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json") + cdrsMasterRpc, err := rpcclient.NewRpcClient("tcp", cdrsMasterCfg.RPCJSONListen, 1, 1, "json", nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } @@ -104,7 +104,7 @@ func TestCdrsHttpCdrReplication(t *testing.T) { t.Error("Unexpected reply received: ", reply) } time.Sleep(time.Duration(waitRater) * time.Millisecond) - cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json") + cdrsSlaveRpc, err := rpcclient.NewRpcClient("tcp", "127.0.0.1:12012", 1, 1, "json", nil) if err != nil { t.Fatal("Could not connect to rater: ", err.Error()) } diff --git a/engine/pubsub.go b/engine/pubsub.go index 9ec56581c..61e609020 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -170,7 +170,7 @@ type ProxyPubSub struct { } func NewProxyPubSub(addr string, attempts, reconnects int) (*ProxyPubSub, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) if err != nil { return nil, err } diff --git a/engine/stats.go b/engine/stats.go index d417d73b7..4c9599463 100644 --- a/engine/stats.go +++ b/engine/stats.go @@ -291,7 +291,7 @@ type ProxyStats struct { } func NewProxyStats(addr string, attempts, reconnects int) (*ProxyStats, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) if err != nil { return nil, err } diff --git a/engine/users.go b/engine/users.go index ee4e241fb..69dcc2ef4 100644 --- a/engine/users.go +++ b/engine/users.go @@ -407,7 +407,7 @@ type ProxyUserService struct { } func NewProxyUserService(addr string, attempts, reconnects int) (*ProxyUserService, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) if err != nil { return nil, err } diff --git a/glide.yaml b/glide.yaml index 970ffec2d..b6a8c8ab1 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,17 +1,17 @@ package: github.com/cgrates/cgrates import: - package: github.com/ugorji/go - ref: 69aba3eabf3ef86516c9643ecb6d3785ca3b8436 + ref: 1a8bf87a90ddcdc7deaa0038f127ac62135fdd58 - package: github.com/jinzhu/gorm ref: 611e613459953787a01c2afc82835aa0ba01a045 - package: golang.org/x/net - ref: 042ba42fa6633b34205efc66ba5719cd3afd8d38 + ref: c764672d0ee39ffd83cfcb375804d3181302b62b - package: github.com/DisposaBoy/JsonConfigReader ref: 33a99fdf1d5ee1f79b5077e9c06f955ad356d5f4 - package: github.com/hoisie/redis ref: 788f01e396a99c96c8f56338383926f16841ebae - package: github.com/go-sql-driver/mysql - ref: 69e3ed7607d7c139386480824801584c947c67cf + ref: d512f204a577a4ab037a1816604c48c9c13210be - package: github.com/gorhill/cronexpr ref: a557574d6c024ed6e36acc8b610f5f211c91568a - package: github.com/lib/pq @@ -25,9 +25,9 @@ import: - package: gopkg.in/fsnotify.v1 ref: 7be54206639f256967dd82fa767397ba5f8f48f5 - package: github.com/peterh/liner - ref: b850cf8c6d0ee52309aad09ac610508c6c75e819 + ref: 32e535aff4145c12d1e154754ab144b49ab578e2 - package: github.com/cgrates/rpcclient - ref: f0f378951e30943373953b44b910cc65ba832da3 + ref: 028c43fc34d32dc9095c7605e2e455e0c7a5ea69 - package: github.com/cgrates/osipsdagram ref: 3d6beed663452471dec3ca194137a30d379d9e8f - package: github.com/cgrates/kamevapi diff --git a/history/proxy_scribe.go b/history/proxy_scribe.go index a1e528995..3b00ebc0a 100644 --- a/history/proxy_scribe.go +++ b/history/proxy_scribe.go @@ -28,7 +28,7 @@ type ProxyScribe struct { } func NewProxyScribe(addr string, attempts, reconnects int) (*ProxyScribe, error) { - client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB) + client, err := rpcclient.NewRpcClient("tcp", addr, attempts, reconnects, utils.GOB, nil) if err != nil { return nil, err }