diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index f729aff29..3363f7a84 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -700,19 +700,23 @@ func main() { attrS.GetIntenternalChan(), tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalDispatcherSChan) schS.SetCdrsConns(cdrS.GetIntenternalChan()) - /* - smg := services.NewSessionService() - */ + + smg := services.NewSessionService(cfg, dm, server, cdrS.GetIntenternalChan(), + rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(), + tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(), + attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan) + srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals, - rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS) /* smg, - services.NewEventReaderService(), - services.NewDNSAgent(), - services.NewFreeswitchAgent(), - services.NewKamailioAgent(), - services.NewAsteriskAgent(), // partial reload - services.NewRadiusAgent(), // partial reload - services.NewDiameterAgent(), // partial reload - services.NewHTTPAgent(), // no reload + rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg) + /* + services.NewEventReaderService(), + services.NewDNSAgent(), + services.NewFreeswitchAgent(), + services.NewKamailioAgent(), + services.NewAsteriskAgent(), // partial reload + services.NewRadiusAgent(), // partial reload + services.NewDiameterAgent(), // partial reload + services.NewHTTPAgent(), // no reload */ /* @@ -734,7 +738,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.ApierV1, rals.GetAPIv1().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) - engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier + engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) @@ -743,7 +747,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan()) - // engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier + engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan()) @@ -771,13 +775,11 @@ func main() { go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan) // Serve rpc connections - /* - go startRpc(server, internalRaterChan, cdrS.GetIntenternalChan(), - reS.GetIntenternalChan(), stS.GetIntenternalChan(), - attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), - supS.GetIntenternalChan(), internalSMGChan, internalAnalyzerSChan, - internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan) - */ + go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(), + reS.GetIntenternalChan(), stS.GetIntenternalChan(), + attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(), + supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan, + internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan) <-exitChan if *cpuProfDir != "" { // wait to end cpuProfiling diff --git a/services/sessions.go b/services/sessions.go index 5c44675f2..daee41a75 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -23,6 +23,8 @@ import ( "sync" v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" @@ -30,15 +32,45 @@ import ( ) // NewSessionService returns the Session Service -func NewSessionService() servmanager.Service { +func NewSessionService(cfg *config.CGRConfig, dm *engine.DataManager, + server *utils.Server, chrsChan, respChan, resChan, thsChan, stsChan, + supChan, attrsChan, cdrsChan, dispatcherChan chan rpcclient.RpcClientConnection, + exitChan chan bool) servmanager.Service { return &SessionService{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + server: server, + chrsChan: chrsChan, + respChan: respChan, + resChan: resChan, + thsChan: thsChan, + stsChan: stsChan, + supChan: supChan, + attrsChan: attrsChan, + cdrsChan: cdrsChan, + dispatcherChan: dispatcherChan, + exitChan: exitChan, } } // SessionService implements Service interface type SessionService struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + server *utils.Server + chrsChan chan rpcclient.RpcClientConnection + respChan chan rpcclient.RpcClientConnection + resChan chan rpcclient.RpcClientConnection + thsChan chan rpcclient.RpcClientConnection + stsChan chan rpcclient.RpcClientConnection + supChan chan rpcclient.RpcClientConnection + attrsChan chan rpcclient.RpcClientConnection + cdrsChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + exitChan chan bool + sm *sessions.SessionS rpc *v1.SMGenericV1 rpcv1 *v1.SessionSv1 @@ -46,11 +78,10 @@ type SessionService struct { // in order to stop the bircp server if necesary bircpEnabled bool - server *utils.Server } // Start should handle the sercive start -func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (smg *SessionService) Start() (err error) { if smg.IsRunning() { return fmt.Errorf("service aleady running") } @@ -59,69 +90,69 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) defer smg.Unlock() var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection - if chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil { + if chargerSConn, err = NewConnection(smg.cfg, smg.chrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ChargerSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ChargerS, err.Error())) return } - if ralsConns, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil { + if ralsConns, err = NewConnection(smg.cfg, smg.respChan, smg.dispatcherChan, smg.cfg.SessionSCfg().RALsConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ResponderS, err.Error())) return } - if resSConns, err = sp.NewConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil { + if resSConns, err = NewConnection(smg.cfg, smg.resChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ResSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ResourceS, err.Error())) return } - if threshSConns, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil { + if threshSConns, err = NewConnection(smg.cfg, smg.thsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ThreshSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ThresholdS, err.Error())) return } - if statSConns, err = sp.NewConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil { + if statSConns, err = NewConnection(smg.cfg, smg.stsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().StatSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.StatS, err.Error())) return } - if suplSConns, err = sp.NewConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil { + if suplSConns, err = NewConnection(smg.cfg, smg.supChan, smg.dispatcherChan, smg.cfg.SessionSCfg().SupplSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.SupplierS, err.Error())) return } - if attrConns, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil { + if attrConns, err = NewConnection(smg.cfg, smg.attrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().AttrSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.AttributeS, err.Error())) return } - if cdrsConn, err = sp.NewConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil { + if cdrsConn, err = NewConnection(smg.cfg, smg.cdrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().CDRsConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.CDRServer, err.Error())) return } - sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns, - sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout, - sp.GetConfig().GeneralCfg().ReplyTimeout) + sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns, + smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, + smg.cfg.GeneralCfg().ReplyTimeout) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>", utils.SessionS, err.Error())) return } - smg.sm = sessions.NewSessionS(sp.GetConfig(), ralsConns, resSConns, threshSConns, + smg.sm = sessions.NewSessionS(smg.cfg, ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn, - sReplConns, sp.GetDM(), sp.GetConfig().GeneralCfg().DefaultTimezone) + sReplConns, smg.dm, smg.cfg.GeneralCfg().DefaultTimezone) //start sync session in a separate gorutine go func(sm *sessions.SessionS) { - if err = sm.ListenAndServe(sp.GetExitChan()); err != nil { + if err = sm.ListenAndServe(smg.exitChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err)) } }(smg.sm) @@ -129,25 +160,24 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) smg.connChan <- smg.sm // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm) - sp.GetServer().RpcRegister(smg.rpc) + smg.server.RpcRegister(smg.rpc) smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options - if !sp.GetConfig().DispatcherSCfg().Enabled { - sp.GetServer().RpcRegister(smg.rpcv1) + if !smg.cfg.DispatcherSCfg().Enabled { + smg.server.RpcRegister(smg.rpcv1) } // Register BiRpc handlers - if sp.GetConfig().SessionSCfg().ListenBijson != "" { + if smg.cfg.SessionSCfg().ListenBijson != "" { smg.bircpEnabled = true - smg.server = sp.GetServer() for method, handler := range smg.rpc.Handlers() { - sp.GetServer().BiRPCRegisterName(method, handler) + smg.server.BiRPCRegisterName(method, handler) } for method, handler := range smg.rpcv1.Handlers() { - sp.GetServer().BiRPCRegisterName(method, handler) + smg.server.BiRPCRegisterName(method, handler) } // run this in it's own gorutine go func() { - if err := sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { + if err := smg.server.ServeBiJSON(smg.cfg.SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err)) smg.Lock() smg.bircpEnabled = false @@ -164,60 +194,60 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.RpcClientCo } // Reload handles the change of config -func (smg *SessionService) Reload(sp servmanager.ServiceProvider) (err error) { +func (smg *SessionService) Reload() (err error) { var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection - if chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil { + if chargerSConn, err = NewConnection(smg.cfg, smg.chrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ChargerSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ChargerS, err.Error())) return } - if ralsConns, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil { + if ralsConns, err = NewConnection(smg.cfg, smg.respChan, smg.dispatcherChan, smg.cfg.SessionSCfg().RALsConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ResponderS, err.Error())) return } - if resSConns, err = sp.NewConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil { + if resSConns, err = NewConnection(smg.cfg, smg.resChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ResSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ResourceS, err.Error())) return } - if threshSConns, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil { + if threshSConns, err = NewConnection(smg.cfg, smg.thsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ThreshSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ThresholdS, err.Error())) return } - if statSConns, err = sp.NewConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil { + if statSConns, err = NewConnection(smg.cfg, smg.stsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().StatSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.StatS, err.Error())) return } - if suplSConns, err = sp.NewConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil { + if suplSConns, err = NewConnection(smg.cfg, smg.supChan, smg.dispatcherChan, smg.cfg.SessionSCfg().SupplSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.SupplierS, err.Error())) return } - if attrConns, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil { + if attrConns, err = NewConnection(smg.cfg, smg.attrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().AttrSConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.AttributeS, err.Error())) return } - if cdrsConn, err = sp.NewConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil { + if cdrsConn, err = NewConnection(smg.cfg, smg.cdrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().CDRsConns); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.CDRServer, err.Error())) return } - sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns, - sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout, - sp.GetConfig().GeneralCfg().ReplyTimeout) + sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns, + smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, + smg.cfg.GeneralCfg().ReplyTimeout) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>", utils.SessionS, err.Error())) @@ -255,11 +285,6 @@ func (smg *SessionService) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (smg *SessionService) GetRPCInterface() interface{} { - return smg.sm -} - // IsRunning returns if the service is running func (smg *SessionService) IsRunning() bool { smg.RLock() @@ -271,3 +296,8 @@ func (smg *SessionService) IsRunning() bool { func (smg *SessionService) ServiceName() string { return utils.SessionS } + +// ShouldRun returns if the service should be running +func (smg *SessionService) ShouldRun() bool { + return smg.cfg.SessionSCfg().Enabled +} diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index 6d5dacf4e..cae8f1dc1 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -60,17 +60,25 @@ func TestSessionSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheSharedGroups)) close(chS.GetPrecacheChannel(utils.CacheTimings)) + internalChan := make(chan rpcclient.RpcClientConnection, 1) + internalChan <- nil cacheSChan := make(chan rpcclient.RpcClientConnection, 1) cacheSChan <- chS server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, - /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + /*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - attrS := NewSessionService() - srvMngr.AddService(attrS, NewChargerService(), NewRalService(srvMngr), &CacheService{connChan: cacheSChan}, NewSchedulerService(), NewCDRServer()) + chrS := NewChargerService(cfg, nil, chS, filterSChan, server, nil, nil) + schS := NewSchedulerService(cfg, nil, chS, server, nil) + ralS := NewRalService(cfg, nil, nil, nil, chS, filterSChan, server, + /*tS*/ internalChan, internalChan, cacheSChan, internalChan, internalChan, + internalChan, schS, engineShutdown) + cdrS := NewCDRServer(cfg, nil, nil, filterSChan, server, chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil) + attrS := NewSessionService(cfg, nil, server, chrS.GetIntenternalChan(), + ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown) + srvMngr.AddServices(attrS, chrS, schS, ralS, cdrS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 5bff97359..b90e686e5 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -267,10 +267,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().CdrsCfg().Enabled { go srvMngr.startService(utils.CDRServer) + } + if srvMngr.GetConfig().SessionSCfg().Enabled { + go srvMngr.startService(utils.SessionS) } /* - if srvMngr.GetConfig().SessionSCfg().Enabled { - go srvMngr.startService(utils.SessionS) - } if srvMngr.GetConfig().ERsCfg().Enabled { go srvMngr.startService(utils.ERs) } @@ -367,11 +367,11 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): if err = srvMngr.reloadService(utils.CDRServer); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson): + if err = srvMngr.reloadService(utils.SessionS); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson): - if err = srvMngr.reloadService(utils.SessionS); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson): if err = srvMngr.reloadService(utils.ERs); err != nil { return