From cea082eb7f94d3c689d9e2dd76b9cd991f5b8d0f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 7 Sep 2021 19:32:59 +0300 Subject: [PATCH] Updated CGREngine structure --- cmd/cgr-engine/cgr-engine.go | 269 +--------------- cores/core.go | 24 +- cores/core_test.go | 11 +- cores/server.go | 32 +- cores/server_it_test.go | 576 ++--------------------------------- cores/server_test.go | 4 + engine/connmanager.go | 14 +- engine/libengine.go | 64 ++-- services/analyzers.go | 12 +- services/asteriskagent.go | 16 +- services/cgr-engine.go | 309 ++++++++++++++++++- services/cores.go | 12 +- services/diameteragent.go | 12 +- services/dnsagent.go | 12 +- services/ers.go | 12 +- services/freeswitchagent.go | 25 +- services/kamailioagent.go | 31 +- services/libcgr-engine.go | 132 +++----- services/radiusagent.go | 12 +- services/sessions.go | 15 +- services/sipagent.go | 31 +- servmanager/servmanager.go | 86 +++--- utils/consts.go | 3 - 23 files changed, 613 insertions(+), 1101 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2224abaf1..d5540cf19 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -39,7 +39,6 @@ import ( "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/loaders" - "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -323,126 +322,7 @@ func main() { if err := cgrEngineFlags.Parse(os.Args[1:]); err != nil { return } - vers, err := utils.GetCGRVersion() - if err != nil { - fmt.Println(err) - return - } - goVers := runtime.Version() - if *version { - fmt.Println(vers) - return - } - if *pidFile != utils.EmptyString { - writePid() - } - if *singlecpu { - runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases - } - shdWg := new(sync.WaitGroup) - shdChan := utils.NewSyncedChan() - - shdWg.Add(1) - go singnalHandler(shdWg, shdChan) - - var cS *cores.CoreService - var stopMemProf chan struct{} - var memPrfDirForCores string - if *memProfDir != utils.EmptyString { - shdWg.Add(1) - stopMemProf = make(chan struct{}) - memPrfDirForCores = *memProfDir - go cores.MemProfiling(*memProfDir, *memProfInterval, *memProfNrFiles, shdWg, stopMemProf, shdChan) - defer func() { - if cS == nil { - close(stopMemProf) - } - }() - } - - var cpuProfileFile io.Closer - if *cpuProfDir != utils.EmptyString { - cpuPath := path.Join(*cpuProfDir, utils.CpuPathCgr) - cpuProfileFile, err = cores.StartCPUProfiling(cpuPath) - if err != nil { - return - } - defer func() { - if cS != nil { - cS.StopCPUProfiling() - return - } - if cpuProfileFile != nil { - pprof.StopCPUProfile() - cpuProfileFile.Close() - } - }() - } - - if *scheduledShutdown != utils.EmptyString { - shutdownDur, err := utils.ParseDurationWithNanosecs(*scheduledShutdown) - if err != nil { - log.Fatal(err) - } - shdWg.Add(1) - go func() { // Schedule shutdown - tm := time.NewTimer(shutdownDur) - select { - case <-tm.C: - shdChan.CloseOnce() - case <-shdChan.Done(): - tm.Stop() - } - shdWg.Done() - }() - } - - // Init config - cfg, err = config.NewCGRConfigFromPath(*cfgPath) - if err != nil { - log.Fatalf("Could not parse config: <%s>", err.Error()) - return - } - if *checkConfig { - if err := cfg.CheckConfigSanity(); err != nil { - fmt.Println(err) - } - return - } - - if *nodeID != utils.EmptyString { - cfg.GeneralCfg().NodeID = *nodeID - } - - if cfg.ConfigDBCfg().Type != utils.MetaInternal { - d, err := engine.NewDataDBConn(cfg.ConfigDBCfg().Type, - cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port, - cfg.ConfigDBCfg().Name, cfg.ConfigDBCfg().User, - cfg.ConfigDBCfg().Password, cfg.GeneralCfg().DBDataEncoding, - cfg.ConfigDBCfg().Opts) - if err != nil { // Cannot configure getter database, show stopper - log.Fatalf("Could not configure configDB: %s exiting!", err) - return - } - if err = cfg.LoadFromDB(d); err != nil { - log.Fatalf("Could not parse config: <%s>", err.Error()) - return - } - } - config.SetCgrConfig(cfg) // Share the config object - - // init syslog - if utils.Logger, err = utils.Newlogger(utils.FirstNonEmpty(*syslogger, - cfg.GeneralCfg().Logger), cfg.GeneralCfg().NodeID); err != nil { - log.Fatalf("Could not initialize syslog connection, err: <%s>", err.Error()) - return - } - lgLevel := cfg.GeneralCfg().LogLevel - if *logLevel != -1 { // Modify the log level if provided by command arguments - lgLevel = *logLevel - } - utils.Logger.SetLogLevel(lgLevel) // init the concurrentRequests cncReqsLimit := cfg.CoreSCfg().Caps if utils.ConcurrentReqsLimit != 0 { // used as shared variable @@ -453,8 +333,6 @@ func main() { cncReqsStrategy = utils.ConcurrentReqsStrategy } caps := engine.NewCaps(cncReqsLimit, cncReqsStrategy) - utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, goVers)) - cfg.LazySanityCheck() // init the channel here because we need to pass them to connManager internalServeManagerChan := make(chan birpc.ClientConnector, 1) @@ -539,115 +417,7 @@ func main() { utils.ActionS: new(sync.WaitGroup), utils.AccountS: new(sync.WaitGroup), } - gvService := services.NewGlobalVarS(cfg, srvDep) - shdWg.Add(1) - if err = gvService.Start(); err != nil { - return - } - dmService := services.NewDataDBService(cfg, connManager, srvDep) - if dmService.ShouldRun() { // Some services can run without db, ie: ERs - shdWg.Add(1) - if err = dmService.Start(); err != nil { - return - } - } - storDBService := services.NewStorDBService(cfg, srvDep) - if storDBService.ShouldRun() { // Some services can run without db, ie: ERs - shdWg.Add(1) - if err = storDBService.Start(); err != nil { - return - } - } - - // Rpc/http server - server := cores.NewServer(caps) - if len(cfg.HTTPCfg().RegistrarSURL) != 0 { - server.RegisterHTTPFunc(cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) - } - if cfg.ConfigSCfg().Enabled { - server.RegisterHTTPFunc(cfg.ConfigSCfg().URL, config.HandlerConfigS) - } - if *httpPprofPath != utils.EmptyString { - server.RegisterProfiler(*httpPprofPath) - } - - // Define internal connections via channels - filterSChan := make(chan *engine.FilterS, 1) - - // init AnalyzerS - anz := services.NewAnalyzerService(cfg, server, filterSChan, shdChan, internalAnalyzerSChan, srvDep) - if anz.ShouldRun() { - shdWg.Add(1) - if err := anz.Start(); err != nil { - fmt.Println(err) - return - } - } - - // init CoreSv1 - - coreS := services.NewCoreService(cfg, caps, server, internalCoreSv1Chan, anz, cpuProfileFile, memPrfDirForCores, shdWg, stopMemProf, shdChan, srvDep) - shdWg.Add(1) - if err := coreS.Start(); err != nil { - fmt.Println(err) - return - } - cS = coreS.GetCoreS() - - // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), shdChan, anz, coreS.GetCoreS().CapsStats) - engine.Cache = cacheS - - // init GuardianSv1 - initGuardianSv1(internalGuardianSChan, server, anz) - - // Start ServiceManager - srvManager := servmanager.NewServiceManager(cfg, shdChan, shdWg, connManager) - dspS := services.NewDispatcherService(cfg, dmService, cacheS, filterSChan, server, internalDispatcherSChan, connManager, anz, srvDep) - attrS := services.NewAttributeService(cfg, dmService, cacheS, filterSChan, server, internalAttributeSChan, anz, dspS, srvDep) - dspH := services.NewRegistrarCService(cfg, server, connManager, anz, srvDep) - chrS := services.NewChargerService(cfg, dmService, cacheS, filterSChan, server, - internalChargerSChan, connManager, anz, srvDep) - tS := services.NewThresholdService(cfg, dmService, cacheS, filterSChan, - connManager, server, internalThresholdSChan, anz, srvDep) - stS := services.NewStatService(cfg, dmService, cacheS, filterSChan, server, - internalStatSChan, connManager, anz, srvDep) - reS := services.NewResourceService(cfg, dmService, cacheS, filterSChan, server, - internalResourceSChan, connManager, anz, srvDep) - routeS := services.NewRouteService(cfg, dmService, cacheS, filterSChan, server, - internalRouteSChan, connManager, anz, srvDep) - - admS := services.NewAdminSv1Service(cfg, dmService, storDBService, filterSChan, server, - internalAdminSChan, connManager, anz, srvDep) - - cdrS := services.NewCDRServer(cfg, dmService, storDBService, filterSChan, server, internalCDRServerChan, - connManager, anz, srvDep) - - smg := services.NewSessionService(cfg, dmService, server, internalSessionSChan, shdChan, connManager, anz, srvDep) - - ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, - internalLoaderSChan, connManager, anz, srvDep) - - srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, - admS, cdrS, smg, coreS, - services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, srvDep), - services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep), - services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), - services.NewKamailioAgent(cfg, shdChan, connManager, srvDep), - services.NewAsteriskAgent(cfg, shdChan, connManager, srvDep), // partial reload - services.NewRadiusAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload - services.NewDiameterAgent(cfg, filterSChan, shdChan, connManager, srvDep), // partial reload - services.NewHTTPAgent(cfg, filterSChan, server, connManager, srvDep), // no reload - ldrs, anz, dspS, dspH, dmService, storDBService, - services.NewEventExporterService(cfg, filterSChan, - connManager, server, internalEEsChan, anz, srvDep), - services.NewRateService(cfg, cacheS, filterSChan, dmService, - server, internalRateSChan, anz, srvDep), - services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), - services.NewActionService(cfg, dmService, cacheS, filterSChan, connManager, server, internalActionSChan, anz, srvDep), - services.NewAccountService(cfg, dmService, cacheS, filterSChan, connManager, server, internalAccountSChan, anz, srvDep), - ) srvManager.StartServices() // Start FilterS go startFilterService(filterSChan, cacheS, connManager, @@ -655,31 +425,6 @@ func main() { initServiceManagerV1(internalServeManagerChan, srvManager, server, anz) - // init internalRPCSet to share internal connections among the engine - engine.IntRPC = engine.NewRPCClientSet() - engine.IntRPC.AddInternalRPCClient(utils.AnalyzerSv1, internalAnalyzerSChan) - engine.IntRPC.AddInternalRPCClient(utils.AdminS, internalAdminSChan) - engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, internalAttributeSChan) - engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) - engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, internalCDRServerChan) - engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, internalCDRServerChan) - engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, internalChargerSChan) - engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) - engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) - engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, internalResourceSChan) - engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSessionSChan) - engine.IntRPC.AddInternalRPCClient(utils.StatSv1, internalStatSChan) - engine.IntRPC.AddInternalRPCClient(utils.RouteSv1, internalRouteSChan) - engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, internalThresholdSChan) - engine.IntRPC.AddInternalRPCClient(utils.ServiceManagerV1, internalServeManagerChan) - engine.IntRPC.AddInternalRPCClient(utils.ConfigSv1, internalConfigChan) - engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) - engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan) - engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan) - engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan) - engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) - engine.IntRPC.AddInternalRPCClient(utils.AccountSv1, internalAccountSChan) - initConfigSv1(internalConfigChan, server, anz) if *preload != utils.EmptyString { @@ -687,13 +432,13 @@ func main() { } // Serve rpc connections - go startRPC(server, internalAdminSChan, internalCDRServerChan, - internalResourceSChan, internalStatSChan, - internalAttributeSChan, internalChargerSChan, internalThresholdSChan, - internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, - internalDispatcherSChan, internalLoaderSChan, - internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, - internalAccountSChan, shdChan) + // go startRPC(server, internalAdminSChan, internalCDRServerChan, + // internalResourceSChan, internalStatSChan, + // internalAttributeSChan, internalChargerSChan, internalThresholdSChan, + // internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, + // internalDispatcherSChan, internalLoaderSChan, + // internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, + // internalAccountSChan, shdChan) <-shdChan.Done() shtdDone := make(chan struct{}) diff --git a/cores/core.go b/cores/core.go index 6f8b15deb..05f65f21e 100644 --- a/cores/core.go +++ b/cores/core.go @@ -29,13 +29,14 @@ import ( "sync" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, fileMem string, stopChan chan struct{}, - shdWg *sync.WaitGroup, stopMemPrf chan struct{}, shdChan *utils.SyncedChan) *CoreService { + stopMemPrf chan struct{}, shdWg *sync.WaitGroup, shtDw context.CancelFunc) *CoreService { var st *engine.CapsStats if caps.IsLimited() && cfg.CoreSCfg().CapsStatsInterval != 0 { st = engine.NewCapsStats(cfg.CoreSCfg().CapsStatsInterval, caps, stopChan) @@ -43,7 +44,7 @@ func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, fileCPU io.Closer, return &CoreService{ shdWg: shdWg, stopMemPrf: stopMemPrf, - shdChan: shdChan, + shtDw: shtDw, cfg: cfg, CapsStats: st, fileCPU: fileCPU, @@ -56,26 +57,27 @@ type CoreService struct { CapsStats *engine.CapsStats shdWg *sync.WaitGroup stopMemPrf chan struct{} - shdChan *utils.SyncedChan fileMEM string fileCPU io.Closer fileMx sync.Mutex + shtDw context.CancelFunc } func (cS *CoreService) ShutdownEngine() { - cS.shdChan.CloseOnce() + cS.shtDw() } // Shutdown is called to shutdown the service func (cS *CoreService) Shutdown() { utils.Logger.Info(fmt.Sprintf("<%s> shutdown initialized", utils.CoreS)) - cS.StopChanMemProf() + cS.stopChanMemProf() + cS.StopCPUProfiling() utils.Logger.Info(fmt.Sprintf("<%s> shutdown complete", utils.CoreS)) } -// StopChanMemProf will stop the MemoryProfiling Channel in order to create +// stopChanMemProf will stop the MemoryProfiling Channel in order to create // the final MemoryProfiling when CoreS subsystem will stop. -func (cS *CoreService) StopChanMemProf() { +func (cS *CoreService) stopChanMemProf() { if cS.stopMemPrf != nil { MemProfFile(cS.fileMEM) close(cS.stopMemPrf) @@ -108,7 +110,7 @@ func MemProfFile(memProfPath string) bool { return true } -func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shdChan *utils.SyncedChan) { +func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg *sync.WaitGroup, stopChan chan struct{}, shDw context.CancelFunc) { tm := time.NewTimer(interval) for i := 1; ; i++ { select { @@ -119,7 +121,7 @@ func MemProfiling(memProfDir string, interval time.Duration, nrFiles int, shdWg case <-tm.C: } if !MemProfFile(path.Join(memProfDir, fmt.Sprintf("mem%v.prof", i))) { - shdChan.CloseOnce() + shDw() shdWg.Done() return } @@ -192,7 +194,7 @@ func (cS *CoreService) StartMemoryProfiling(args *utils.MemoryPrf) (err error) { cS.shdWg.Add(1) cS.stopMemPrf = make(chan struct{}) cS.fileMEM = args.DirPath - go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shdChan) + go MemProfiling(args.DirPath, args.Interval, args.NrFiles, cS.shdWg, cS.stopMemPrf, cS.shtDw) return } @@ -202,6 +204,6 @@ func (cS *CoreService) StopMemoryProfiling() (err error) { return errors.New(" Memory Profiling is not started") } cS.fileMEM = path.Join(cS.fileMEM, utils.MemProfFileCgr) - cS.StopChanMemProf() + cS.stopChanMemProf() return } diff --git a/cores/core_test.go b/cores/core_test.go index b8e4f05c2..214cc6f7b 100644 --- a/cores/core_test.go +++ b/cores/core_test.go @@ -33,7 +33,7 @@ func TestNewCoreService(t *testing.T) { cfgDflt := config.NewDefaultCGRConfig() cfgDflt.CoreSCfg().CapsStatsInterval = time.Second stopchan := make(chan struct{}, 1) - shdChan := utils.NewSyncedChan() + shdChan := func() {} caps := engine.NewCaps(1, utils.MetaBusy) sts := engine.NewCapsStats(cfgDflt.CoreSCfg().CapsStatsInterval, caps, stopchan) stopMemChan := make(chan struct{}) @@ -42,9 +42,9 @@ func TestNewCoreService(t *testing.T) { cfg: cfgDflt, CapsStats: sts, fileMEM: "/tmp", - shdChan: shdChan, + shtDw: shdChan, } - rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopMemChan, nil, stopchan, shdChan) + rcv := NewCoreService(cfgDflt, caps, nil, "/tmp", stopMemChan, stopchan, nil, shdChan) if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", utils.ToJSON(expected), utils.ToJSON(rcv)) } @@ -57,10 +57,7 @@ func TestCoreServiceStatus(t *testing.T) { cfgDflt := config.NewDefaultCGRConfig() cfgDflt.CoreSCfg().CapsStatsInterval = 1 caps := engine.NewCaps(1, utils.MetaBusy) - stopChan := make(chan struct{}, 1) - shdChan := utils.NewSyncedChan() - - cores := NewCoreService(cfgDflt, caps, nil, "/tmp", nil, nil, stopChan, shdChan) + cores := NewCoreService(cfgDflt, caps, nil, "/tmp", nil, nil, nil, func() {}) args := &utils.TenantWithAPIOpts{ Tenant: "cgrates.org", APIOpts: map[string]interface{}{}, diff --git a/cores/server.go b/cores/server.go index a13f665d5..08e01947f 100644 --- a/cores/server.go +++ b/cores/server.go @@ -44,9 +44,8 @@ func NewServer(caps *engine.Caps) (s *Server) { stopbiRPCServer: make(chan struct{}, 1), caps: caps, - rpcStarted: utils.NewSyncedChan(), - rpcServer: birpc.NewServer(), - birpcSrv: birpc.NewBirpcServer(), + rpcServer: birpc.NewServer(), + birpcSrv: birpc.NewBirpcServer(), } s.httpServer = &http.Server{Handler: s.httpMux} s.httpsServer = &http.Server{Handler: s.httpsMux} @@ -63,7 +62,6 @@ type Server struct { caps *engine.Caps anz *analyzers.AnalyzerService - rpcStarted *utils.SyncedChan rpcServer *birpc.Server rpcJSONl net.Listener rpcGOBl net.Listener @@ -208,7 +206,7 @@ func (s *Server) ServeHTTP(shtdwnEngine context.CancelFunc, addr, jsonRPCURL, ws } // ServeBiRPC create a goroutine to listen and serve as BiRPC server -func (s *Server) ServeBiRPC2(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) { +func (s *Server) ServeBiRPC(addrJSON, addrGOB string, onConn, onDis func(birpc.ClientConnector)) (err error) { s.birpcSrv.OnConnect(onConn) s.birpcSrv.OnDisconnect(onDis) if addrJSON != utils.EmptyString { @@ -320,12 +318,24 @@ func (s *Server) ServeHTTPS(shtdwnEngine context.CancelFunc, } func (s *Server) Stop() { - s.rpcJSONl.Close() - s.rpcGOBl.Close() - s.rpcJSONlTLS.Close() - s.rpcGOBlTLS.Close() - s.httpServer.Shutdown(context.Background()) - s.httpsServer.Shutdown(context.Background()) + if s.rpcJSONl != nil { + s.rpcJSONl.Close() + } + if s.rpcGOBl != nil { + s.rpcGOBl.Close() + } + if s.rpcJSONlTLS != nil { + s.rpcJSONlTLS.Close() + } + if s.rpcGOBlTLS != nil { + s.rpcGOBlTLS.Close() + } + if s.httpServer != nil { + s.httpServer.Shutdown(context.Background()) + } + if s.httpsServer != nil { + s.httpsServer.Shutdown(context.Background()) + } s.StopBiRPC() } diff --git a/cores/server_it_test.go b/cores/server_it_test.go index e2a8402f6..2f8525cc2 100644 --- a/cores/server_it_test.go +++ b/cores/server_it_test.go @@ -52,34 +52,12 @@ var ( server *Server sTestsServer = []func(t *testing.T){ - testServeGOBPortFail, testServeJSON, - testServeJSONFail, - testServeJSONFailRpcEnabled, - testServeGOB, - testServeHHTPPass, - testServeHHTPPassUseBasicAuth, - testServeHHTPEnableHttp, testServeHHTPFail, - testServeHHTPFailEnableRpc, - testServeBiJSON, - testServeBiJSONEmptyBiRPCServer, testServeBiJSONInvalidPort, - testServeBiGoB, - testServeBiGoBEmptyBiRPCServer, testServeBiGoBInvalidPort, - testServeGOBTLS, - testServeJSONTls, - testServeCodecTLSErr, testLoadTLSConfigErr, - testServeHTTPTLS, - testServeHTTPTLSWithBasicAuth, - testServeHTTPTLSError, - testServeHTTPTLSHttpNotEnabled, testHandleRequest, - testBiRPCRegisterName, - testAcceptBiRPC, - testAcceptBiRPCError, testRpcRegisterActions, testWebSocket, } @@ -93,13 +71,13 @@ func TestServerIT(t *testing.T) { } } -type mockRegister string +type mockRegister struct{} -func (x *mockRegister) ForTest(ctx *context.Context, args, reply interface{}) error { +func (*mockRegister) ForTest(ctx *context.Context, args, reply interface{}) error { return nil } -func (robj *mockRegister) Ping(ctx *context.Context, in string, out *string) error { +func (*mockRegister) Ping(ctx *context.Context, in string, out *string) error { *out = utils.Pong return nil } @@ -117,8 +95,8 @@ func (mkL *mockListener) Accept() (net.Conn, error) { return nil, utils.ErrDisconnected } -func (mkL *mockListener) Close() error { return mkL.p1.Close() } -func (mkL *mockListener) Addr() net.Addr { return nil } +func (mkL *mockListener) Close() error { return mkL.p1.Close() } +func (*mockListener) Addr() net.Addr { return nil } func testHandleRequest(t *testing.T) { cfgDflt := config.NewDefaultCGRConfig() @@ -126,8 +104,6 @@ func testHandleRequest(t *testing.T) { caps := engine.NewCaps(0, utils.MetaBusy) rcv := NewServer(caps) - rcv.rpcEnabled = true - req, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer([]byte("1"))) if err != nil { @@ -139,20 +115,19 @@ func testHandleRequest(t *testing.T) { if w.Body.String() != utils.EmptyString { t.Errorf("Expected: %q ,received: %q", utils.EmptyString, w.Body.String()) } - - rcv.StopBiRPC() } func testServeJSON(t *testing.T) { caps := engine.NewCaps(100, utils.MetaBusy) server = NewServer(caps) server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() buff := new(bytes.Buffer) log.SetOutput(buff) - - go server.ServeJSON(":88845", shdChan) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer server.Stop() + go server.ServeJSON(ctx, cancel, ":88845") runtime.Gosched() expected := "listen tcp: address 88845: invalid port" @@ -160,121 +135,6 @@ func testServeJSON(t *testing.T) { t.Errorf("Expected %+v, received %+v", expected, rcv) } - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeJSONFail(t *testing.T) { - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - p1, p2 := net.Pipe() - l := &mockListener{ - p1: p1, - } - go server.accept(l, utils.JSONCaps, newCapsJSONCodec, shdChan) - runtime.Gosched() - _, ok := <-shdChan.Done() - if ok { - t.Errorf("Expected to be close") - } - p2.Close() - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeJSONFailRpcEnabled(t *testing.T) { - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - server.rpcEnabled = false - - go server.serveCodec(":9999", utils.JSONCaps, newCapsJSONCodec, shdChan) - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeGOB(t *testing.T) { - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - go server.ServeGOB(":27697", shdChan) - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeHHTPPass(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - go server.ServeHTTP( - ":6555", - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeHHTPPassUseBasicAuth(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - go server.ServeHTTP( - ":56432", - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - !cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeHHTPEnableHttp(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - go server.ServeHTTP( - ":45779", - utils.EmptyString, - utils.EmptyString, - !cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - runtime.Gosched() - - shdChan.CloseOnce() - server.StopBiRPC() } func testServeHHTPFail(t *testing.T) { @@ -282,83 +142,30 @@ func testServeHHTPFail(t *testing.T) { caps := engine.NewCaps(100, utils.MetaBusy) server = NewServer(caps) server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - - go server.ServeHTTP( + var closed bool + ch := make(chan struct{}) + go server.ServeHTTP(func() { + closed = true + close(ch) + }, "invalid_port_format", cfgDflt.HTTPCfg().JsonRPCURL, cfgDflt.HTTPCfg().WSURL, cfgDflt.HTTPCfg().UseBasicAuth, cfgDflt.HTTPCfg().AuthUsers, - shdChan) + ) runtime.Gosched() - _, ok := <-shdChan.Done() - if ok { + select { + case <-ch: + case <-time.After(50 * time.Millisecond): + t.Fatal("Timeout") + } + if closed { t.Errorf("Expected to be close") } - server.StopBiRPC() -} - -func testServeHHTPFailEnableRpc(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - shdChan := utils.NewSyncedChan() - server.rpcEnabled = false - - go server.ServeHTTP(":1000", - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - shdChan.CloseOnce() - server.StopBiRPC() -} - -func testServeBiJSON(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - server.birpcSrv = birpc.NewBirpcServer() - - data := engine.NewInternalDB(nil, nil, true) - dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil) - - ss := sessions.NewSessionS(cfgDflt, dm, nil) - - go func() { - if err := server.ServeBiRPC(":3434", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err != nil { - t.Error(err) - } - }() - runtime.Gosched() -} - -func testServeBiJSONEmptyBiRPCServer(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - data := engine.NewInternalDB(nil, nil, true) - dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil) - - ss := sessions.NewSessionS(cfgDflt, dm, nil) - - expectedErr := "BiRPCServer should not be nil" - go func() { - if err := server.ServeBiRPC(":3430", "", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" { - t.Errorf("Expected %+v, received %+v", expectedErr, err) - } - }() - - runtime.Gosched() + server.Stop() } func testServeBiJSONInvalidPort(t *testing.T) { @@ -366,7 +173,6 @@ func testServeBiJSONInvalidPort(t *testing.T) { caps := engine.NewCaps(100, utils.MetaBusy) server = NewServer(caps) server.RpcRegister(new(mockRegister)) - server.birpcSrv = birpc.NewBirpcServer() data := engine.NewInternalDB(nil, nil, true) dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil) @@ -382,47 +188,6 @@ func testServeBiJSONInvalidPort(t *testing.T) { server.StopBiRPC() } -func testServeBiGoB(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - server.birpcSrv = birpc.NewBirpcServer() - - data := engine.NewInternalDB(nil, nil, true) - dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil) - - ss := sessions.NewSessionS(cfgDflt, dm, nil) - - go func() { - if err := server.ServeBiRPC("", ":9343", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err != nil { - t.Log(err) - } - }() - runtime.Gosched() -} - -func testServeBiGoBEmptyBiRPCServer(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - data := engine.NewInternalDB(nil, nil, true) - dm := engine.NewDataManager(data, cfgDflt.CacheCfg(), nil) - - ss := sessions.NewSessionS(cfgDflt, dm, nil) - - expectedErr := "BiRPCServer should not be nil" - go func() { - if err := server.ServeBiRPC("", ":93430", ss.OnBiJSONConnect, ss.OnBiJSONDisconnect); err == nil || err.Error() != "BiRPCServer should not be nil" { - t.Errorf("Expected %+v, received %+v", expectedErr, err) - } - }() - - runtime.Gosched() -} - func testServeBiGoBInvalidPort(t *testing.T) { cfgDflt := config.NewDefaultCGRConfig() caps := engine.NewCaps(100, utils.MetaBusy) @@ -444,121 +209,6 @@ func testServeBiGoBInvalidPort(t *testing.T) { server.StopBiRPC() } -func testServeGOBTLS(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - go server.ServeGOBTLS( - ":34476", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - 4, - cfgDflt.TLSCfg().ServerName, - shdChan, - ) - runtime.Gosched() - - server.StopBiRPC() -} - -func testServeJSONTls(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - go server.ServeJSONTLS( - ":64779", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - 4, - cfgDflt.TLSCfg().ServerName, - shdChan, - ) - runtime.Gosched() -} - -func testServeGOBPortFail(t *testing.T) { - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - buff := new(bytes.Buffer) - log.SetOutput(buff) - - go server.serveCodecTLS( - "34776", - utils.GOBCaps, - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - 4, - "Server_name", - newCapsGOBCodec, - shdChan, - ) - runtime.Gosched() - select { - case <-time.After(10 * time.Second): - t.Fatal("timeout") - case <-shdChan.Done(): - } - expected := "listen tcp: address 34776: missing port in address when listening" - if rcv := buff.String(); !strings.Contains(rcv, expected) { - t.Errorf("Expected %+v, received %+v", expected, rcv) - } - - log.SetOutput(os.Stderr) -} - -func testServeCodecTLSErr(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - //if rpc is not enabled, won t be able to serve - server.rpcEnabled = false - server.serveCodecTLS("13567", - utils.GOBCaps, - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - 4, - cfgDflt.TLSCfg().ServerName, - newCapsGOBCodec, - shdChan) - - //unable to load TLS config when there is an inexisting server certificate file - server.rpcEnabled = true - server.serveCodecTLS("13567", - utils.GOBCaps, - "/usr/share/cgrates/tls/inexisting_cert", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - 4, - cfgDflt.TLSCfg().ServerName, - newCapsGOBCodec, - shdChan) - - _, ok := <-shdChan.Done() - if ok { - t.Errorf("Expected to be close") - } -} - func testLoadTLSConfigErr(t *testing.T) { flPath := "/tmp/testLoadTLSConfigErr1" if err := os.MkdirAll(flPath, 0777); err != nil { @@ -603,188 +253,12 @@ TEST } } -func testServeHTTPTLS(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) +type mockListenError mockListener - shdChan := utils.NewSyncedChan() - - //cannot serve HHTPTls when rpc is not enabled - server.rpcEnabled = false - server.ServeHTTPTLS( - "17789", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - cfgDflt.TLSCfg().ServerPolicy, - cfgDflt.TLSCfg().ServerName, - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - //Invalid port address - server.rpcEnabled = true - go server.ServeHTTPTLS( - "17789", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - cfgDflt.TLSCfg().ServerPolicy, - cfgDflt.TLSCfg().ServerName, - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - runtime.Gosched() - - _, ok := <-shdChan.Done() - if ok { - t.Errorf("Expected to be close") - } -} - -func testServeHTTPTLSWithBasicAuth(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - //Invalid port address - server.rpcEnabled = true - go server.ServeHTTPTLS( - "57235", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - cfgDflt.TLSCfg().ServerPolicy, - cfgDflt.TLSCfg().ServerName, - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - !cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - runtime.Gosched() - - _, ok := <-shdChan.Done() - if ok { - t.Errorf("Expected to be close") - } -} - -func testServeHTTPTLSError(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - //Invalid port address - go server.ServeHTTPTLS( - "57235", - "/usr/share/cgrates/tls/inexisting_file", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - cfgDflt.TLSCfg().ServerPolicy, - cfgDflt.TLSCfg().ServerName, - cfgDflt.HTTPCfg().JsonRPCURL, - cfgDflt.HTTPCfg().WSURL, - !cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - runtime.Gosched() - - _, ok := <-shdChan.Done() - if ok { - t.Errorf("Expected to be close") - } -} - -func testServeHTTPTLSHttpNotEnabled(t *testing.T) { - cfgDflt := config.NewDefaultCGRConfig() - caps := engine.NewCaps(100, utils.MetaBusy) - server = NewServer(caps) - server.RpcRegister(new(mockRegister)) - - shdChan := utils.NewSyncedChan() - - server.httpEnabled = false - go server.ServeHTTPTLS( - "17789", - "/usr/share/cgrates/tls/server.crt", - "/usr/share/cgrates/tls/server.key", - "/usr/share/cgrates/tls/ca.crt", - cfgDflt.TLSCfg().ServerPolicy, - cfgDflt.TLSCfg().ServerName, - utils.EmptyString, - utils.EmptyString, - cfgDflt.HTTPCfg().UseBasicAuth, - cfgDflt.HTTPCfg().AuthUsers, - shdChan) - - shdChan.CloseOnce() -} - -func testBiRPCRegisterName(t *testing.T) { - caps := engine.NewCaps(0, utils.MetaBusy) - server := NewServer(caps) - - handler := struct{}{} - go server.BiRPCRegisterName(utils.APIerSv1Ping, handler) - runtime.Gosched() - - server.StopBiRPC() -} - -func testAcceptBiRPC(t *testing.T) { - caps := engine.NewCaps(0, utils.MetaBusy) - server := NewServer(caps) - server.RpcRegister(new(mockRegister)) - server.birpcSrv = birpc.NewBirpcServer() - - p1, p2 := net.Pipe() - l := &mockListener{ - p1: p1, - } - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) - rpc := jsonrpc.NewClient(p2) - var reply string - expected := "birpc: can't find method AttributeSv1.Ping" - if err := rpc.Call(context.TODO(), utils.AttributeSv1Ping, utils.CGREvent{}, &reply); err == nil || err.Error() != expected { - t.Errorf("Expected %+v, received %+v", expected, err) - } - - p2.Close() - runtime.Gosched() -} - -type mockListenError struct { - *mockListener -} - -func (mK *mockListenError) Accept() (net.Conn, error) { +func (*mockListenError) Accept() (net.Conn, error) { return nil, errors.New("use of closed network connection") } -func testAcceptBiRPCError(t *testing.T) { - caps := engine.NewCaps(10, utils.MetaBusy) - server := NewServer(caps) - server.RpcRegister(new(mockRegister)) - server.birpcSrv = birpc.NewBirpcServer() - - //it will contain "use of closed network connection" - l := new(mockListenError) - go server.acceptBiRPC(server.birpcSrv, l, utils.JSONCaps, newCapsBiRPCJSONCodec) - runtime.Gosched() -} - func testRpcRegisterActions(t *testing.T) { caps := engine.NewCaps(0, utils.MetaBusy) server := NewServer(caps) diff --git a/cores/server_test.go b/cores/server_test.go index 40ff6dae2..87e3e8b8b 100644 --- a/cores/server_test.go +++ b/cores/server_test.go @@ -45,6 +45,10 @@ func TestNewServer(t *testing.T) { } rcv := NewServer(caps) rcv.stopbiRPCServer = nil + rcv.httpServer = nil + rcv.httpsServer = nil + rcv.rpcServer = nil + rcv.birpcSrv = nil if !reflect.DeepEqual(expected, rcv) { t.Errorf("Expected %+v, received %+v", expected, rcv) } diff --git a/engine/connmanager.go b/engine/connmanager.go index 7fe614392..5c7b2ad8f 100644 --- a/engine/connmanager.go +++ b/engine/connmanager.go @@ -35,6 +35,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan birpc.Cli cM = &ConnManager{ cfg: cfg, rpcInternal: rpcInternal, + dynIntCh: NewRPCClientSet(rpcInternal), connCache: ltcache.NewCache(-1, 0, true, nil), } SetConnManager(cM) @@ -45,6 +46,7 @@ func NewConnManager(cfg *config.CGRConfig, rpcInternal map[string]chan birpc.Cli type ConnManager struct { cfg *config.CGRConfig rpcInternal map[string]chan birpc.ClientConnector + dynIntCh RPCClientSet connCache *ltcache.Cache } @@ -71,7 +73,7 @@ func (cM *ConnManager) getConn(ctx *context.Context, connID string) (conn birpc. connCfg = cM.cfg.RPCConns()[connID] for _, rpcConn := range connCfg.Conns { if rpcConn.Address == utils.MetaInternal { - intChan = IntRPC.GetInternalChanel() + intChan = cM.dynIntCh.GetInternalChanel() break } } @@ -203,3 +205,13 @@ func (cM *ConnManager) Reload() { Cache.Clear([]string{utils.CacheReplicationHosts}) cM.connCache.Clear() } + +func (cM *ConnManager) GetInternalChan() chan birpc.ClientConnector { + return cM.dynIntCh.GetInternalChanel() +} + +func (cM *ConnManager) AddInternalConn(connName, apiPrefix string, + iConnCh chan birpc.ClientConnector) { + cM.rpcInternal[connName] = iConnCh + cM.dynIntCh[apiPrefix] = iConnCh +} diff --git a/engine/libengine.go b/engine/libengine.go index 753cd02d2..393e0244d 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -91,30 +91,38 @@ func NewRPCConnection(ctx *context.Context, cfg *config.RemoteHost, keyPath, cer return } -// IntRPC is the global variable that is used to comunicate with all the subsystems internally -var IntRPC RPCClientSet - -// NewRPCClientSet initilalizates the map of connections -func NewRPCClientSet() (s RPCClientSet) { - return make(RPCClientSet) +func NewRPCClientSet(m map[string]chan birpc.ClientConnector) (s RPCClientSet) { + s = make(RPCClientSet) + for k, v := range map[string]string{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer): utils.AnalyzerSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS): utils.AdminSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): utils.AttributeSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): utils.CacheSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs): utils.CDRsV1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): utils.ChargerSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): utils.GuardianSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders): utils.LoaderSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): utils.ResourceSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): utils.SessionSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): utils.StatSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes): utils.RouteSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): utils.ThresholdSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): utils.ServiceManagerV1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): utils.ConfigSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): utils.CoreSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): utils.EeSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): utils.RateSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): utils.DispatcherSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): utils.AccountSv1, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): utils.ActionSv1, + } { + s[v] = m[k] + } + return } // RPCClientSet is a RPC ClientConnector for the internal subsystems -type RPCClientSet map[string]*rpcclient.RPCClient - -// AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration -func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan birpc.ClientConnector) { - rpc, err := rpcclient.NewRPCClient(context.Background(), utils.EmptyString, utils.EmptyString, false, - utils.EmptyString, utils.EmptyString, utils.EmptyString, - config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, - config.CgrConfig().GeneralCfg().ConnectTimeout, config.CgrConfig().GeneralCfg().ReplyTimeout, - rpcclient.InternalRPC, connChan, true, nil) - if err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error())) - return - } - s[name] = rpc -} +type RPCClientSet map[string]chan birpc.ClientConnector // GetInternalChanel is used when RPCClientSet is passed as internal connection for RPCPool func (s RPCClientSet) GetInternalChanel() chan birpc.ClientConnector { @@ -129,9 +137,21 @@ func (s RPCClientSet) Call(ctx *context.Context, method string, args interface{} if len(methodSplit) != 2 { return rpcclient.ErrUnsupporteServiceMethod } - conn, has := s[methodSplit[0]] + connCh, has := s[methodSplit[0]] if !has { return rpcclient.ErrUnsupporteServiceMethod } + var conn birpc.ClientConnector + ctx2, cancel := context.WithTimeout(ctx, config.CgrConfig().GeneralCfg().ConnectTimeout) + select { + case conn = <-connCh: + connCh <- conn + cancel() + if conn == nil { + return rpcclient.ErrDisconnected + } + case <-ctx2.Done(): + return ctx2.Err() + } return conn.Call(ctx, method, args, reply) } diff --git a/services/analyzers.go b/services/analyzers.go index f6efa5da1..d58ca36f8 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -23,6 +23,7 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" @@ -32,15 +33,16 @@ import ( // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, - filterSChan chan *engine.FilterS, shdChan *utils.SyncedChan, + filterSChan chan *engine.FilterS, internalAnalyzerSChan chan birpc.ClientConnector, - srvDep map[string]*sync.WaitGroup) *AnalyzerService { + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, server: server, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, srvDep: srvDep, } } @@ -52,7 +54,7 @@ type AnalyzerService struct { server *cores.Server filterSChan chan *engine.FilterS stopChan chan struct{} - shdChan *utils.SyncedChan + shtDwn context.CancelFunc anz *analyzers.AnalyzerService // rpc *v1.AnalyzerSv1 @@ -76,7 +78,7 @@ func (anz *AnalyzerService) Start() (err error) { go func(a *analyzers.AnalyzerService) { if err := a.ListenAndServe(anz.stopChan); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.AnalyzerS, err.Error())) - anz.shdChan.CloseOnce() + anz.shtDwn() } return }(anz.anz) diff --git a/services/asteriskagent.go b/services/asteriskagent.go index 7ae03feca..f401df211 100644 --- a/services/asteriskagent.go +++ b/services/asteriskagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/agents" @@ -32,11 +33,12 @@ import ( // NewAsteriskAgent returns the Asterisk Agent func NewAsteriskAgent(cfg *config.CGRConfig, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &AsteriskAgent{ cfg: cfg, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -46,7 +48,7 @@ func NewAsteriskAgent(cfg *config.CGRConfig, type AsteriskAgent struct { sync.RWMutex cfg *config.CGRConfig - shdChan *utils.SyncedChan + shtDwn context.CancelFunc stopChan chan struct{} smas []*agents.AsteriskAgent @@ -63,17 +65,17 @@ func (ast *AsteriskAgent) Start() (err error) { ast.Lock() defer ast.Unlock() - listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}, shdChan *utils.SyncedChan) { + listenAndServe := func(sma *agents.AsteriskAgent, stopChan chan struct{}) { if err := sma.ListenAndServe(stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> runtime error: %s!", utils.AsteriskAgent, err)) - shdChan.CloseOnce() + ast.shtDwn() } } ast.stopChan = make(chan struct{}) ast.smas = make([]*agents.AsteriskAgent, len(ast.cfg.AsteriskAgentCfg().AsteriskConns)) for connIdx := range ast.cfg.AsteriskAgentCfg().AsteriskConns { // Instantiate connections towards asterisk servers ast.smas[connIdx] = agents.NewAsteriskAgent(ast.cfg, connIdx, ast.connMgr) - go listenAndServe(ast.smas[connIdx], ast.stopChan, ast.shdChan) + go listenAndServe(ast.smas[connIdx], ast.stopChan) } return } diff --git a/services/cgr-engine.go b/services/cgr-engine.go index b33ce7873..0ff17ba31 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -20,13 +20,23 @@ package services import ( "fmt" + "io" + "log" + "path" + "runtime" + "runtime/pprof" "sync" + "time" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/registrarc" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) type CGREngine struct { @@ -34,18 +44,21 @@ type CGREngine struct { srvManager *servmanager.ServiceManager srvDep map[string]*sync.WaitGroup - cmConns map[string]chan birpc.ClientConnector + shdWg sync.WaitGroup + cM *engine.ConnManager + server *cores.Server + + cS *cores.CoreService } func (cgr *CGREngine) AddService(service servmanager.Service, connName, apiPrefix string, iConnCh chan birpc.ClientConnector) { cgr.srvManager.AddServices(service) - cgr.cmConns[utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources)] = iConnCh cgr.srvDep[service.ServiceName()] = new(sync.WaitGroup) - engine.IntRPC.AddInternalRPCClient(apiPrefix, iConnCh) + cgr.cM.AddInternalConn(connName, apiPrefix, iConnCh) } -func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) { +func (cgr *CGREngine) InitConfigFromPath(path, nodeID string, lgLevel int) (err error) { // Init config if cgr.cfg, err = config.NewCGRConfigFromPath(path); err != nil { err = fmt.Errorf("could not parse config: <%s>", err) @@ -69,6 +82,294 @@ func (cgr *CGREngine) InitConfigFromPath(path string, nodeID string) (err error) if nodeID != utils.EmptyString { cgr.cfg.GeneralCfg().NodeID = nodeID } + if lgLevel != -1 { // Modify the log level if provided by command arguments + cgr.cfg.GeneralCfg().LogLevel = lgLevel + } config.SetCgrConfig(cgr.cfg) // Share the config object return } + +func (cgr *CGREngine) InitServices(ctx *context.Context, shtDwn context.CancelFunc, pprofPath string, cpuPrfFl io.Closer, memPrfDir string, memPrfStop chan struct{}) (err error) { + iFilterSCh := make(chan *engine.FilterS, 1) + // init the channel here because we need to pass them to connManager + iServeManagerCh := make(chan birpc.ClientConnector, 1) + iConfigCh := make(chan birpc.ClientConnector, 1) + iCoreSv1Ch := make(chan birpc.ClientConnector, 1) + iCacheSCh := make(chan birpc.ClientConnector, 1) + iGuardianSCh := make(chan birpc.ClientConnector, 1) + iAnalyzerSCh := make(chan birpc.ClientConnector, 1) + iCDRServerCh := make(chan birpc.ClientConnector, 1) + iAttributeSCh := make(chan birpc.ClientConnector, 1) + iDispatcherSCh := make(chan birpc.ClientConnector, 1) + iSessionSCh := make(chan birpc.ClientConnector, 1) + iChargerSCh := make(chan birpc.ClientConnector, 1) + iThresholdSCh := make(chan birpc.ClientConnector, 1) + iStatSCh := make(chan birpc.ClientConnector, 1) + iResourceSCh := make(chan birpc.ClientConnector, 1) + iRouteSCh := make(chan birpc.ClientConnector, 1) + iAdminSCh := make(chan birpc.ClientConnector, 1) + iLoaderSCh := make(chan birpc.ClientConnector, 1) + iEEsCh := make(chan birpc.ClientConnector, 1) + iRateSCh := make(chan birpc.ClientConnector, 1) + iActionSCh := make(chan birpc.ClientConnector, 1) + iAccountSCh := make(chan birpc.ClientConnector, 1) + + cgr.srvDep = map[string]*sync.WaitGroup{ + utils.AnalyzerS: new(sync.WaitGroup), + utils.AdminS: new(sync.WaitGroup), + utils.AsteriskAgent: new(sync.WaitGroup), + utils.AttributeS: new(sync.WaitGroup), + utils.CDRServer: new(sync.WaitGroup), + utils.ChargerS: new(sync.WaitGroup), + utils.CoreS: new(sync.WaitGroup), + utils.DataDB: new(sync.WaitGroup), + utils.DiameterAgent: new(sync.WaitGroup), + utils.RegistrarC: new(sync.WaitGroup), + utils.DispatcherS: new(sync.WaitGroup), + utils.DNSAgent: new(sync.WaitGroup), + utils.EEs: new(sync.WaitGroup), + utils.ERs: new(sync.WaitGroup), + utils.FreeSWITCHAgent: new(sync.WaitGroup), + utils.GlobalVarS: new(sync.WaitGroup), + utils.HTTPAgent: new(sync.WaitGroup), + utils.KamailioAgent: new(sync.WaitGroup), + utils.LoaderS: new(sync.WaitGroup), + utils.RadiusAgent: new(sync.WaitGroup), + utils.RateS: new(sync.WaitGroup), + utils.ResourceS: new(sync.WaitGroup), + utils.RouteS: new(sync.WaitGroup), + utils.SchedulerS: new(sync.WaitGroup), + utils.SessionS: new(sync.WaitGroup), + utils.SIPAgent: new(sync.WaitGroup), + utils.StatS: new(sync.WaitGroup), + utils.StorDB: new(sync.WaitGroup), + utils.ThresholdS: new(sync.WaitGroup), + utils.ActionS: new(sync.WaitGroup), + utils.AccountS: new(sync.WaitGroup), + } + + cncReqsLimit := cgr.cfg.CoreSCfg().Caps + if utils.ConcurrentReqsLimit != 0 { // used as shared variable + cncReqsLimit = utils.ConcurrentReqsLimit + } + cncReqsStrategy := cgr.cfg.CoreSCfg().CapsStrategy + if len(utils.ConcurrentReqsStrategy) != 0 { + cncReqsStrategy = utils.ConcurrentReqsStrategy + } + caps := engine.NewCaps(cncReqsLimit, cncReqsStrategy) + + // initialize the connManager before creating the DMService + // because we need to pass the connection to it + cgr.cM = engine.NewConnManager(cgr.cfg, map[string]chan birpc.ClientConnector{ + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzer): iAnalyzerSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS): iAdminSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): iAttributeSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): iCacheSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs): iCDRServerCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers): iChargerSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaGuardian): iGuardianSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders): iLoaderSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources): iResourceSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS): iSessionSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats): iStatSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes): iRouteSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds): iThresholdSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager): iServeManagerCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig): iConfigCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore): iCoreSv1Ch, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): iEEsCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): iRateSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): iDispatcherSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts): iAccountSCh, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): iActionSCh, + utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS): iSessionSCh, + }) + gvService := NewGlobalVarS(cgr.cfg, cgr.srvDep) + cgr.shdWg.Add(1) + if err = gvService.Start(); err != nil { + return + } + dmService := NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep) + if dmService.ShouldRun() { // Some services can run without db, ie: ERs + cgr.shdWg.Add(1) + if err = dmService.Start(); err != nil { + return + } + } + + storDBService := NewStorDBService(cgr.cfg, cgr.srvDep) + if storDBService.ShouldRun() { // Some services can run without db, ie: ERs + cgr.shdWg.Add(1) + if err = storDBService.Start(); err != nil { + return + } + } + + // Rpc/http server + cgr.server = cores.NewServer(caps) + if len(cgr.cfg.HTTPCfg().RegistrarSURL) != 0 { + cgr.server.RegisterHTTPFunc(cgr.cfg.HTTPCfg().RegistrarSURL, registrarc.Registrar) + } + if cgr.cfg.ConfigSCfg().Enabled { + cgr.server.RegisterHTTPFunc(cgr.cfg.ConfigSCfg().URL, config.HandlerConfigS) + } + if pprofPath != utils.EmptyString { + cgr.server.RegisterProfiler(pprofPath) + } + + // init AnalyzerS + anz := NewAnalyzerService(cgr.cfg, cgr.server, iFilterSCh, iAnalyzerSCh, cgr.srvDep, shtDwn) + if anz.ShouldRun() { + cgr.shdWg.Add(1) + if err = anz.Start(); err != nil { + return + } + } + + // init CoreSv1 + coreS := NewCoreService(cgr.cfg, caps, cgr.server, iCoreSv1Ch, anz, cpuPrfFl, memPrfDir, memPrfStop, &cgr.shdWg, cgr.srvDep, shtDwn) + cgr.shdWg.Add(1) + if err = coreS.Start(); err != nil { + return + } + cgr.cS = coreS.GetCoreS() + + // init CacheS + cacheS := cgrInitCacheS(ctx, shtDwn, iCacheSCh, cgr.server, cgr.cfg, dmService.GetDM(), anz, coreS.GetCoreS().CapsStats) + engine.Cache = cacheS + + // init GuardianSv1 + cgrInitGuardianSv1(iGuardianSCh, cgr.server, anz) + + // Start ServiceManager + cgr.srvManager = servmanager.NewServiceManager(cgr.cfg, &cgr.shdWg, cgr.cM) + dspS := NewDispatcherService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, iDispatcherSCh, cgr.cM, anz, cgr.srvDep) + attrS := NewAttributeService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, iAttributeSCh, anz, dspS, cgr.srvDep) + dspH := NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, anz, cgr.srvDep) + chrS := NewChargerService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, + iChargerSCh, cgr.cM, anz, cgr.srvDep) + tS := NewThresholdService(cgr.cfg, dmService, cacheS, iFilterSCh, + cgr.cM, cgr.server, iThresholdSCh, anz, cgr.srvDep) + stS := NewStatService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, + iStatSCh, cgr.cM, anz, cgr.srvDep) + reS := NewResourceService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, + iResourceSCh, cgr.cM, anz, cgr.srvDep) + routeS := NewRouteService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.server, + iRouteSCh, cgr.cM, anz, cgr.srvDep) + + admS := NewAdminSv1Service(cgr.cfg, dmService, storDBService, iFilterSCh, cgr.server, + iAdminSCh, cgr.cM, anz, cgr.srvDep) + + cdrS := NewCDRServer(cgr.cfg, dmService, storDBService, iFilterSCh, cgr.server, iCDRServerCh, + cgr.cM, anz, cgr.srvDep) + + smg := NewSessionService(cgr.cfg, dmService, cgr.server, iSessionSCh, cgr.cM, anz, cgr.srvDep, shtDwn) + + ldrs := NewLoaderService(cgr.cfg, dmService, iFilterSCh, cgr.server, + iLoaderSCh, cgr.cM, anz, cgr.srvDep) + + cgr.srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, + admS, cdrS, smg, coreS, + NewEventReaderService(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), + NewDNSAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), + NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn), + NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn), + NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep, shtDwn), // partial reload + NewRadiusAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), // partial reload + NewDiameterAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), // partial reload + NewHTTPAgent(cgr.cfg, iFilterSCh, cgr.server, cgr.cM, cgr.srvDep), // no reload + ldrs, anz, dspS, dspH, dmService, storDBService, + NewEventExporterService(cgr.cfg, iFilterSCh, + cgr.cM, cgr.server, iEEsCh, anz, cgr.srvDep), + NewRateService(cgr.cfg, cacheS, iFilterSCh, dmService, + cgr.server, iRateSCh, anz, cgr.srvDep), + NewSIPAgent(cgr.cfg, iFilterSCh, cgr.cM, cgr.srvDep, shtDwn), + NewActionService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.cM, cgr.server, iActionSCh, anz, cgr.srvDep), + NewAccountService(cgr.cfg, dmService, cacheS, iFilterSCh, cgr.cM, cgr.server, iAccountSCh, anz, cgr.srvDep), + ) + return +} + +func (cgr *CGREngine) Start(ctx *context.Context, shtDw context.CancelFunc, flags *CGREngineFlags) (err error) { + var vers string + goVers := runtime.Version() + if vers, err = utils.GetCGRVersion(); err != nil { + return + } + if *flags.Version { + fmt.Println(vers) + return + } + if *flags.PidFile != utils.EmptyString { + cgrWritePid(*flags.PidFile) + } + if *flags.Singlecpu { + runtime.GOMAXPROCS(1) // Having multiple cpus may slow down computing due to CPU management, to be reviewed in future Go releases + } + cgr.shdWg.Add(1) + go cgrSingnalHandler(ctx, shtDw, cgr.cfg, &cgr.shdWg) + + var stopMemProf chan struct{} + if *flags.MemPrfDir != utils.EmptyString { + cgr.shdWg.Add(1) + stopMemProf = make(chan struct{}) + go cores.MemProfiling(*flags.MemPrfDir, *flags.MemPrfInterval, *flags.MemPrfNoF, &cgr.shdWg, stopMemProf, shtDw) + defer func() { //here + if cgr.cS == nil { + close(stopMemProf) + } + }() + } + + var cpuProfileFile io.Closer + if *flags.CpuPrfDir != utils.EmptyString { + cpuProfileFile, err = cores.StartCPUProfiling(path.Join(*flags.CpuPrfDir, utils.CpuPathCgr)) + if err != nil { + return + } + defer func() { //here + if cgr.cS == nil { + pprof.StopCPUProfile() + cpuProfileFile.Close() + } + }() + } + + if *flags.ScheduledShutDown != utils.EmptyString { + var shtDwDur time.Duration + if shtDwDur, err = utils.ParseDurationWithNanosecs(*flags.ScheduledShutDown); err != nil { + return + } + cgr.shdWg.Add(1) + go func() { // Schedule shutdown + tm := time.NewTimer(shtDwDur) + select { + case <-tm.C: + shtDw() + case <-ctx.Done(): + tm.Stop() + } + cgr.shdWg.Done() + }() + } + + // Init config + if err = cgr.InitConfigFromPath(*flags.CfgPath, *flags.NodeID, *flags.LogLevel); err != nil { + return + } + if *flags.CheckConfig { + return + } + + // init syslog + if utils.Logger, err = utils.Newlogger(utils.FirstNonEmpty(*flags.SysLogger, + cgr.cfg.GeneralCfg().Logger), cgr.cfg.GeneralCfg().NodeID); err != nil { + log.Fatalf("Could not initialize syslog connection, err: <%s>", err) + return + } + utils.Logger.SetLogLevel(cgr.cfg.GeneralCfg().LogLevel) + utils.Logger.Info(fmt.Sprintf(" starting version <%s><%s>", vers, goVers)) + cgr.cfg.LazySanityCheck() + + return +} diff --git a/services/cores.go b/services/cores.go index 09440bccc..37ee7c992 100644 --- a/services/cores.go +++ b/services/cores.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/apis" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" @@ -34,10 +35,11 @@ import ( // NewCoreService returns the Core Service func NewCoreService(cfg *config.CGRConfig, caps *engine.Caps, server *cores.Server, internalCoreSChan chan birpc.ClientConnector, anz *AnalyzerService, - fileCpu io.Closer, fileMEM string, shdWg *sync.WaitGroup, stopMemPrf chan struct{}, - shdChan *utils.SyncedChan, srvDep map[string]*sync.WaitGroup) *CoreService { + fileCpu io.Closer, fileMEM string, stopMemPrf chan struct{}, + shdWg *sync.WaitGroup, srvDep map[string]*sync.WaitGroup, + shtDw context.CancelFunc) *CoreService { return &CoreService{ - shdChan: shdChan, + shtDw: shtDw, shdWg: shdWg, stopMemPrf: stopMemPrf, connChan: internalCoreSChan, @@ -60,7 +62,7 @@ type CoreService struct { stopChan chan struct{} shdWg *sync.WaitGroup stopMemPrf chan struct{} - shdChan *utils.SyncedChan + shtDw context.CancelFunc fileCpu io.Closer fileMem string cS *cores.CoreService @@ -80,7 +82,7 @@ func (cS *CoreService) Start() (err error) { defer cS.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CoreS)) cS.stopChan = make(chan struct{}) - cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.shdWg, cS.stopMemPrf, cS.shdChan) + cS.cS = cores.NewCoreService(cS.cfg, cS.caps, cS.fileCpu, cS.fileMem, cS.stopChan, cS.stopMemPrf, cS.shdWg, cS.shtDw) cS.rpc = apis.NewCoreSv1(cS.cS) srv, _ := birpc.NewService(cS.rpc, utils.EmptyString, false) if !cS.cfg.DispatcherSCfg().Enabled { diff --git a/services/diameteragent.go b/services/diameteragent.go index 689bcaded..1376a2182 100644 --- a/services/diameteragent.go +++ b/services/diameteragent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,12 +32,13 @@ import ( // NewDiameterAgent returns the Diameter Agent func NewDiameterAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &DiameterAgent{ cfg: cfg, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -47,8 +49,8 @@ type DiameterAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - shdChan *utils.SyncedChan stopChan chan struct{} + shtDwn context.CancelFunc da *agents.DiameterAgent connMgr *engine.ConnManager @@ -87,7 +89,7 @@ func (da *DiameterAgent) start(filterS *engine.FilterS) (err error) { if err = d.ListenAndServe(da.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.DiameterAgent, err)) - da.shdChan.CloseOnce() + da.shtDwn() } }(da.da) return diff --git a/services/dnsagent.go b/services/dnsagent.go index 6a9e1185f..793fbe26d 100644 --- a/services/dnsagent.go +++ b/services/dnsagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,12 +32,13 @@ import ( // NewDNSAgent returns the DNS Agent func NewDNSAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &DNSAgent{ cfg: cfg, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -47,7 +49,7 @@ type DNSAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - shdChan *utils.SyncedChan + shtDwn context.CancelFunc dns *agents.DNSAgent connMgr *engine.ConnManager @@ -98,7 +100,7 @@ func (dns *DNSAgent) Reload() (err error) { func (dns *DNSAgent) listenAndServe() (err error) { if err = dns.dns.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.DNSAgent, err.Error())) - dns.shdChan.CloseOnce() // stop the engine here + dns.shtDwn() // stop the engine here } return } diff --git a/services/ers.go b/services/ers.go index 7f182293b..9035fb16a 100644 --- a/services/ers.go +++ b/services/ers.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" @@ -31,13 +32,14 @@ import ( // NewEventReaderService returns the EventReader Service func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -48,7 +50,7 @@ type EventReaderService struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - shdChan *utils.SyncedChan + shtDwn context.CancelFunc ers *ers.ERService rldChan chan struct{} @@ -83,7 +85,7 @@ func (erS *EventReaderService) Start() (err error) { func (erS *EventReaderService) listenAndServe(ers *ers.ERService, stopChan chan struct{}, rldChan chan struct{}) (err error) { if err = ers.ListenAndServe(stopChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.ERs, err.Error())) - erS.shdChan.CloseOnce() + erS.shtDwn() } return } diff --git a/services/freeswitchagent.go b/services/freeswitchagent.go index fb731f7f3..c1a5706e4 100644 --- a/services/freeswitchagent.go +++ b/services/freeswitchagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/agents" @@ -32,11 +33,12 @@ import ( // NewFreeswitchAgent returns the Freeswitch Agent func NewFreeswitchAgent(cfg *config.CGRConfig, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &FreeswitchAgent{ cfg: cfg, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -45,8 +47,8 @@ func NewFreeswitchAgent(cfg *config.CGRConfig, // FreeswitchAgent implements Agent interface type FreeswitchAgent struct { sync.RWMutex - cfg *config.CGRConfig - shdChan *utils.SyncedChan + cfg *config.CGRConfig + shtDwn context.CancelFunc fS *agents.FSsessions connMgr *engine.ConnManager @@ -64,12 +66,7 @@ func (fS *FreeswitchAgent) Start() (err error) { fS.fS = agents.NewFSsessions(fS.cfg.FsAgentCfg(), fS.cfg.GeneralCfg().DefaultTimezone, fS.connMgr) - go func(f *agents.FSsessions) { - if err := f.Connect(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - fS.shdChan.CloseOnce() // stop the engine here - } - }(fS.fS) + go fS.connect(fS.fS) return } @@ -81,14 +78,14 @@ func (fS *FreeswitchAgent) Reload() (err error) { return } fS.fS.Reload() - go fS.reload(fS.fS) + go fS.connect(fS.fS) return } -func (fS *FreeswitchAgent) reload(f *agents.FSsessions) (err error) { +func (fS *FreeswitchAgent) connect(f *agents.FSsessions) (err error) { if err := fS.fS.Connect(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.FreeSWITCHAgent, err)) - fS.shdChan.CloseOnce() // stop the engine here + fS.shtDwn() // stop the engine here } return } diff --git a/services/kamailioagent.go b/services/kamailioagent.go index 1330ac032..b241eb1c8 100644 --- a/services/kamailioagent.go +++ b/services/kamailioagent.go @@ -23,6 +23,7 @@ import ( "strings" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/agents" @@ -33,11 +34,12 @@ import ( // NewKamailioAgent returns the Kamailio Agent func NewKamailioAgent(cfg *config.CGRConfig, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &KamailioAgent{ cfg: cfg, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -46,8 +48,8 @@ func NewKamailioAgent(cfg *config.CGRConfig, // KamailioAgent implements Agent interface type KamailioAgent struct { sync.RWMutex - cfg *config.CGRConfig - shdChan *utils.SyncedChan + cfg *config.CGRConfig + shtDwn context.CancelFunc kam *agents.KamailioAgent connMgr *engine.ConnManager @@ -66,13 +68,7 @@ func (kam *KamailioAgent) Start() (err error) { kam.kam = agents.NewKamailioAgent(kam.cfg.KamAgentCfg(), kam.connMgr, utils.FirstNonEmpty(kam.cfg.KamAgentCfg().Timezone, kam.cfg.GeneralCfg().DefaultTimezone)) - go func(k *agents.KamailioAgent) { - if err = k.Connect(); err != nil && - !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log - utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - kam.shdChan.CloseOnce() - } - }(kam.kam) + go kam.connect(kam.kam) return } @@ -84,17 +80,16 @@ func (kam *KamailioAgent) Reload() (err error) { return } kam.kam.Reload() - go kam.reload(kam.kam) + go kam.connect(kam.kam) return } -func (kam *KamailioAgent) reload(k *agents.KamailioAgent) (err error) { +func (kam *KamailioAgent) connect(k *agents.KamailioAgent) (err error) { if err = k.Connect(); err != nil { - if strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log - return + if !strings.Contains(err.Error(), "use of closed network connection") { // if closed by us do not log + utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) + kam.shtDwn() } - utils.Logger.Err(fmt.Sprintf("<%s> error: %s", utils.KamailioAgent, err)) - kam.shdChan.CloseOnce() } return } diff --git a/services/libcgr-engine.go b/services/libcgr-engine.go index fea49c696..c4d8a435c 100644 --- a/services/libcgr-engine.go +++ b/services/libcgr-engine.go @@ -44,42 +44,42 @@ func NewCGREngineFlags() *CGREngineFlags { fs := flag.NewFlagSet(utils.CgrEngine, flag.ContinueOnError) return &CGREngineFlags{ FlagSet: fs, - Cfgpath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."), + CfgPath: fs.String(utils.CfgPathCgr, utils.ConfigPath, "Configuration directory path."), Version: fs.Bool(utils.VersionCgr, false, "Prints the application version."), - Checkconfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"), - Pidfile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"), - Httppprofpath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"), - Cpuprofdir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"), - Memprofdir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"), - Memprofinterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"), - Memprofnrfiles: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"), - Scheduledshutdown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"), + PidFile: fs.String(utils.PidCgr, utils.EmptyString, "Write pid file"), + HttPrfPath: fs.String(utils.HttpPrfPthCgr, utils.EmptyString, "http address used for program profiling"), + CpuPrfDir: fs.String(utils.CpuProfDirCgr, utils.EmptyString, "write cpu profile to files"), + MemPrfDir: fs.String(utils.MemProfDirCgr, utils.EmptyString, "write memory profile to file"), + MemPrfInterval: fs.Duration(utils.MemProfIntervalCgr, 5*time.Second, "Time between memory profile saves"), + MemPrfNoF: fs.Int(utils.MemProfNrFilesCgr, 1, "Number of memory profile to write"), + ScheduledShutDown: fs.String(utils.ScheduledShutdownCgr, utils.EmptyString, "shutdown the engine after this duration"), Singlecpu: fs.Bool(utils.SingleCpuCgr, false, "Run on single CPU core"), - Syslogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"), - Nodeid: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"), - Loglevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"), + SysLogger: fs.String(utils.LoggerCfg, utils.EmptyString, "logger <*syslog|*stdout>"), + NodeID: fs.String(utils.NodeIDCfg, utils.EmptyString, "The node ID of the engine"), + LogLevel: fs.Int(utils.LogLevelCfg, -1, "Log level (0-emergency to 7-debug)"), Preload: fs.String(utils.PreloadCgr, utils.EmptyString, "LoaderIDs used to load the data before the engine starts"), + CheckConfig: fs.Bool(utils.CheckCfgCgr, false, "Verify the config without starting the engine"), } } type CGREngineFlags struct { *flag.FlagSet - Cfgpath *string + CfgPath *string Version *bool - Checkconfig *bool - Pidfile *string - Httppprofpath *string - Cpuprofdir *string - Memprofdir *string - Memprofinterval *time.Duration - Memprofnrfiles *int - Scheduledshutdown *string + PidFile *string + HttPrfPath *string + CpuPrfDir *string + MemPrfDir *string + MemPrfInterval *time.Duration + MemPrfNoF *int + ScheduledShutDown *string Singlecpu *bool - Syslogger *string - Nodeid *string - Loglevel *int + SysLogger *string + NodeID *string + LogLevel *int Preload *string + CheckConfig *bool } func cgrSingnalHandler(ctx *context.Context, shutdown context.CancelFunc, @@ -231,18 +231,17 @@ func cgrInitConfigSv1(iConfigCh chan birpc.ClientConnector, iConfigCh <- rpc } -func startRPC(server *cores.Server, internalAdminSChan, - internalCdrSChan, internalRsChan, internalStatSChan, +func cgrStartRPC(ctx *context.Context, shtdwnEngine context.CancelFunc, + cfg *config.CGRConfig, server *cores.Server, + internalAdminSChan, internalCdrSChan, internalRsChan, internalStatSChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, - internalLoaderSChan, internalCacheSChan, - internalEEsChan, internalRateSChan, internalActionSChan, - internalAccountSChan chan birpc.ClientConnector, - shdChan *utils.SyncedChan) { + internalLoaderSChan, internalCacheSChan, internalEEsChan, internalRateSChan, + internalActionSChan, internalAccountSChan chan birpc.ClientConnector) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests - // case cdrs := <-internalCdrSChan: - // internalCdrSChan <- cdrs + case cdrs := <-internalCdrSChan: + internalCdrSChan <- cdrs case smg := <-internalSessionSChan: internalSessionSChan <- smg case rls := <-internalRsChan: @@ -259,85 +258,30 @@ func startRPC(server *cores.Server, internalAdminSChan, internalThdSChan <- thS case rtS := <-internalRouteSChan: internalRouteSChan <- rtS - // case analyzerS := <-internalAnalyzerSChan: - // internalAnalyzerSChan <- analyzerS + case analyzerS := <-internalAnalyzerSChan: + internalAnalyzerSChan <- analyzerS case loaderS := <-internalLoaderSChan: internalLoaderSChan <- loaderS case chS := <-internalCacheSChan: // added in order to start the RPC before precaching is done internalCacheSChan <- chS - // case eeS := <-internalEEsChan: - // internalEEsChan <- eeS + case eeS := <-internalEEsChan: + internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS case actionS := <-internalActionSChan: internalActionSChan <- actionS case accountS := <-internalAccountSChan: internalAccountSChan <- accountS - case <-shdChan.Done(): + case <-ctx.Done(): return } } else { select { case dispatcherS := <-internalDispatcherSChan: internalDispatcherSChan <- dispatcherS - case <-shdChan.Done(): + case <-ctx.Done(): return } } - - go server.ServeJSON(cfg.ListenCfg().RPCJSONListen, shdChan) - go server.ServeGOB(cfg.ListenCfg().RPCGOBListen, shdChan) - go server.ServeHTTP( - cfg.ListenCfg().HTTPListen, - cfg.HTTPCfg().JsonRPCURL, - cfg.HTTPCfg().WSURL, - cfg.HTTPCfg().UseBasicAuth, - cfg.HTTPCfg().AuthUsers, - shdChan, - ) - if (len(cfg.ListenCfg().RPCGOBTLSListen) != 0 || - len(cfg.ListenCfg().RPCJSONTLSListen) != 0 || - len(cfg.ListenCfg().HTTPTLSListen) != 0) && - (len(cfg.TLSCfg().ServerCerificate) == 0 || - len(cfg.TLSCfg().ServerKey) == 0) { - utils.Logger.Warning("WARNING: missing TLS certificate/key file!") - return - } - if cfg.ListenCfg().RPCGOBTLSListen != utils.EmptyString { - go server.ServeGOBTLS( - cfg.ListenCfg().RPCGOBTLSListen, - cfg.TLSCfg().ServerCerificate, - cfg.TLSCfg().ServerKey, - cfg.TLSCfg().CaCertificate, - cfg.TLSCfg().ServerPolicy, - cfg.TLSCfg().ServerName, - shdChan, - ) - } - if cfg.ListenCfg().RPCJSONTLSListen != utils.EmptyString { - go server.ServeJSONTLS( - cfg.ListenCfg().RPCJSONTLSListen, - cfg.TLSCfg().ServerCerificate, - cfg.TLSCfg().ServerKey, - cfg.TLSCfg().CaCertificate, - cfg.TLSCfg().ServerPolicy, - cfg.TLSCfg().ServerName, - shdChan, - ) - } - if cfg.ListenCfg().HTTPTLSListen != utils.EmptyString { - go server.ServeHTTPTLS( - cfg.ListenCfg().HTTPTLSListen, - cfg.TLSCfg().ServerCerificate, - cfg.TLSCfg().ServerKey, - cfg.TLSCfg().CaCertificate, - cfg.TLSCfg().ServerPolicy, - cfg.TLSCfg().ServerName, - cfg.HTTPCfg().JsonRPCURL, - cfg.HTTPCfg().WSURL, - cfg.HTTPCfg().UseBasicAuth, - cfg.HTTPCfg().AuthUsers, - shdChan, - ) - } + server.StartServer(ctx, shtdwnEngine, cfg) } diff --git a/services/radiusagent.go b/services/radiusagent.go index d16690a0b..2c631de08 100644 --- a/services/radiusagent.go +++ b/services/radiusagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,12 +32,13 @@ import ( // NewRadiusAgent returns the Radius Agent func NewRadiusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &RadiusAgent{ cfg: cfg, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -47,7 +49,7 @@ type RadiusAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - shdChan *utils.SyncedChan + shtDwn context.CancelFunc stopChan chan struct{} rad *agents.RadiusAgent @@ -89,7 +91,7 @@ func (rad *RadiusAgent) Start() (err error) { func (rad *RadiusAgent) listenAndServe(r *agents.RadiusAgent) (err error) { if err = r.ListenAndServe(rad.stopChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.RadiusAgent, err.Error())) - rad.shdChan.CloseOnce() + rad.shtDwn() } return } diff --git a/services/sessions.go b/services/sessions.go index fb1019e30..61b3d7429 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -20,9 +20,11 @@ package services import ( "fmt" - "github.com/cgrates/cgrates/apis" "sync" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/apis" + "github.com/cgrates/birpc" "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" @@ -36,14 +38,15 @@ import ( // NewSessionService returns the Session Service func NewSessionService(cfg *config.CGRConfig, dm *DataDBService, server *cores.Server, internalChan chan birpc.ClientConnector, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, anz *AnalyzerService, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &SessionService{ connChan: internalChan, cfg: cfg, dm: dm, server: server, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, anz: anz, srvDep: srvDep, @@ -56,7 +59,7 @@ type SessionService struct { cfg *config.CGRConfig dm *DataDBService server *cores.Server - shdChan *utils.SyncedChan + shtDwn context.CancelFunc stopChan chan struct{} sm *sessions.SessionS @@ -116,7 +119,7 @@ func (smg *SessionService) start() (err error) { smg.Lock() smg.bircpEnabled = false smg.Unlock() - smg.shdChan.CloseOnce() + smg.shtDwn() } return } diff --git a/services/sipagent.go b/services/sipagent.go index 9c16ffbd7..e351ec16c 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -22,6 +22,7 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -31,12 +32,13 @@ import ( // NewSIPAgent returns the sip Agent func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup, + shtDwn context.CancelFunc) servmanager.Service { return &SIPAgent{ cfg: cfg, filterSChan: filterSChan, - shdChan: shdChan, + shtDwn: shtDwn, connMgr: connMgr, srvDep: srvDep, } @@ -47,7 +49,7 @@ type SIPAgent struct { sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS - shdChan *utils.SyncedChan + shtDwn context.CancelFunc sip *agents.SIPAgent connMgr *engine.ConnManager @@ -74,15 +76,17 @@ func (sip *SIPAgent) Start() (err error) { utils.SIPAgent, err)) return } - go func() { - if err = sip.sip.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - sip.shdChan.CloseOnce() // stop the engine here - } - }() + go sip.listenAndServe() return } +func (sip *SIPAgent) listenAndServe() { + if err := sip.sip.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) + sip.shtDwn() // stop the engine here + } +} + // Reload handles the change of config func (sip *SIPAgent) Reload() (err error) { if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { @@ -93,12 +97,7 @@ func (sip *SIPAgent) Reload() (err error) { sip.oldListen = sip.cfg.SIPAgentCfg().Listen sip.sip.InitStopChan() sip.Unlock() - go func() { - if err := sip.sip.ListenAndServe(); err != nil { - utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - sip.shdChan.CloseOnce() // stop the engine here - } - }() + go sip.listenAndServe() return } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 2ff8cd6a7..c8cf91490 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -22,21 +22,20 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // NewServiceManager returns a service manager -func NewServiceManager(cfg *config.CGRConfig, shdChan *utils.SyncedChan, shdWg *sync.WaitGroup, connMgr *engine.ConnManager) *ServiceManager { - sm := &ServiceManager{ +func NewServiceManager(cfg *config.CGRConfig, shdWg *sync.WaitGroup, connMgr *engine.ConnManager) *ServiceManager { + return &ServiceManager{ cfg: cfg, subsystems: make(map[string]Service), - shdChan: shdChan, shdWg: shdWg, connMgr: connMgr, } - return sm } // ServiceManager handles service management ran by the engine @@ -45,7 +44,6 @@ type ServiceManager struct { cfg *config.CGRConfig subsystems map[string]Service - shdChan *utils.SyncedChan shdWg *sync.WaitGroup connMgr *engine.ConnManager } @@ -58,8 +56,8 @@ func (srvMngr *ServiceManager) GetConfig() *config.CGRConfig { } // StartServices starts all enabled services -func (srvMngr *ServiceManager) StartServices() (err error) { - go srvMngr.handleReload() +func (srvMngr *ServiceManager) StartServices(ctx *context.Context, shtDwn context.CancelFunc) (err error) { + go srvMngr.handleReload(ctx, shtDwn) for _, service := range srvMngr.subsystems { if service.ShouldRun() && !service.IsRunning() { srvMngr.shdWg.Add(1) @@ -67,7 +65,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) { if err := srv.Start(); err != nil && err != utils.ErrServiceAlreadyRunning { // in case the service was started in another gorutine utils.Logger.Err(fmt.Sprintf("<%s> failed to start %s because: %s", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.shdChan.CloseOnce() + shtDwn() } }(service) } @@ -87,100 +85,100 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { srvMngr.Unlock() } -func (srvMngr *ServiceManager) handleReload() { +func (srvMngr *ServiceManager) handleReload(ctx *context.Context, shtDwn context.CancelFunc) { for { select { - case <-srvMngr.shdChan.Done(): + case <-ctx.Done(): srvMngr.ShutdownServices() return case <-srvMngr.GetConfig().GetReloadChan(config.AttributeSJSON): - go srvMngr.reloadService(utils.AttributeS) + go srvMngr.reloadService(utils.AttributeS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSJSON): - go srvMngr.reloadService(utils.ChargerS) + go srvMngr.reloadService(utils.ChargerS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.ThresholdSJSON): - go srvMngr.reloadService(utils.ThresholdS) + go srvMngr.reloadService(utils.ThresholdS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.StatSJSON): - go srvMngr.reloadService(utils.StatS) + go srvMngr.reloadService(utils.StatS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.ResourceSJSON): - go srvMngr.reloadService(utils.ResourceS) + go srvMngr.reloadService(utils.ResourceS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.RouteSJSON): - go srvMngr.reloadService(utils.RouteS) + go srvMngr.reloadService(utils.RouteS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.AdminSJSON): - go srvMngr.reloadService(utils.AdminS) + go srvMngr.reloadService(utils.AdminS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.CDRsJSON): - go srvMngr.reloadService(utils.CDRServer) + go srvMngr.reloadService(utils.CDRServer, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJSON): - go srvMngr.reloadService(utils.SessionS) + go srvMngr.reloadService(utils.SessionS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.ERsJSON): - go srvMngr.reloadService(utils.ERs) + go srvMngr.reloadService(utils.ERs, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJSON): - go srvMngr.reloadService(utils.DNSAgent) + go srvMngr.reloadService(utils.DNSAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSON): - go srvMngr.reloadService(utils.FreeSWITCHAgent) + go srvMngr.reloadService(utils.FreeSWITCHAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSON): - go srvMngr.reloadService(utils.KamailioAgent) + go srvMngr.reloadService(utils.KamailioAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSON): - go srvMngr.reloadService(utils.AsteriskAgent) + go srvMngr.reloadService(utils.AsteriskAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.RadiusAgentJSON): - go srvMngr.reloadService(utils.RadiusAgent) + go srvMngr.reloadService(utils.RadiusAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.DiameterAgentJSON): - go srvMngr.reloadService(utils.DiameterAgent) + go srvMngr.reloadService(utils.DiameterAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.HTTPAgentJSON): - go srvMngr.reloadService(utils.HTTPAgent) + go srvMngr.reloadService(utils.HTTPAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.LoaderSJSON): - go srvMngr.reloadService(utils.LoaderS) + go srvMngr.reloadService(utils.LoaderS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerSJSON): - go srvMngr.reloadService(utils.AnalyzerS) + go srvMngr.reloadService(utils.AnalyzerS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherSJSON): - go srvMngr.reloadService(utils.DispatcherS) + go srvMngr.reloadService(utils.DispatcherS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.DataDBJSON): - go srvMngr.reloadService(utils.DataDB) + go srvMngr.reloadService(utils.DataDB, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.StorDBJSON): - go srvMngr.reloadService(utils.StorDB) + go srvMngr.reloadService(utils.StorDB, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.EEsJSON): - go srvMngr.reloadService(utils.EEs) + go srvMngr.reloadService(utils.EEs, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.RateSJSON): - go srvMngr.reloadService(utils.RateS) + go srvMngr.reloadService(utils.RateS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJSON): go srvMngr.connMgr.Reload() case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJSON): - go srvMngr.reloadService(utils.SIPAgent) + go srvMngr.reloadService(utils.SIPAgent, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.RegistrarCJSON): - go srvMngr.reloadService(utils.RegistrarC) + go srvMngr.reloadService(utils.RegistrarC, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.HTTPJSON): - go srvMngr.reloadService(utils.GlobalVarS) + go srvMngr.reloadService(utils.GlobalVarS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.AccountSJSON): - go srvMngr.reloadService(utils.AccountS) + go srvMngr.reloadService(utils.AccountS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.ActionSJSON): - go srvMngr.reloadService(utils.ActionS) + go srvMngr.reloadService(utils.ActionS, shtDwn) case <-srvMngr.GetConfig().GetReloadChan(config.CoreSJSON): - go srvMngr.reloadService(utils.CoreS) + go srvMngr.reloadService(utils.CoreS, shtDwn) } // handle RPC server } } -func (srvMngr *ServiceManager) reloadService(srviceName string) (err error) { +func (srvMngr *ServiceManager) reloadService(srviceName string, shtDwn context.CancelFunc) (err error) { srv := srvMngr.GetService(srviceName) if srv.ShouldRun() { if srv.IsRunning() { if err = srv.Reload(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to reload <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.shdChan.CloseOnce() + shtDwn() return // stop if we encounter an error } } else { srvMngr.shdWg.Add(1) if err = srv.Start(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to start <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.shdChan.CloseOnce() + shtDwn() return // stop if we encounter an error } } } else if srv.IsRunning() { if err = srv.Shutdown(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> failed to stop service <%s> err <%s>", utils.ServiceManager, srv.ServiceName(), err)) - srvMngr.shdChan.CloseOnce() + shtDwn() } srvMngr.shdWg.Done() } diff --git a/utils/consts.go b/utils/consts.go index 3b51620b2..0f53c343a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1435,9 +1435,6 @@ const ( CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV1ProcessEvent = "CDRsV1.ProcessEvent" CDRsV1Ping = "CDRsV1.Ping" - CDRsV2 = "CDRsV2" - CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost" - CDRsV2ProcessEvent = "CDRsV2.ProcessEvent" ) // EEs