Updated CGREngine structure

This commit is contained in:
Trial97
2021-09-07 19:32:59 +03:00
committed by Dan Christian Bogos
parent 5098d3e65f
commit cea082eb7f
23 changed files with 613 additions and 1101 deletions

View File

@@ -39,7 +39,6 @@ import (
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -323,126 +322,7 @@ func main() {
if err := cgrEngineFlags.Parse(os.Args[1:]); err != nil {
return
}
vers, err := utils.GetCGRVersion()
if err != nil {
fmt.Println(err)
return
}
goVers := runtime.Version()
if *version {
fmt.Println(vers)
return
}
if *pidFile != utils.EmptyString {
writePid()
}
if *singlecpu {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
shdWg := new(sync.WaitGroup)
shdChan := utils.NewSyncedChan()
shdWg.Add(1)
go singnalHandler(shdWg, shdChan)
var cS *cores.CoreService
var stopMemProf chan struct{}
var memPrfDirForCores string
if *memProfDir != utils.EmptyString {
shdWg.Add(1)
stopMemProf = make(chan struct{})
memPrfDirForCores = *memProfDir
go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan)
defer func() {
if cS == nil {
close(stopMemProf)
}
}()
}
var cpuProfileFile io.Closer
if *cpuProfDir != utils.EmptyString {
cpuPath := path.Join(*cpuProfDir, utils.CpuPathCgr)
cpuProfileFile, err = cores.StartCPUProfiling(cpuPath)
if err != nil {
return
}
defer func() {
if cS != nil {
cS.StopCPUProfiling()
return
}
if cpuProfileFile != nil {
pprof.StopCPUProfile()
cpuProfileFile.Close()
}
}()
}
if *scheduledShutdown != utils.EmptyString {
shutdownDur, err := utils.ParseDurationWithNanosecs(*scheduledShutdown)
if err != nil {
log.Fatal(err)
}
shdWg.Add(1)
go func() { // Schedule shutdown
tm := time.NewTimer(shutdownDur)
select {
case <-tm.C:
shdChan.CloseOnce()
case <-shdChan.Done():
tm.Stop()
}
shdWg.Done()
}()
}
// Init config
cfg, err = config.NewCGRConfigFromPath(*cfgPath)
if err != nil {
log.Fatalf("Could not parse config: <%s>", err.Error())
return
}
if *checkConfig {
if err := cfg.CheckConfigSanity(); err != nil {
fmt.Println(err)
}
return
}
if *nodeID != utils.EmptyString {
cfg.GeneralCfg().NodeID = *nodeID
}
if cfg.ConfigDBCfg().Type != utils.MetaInternal {
d, err := engine.NewDataDBConn(cfg.ConfigDBCfg().Type,
cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port,
cfg.ConfigDBCfg().Name, cfg.ConfigDBCfg().User,
cfg.ConfigDBCfg().Password, cfg.GeneralCfg().DBDataEncoding,
cfg.ConfigDBCfg().Opts)
if err != nil { // Cannot configure getter database, show stopper
log.Fatalf("Could not configure configDB: %s exiting!", err)
return
}
if err = cfg.LoadFromDB(d); err != nil {
log.Fatalf("Could not parse config: <%s>", err.Error())
return
}
}
config.SetCgrConfig(cfg) // Share the config object
// init syslog
if utils.Logger, err = utils.Newlogger(utils.FirstNonEmpty(*syslogger,
cfg.GeneralCfg().Logger), cfg.GeneralCfg().NodeID); err != nil {
log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error())
return
}
lgLevel := cfg.GeneralCfg().LogLevel
if *logLevel != -1 { // Modify the log level if provided by command arguments
lgLevel = *logLevel
}
utils.Logger.SetLogLevel(lgLevel)
// init the concurrentRequests
cncReqsLimit := cfg.CoreSCfg().Caps
if utils.ConcurrentReqsLimit != 0 { // used as shared variable
@@ -453,8 +333,6 @@ func main() {
cncReqsStrategy = utils.ConcurrentReqsStrategy
}
caps := engine.NewCaps(cncReqsLimit, cncReqsStrategy)
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, goVers))
cfg.LazySanityCheck()
// init the channel here because we need to pass them to connManager
internalServeManagerChan := make(chan birpc.ClientConnector, 1)
@@ -539,115 +417,7 @@ func main() {
utils.ActionS: new(sync.WaitGroup),
utils.AccountS: new(sync.WaitGroup),
}
gvService := services.NewGlobalVarS(cfg, srvDep)
shdWg.Add(1)
if err = gvService.Start(); err != nil {
return
}
dmService := services.NewDataDBService(cfg, connManager, srvDep)
if dmService.ShouldRun() { // Some services can run without db, ie: ERs
shdWg.Add(1)
if err = dmService.Start(); err != nil {
return
}
}
storDBService := services.NewStorDBService(cfg, srvDep)
if storDBService.ShouldRun() { // Some services can run without db, ie: ERs
shdWg.Add(1)
if err = storDBService.Start(); err != nil {
return
}
}
// Rpc/http server
server := cores.NewServer(caps)
if len(cfg.HTTPCfg().RegistrarSURL) != 0 {
server.RegisterHTTPFunc(cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
if cfg.ConfigSCfg().Enabled {
server.RegisterHTTPFunc(cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
if *httpPprofPath != utils.EmptyString {
server.RegisterProfiler(*httpPprofPath)
}
// Define internal connections via channels
filterSChan := make(chan *engine.FilterS, 1)
// init AnalyzerS
anz := services.NewAnalyzerService(cfg, server, filterSChan, shdChan, internalAnalyzerSChan, srvDep)
if anz.ShouldRun() {
shdWg.Add(1)
if err := anz.Start(); err != nil {
fmt.Println(err)
return
}
}
// init CoreSv1
coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, memPrfDirForCores, shdWg, stopMemProf, shdChan, srvDep)
shdWg.Add(1)
if err := coreS.Start(); err != nil {
fmt.Println(err)
return
}
cS = coreS.GetCoreS()
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), shdChan, anz, coreS.GetCoreS().CapsStats)
engine.Cache = cacheS
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server, anz)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg, connManager)
dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz, srvDep)
attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, dspS, srvDep)
dspH := services.NewRegistrarCService(cfg, server, connManager, anz, srvDep)
chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server,
internalChargerSChan, connManager, anz, srvDep)
tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan,
connManager, server, internalThresholdSChan, anz, srvDep)
stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server,
internalStatSChan, connManager, anz, srvDep)
reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server,
internalResourceSChan, connManager, anz, srvDep)
routeS := services.NewRouteService(cfg, dmService, cacheS, filterSChan, server,
internalRouteSChan, connManager, anz, srvDep)
admS := services.NewAdminSv1Service(cfg, dmService, storDBService, filterSChan, server,
internalAdminSChan, connManager, anz, srvDep)
cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan,
connManager, anz, srvDep)
smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep)
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server,
internalLoaderSChan, connManager, anz, srvDep)
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS,
admS, cdrS, smg, coreS,
services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep),
services.NewKamailioAgent(cfg, shdChan, connManager, srvDep),
services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload
services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload
services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,
services.NewEventExporterService(cfg, filterSChan,
connManager, server, internalEEsChan, anz, srvDep),
services.NewRateService(cfg, cacheS, filterSChan, dmService,
server, internalRateSChan, anz, srvDep),
services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep),
services.NewActionService(cfg, dmService, cacheS, filterSChan, connManager, server, internalActionSChan, anz, srvDep),
services.NewAccountService(cfg, dmService, cacheS, filterSChan, connManager, server, internalAccountSChan, anz, srvDep),
)
srvManager.StartServices()
// Start FilterS
go startFilterService(filterSChan, cacheS, connManager,
@@ -655,31 +425,6 @@ func main() {
initServiceManagerV1(internalServeManagerChan, srvManager, server, anz)
// init internalRPCSet to share internal connections among the engine
engine.IntRPC = engine.NewRPCClientSet()
engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan)
engine.IntRPC.AddInternalRPCClient(utils.AdminS, internalAdminSChan)
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan)
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCDRServerChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCDRServerChan)
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan)
engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, internalResourceSChan)
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSessionSChan)
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan)
engine.IntRPC.AddInternalRPCClient(utils.RouteSv1, internalRouteSChan)
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan)
engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan)
engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan)
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan)
engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan)
engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan)
engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan)
engine.IntRPC.AddInternalRPCClient(utils.AccountSv1, internalAccountSChan)
initConfigSv1(internalConfigChan, server, anz)
if *preload != utils.EmptyString {
@@ -687,13 +432,13 @@ func main() {
}
// Serve rpc connections
go startRPC(server, internalAdminSChan, internalCDRServerChan,
internalResourceSChan, internalStatSChan,
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan,
internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan,
internalAccountSChan, shdChan)
// go startRPC(server, internalAdminSChan, internalCDRServerChan,
// internalResourceSChan, internalStatSChan,
// internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
// internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
// internalDispatcherSChan, internalLoaderSChan,
// internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan,
// internalAccountSChan, shdChan)
<-shdChan.Done()
shtdDone := make(chan struct{})

View File

@@ -29,13 +29,14 @@ import (
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, fileMem string, stopChan chan struct{},
shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService {
stopMemPrf chan struct{}, shdWg *sync.WaitGroup, shtDw context.CancelFunc) *CoreService {
var st *engine.CapsStats
if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 {
st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan)
@@ -43,7 +44,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer,
return &CoreService{
shdWg: shdWg,
stopMemPrf: stopMemPrf,
shdChan: shdChan,
shtDw: shtDw,
cfg: cfg,
CapsStats: st,
fileCPU: fileCPU,
@@ -56,26 +57,27 @@ type CoreService struct {
CapsStats *engine.CapsStats
shdWg *sync.WaitGroup
stopMemPrf chan struct{}
shdChan *utils.SyncedChan
fileMEM string
fileCPU io.Closer
fileMx sync.Mutex
shtDw context.CancelFunc
}
func (cS *CoreService) ShutdownEngine() {
cS.shdChan.CloseOnce()
cS.shtDw()
}
// Shutdown is called to shutdown the service
func (cS *CoreService) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS))
cS.StopChanMemProf()
cS.stopChanMemProf()
cS.StopCPUProfiling()
utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS))
}
// StopChanMemProf will stop the MemoryProfiling Channel in order to create
// stopChanMemProf will stop the MemoryProfiling Channel in order to create
// the final MemoryProfiling when CoreS subsystem will stop.
func (cS *CoreService) StopChanMemProf() {
func (cS *CoreService) stopChanMemProf() {
if cS.stopMemPrf != nil {
MemProfFile(cS.fileMEM)
close(cS.stopMemPrf)
@@ -108,7 +110,7 @@ func MemProfFile(memProfPath string) bool {
return true
}
func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) {
func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shDw context.CancelFunc) {
tm := time.NewTimer(interval)
for i := 1; ; i++ {
select {
@@ -119,7 +121,7 @@ func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg
case <-tm.C:
}
if !MemProfFile(path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i))) {
shdChan.CloseOnce()
shDw()
shdWg.Done()
return
}
@@ -192,7 +194,7 @@ func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) {
cS.shdWg.Add(1)
cS.stopMemPrf = make(chan struct{})
cS.fileMEM = args.DirPath
go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan)
go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shtDw)
return
}
@@ -202,6 +204,6 @@ func (cS *CoreService) StopMemoryProfiling() (err error) {
return errors.New(" Memory Profiling is not started")
}
cS.fileMEM = path.Join(cS.fileMEM, utils.MemProfFileCgr)
cS.StopChanMemProf()
cS.stopChanMemProf()
return
}

