mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
History server reconnects
This commit is contained in:
@@ -211,15 +211,50 @@ func startHistoryScribe() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.HistoryServerEnabled {
|
||||
if cfg.HistoryListen != INTERNAL {
|
||||
rpc.RegisterName("Scribe", scribeServer)
|
||||
var serveFunc func(io.ReadWriteCloser)
|
||||
if cfg.RPCEncoding == JSON {
|
||||
serveFunc = jsonrpc.ServeConn
|
||||
} else {
|
||||
serveFunc = rpc.ServeConn
|
||||
}
|
||||
l, err := net.Listen("tcp", cfg.HistoryListen)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<History> Could not listen to %v: %v", cfg.HistoryListen, err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
defer l.Close()
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
|
||||
continue
|
||||
}
|
||||
|
||||
engine.Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
|
||||
go serveFunc(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var scribeAgent history.Scribe
|
||||
|
||||
if cfg.HistoryAgentEnabled {
|
||||
if cfg.HistoryServer != INTERNAL {
|
||||
if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer, cfg.RPCEncoding); err != nil {
|
||||
engine.Logger.Crit(err.Error())
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here
|
||||
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
|
||||
if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer, cfg.RPCEncoding); err == nil {
|
||||
break //Connected so no need to reiterate
|
||||
} else if (i==2 && err!= nil) {
|
||||
engine.Logger.Crit(err.Error())
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Duration(i/2) * time.Second)
|
||||
}
|
||||
} else {
|
||||
scribeAgent = scribeServer
|
||||
}
|
||||
@@ -230,35 +265,6 @@ func startHistoryScribe() {
|
||||
engine.SetHistoryScribe(scribeServer) // if it is nil so be it
|
||||
}
|
||||
|
||||
if cfg.HistoryServerEnabled {
|
||||
if cfg.HistoryListen != INTERNAL {
|
||||
rpc.RegisterName("Scribe", scribeServer)
|
||||
var serveFunc func(io.ReadWriteCloser)
|
||||
if cfg.RPCEncoding == JSON {
|
||||
serveFunc = jsonrpc.ServeConn
|
||||
} else {
|
||||
serveFunc = rpc.ServeConn
|
||||
}
|
||||
l, err := net.Listen("tcp", cfg.HistoryListen)
|
||||
if err != nil {
|
||||
engine.Logger.Crit(fmt.Sprintf("<History> Could not listen to %v: %v", cfg.HistoryListen, err))
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
defer l.Close()
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
engine.Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
|
||||
continue
|
||||
}
|
||||
|
||||
engine.Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
|
||||
go serveFunc(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user