diff --git a/cores/server.go b/cores/server.go index 6670e05f1..579c8dd7c 100644 --- a/cores/server.go +++ b/cores/server.go @@ -140,7 +140,8 @@ func (s *Server) BiRPCRegister(rcvr interface{}) { } } -func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { +func (s *Server) serveCodec(addr, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc.ServerCodec, + shdChan *utils.SyncedChan) { s.RLock() enabled := s.rpcEnabled s.RUnlock() @@ -148,19 +149,24 @@ func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { return } - lJSON, e := net.Listen(utils.TCP, addr) + l, e := net.Listen(utils.TCP, addr) if e != nil { - log.Println("ServeJSON listen error:", e) + log.Printf("Serve%s listen error: %s", codecName, e) shdChan.CloseOnce() return } - utils.Logger.Info(fmt.Sprintf("Starting CGRateS JSON server at <%s>.", addr)) + utils.Logger.Info(fmt.Sprintf("Starting CGRateS %s server at <%s>.", codecName, addr)) + s.accept(l, codecName, newCodec, shdChan) +} + +func (s *Server) accept(l net.Listener, codecName string, newCodec func(conn conn, caps *engine.Caps, anz *analyzers.AnalyzerService) rpc.ServerCodec, + shdChan *utils.SyncedChan) { errCnt := 0 var lastErrorTime time.Time for { - conn, err := lJSON.Accept() + conn, err := l.Accept() if err != nil { - utils.Logger.Err(fmt.Sprintf(" JSON accept error: <%s>", err.Error())) + utils.Logger.Err(fmt.Sprintf(" %s accept error: <%s>", codecName, err.Error())) now := time.Now() if now.Sub(lastErrorTime) > 5*time.Second { errCnt = 0 // reset error count if last error was more than 5 seconds ago @@ -173,44 +179,16 @@ func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { } continue } - go rpc.ServeCodec(newCapsJSONCodec(conn, s.caps, s.anz)) + go rpc.ServeCodec(newCodec(conn, s.caps, s.anz)) } } +func (s *Server) ServeJSON(addr string, shdChan *utils.SyncedChan) { + s.serveCodec(addr, utils.JSONCaps, newCapsJSONCodec, shdChan) +} + func (s *Server) ServeGOB(addr string, shdChan *utils.SyncedChan) { - s.RLock() - enabled := s.rpcEnabled - s.RUnlock() - if !enabled { - return - } - lGOB, e := net.Listen(utils.TCP, addr) - if e != nil { - log.Println("ServeGOB listen error:", e) - shdChan.CloseOnce() - return - } - utils.Logger.Info(fmt.Sprintf("Starting CGRateS GOB server at <%s>.", addr)) - errCnt := 0 - var lastErrorTime time.Time - for { - conn, err := lGOB.Accept() - if err != nil { - utils.Logger.Err(fmt.Sprintf(" GOB accept error: <%s>", err.Error())) - now := time.Now() - if now.Sub(lastErrorTime) > 5*time.Second { - errCnt = 0 // reset error count if last error was more than 5 seconds ago - } - lastErrorTime = time.Now() - errCnt++ - if errCnt > 50 { // Too many errors in short interval, network buffer failure most probably - shdChan.CloseOnce() - return - } - continue - } - go rpc.ServeCodec(newCapsGOBCodec(conn, s.caps, s.anz)) - } + s.serveCodec(addr, utils.GOBCaps, newCapsGOBCodec, shdChan) } func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { @@ -425,7 +403,9 @@ func (s *Server) ServeGOBTLS(addr, serverCrt, serverKey, caCert string, if !enabled { return } + fmt.Println("am intrat aici") config, err := loadTLSConfig(serverCrt, serverKey, caCert, serverPolicy, serverName) + fmt.Println("am iesit de aici") if err != nil { shdChan.CloseOnce() return diff --git a/cores/server_it_test.go b/cores/server_it_test.go new file mode 100644 index 000000000..131a7b385 --- /dev/null +++ b/cores/server_it_test.go @@ -0,0 +1 @@ +package cores diff --git a/utils/consts.go b/utils/consts.go index e3aac801e..81c5e7668 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -299,6 +299,8 @@ const ( FilterValStart = "(" FilterValEnd = ")" JSON = "json" + JSONCaps = "JSON" + GOBCaps = "GOB" MsgPack = "msgpack" CSVLoad = "CSVLOAD" CGRID = "CGRID"