View File

@@ -33,7 +33,7 @@ func TestNewCoreService(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
cfgDflt.CoreSCfg().CapsStatsInterval = time.Second
stopchan := make(chan struct{}, 1)
shdChan := utils.NewSyncedChan()
shdChan := func() {}
caps := engine.NewCaps(1, utils.MetaBusy)
sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan)
stopMemChan := make(chan struct{})
@@ -42,9 +42,9 @@ func TestNewCoreService(t *testing.T) {
cfg: cfgDflt,
CapsStats: sts,
fileMEM: "/tmp",
shdChan: shdChan,
shtDw: shdChan,
}
rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopMemChan, nil, stopchan, shdChan)
rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopMemChan, stopchan, nil, shdChan)
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv))
}
@@ -57,10 +57,7 @@ func TestCoreServiceStatus(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
cfgDflt.CoreSCfg().CapsStatsInterval = 1
caps := engine.NewCaps(1, utils.MetaBusy)
stopChan := make(chan struct{}, 1)
shdChan := utils.NewSyncedChan()
cores := NewCoreService(cfgDflt, caps, nil, "/tmp", nil, nil, stopChan, shdChan)
cores := NewCoreService(cfgDflt, caps, nil, "/tmp", nil, nil, nil, func() {})
args := &utils.TenantWithAPIOpts{
Tenant: "cgrates.org",
APIOpts: map[string]interface{}{},

View File

@@ -44,9 +44,8 @@ func NewServer(caps *engine.Caps) (s *Server) {
stopbiRPCServer: make(chan struct{}, 1),
caps: caps,
rpcStarted: utils.NewSyncedChan(),
rpcServer: birpc.NewServer(),
birpcSrv: birpc.NewBirpcServer(),
rpcServer: birpc.NewServer(),
birpcSrv: birpc.NewBirpcServer(),
}
s.httpServer = &http.Server{Handler: s.httpMux}
s.httpsServer = &http.Server{Handler: s.httpsMux}
@@ -63,7 +62,6 @@ type Server struct {
caps *engine.Caps
anz *analyzers.AnalyzerService
rpcStarted *utils.SyncedChan
rpcServer *birpc.Server
rpcJSONl net.Listener
rpcGOBl net.Listener
@@ -208,7 +206,7 @@ func (s *Server) ServeHTTP(shtdwnEngine context.CancelFunc, addr, jsonRPCURL, ws
}
// ServeBiRPC create a goroutine to listen and serve as BiRPC server
func (s *Server) ServeBiRPC2(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) {
func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) {
s.birpcSrv.OnConnect(onConn)
s.birpcSrv.OnDisconnect(onDis)
if addrJSON != utils.EmptyString {
@@ -320,12 +318,24 @@ func (s *Server) ServeHTTPS(shtdwnEngine context.CancelFunc,
}
func (s *Server) Stop() {
s.rpcJSONl.Close()
s.rpcGOBl.Close()
s.rpcJSONlTLS.Close()
s.rpcGOBlTLS.Close()
s.httpServer.Shutdown(context.Background())
s.httpsServer.Shutdown(context.Background())
if s.rpcJSONl != nil {
s.rpcJSONl.Close()
}
if s.rpcGOBl != nil {
s.rpcGOBl.Close()
}
if s.rpcJSONlTLS != nil {
s.rpcJSONlTLS.Close()
}
if s.rpcGOBlTLS != nil {
s.rpcGOBlTLS.Close()
}
if s.httpServer != nil {
s.httpServer.Shutdown(context.Background())
}
if s.httpsServer != nil {
s.httpsServer.Shutdown(context.Background())
}
s.StopBiRPC()
}

View File

@@ -52,34 +52,12 @@ var (
server *Server
sTestsServer = []func(t *testing.T){
testServeGOBPortFail,
testServeJSON,
testServeJSONFail,
testServeJSONFailRpcEnabled,
testServeGOB,
testServeHHTPPass,
testServeHHTPPassUseBasicAuth,
testServeHHTPEnableHttp,
testServeHHTPFail,
testServeHHTPFailEnableRpc,
testServeBiJSON,
testServeBiJSONEmptyBiRPCServer,
testServeBiJSONInvalidPort,
testServeBiGoB,
testServeBiGoBEmptyBiRPCServer,
testServeBiGoBInvalidPort,
testServeGOBTLS,
testServeJSONTls,
testServeCodecTLSErr,
testLoadTLSConfigErr,
testServeHTTPTLS,
testServeHTTPTLSWithBasicAuth,
testServeHTTPTLSError,
testServeHTTPTLSHttpNotEnabled,
testHandleRequest,
testBiRPCRegisterName,
testAcceptBiRPC,
testAcceptBiRPCError,
testRpcRegisterActions,
testWebSocket,
}
@@ -93,13 +71,13 @@ func TestServerIT(t *testing.T) {
}
}
type mockRegister string
type mockRegister struct{}
func (x *mockRegister) ForTest(ctx *context.Context, args, reply interface{}) error {
func (*mockRegister) ForTest(ctx *context.Context, args, reply interface{}) error {
return nil
}
func (robj *mockRegister) Ping(ctx *context.Context, in string, out *string) error {
func (*mockRegister) Ping(ctx *context.Context, in string, out *string) error {
*out = utils.Pong
return nil
}
@@ -117,8 +95,8 @@ func (mkL *mockListener) Accept() (net.Conn, error) {
return nil, utils.ErrDisconnected
}
func (mkL *mockListener) Close() error { return mkL.p1.Close() }
func (mkL *mockListener) Addr() net.Addr { return nil }
func (mkL *mockListener) Close() error { return mkL.p1.Close() }
func (*mockListener) Addr() net.Addr { return nil }
func testHandleRequest(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
@@ -126,8 +104,6 @@ func testHandleRequest(t *testing.T) {
caps := engine.NewCaps(0, utils.MetaBusy)
rcv := NewServer(caps)
rcv.rpcEnabled = true
req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc",
bytes.NewBuffer([]byte("1")))
if err != nil {
@@ -139,20 +115,19 @@ func testHandleRequest(t *testing.T) {
if w.Body.String() != utils.EmptyString {
t.Errorf("Expected: %q ,received: %q", utils.EmptyString, w.Body.String())
}
rcv.StopBiRPC()
}
func testServeJSON(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
buff := new(bytes.Buffer)
log.SetOutput(buff)
go server.ServeJSON(":88845", shdChan)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer server.Stop()
go server.ServeJSON(ctx, cancel, ":88845")
runtime.Gosched()
expected := "listen tcp: address 88845: invalid port"
@@ -160,121 +135,6 @@ func testServeJSON(t *testing.T) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeJSONFail(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
p1, p2 := net.Pipe()
l := &mockListener{
p1: p1,
}
go server.accept(l, utils.JSONCaps, newCapsJSONCodec, shdChan)
runtime.Gosched()
_, ok := <-shdChan.Done()
if ok {
t.Errorf("Expected to be close")
}
p2.Close()
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeJSONFailRpcEnabled(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
server.rpcEnabled = false
go server.serveCodec(":9999", utils.JSONCaps, newCapsJSONCodec, shdChan)
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeGOB(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeGOB(":27697", shdChan)
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeHHTPPass(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeHTTP(
":6555",
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeHHTPPassUseBasicAuth(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeHTTP(
":56432",
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
!cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeHHTPEnableHttp(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeHTTP(
":45779",
utils.EmptyString,
utils.EmptyString,
!cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeHHTPFail(t *testing.T) {
@@ -282,83 +142,30 @@ func testServeHHTPFail(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeHTTP(
var closed bool
ch := make(chan struct{})
go server.ServeHTTP(func() {
closed = true
close(ch)
},
"invalid_port_format",
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
)
runtime.Gosched()
_, ok := <-shdChan.Done()
if ok {
select {
case <-ch:
case <-time.After(50 * time.Millisecond):
t.Fatal("Timeout")
}
if closed {
t.Errorf("Expected to be close")
}
server.StopBiRPC()
}
func testServeHHTPFailEnableRpc(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
server.rpcEnabled = false
go server.ServeHTTP(":1000",
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
shdChan.CloseOnce()
server.StopBiRPC()
}
func testServeBiJSON(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = birpc.NewBirpcServer()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil)
ss := sessions.NewSessionS(cfgDflt, dm, nil)
go func() {
if err := server.ServeBiRPC(":3434", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err != nil {
t.Error(err)
}
}()
runtime.Gosched()
}
func testServeBiJSONEmptyBiRPCServer(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil)
ss := sessions.NewSessionS(cfgDflt, dm, nil)
expectedErr := "BiRPCServer should not be nil"
go func() {
if err := server.ServeBiRPC(":3430", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" {
t.Errorf("Expected %+v, received %+v", expectedErr, err)
}
}()
runtime.Gosched()
server.Stop()
}
func testServeBiJSONInvalidPort(t *testing.T) {
@@ -366,7 +173,6 @@ func testServeBiJSONInvalidPort(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = birpc.NewBirpcServer()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil)
@@ -382,47 +188,6 @@ func testServeBiJSONInvalidPort(t *testing.T) {
server.StopBiRPC()
}
func testServeBiGoB(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = birpc.NewBirpcServer()
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil)
ss := sessions.NewSessionS(cfgDflt, dm, nil)
go func() {
if err := server.ServeBiRPC("", ":9343", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err != nil {
t.Log(err)
}
}()
runtime.Gosched()
}
func testServeBiGoBEmptyBiRPCServer(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
data := engine.NewInternalDB(nil, nil, true)
dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil)
ss := sessions.NewSessionS(cfgDflt, dm, nil)
expectedErr := "BiRPCServer should not be nil"
go func() {
if err := server.ServeBiRPC("", ":93430", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" {
t.Errorf("Expected %+v, received %+v", expectedErr, err)
}
}()
runtime.Gosched()
}
func testServeBiGoBInvalidPort(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
@@ -444,121 +209,6 @@ func testServeBiGoBInvalidPort(t *testing.T) {
server.StopBiRPC()
}
func testServeGOBTLS(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeGOBTLS(
":34476",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
4,
cfgDflt.TLSCfg().ServerName,
shdChan,
)
runtime.Gosched()
server.StopBiRPC()
}
func testServeJSONTls(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
go server.ServeJSONTLS(
":64779",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
4,
cfgDflt.TLSCfg().ServerName,
shdChan,
)
runtime.Gosched()
}
func testServeGOBPortFail(t *testing.T) {
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
buff := new(bytes.Buffer)
log.SetOutput(buff)
go server.serveCodecTLS(
"34776",
utils.GOBCaps,
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
4,
"Server_name",
newCapsGOBCodec,
shdChan,
)
runtime.Gosched()
select {
case <-time.After(10 * time.Second):
t.Fatal("timeout")
case <-shdChan.Done():
}
expected := "listen tcp: address 34776: missing port in address when listening"
if rcv := buff.String(); !strings.Contains(rcv, expected) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}
log.SetOutput(os.Stderr)
}
func testServeCodecTLSErr(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
//if rpc is not enabled, won t be able to serve
server.rpcEnabled = false
server.serveCodecTLS("13567",
utils.GOBCaps,
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
4,
cfgDflt.TLSCfg().ServerName,
newCapsGOBCodec,
shdChan)
//unable to load TLS config when there is an inexisting server certificate file
server.rpcEnabled = true
server.serveCodecTLS("13567",
utils.GOBCaps,
"/usr/share/cgrates/tls/inexisting_cert",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
4,
cfgDflt.TLSCfg().ServerName,
newCapsGOBCodec,
shdChan)
_, ok := <-shdChan.Done()
if ok {
t.Errorf("Expected to be close")
}
}
func testLoadTLSConfigErr(t *testing.T) {
flPath := "/tmp/testLoadTLSConfigErr1"
if err := os.MkdirAll(flPath, 0777); err != nil {
@@ -603,188 +253,12 @@ TEST
}
}
func testServeHTTPTLS(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
type mockListenError mockListener
shdChan := utils.NewSyncedChan()
//cannot serve HHTPTls when rpc is not enabled
server.rpcEnabled = false
server.ServeHTTPTLS(
"17789",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
cfgDflt.TLSCfg().ServerPolicy,
cfgDflt.TLSCfg().ServerName,
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
//Invalid port address
server.rpcEnabled = true
go server.ServeHTTPTLS(
"17789",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
cfgDflt.TLSCfg().ServerPolicy,
cfgDflt.TLSCfg().ServerName,
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
_, ok := <-shdChan.Done()
if ok {
t.Errorf("Expected to be close")
}
}
func testServeHTTPTLSWithBasicAuth(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
//Invalid port address
server.rpcEnabled = true
go server.ServeHTTPTLS(
"57235",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
cfgDflt.TLSCfg().ServerPolicy,
cfgDflt.TLSCfg().ServerName,
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
!cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
_, ok := <-shdChan.Done()
if ok {
t.Errorf("Expected to be close")
}
}
func testServeHTTPTLSError(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
//Invalid port address
go server.ServeHTTPTLS(
"57235",
"/usr/share/cgrates/tls/inexisting_file",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
cfgDflt.TLSCfg().ServerPolicy,
cfgDflt.TLSCfg().ServerName,
cfgDflt.HTTPCfg().JsonRPCURL,
cfgDflt.HTTPCfg().WSURL,
!cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
runtime.Gosched()
_, ok := <-shdChan.Done()
if ok {
t.Errorf("Expected to be close")
}
}
func testServeHTTPTLSHttpNotEnabled(t *testing.T) {
cfgDflt := config.NewDefaultCGRConfig()
caps := engine.NewCaps(100, utils.MetaBusy)
server = NewServer(caps)
server.RpcRegister(new(mockRegister))
shdChan := utils.NewSyncedChan()
server.httpEnabled = false
go server.ServeHTTPTLS(
"17789",
"/usr/share/cgrates/tls/server.crt",
"/usr/share/cgrates/tls/server.key",
"/usr/share/cgrates/tls/ca.crt",
cfgDflt.TLSCfg().ServerPolicy,
cfgDflt.TLSCfg().ServerName,
utils.EmptyString,
utils.EmptyString,
cfgDflt.HTTPCfg().UseBasicAuth,
cfgDflt.HTTPCfg().AuthUsers,
shdChan)
shdChan.CloseOnce()
}
func testBiRPCRegisterName(t *testing.T) {
caps := engine.NewCaps(0, utils.MetaBusy)
server := NewServer(caps)
handler := struct{}{}
go server.BiRPCRegisterName(utils.APIerSv1Ping, handler)
runtime.Gosched()
server.StopBiRPC()
}
func testAcceptBiRPC(t *testing.T) {
caps := engine.NewCaps(0, utils.MetaBusy)
server := NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = birpc.NewBirpcServer()
p1, p2 := net.Pipe()
l := &mockListener{
p1: p1,
}
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
rpc := jsonrpc.NewClient(p2)
var reply string
expected := "birpc: can't find method AttributeSv1.Ping"
if err := rpc.Call(context.TODO(), utils.AttributeSv1Ping, utils.CGREvent{}, &reply); err == nil || err.Error() != expected {
t.Errorf("Expected %+v, received %+v", expected, err)
}
p2.Close()
runtime.Gosched()
}
type mockListenError struct {
*mockListener
}
func (mK *mockListenError) Accept() (net.Conn, error) {
func (*mockListenError) Accept() (net.Conn, error) {
return nil, errors.New("use of closed network connection")
}
func testAcceptBiRPCError(t *testing.T) {
caps := engine.NewCaps(10, utils.MetaBusy)
server := NewServer(caps)
server.RpcRegister(new(mockRegister))
server.birpcSrv = birpc.NewBirpcServer()
//it will contain "use of closed network connection"
l := new(mockListenError)
go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec)
runtime.Gosched()
}
func testRpcRegisterActions(t *testing.T) {
caps := engine.NewCaps(0, utils.MetaBusy)
server := NewServer(caps)

View File

@@ -45,6 +45,10 @@ func TestNewServer(t *testing.T) {
}
rcv := NewServer(caps)
rcv.stopbiRPCServer = nil
rcv.httpServer = nil
rcv.httpsServer = nil
rcv.rpcServer = nil
rcv.birpcSrv = nil
if !reflect.DeepEqual(expected, rcv) {
t.Errorf("Expected %+v, received %+v", expected, rcv)
}

View File

@@ -35,6 +35,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan birpc.Cli
cM = &ConnManager{
cfg: cfg,
rpcInternal: rpcInternal,
dynIntCh: NewRPCClientSet(rpcInternal),
connCache: ltcache.NewCache(-1, 0, true, nil),
}
SetConnManager(cM)
@@ -45,6 +46,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan birpc.Cli
type ConnManager struct {
cfg *config.CGRConfig
rpcInternal map[string]chan birpc.ClientConnector
dynIntCh RPCClientSet
connCache *ltcache.Cache
}
@@ -71,7 +73,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc.
connCfg = cM.cfg.RPCConns()[connID]
for _, rpcConn := range connCfg.Conns {
if rpcConn.Address == utils.MetaInternal {
intChan = IntRPC.GetInternalChanel()
intChan = cM.dynIntCh.GetInternalChanel()
break
}
}
@@ -203,3 +205,13 @@ func (cM *ConnManager) Reload() {
Cache.Clear([]string{utils.CacheReplicationHosts})
cM.connCache.Clear()
}
func (cM *ConnManager) GetInternalChan() chan birpc.ClientConnector {
return cM.dynIntCh.GetInternalChanel()
}
func (cM *ConnManager) AddInternalConn(connName, apiPrefix string,
iConnCh chan birpc.ClientConnector) {
cM.rpcInternal[connName] = iConnCh
cM.dynIntCh[apiPrefix] = iConnCh
}

View File

@@ -91,30 +91,38 @@ func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, cer
return
}
// IntRPC is the global variable that is used to comunicate with all the subsystems internally
var IntRPC RPCClientSet
// NewRPCClientSet initilalizates the map of connections
func NewRPCClientSet() (s RPCClientSet) {
return make(RPCClientSet)
func NewRPCClientSet(m map[string]chan birpc.ClientConnector) (s RPCClientSet) {
s = make(RPCClientSet)
for k, v := range map[string]string{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer): utils.AnalyzerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS): utils.AdminSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): utils.AttributeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): utils.CacheSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs): utils.CDRsV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): utils.ChargerSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): utils.GuardianSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders): utils.LoaderSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): utils.ResourceSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): utils.SessionSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): utils.StatSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes): utils.RouteSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): utils.ThresholdSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): utils.ServiceManagerV1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): utils.ConfigSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): utils.CoreSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): utils.EeSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): utils.RateSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): utils.DispatcherSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): utils.AccountSv1,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): utils.ActionSv1,
} {
s[v] = m[k]
}
return
}
// RPCClientSet is a RPC ClientConnector for the internal subsystems
type RPCClientSet map[string]*rpcclient.RPCClient
// AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration
func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan birpc.ClientConnector) {
rpc, err := rpcclient.NewRPCClient(context.Background(), utils.EmptyString, utils.EmptyString, false,
utils.EmptyString, utils.EmptyString, utils.EmptyString,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout,
rpcclient.InternalRPC, connChan, true, nil)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error()))
return
}
s[name] = rpc
}
type RPCClientSet map[string]chan birpc.ClientConnector
// GetInternalChanel is used when RPCClientSet is passed as internal connection for RPCPool
func (s RPCClientSet) GetInternalChanel() chan birpc.ClientConnector {
@@ -129,9 +137,21 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{}
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
conn, has := s[methodSplit[0]]
connCh, has := s[methodSplit[0]]
if !has {
return rpcclient.ErrUnsupporteServiceMethod
}
var conn birpc.ClientConnector
ctx2, cancel := context.WithTimeout(ctx, config.CgrConfig().GeneralCfg().ConnectTimeout)
select {
case conn = <-connCh:
connCh <- conn
cancel()
if conn == nil {
return rpcclient.ErrDisconnected
}
case <-ctx2.Done():
return ctx2.Err()
}
return conn.Call(ctx, method, args, reply)
}

View File

@@ -23,6 +23,7 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
@@ -32,15 +33,16 @@ import (
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server,
filterSChan chan *engine.FilterS, shdChan *utils.SyncedChan,
filterSChan chan *engine.FilterS,
internalAnalyzerSChan chan birpc.ClientConnector,
srvDep map[string]*sync.WaitGroup) *AnalyzerService {
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
server: server,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
srvDep: srvDep,
}
}
@@ -52,7 +54,7 @@ type AnalyzerService struct {
server *cores.Server
filterSChan chan *engine.FilterS
stopChan chan struct{}
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
anz *analyzers.AnalyzerService
// rpc *v1.AnalyzerSv1
@@ -76,7 +78,7 @@ func (anz *AnalyzerService) Start() (err error) {
go func(a *analyzers.AnalyzerService) {
if err := a.ListenAndServe(anz.stopChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error()))
anz.shdChan.CloseOnce()
anz.shtDwn()
}
return
}(anz.anz)

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/agents"
@@ -32,11 +33,12 @@ import (
// NewAsteriskAgent returns the Asterisk Agent
func NewAsteriskAgent(cfg *config.CGRConfig,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &AsteriskAgent{
cfg: cfg,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -46,7 +48,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig,
type AsteriskAgent struct {
sync.RWMutex
cfg *config.CGRConfig
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
stopChan chan struct{}
smas []*agents.AsteriskAgent
@@ -63,17 +65,17 @@ func (ast *AsteriskAgent) Start() (err error) {
ast.Lock()
defer ast.Unlock()
listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}, shdChan *utils.SyncedChan) {
listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) {
if err := sma.ListenAndServe(stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err))
shdChan.CloseOnce()
ast.shtDwn()
}
}
ast.stopChan = make(chan struct{})
ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns))
for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers
ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr)
go listenAndServe(ast.smas[connIdx], ast.stopChan, ast.shdChan)
go listenAndServe(ast.smas[connIdx], ast.stopChan)
}
return
}

