diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 677afb7f7..647db97d0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -69,7 +69,6 @@ var ( cfg *config.CGRConfig smRpc *v1.SessionManagerV1 - err error ) func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, exitChan chan bool) { @@ -132,8 +131,10 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne } } -func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { +func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRaterChan, + internalCDRSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS SMGeneric service.") + var err error var ralsConns, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmGenericConfig.RALsConns) != 0 { ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, @@ -212,6 +213,7 @@ func startSMAsterisk(internalSMGChan chan *sessionmanager.SMGeneric, exitChan ch } func startDiameterAgent(internalSMGChan chan *sessionmanager.SMGeneric, internalPubSubSChan chan rpcclient.RpcClientConnection, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS DiameterAgent service") smgChan := make(chan rpcclient.RpcClientConnection, 1) // Use it to pass smg go func(internalSMGChan chan *sessionmanager.SMGeneric, smgChan chan rpcclient.RpcClientConnection) { @@ -253,6 +255,7 @@ func startDiameterAgent(internalSMGChan chan *sessionmanager.SMGeneric, internal } func startRadiusAgent(internalSMGChan chan *sessionmanager.SMGeneric, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS RadiusAgent service") smgChan := make(chan rpcclient.RpcClientConnection, 1) // Use it to pass smg go func(internalSMGChan chan *sessionmanager.SMGeneric, smgChan chan rpcclient.RpcClientConnection) { @@ -285,6 +288,7 @@ func startRadiusAgent(internalSMGChan chan *sessionmanager.SMGeneric, exitChan c } func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS SMFreeSWITCH service") var ralsConn, cdrsConn, rlsConn *rpcclient.RpcClientPool if len(cfg.SmFsConfig.RALsConns) != 0 { @@ -323,6 +327,7 @@ func startSmFreeSWITCH(internalRaterChan, internalCDRSChan, rlsChan chan rpcclie } func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS SMKamailio service.") var ralsConn, cdrsConn, rlSConn *rpcclient.RpcClientPool if len(cfg.SmKamConfig.RALsConns) != 0 { @@ -361,6 +366,7 @@ func startSmKamailio(internalRaterChan, internalCDRSChan, internalRsChan chan rp } func startSmOpenSIPS(internalRaterChan, internalCDRSChan chan rpcclient.RpcClientConnection, cdrDb engine.CdrStorage, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS SMOpenSIPS service.") var ralsConn, cdrsConn *rpcclient.RpcClientPool if len(cfg.SmOsipsConfig.RALsConns) != 0 { @@ -394,6 +400,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, internalRaterChan, internalPubSubSChan, internalAttributeSChan, internalUserSChan, internalAliaseSChan, internalCdrStatSChan, internalThresholdSChan, internalStatSChan chan rpcclient.RpcClientConnection, server *utils.Server, exitChan chan bool) { + var err error utils.Logger.Info("Starting CGRateS CDRS service.") var ralConn, pubSubConn, usersConn, attrSConn, aliasesConn, cdrstatsConn, thresholdSConn, statsConn *rpcclient.RpcClientPool if len(cfg.CDRSRaterConns) != 0 { // Conn pool towards RAL @@ -574,6 +581,7 @@ func startAttributeService(internalAttributeSChan chan rpcclient.RpcClientConnec func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + var err error var thdSConn *rpcclient.RpcClientPool filterS := <-filterSChan filterSChan <- filterS @@ -610,6 +618,7 @@ func startResourceService(internalRsChan, internalThresholdSChan chan rpcclient. // startStatService fires up the StatS func startStatService(internalStatSChan, internalThresholdSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + var err error var thdSConn *rpcclient.RpcClientPool filterS := <-filterSChan filterSChan <- filterS @@ -671,6 +680,7 @@ func startThresholdService(internalThresholdSChan chan rpcclient.RpcClientConnec // startSupplierService fires up the ThresholdS func startSupplierService(internalSupplierSChan, internalRsChan, internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, server *utils.Server, exitChan chan bool, filterSChan chan *engine.FilterS) { + var err error filterS := <-filterSChan filterSChan <- filterS var resourceSConn, statSConn *rpcclient.RpcClientPool @@ -823,6 +833,7 @@ func main() { return }() } + var err error // Init config cfg, err = config.NewCGRConfigFromFolder(*cfgDir) if err != nil { diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index c07a481ff..481047f97 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -128,6 +128,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, thdsTaskChan) go func() { defer close(thdsTaskChan) + var err error thdS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsThresholdSConns, internalThdSChan, cfg.InternalTtl) if err != nil { @@ -144,6 +145,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, cdrstatTaskChan) go func() { defer close(cdrstatTaskChan) + var err error cdrStats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsCDRStatSConns, internalCdrStatSChan, cfg.InternalTtl) if err != nil { @@ -160,6 +162,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, statsTaskChan) go func() { defer close(statsTaskChan) + var err error stats, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsStatSConns, internalStatSChan, cfg.InternalTtl) if err != nil { @@ -209,6 +212,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, attrsTaskChan) go func() { defer close(attrsTaskChan) + var err error attrS, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsAttributeSConns, internalAttributeSChan, cfg.InternalTtl) @@ -226,7 +230,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, aliasesTaskChan) go func() { defer close(aliasesTaskChan) - if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + if aliaseSCons, err := engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsAliasSConns, internalAliaseSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to AliaseS, error: %s", err.Error())) exitChan <- true @@ -243,13 +248,15 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheDoneC waitTasks = append(waitTasks, usersTaskChan) go func() { defer close(usersTaskChan) - if usersConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + if usersConns, err := engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, cfg.RALsUserSConns, internalUserSChan, cfg.InternalTtl); err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect UserS, error: %s", err.Error())) exitChan <- true return + } else { + engine.SetUserService(usersConns) } - engine.SetUserService(usersConns) }() } // Wait for all connections to complete before going further diff --git a/utils/server.go b/utils/server.go index 0a2bc6341..9f3d5b9fd 100644 --- a/utils/server.go +++ b/utils/server.go @@ -28,6 +28,7 @@ import ( "net/rpc" "net/rpc/jsonrpc" "reflect" + "sync" "time" "github.com/cenk/rpc2" @@ -40,34 +41,51 @@ type Server struct { rpcEnabled bool httpEnabled bool birpcSrv *rpc2.Server + sync.RWMutex } func (s *Server) RpcRegister(rcvr interface{}) { rpc.Register(rcvr) + s.Lock() s.rpcEnabled = true + s.Unlock() } func (s *Server) RpcRegisterName(name string, rcvr interface{}) { rpc.RegisterName(name, rcvr) + s.Lock() s.rpcEnabled = true + s.Unlock() } func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { http.HandleFunc(pattern, handler) + s.Lock() s.httpEnabled = true + s.Unlock() } // Registers a new BiJsonRpc name func (s *Server) BiRPCRegisterName(method string, handlerFunc interface{}) { - if s.birpcSrv == nil { + s.RLock() + isNil := s.birpcSrv == nil + s.RUnlock() + if isNil { + s.Lock() s.birpcSrv = rpc2.NewServer() + s.Unlock() } s.birpcSrv.Handle(method, handlerFunc) } func (s *Server) BiRPCRegister(rcvr interface{}) { - if s.birpcSrv == nil { + s.RLock() + isNil := s.birpcSrv == nil + s.RUnlock() + if isNil { + s.Lock() s.birpcSrv = rpc2.NewServer() + s.Unlock() } rcvType := reflect.TypeOf(rcvr) for i := 0; i < rcvType.NumMethod(); i++ { @@ -79,7 +97,10 @@ func (s *Server) BiRPCRegister(rcvr interface{}) { } func (s *Server) ServeJSON(addr string) { - if !s.rpcEnabled { + s.RLock() + enabled := s.rpcEnabled + s.RUnlock() + if !enabled { return } lJSON, e := net.Listen("tcp", addr) @@ -111,7 +132,10 @@ func (s *Server) ServeJSON(addr string) { } func (s *Server) ServeGOB(addr string) { - if !s.rpcEnabled { + s.RLock() + enabled := s.rpcEnabled + s.RUnlock() + if !enabled { return } lGOB, e := net.Listen("tcp", addr) @@ -150,8 +174,16 @@ func handleRequest(w http.ResponseWriter, r *http.Request) { } func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useBasicAuth bool, userList map[string]string) { - if s.rpcEnabled && jsonRPCURL != "" { + s.RLock() + enabled := s.rpcEnabled + s.RUnlock() + if !enabled { + return + } + if enabled && jsonRPCURL != "" { + s.Lock() s.httpEnabled = true + s.Unlock() Logger.Info(" enabling handler for JSON-RPC") if useBasicAuth { http.HandleFunc(jsonRPCURL, use(handleRequest, basicAuth(userList))) @@ -160,8 +192,10 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useB } } - if s.rpcEnabled && wsRPCURL != "" { + if enabled && wsRPCURL != "" { + s.Lock() s.httpEnabled = true + s.Unlock() Logger.Info(" enabling handler for WebSocket connections") wsHandler := websocket.Handler(func(ws *websocket.Conn) { jsonrpc.ServeConn(ws) @@ -186,7 +220,10 @@ func (s *Server) ServeHTTP(addr string, jsonRPCURL string, wsRPCURL string, useB } func (s *Server) ServeBiJSON(addr string) { - if s.birpcSrv == nil { + s.RLock() + isNil := s.birpcSrv == nil + s.RUnlock() + if isNil { return } lBiJSON, e := net.Listen("tcp", addr)