View File

@@ -20,13 +20,23 @@ package services
import (
"fmt"
"io"
"log"
"path"
"runtime"
"runtime/pprof"
"sync"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/registrarc"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
type CGREngine struct {
@@ -34,18 +44,21 @@ type CGREngine struct {
srvManager *servmanager.ServiceManager
srvDep map[string]*sync.WaitGroup
cmConns map[string]chan birpc.ClientConnector
shdWg sync.WaitGroup
cM *engine.ConnManager
server *cores.Server
cS *cores.CoreService
}
func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string,
iConnCh chan birpc.ClientConnector) {
cgr.srvManager.AddServices(service)
cgr.cmConns[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)] = iConnCh
cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup)
engine.IntRPC.AddInternalRPCClient(apiPrefix, iConnCh)
cgr.cM.AddInternalConn(connName, apiPrefix, iConnCh)
}
func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) {
func (cgr *CGREngine) InitConfigFromPath(path, nodeID string, lgLevel int) (err error) {
// Init config
if cgr.cfg, err = config.NewCGRConfigFromPath(path); err != nil {
err = fmt.Errorf("could not parse config: <%s>", err)
@@ -69,6 +82,294 @@ func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error)
if nodeID != utils.EmptyString {
cgr.cfg.GeneralCfg().NodeID = nodeID
}
if lgLevel != -1 { // Modify the log level if provided by command arguments
cgr.cfg.GeneralCfg().LogLevel = lgLevel
}
config.SetCgrConfig(cgr.cfg) // Share the config object
return
}
func (cgr *CGREngine) InitServices(ctx *context.Context, shtDwn context.CancelFunc, pprofPath string, cpuPrfFl io.Closer, memPrfDir string, memPrfStop chan struct{}) (err error) {
iFilterSCh := make(chan *engine.FilterS, 1)
// init the channel here because we need to pass them to connManager
iServeManagerCh := make(chan birpc.ClientConnector, 1)
iConfigCh := make(chan birpc.ClientConnector, 1)
iCoreSv1Ch := make(chan birpc.ClientConnector, 1)
iCacheSCh := make(chan birpc.ClientConnector, 1)
iGuardianSCh := make(chan birpc.ClientConnector, 1)
iAnalyzerSCh := make(chan birpc.ClientConnector, 1)
iCDRServerCh := make(chan birpc.ClientConnector, 1)
iAttributeSCh := make(chan birpc.ClientConnector, 1)
iDispatcherSCh := make(chan birpc.ClientConnector, 1)
iSessionSCh := make(chan birpc.ClientConnector, 1)
iChargerSCh := make(chan birpc.ClientConnector, 1)
iThresholdSCh := make(chan birpc.ClientConnector, 1)
iStatSCh := make(chan birpc.ClientConnector, 1)
iResourceSCh := make(chan birpc.ClientConnector, 1)
iRouteSCh := make(chan birpc.ClientConnector, 1)
iAdminSCh := make(chan birpc.ClientConnector, 1)
iLoaderSCh := make(chan birpc.ClientConnector, 1)
iEEsCh := make(chan birpc.ClientConnector, 1)
iRateSCh := make(chan birpc.ClientConnector, 1)
iActionSCh := make(chan birpc.ClientConnector, 1)
iAccountSCh := make(chan birpc.ClientConnector, 1)
cgr.srvDep = map[string]*sync.WaitGroup{
utils.AnalyzerS: new(sync.WaitGroup),
utils.AdminS: new(sync.WaitGroup),
utils.AsteriskAgent: new(sync.WaitGroup),
utils.AttributeS: new(sync.WaitGroup),
utils.CDRServer: new(sync.WaitGroup),
utils.ChargerS: new(sync.WaitGroup),
utils.CoreS: new(sync.WaitGroup),
utils.DataDB: new(sync.WaitGroup),
utils.DiameterAgent: new(sync.WaitGroup),
utils.RegistrarC: new(sync.WaitGroup),
utils.DispatcherS: new(sync.WaitGroup),
utils.DNSAgent: new(sync.WaitGroup),
utils.EEs: new(sync.WaitGroup),
utils.ERs: new(sync.WaitGroup),
utils.FreeSWITCHAgent: new(sync.WaitGroup),
utils.GlobalVarS: new(sync.WaitGroup),
utils.HTTPAgent: new(sync.WaitGroup),
utils.KamailioAgent: new(sync.WaitGroup),
utils.LoaderS: new(sync.WaitGroup),
utils.RadiusAgent: new(sync.WaitGroup),
utils.RateS: new(sync.WaitGroup),
utils.ResourceS: new(sync.WaitGroup),
utils.RouteS: new(sync.WaitGroup),
utils.SchedulerS: new(sync.WaitGroup),
utils.SessionS: new(sync.WaitGroup),
utils.SIPAgent: new(sync.WaitGroup),
utils.StatS: new(sync.WaitGroup),
utils.StorDB: new(sync.WaitGroup),
utils.ThresholdS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
utils.AccountS: new(sync.WaitGroup),
}
cncReqsLimit := cgr.cfg.CoreSCfg().Caps
if utils.ConcurrentReqsLimit != 0 { // used as shared variable
cncReqsLimit = utils.ConcurrentReqsLimit
}
cncReqsStrategy := cgr.cfg.CoreSCfg().CapsStrategy
if len(utils.ConcurrentReqsStrategy) != 0 {
cncReqsStrategy = utils.ConcurrentReqsStrategy
}
caps := engine.NewCaps(cncReqsLimit, cncReqsStrategy)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
cgr.cM = engine.NewConnManager(cgr.cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer): iAnalyzerSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS): iAdminSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): iAttributeSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): iCacheSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs): iCDRServerCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): iChargerSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): iGuardianSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders): iLoaderSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): iResourceSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): iSessionSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): iStatSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes): iRouteSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): iThresholdSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): iServeManagerCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): iConfigCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): iCoreSv1Ch,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): iEEsCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): iRateSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): iDispatcherSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): iAccountSCh,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): iActionSCh,
utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): iSessionSCh,
})
gvService := NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.shdWg.Add(1)
if err = gvService.Start(); err != nil {
return
}
dmService := NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep)
if dmService.ShouldRun() { // Some services can run without db, ie: ERs
cgr.shdWg.Add(1)
if err = dmService.Start(); err != nil {
return
}
}
storDBService := NewStorDBService(cgr.cfg, cgr.srvDep)
if storDBService.ShouldRun() { // Some services can run without db, ie: ERs
cgr.shdWg.Add(1)
if err = storDBService.Start(); err != nil {
return
}
}
// Rpc/http server
cgr.server = cores.NewServer(caps)
if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 {
cgr.server.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar)
}
if cgr.cfg.ConfigSCfg().Enabled {
cgr.server.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS)
}
if pprofPath != utils.EmptyString {
cgr.server.RegisterProfiler(pprofPath)
}
// init AnalyzerS
anz := NewAnalyzerService(cgr.cfg, cgr.server, iFilterSCh, iAnalyzerSCh, cgr.srvDep, shtDwn)
if anz.ShouldRun() {
cgr.shdWg.Add(1)
if err = anz.Start(); err != nil {
return
}
}
// init CoreSv1
coreS := NewCoreService(cgr.cfg, caps, cgr.server, iCoreSv1Ch, anz, cpuPrfFl, memPrfDir, memPrfStop, &cgr.shdWg, cgr.srvDep, shtDwn)
cgr.shdWg.Add(1)
if err = coreS.Start(); err != nil {
return
}
cgr.cS = coreS.GetCoreS()
// init CacheS
cacheS := cgrInitCacheS(ctx, shtDwn, iCacheSCh, cgr.server, cgr.cfg, dmService.GetDM(), anz, coreS.GetCoreS().CapsStats)
engine.Cache = cacheS
// init GuardianSv1
cgrInitGuardianSv1(iGuardianSCh, cgr.server, anz)
// Start ServiceManager
cgr.srvManager = servmanager.NewServiceManager(cgr.cfg, &cgr.shdWg, cgr.cM)
dspS := NewDispatcherService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, iDispatcherSCh, cgr.cM, anz, cgr.srvDep)
attrS := NewAttributeService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, iAttributeSCh, anz, dspS, cgr.srvDep)
dspH := NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, anz, cgr.srvDep)
chrS := NewChargerService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server,
iChargerSCh, cgr.cM, anz, cgr.srvDep)
tS := NewThresholdService(cgr.cfg, dmService, cacheS, iFilterSCh,
cgr.cM, cgr.server, iThresholdSCh, anz, cgr.srvDep)
stS := NewStatService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server,
iStatSCh, cgr.cM, anz, cgr.srvDep)
reS := NewResourceService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server,
iResourceSCh, cgr.cM, anz, cgr.srvDep)
routeS := NewRouteService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server,
iRouteSCh, cgr.cM, anz, cgr.srvDep)
admS := NewAdminSv1Service(cgr.cfg, dmService, storDBService, iFilterSCh, cgr.server,
iAdminSCh, cgr.cM, anz, cgr.srvDep)
cdrS := NewCDRServer(cgr.cfg, dmService, storDBService, iFilterSCh, cgr.server, iCDRServerCh,
cgr.cM, anz, cgr.srvDep)
smg := NewSessionService(cgr.cfg, dmService, cgr.server, iSessionSCh, cgr.cM, anz, cgr.srvDep, shtDwn)
ldrs := NewLoaderService(cgr.cfg, dmService, iFilterSCh, cgr.server,
iLoaderSCh, cgr.cM, anz, cgr.srvDep)
cgr.srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS,
admS, cdrS, smg, coreS,
NewEventReaderService(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn),
NewDNSAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn), // partial reload
NewRadiusAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), // partial reload
NewDiameterAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), // partial reload
NewHTTPAgent(cgr.cfg, iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload
ldrs, anz, dspS, dspH, dmService, storDBService,
NewEventExporterService(cgr.cfg, iFilterSCh,
cgr.cM, cgr.server, iEEsCh, anz, cgr.srvDep),
NewRateService(cgr.cfg, cacheS, iFilterSCh, dmService,
cgr.server, iRateSCh, anz, cgr.srvDep),
NewSIPAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn),
NewActionService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.cM, cgr.server, iActionSCh, anz, cgr.srvDep),
NewAccountService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.cM, cgr.server, iAccountSCh, anz, cgr.srvDep),
)
return
}
func (cgr *CGREngine) Start(ctx *context.Context, shtDw context.CancelFunc, flags *CGREngineFlags) (err error) {
var vers string
goVers := runtime.Version()
if vers, err = utils.GetCGRVersion(); err != nil {
return
}
if *flags.Version {
fmt.Println(vers)
return
}
if *flags.PidFile != utils.EmptyString {
cgrWritePid(*flags.PidFile)
}
if *flags.Singlecpu {
runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases
}
cgr.shdWg.Add(1)
go cgrSingnalHandler(ctx, shtDw, cgr.cfg, &cgr.shdWg)
var stopMemProf chan struct{}
if *flags.MemPrfDir != utils.EmptyString {
cgr.shdWg.Add(1)
stopMemProf = make(chan struct{})
go cores.MemProfiling(*flags.MemPrfDir, *flags.MemPrfInterval, *flags.MemPrfNoF, &cgr.shdWg, stopMemProf, shtDw)
defer func() { //here
if cgr.cS == nil {
close(stopMemProf)
}
}()
}
var cpuProfileFile io.Closer
if *flags.CpuPrfDir != utils.EmptyString {
cpuProfileFile, err = cores.StartCPUProfiling(path.Join(*flags.CpuPrfDir, utils.CpuPathCgr))
if err != nil {
return
}
defer func() { //here
if cgr.cS == nil {
pprof.StopCPUProfile()
cpuProfileFile.Close()
}
}()
}
if *flags.ScheduledShutDown != utils.EmptyString {
var shtDwDur time.Duration
if shtDwDur, err = utils.ParseDurationWithNanosecs(*flags.ScheduledShutDown); err != nil {
return
}
cgr.shdWg.Add(1)
go func() { // Schedule shutdown
tm := time.NewTimer(shtDwDur)
select {
case <-tm.C:
shtDw()
case <-ctx.Done():
tm.Stop()
}
cgr.shdWg.Done()
}()
}
// Init config
if err = cgr.InitConfigFromPath(*flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil {
return
}
if *flags.CheckConfig {
return
}
// init syslog
if utils.Logger, err = utils.Newlogger(utils.FirstNonEmpty(*flags.SysLogger,
cgr.cfg.GeneralCfg().Logger), cgr.cfg.GeneralCfg().NodeID); err != nil {
log.Fatalf("Could not initialize syslog connection, err: <%s>", err)
return
}
utils.Logger.SetLogLevel(cgr.cfg.GeneralCfg().LogLevel)
utils.Logger.Info(fmt.Sprintf("<CoreS> starting version <%s><%s>", vers, goVers))
cgr.cfg.LazySanityCheck()
return
}

View File

@@ -24,6 +24,7 @@ import (
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
@@ -34,10 +35,11 @@ import (
// NewCoreService returns the Core Service
func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server,
internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService,
fileCpu io.Closer, fileMEM string, shdWg *sync.WaitGroup, stopMemPrf chan struct{},
shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService {
fileCpu io.Closer, fileMEM string, stopMemPrf chan struct{},
shdWg *sync.WaitGroup, srvDep map[string]*sync.WaitGroup,
shtDw context.CancelFunc) *CoreService {
return &CoreService{
shdChan: shdChan,
shtDw: shtDw,
shdWg: shdWg,
stopMemPrf: stopMemPrf,
connChan: internalCoreSChan,
@@ -60,7 +62,7 @@ type CoreService struct {
stopChan chan struct{}
shdWg *sync.WaitGroup
stopMemPrf chan struct{}
shdChan *utils.SyncedChan
shtDw context.CancelFunc
fileCpu io.Closer
fileMem string
cS *cores.CoreService
@@ -80,7 +82,7 @@ func (cS *CoreService) Start() (err error) {
defer cS.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS))
cS.stopChan = make(chan struct{})
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan)
cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.stopMemPrf, cS.shdWg, cS.shtDw)
cS.rpc = apis.NewCoreSv1(cS.cS)
srv, _ := birpc.NewService(cS.rpc, utils.EmptyString, false)
if !cS.cfg.DispatcherSCfg().Enabled {

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,12 +32,13 @@ import (
// NewDiameterAgent returns the Diameter Agent
func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &DiameterAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -47,8 +49,8 @@ type DiameterAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
shdChan *utils.SyncedChan
stopChan chan struct{}
shtDwn context.CancelFunc
da *agents.DiameterAgent
connMgr *engine.ConnManager
@@ -87,7 +89,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS) (err error) {
if err = d.ListenAndServe(da.stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
utils.DiameterAgent, err))
da.shdChan.CloseOnce()
da.shtDwn()
}
}(da.da)
return

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,12 +32,13 @@ import (
// NewDNSAgent returns the DNS Agent
func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &DNSAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -47,7 +49,7 @@ type DNSAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
dns *agents.DNSAgent
connMgr *engine.ConnManager
@@ -98,7 +100,7 @@ func (dns *DNSAgent) Reload() (err error) {
func (dns *DNSAgent) listenAndServe() (err error) {
if err = dns.dns.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error()))
dns.shdChan.CloseOnce() // stop the engine here
dns.shtDwn() // stop the engine here
}
return
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
@@ -31,13 +32,14 @@ import (
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -48,7 +50,7 @@ type EventReaderService struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
ers *ers.ERService
rldChan chan struct{}
@@ -83,7 +85,7 @@ func (erS *EventReaderService) Start() (err error) {
func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan chan struct{}, rldChan chan struct{}) (err error) {
if err = ers.ListenAndServe(stopChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error()))
erS.shdChan.CloseOnce()
erS.shtDwn()
}
return
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/agents"
@@ -32,11 +33,12 @@ import (
// NewFreeswitchAgent returns the Freeswitch Agent
func NewFreeswitchAgent(cfg *config.CGRConfig,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &FreeswitchAgent{
cfg: cfg,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -45,8 +47,8 @@ func NewFreeswitchAgent(cfg *config.CGRConfig,
// FreeswitchAgent implements Agent interface
type FreeswitchAgent struct {
sync.RWMutex
cfg *config.CGRConfig
shdChan *utils.SyncedChan
cfg *config.CGRConfig
shtDwn context.CancelFunc
fS *agents.FSsessions
connMgr *engine.ConnManager
@@ -64,12 +66,7 @@ func (fS *FreeswitchAgent) Start() (err error) {
fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr)
go func(f *agents.FSsessions) {
if err := f.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err))
fS.shdChan.CloseOnce() // stop the engine here
}
}(fS.fS)
go fS.connect(fS.fS)
return
}
@@ -81,14 +78,14 @@ func (fS *FreeswitchAgent) Reload() (err error) {
return
}
fS.fS.Reload()
go fS.reload(fS.fS)
go fS.connect(fS.fS)
return
}
func (fS *FreeswitchAgent) reload(f *agents.FSsessions) (err error) {
func (fS *FreeswitchAgent) connect(f *agents.FSsessions) (err error) {
if err := fS.fS.Connect(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err))
fS.shdChan.CloseOnce() // stop the engine here
fS.shtDwn() // stop the engine here
}
return
}

View File

@@ -23,6 +23,7 @@ import (
"strings"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/agents"
@@ -33,11 +34,12 @@ import (
// NewKamailioAgent returns the Kamailio Agent
func NewKamailioAgent(cfg *config.CGRConfig,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &KamailioAgent{
cfg: cfg,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -46,8 +48,8 @@ func NewKamailioAgent(cfg *config.CGRConfig,
// KamailioAgent implements Agent interface
type KamailioAgent struct {
sync.RWMutex
cfg *config.CGRConfig
shdChan *utils.SyncedChan
cfg *config.CGRConfig
shtDwn context.CancelFunc
kam *agents.KamailioAgent
connMgr *engine.ConnManager
@@ -66,13 +68,7 @@ func (kam *KamailioAgent) Start() (err error) {
kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr,
utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone))
go func(k *agents.KamailioAgent) {
if err = k.Connect(); err != nil &&
!strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
kam.shdChan.CloseOnce()
}
}(kam.kam)
go kam.connect(kam.kam)
return
}
@@ -84,17 +80,16 @@ func (kam *KamailioAgent) Reload() (err error) {
return
}
kam.kam.Reload()
go kam.reload(kam.kam)
go kam.connect(kam.kam)
return
}
func (kam *KamailioAgent) reload(k *agents.KamailioAgent) (err error) {
func (kam *KamailioAgent) connect(k *agents.KamailioAgent) (err error) {
if err = k.Connect(); err != nil {
if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
return
if !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
kam.shtDwn()
}
utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err))
kam.shdChan.CloseOnce()
}
return
}

View File

@@ -44,42 +44,42 @@ func NewCGREngineFlags() *CGREngineFlags {
fs := flag.NewFlagSet(utils.CgrEngine, flag.ContinueOnError)
return &CGREngineFlags{
FlagSet: fs,
Cfgpath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."),
CfgPath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."),
Version: fs.Bool(utils.VersionCgr, false, "Prints the application version."),
Checkconfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"),
Pidfile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"),
Httppprofpath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"),
Cpuprofdir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"),
Memprofdir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"),
Memprofinterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"),
Memprofnrfiles: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"),
Scheduledshutdown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"),
PidFile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"),
HttPrfPath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"),
CpuPrfDir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"),
MemPrfDir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"),
MemPrfInterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"),
MemPrfNoF: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"),
ScheduledShutDown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"),
Singlecpu: fs.Bool(utils.SingleCpuCgr, false, "Run on single CPU core"),
Syslogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"),
Nodeid: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"),
Loglevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"),
SysLogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"),
NodeID: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"),
LogLevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"),
Preload: fs.String(utils.PreloadCgr, utils.EmptyString, "LoaderIDs used to load the data before the engine starts"),
CheckConfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"),
}
}
type CGREngineFlags struct {
*flag.FlagSet
Cfgpath *string
CfgPath *string
Version *bool
Checkconfig *bool
Pidfile *string
Httppprofpath *string
Cpuprofdir *string
Memprofdir *string
Memprofinterval *time.Duration
Memprofnrfiles *int
Scheduledshutdown *string
PidFile *string
HttPrfPath *string
CpuPrfDir *string
MemPrfDir *string
MemPrfInterval *time.Duration
MemPrfNoF *int
ScheduledShutDown *string
Singlecpu *bool
Syslogger *string
Nodeid *string
Loglevel *int
SysLogger *string
NodeID *string
LogLevel *int
Preload *string
CheckConfig *bool
}
func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc,
@@ -231,18 +231,17 @@ func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector,
iConfigCh <- rpc
}
func startRPC(server *cores.Server, internalAdminSChan,
internalCdrSChan, internalRsChan, internalStatSChan,
func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc,
cfg *config.CGRConfig, server *cores.Server,
internalAdminSChan, internalCdrSChan, internalRsChan, internalStatSChan,
internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan,
internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan,
internalLoaderSChan, internalCacheSChan,
internalEEsChan, internalRateSChan, internalActionSChan,
internalAccountSChan chan birpc.ClientConnector,
shdChan *utils.SyncedChan) {
internalLoaderSChan, internalCacheSChan, internalEEsChan, internalRateSChan,
internalActionSChan, internalAccountSChan chan birpc.ClientConnector) {
if !cfg.DispatcherSCfg().Enabled {
select { // Any of the rpc methods will unlock listening to rpc requests
// case cdrs := <-internalCdrSChan:
// internalCdrSChan <- cdrs
case cdrs := <-internalCdrSChan:
internalCdrSChan <- cdrs
case smg := <-internalSessionSChan:
internalSessionSChan <- smg
case rls := <-internalRsChan:
@@ -259,85 +258,30 @@ func startRPC(server *cores.Server, internalAdminSChan,
internalThdSChan <- thS
case rtS := <-internalRouteSChan:
internalRouteSChan <- rtS
// case analyzerS := <-internalAnalyzerSChan:
// internalAnalyzerSChan <- analyzerS
case analyzerS := <-internalAnalyzerSChan:
internalAnalyzerSChan <- analyzerS
case loaderS := <-internalLoaderSChan:
internalLoaderSChan <- loaderS
case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done
internalCacheSChan <- chS
// case eeS := <-internalEEsChan:
// internalEEsChan <- eeS
case eeS := <-internalEEsChan:
internalEEsChan <- eeS
case rateS := <-internalRateSChan:
internalRateSChan <- rateS
case actionS := <-internalActionSChan:
internalActionSChan <- actionS
case accountS := <-internalAccountSChan:
internalAccountSChan <- accountS
case <-shdChan.Done():
case <-ctx.Done():
return
}
} else {
select {
case dispatcherS := <-internalDispatcherSChan:
internalDispatcherSChan <- dispatcherS
case <-shdChan.Done():
case <-ctx.Done():
return
}
}
go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan)
go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan)
go server.ServeHTTP(
cfg.ListenCfg().HTTPListen,
cfg.HTTPCfg().JsonRPCURL,
cfg.HTTPCfg().WSURL,
cfg.HTTPCfg().UseBasicAuth,
cfg.HTTPCfg().AuthUsers,
shdChan,
)
if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 ||
len(cfg.ListenCfg().RPCJSONTLSListen) != 0 ||
len(cfg.ListenCfg().HTTPTLSListen) != 0) &&
(len(cfg.TLSCfg().ServerCerificate) == 0 ||
len(cfg.TLSCfg().ServerKey) == 0) {
utils.Logger.Warning("WARNING: missing TLS certificate/key file!")
return
}
if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString {
go server.ServeGOBTLS(
cfg.ListenCfg().RPCGOBTLSListen,
cfg.TLSCfg().ServerCerificate,
cfg.TLSCfg().ServerKey,
cfg.TLSCfg().CaCertificate,
cfg.TLSCfg().ServerPolicy,
cfg.TLSCfg().ServerName,
shdChan,
)
}
if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString {
go server.ServeJSONTLS(
cfg.ListenCfg().RPCJSONTLSListen,
cfg.TLSCfg().ServerCerificate,
cfg.TLSCfg().ServerKey,
cfg.TLSCfg().CaCertificate,
cfg.TLSCfg().ServerPolicy,
cfg.TLSCfg().ServerName,
shdChan,
)
}
if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString {
go server.ServeHTTPTLS(
cfg.ListenCfg().HTTPTLSListen,
cfg.TLSCfg().ServerCerificate,
cfg.TLSCfg().ServerKey,
cfg.TLSCfg().CaCertificate,
cfg.TLSCfg().ServerPolicy,
cfg.TLSCfg().ServerName,
cfg.HTTPCfg().JsonRPCURL,
cfg.HTTPCfg().WSURL,
cfg.HTTPCfg().UseBasicAuth,
cfg.HTTPCfg().AuthUsers,
shdChan,
)
}
server.StartServer(ctx, shtdwnEngine, cfg)
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,12 +32,13 @@ import (
// NewRadiusAgent returns the Radius Agent
func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &RadiusAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -47,7 +49,7 @@ type RadiusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
stopChan chan struct{}
rad *agents.RadiusAgent
@@ -89,7 +91,7 @@ func (rad *RadiusAgent) Start() (err error) {
func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent) (err error) {
if err = r.ListenAndServe(rad.stopChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error()))
rad.shdChan.CloseOnce()
rad.shtDwn()
}
return
}

View File

@@ -20,9 +20,11 @@ package services
import (
"fmt"
"github.com/cgrates/cgrates/apis"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/birpc"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
@@ -36,14 +38,15 @@ import (
// NewSessionService returns the Session Service
func NewSessionService(cfg *config.CGRConfig, dm *DataDBService,
server *cores.Server, internalChan chan birpc.ClientConnector,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &SessionService{
connChan: internalChan,
cfg: cfg,
dm: dm,
server: server,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
anz: anz,
srvDep: srvDep,
@@ -56,7 +59,7 @@ type SessionService struct {
cfg *config.CGRConfig
dm *DataDBService
server *cores.Server
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
stopChan chan struct{}
sm *sessions.SessionS
@@ -116,7 +119,7 @@ func (smg *SessionService) start() (err error) {
smg.Lock()
smg.bircpEnabled = false
smg.Unlock()
smg.shdChan.CloseOnce()
smg.shtDwn()
}
return
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -31,12 +32,13 @@ import (
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
shdChan *utils.SyncedChan, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup,
shtDwn context.CancelFunc) servmanager.Service {
return &SIPAgent{
cfg: cfg,
filterSChan: filterSChan,
shdChan: shdChan,
shtDwn: shtDwn,
connMgr: connMgr,
srvDep: srvDep,
}
@@ -47,7 +49,7 @@ type SIPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
shdChan *utils.SyncedChan
shtDwn context.CancelFunc
sip *agents.SIPAgent
connMgr *engine.ConnManager
@@ -74,15 +76,17 @@ func (sip *SIPAgent) Start() (err error) {
utils.SIPAgent, err))
return
}
go func() {
if err = sip.sip.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
sip.shdChan.CloseOnce() // stop the engine here
}
}()
go sip.listenAndServe()
return
}
func (sip *SIPAgent) listenAndServe() {
if err := sip.sip.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
sip.shtDwn() // stop the engine here
}
}
// Reload handles the change of config
func (sip *SIPAgent) Reload() (err error) {
if sip.oldListen == sip.cfg.SIPAgentCfg().Listen {
@@ -93,12 +97,7 @@ func (sip *SIPAgent) Reload() (err error) {
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip.InitStopChan()
sip.Unlock()
go func() {
if err := sip.sip.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
sip.shdChan.CloseOnce() // stop the engine here
}
}()
go sip.listenAndServe()
return
}

View File

@@ -22,21 +22,20 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewServiceManager returns a service manager
func NewServiceManager(cfg *config.CGRConfig, shdChan *utils.SyncedChan, shdWg *sync.WaitGroup, connMgr *engine.ConnManager) *ServiceManager {
sm := &ServiceManager{
func NewServiceManager(cfg *config.CGRConfig, shdWg *sync.WaitGroup, connMgr *engine.ConnManager) *ServiceManager {
return &ServiceManager{
cfg: cfg,
subsystems: make(map[string]Service),
shdChan: shdChan,
shdWg: shdWg,
connMgr: connMgr,
}
return sm
}
// ServiceManager handles service management ran by the engine
@@ -45,7 +44,6 @@ type ServiceManager struct {
cfg *config.CGRConfig
subsystems map[string]Service
shdChan *utils.SyncedChan
shdWg *sync.WaitGroup
connMgr *engine.ConnManager
}
@@ -58,8 +56,8 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig {
}
// StartServices starts all enabled services
func (srvMngr *ServiceManager) StartServices() (err error) {
go srvMngr.handleReload()
func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn context.CancelFunc) (err error) {
go srvMngr.handleReload(ctx, shtDwn)
for _, service := range srvMngr.subsystems {
if service.ShouldRun() && !service.IsRunning() {
srvMngr.shdWg.Add(1)
@@ -67,7 +65,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
if err := srv.Start(); err != nil &&
err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine
utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err))
srvMngr.shdChan.CloseOnce()
shtDwn()
}
}(service)
}
@@ -87,100 +85,100 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) {
srvMngr.Unlock()
}
func (srvMngr *ServiceManager) handleReload() {
func (srvMngr *ServiceManager) handleReload(ctx *context.Context, shtDwn context.CancelFunc) {
for {
select {
case <-srvMngr.shdChan.Done():
case <-ctx.Done():
srvMngr.ShutdownServices()
return
case <-srvMngr.GetConfig().GetReloadChan(config.AttributeSJSON):
go srvMngr.reloadService(utils.AttributeS)
go srvMngr.reloadService(utils.AttributeS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSJSON):
go srvMngr.reloadService(utils.ChargerS)
go srvMngr.reloadService(utils.ChargerS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.ThresholdSJSON):
go srvMngr.reloadService(utils.ThresholdS)
go srvMngr.reloadService(utils.ThresholdS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.StatSJSON):
go srvMngr.reloadService(utils.StatS)
go srvMngr.reloadService(utils.StatS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.ResourceSJSON):
go srvMngr.reloadService(utils.ResourceS)
go srvMngr.reloadService(utils.ResourceS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.RouteSJSON):
go srvMngr.reloadService(utils.RouteS)
go srvMngr.reloadService(utils.RouteS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.AdminSJSON):
go srvMngr.reloadService(utils.AdminS)
go srvMngr.reloadService(utils.AdminS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.CDRsJSON):
go srvMngr.reloadService(utils.CDRServer)
go srvMngr.reloadService(utils.CDRServer, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJSON):
go srvMngr.reloadService(utils.SessionS)
go srvMngr.reloadService(utils.SessionS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJSON):
go srvMngr.reloadService(utils.ERs)
go srvMngr.reloadService(utils.ERs, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJSON):
go srvMngr.reloadService(utils.DNSAgent)
go srvMngr.reloadService(utils.DNSAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSON):
go srvMngr.reloadService(utils.FreeSWITCHAgent)
go srvMngr.reloadService(utils.FreeSWITCHAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSON):
go srvMngr.reloadService(utils.KamailioAgent)
go srvMngr.reloadService(utils.KamailioAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSON):
go srvMngr.reloadService(utils.AsteriskAgent)
go srvMngr.reloadService(utils.AsteriskAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.RadiusAgentJSON):
go srvMngr.reloadService(utils.RadiusAgent)
go srvMngr.reloadService(utils.RadiusAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.DiameterAgentJSON):
go srvMngr.reloadService(utils.DiameterAgent)
go srvMngr.reloadService(utils.DiameterAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.HTTPAgentJSON):
go srvMngr.reloadService(utils.HTTPAgent)
go srvMngr.reloadService(utils.HTTPAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.LoaderSJSON):
go srvMngr.reloadService(utils.LoaderS)
go srvMngr.reloadService(utils.LoaderS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerSJSON):
go srvMngr.reloadService(utils.AnalyzerS)
go srvMngr.reloadService(utils.AnalyzerS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherSJSON):
go srvMngr.reloadService(utils.DispatcherS)
go srvMngr.reloadService(utils.DispatcherS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.DataDBJSON):
go srvMngr.reloadService(utils.DataDB)
go srvMngr.reloadService(utils.DataDB, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.StorDBJSON):
go srvMngr.reloadService(utils.StorDB)
go srvMngr.reloadService(utils.StorDB, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.EEsJSON):
go srvMngr.reloadService(utils.EEs)
go srvMngr.reloadService(utils.EEs, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.RateSJSON):
go srvMngr.reloadService(utils.RateS)
go srvMngr.reloadService(utils.RateS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJSON):
go srvMngr.connMgr.Reload()
case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJSON):
go srvMngr.reloadService(utils.SIPAgent)
go srvMngr.reloadService(utils.SIPAgent, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.RegistrarCJSON):
go srvMngr.reloadService(utils.RegistrarC)
go srvMngr.reloadService(utils.RegistrarC, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.HTTPJSON):
go srvMngr.reloadService(utils.GlobalVarS)
go srvMngr.reloadService(utils.GlobalVarS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.AccountSJSON):
go srvMngr.reloadService(utils.AccountS)
go srvMngr.reloadService(utils.AccountS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.ActionSJSON):
go srvMngr.reloadService(utils.ActionS)
go srvMngr.reloadService(utils.ActionS, shtDwn)
case <-srvMngr.GetConfig().GetReloadChan(config.CoreSJSON):
go srvMngr.reloadService(utils.CoreS)
go srvMngr.reloadService(utils.CoreS, shtDwn)
}
// handle RPC server
}
}
func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) {
func (srvMngr *ServiceManager) reloadService(srviceName string, shtDwn context.CancelFunc) (err error) {
srv := srvMngr.GetService(srviceName)
if srv.ShouldRun() {
if srv.IsRunning() {
if err = srv.Reload(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err))
srvMngr.shdChan.CloseOnce()
shtDwn()
return // stop if we encounter an error
}
} else {
srvMngr.shdWg.Add(1)
if err = srv.Start(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err))
srvMngr.shdChan.CloseOnce()
shtDwn()
return // stop if we encounter an error
}
}
} else if srv.IsRunning() {
if err = srv.Shutdown(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> failed to stop service <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err))
srvMngr.shdChan.CloseOnce()
shtDwn()
}
srvMngr.shdWg.Done()
}

View File

@@ -1435,9 +1435,6 @@ const (
CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost"
CDRsV1ProcessEvent = "CDRsV1.ProcessEvent"
CDRsV1Ping = "CDRsV1.Ping"
CDRsV2 = "CDRsV2"
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"
CDRsV2ProcessEvent = "CDRsV2.ProcessEvent"
)
// EEs