diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index bf9ac3705..fe0a9b315 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -686,20 +686,17 @@ func main() { tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan, schS.GetIntenternalChan(), attrS.GetIntenternalChan(), internalDispatcherSChan, schS, exitChan) - // cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server, - // chrS.GetIntenternalChan(),internalRALsv1Chan, - // attrS.GetIntenternalChan(), tS.GetIntenternalChan(), - // stS.GetIntenternalChan(), internalDispatcherSChan) - // schS.SetCdrsConns(cdrS.GetIntenternalChan()) + cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server, + chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), + attrS.GetIntenternalChan(), tS.GetIntenternalChan(), + stS.GetIntenternalChan(), internalDispatcherSChan) + schS.SetCdrsConns(cdrS.GetIntenternalChan()) /* - apiv1, _ := srvManager.GetService(utils.ApierV1) - apiv2, _ := srvManager.GetService(utils.ApierV2) - resp, _ := srvManager.GetService(utils.ResponderS) smg := services.NewSessionService() grd := services.NewGuardianService()*/ srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS, schS, rals, - rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2()) /*, cdrS, smg, grd, + rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS) /* smg, grd, services.NewEventReaderService(), services.NewDNSAgent(), services.NewFreeswitchAgent(), @@ -734,8 +731,8 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) - // engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan()) + engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan()) engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan()) // engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan) engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan) @@ -755,7 +752,7 @@ func main() { initConfigSv1(internalConfigChan, server) // Start CDRC components if necessary - // go startCdrcs(cdrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), internalDispatcherSChan, filterSChan, exitChan) + go startCdrcs(cdrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), internalDispatcherSChan, filterSChan, exitChan) if cfg.DispatcherSCfg().Enabled { go startDispatcherService(internalDispatcherSChan, diff --git a/services/cdrs.go b/services/cdrs.go index f20ea23db..3b7ea0ed8 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -24,6 +24,7 @@ import ( v1 "github.com/cgrates/cgrates/apier/v1" v2 "github.com/cgrates/cgrates/apier/v2" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -31,15 +32,41 @@ import ( ) // NewCDRServer returns the CDR Server -func NewCDRServer() servmanager.Service { +func NewCDRServer(cfg *config.CGRConfig, dm *engine.DataManager, + cdrStorage engine.CdrStorage, filterSChan chan *engine.FilterS, + server *utils.Server, chrsChan, respChan, attrsChan, thsChan, stsChan, + dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service { return &CDRServer{ - connChan: make(chan rpcclient.RpcClientConnection, 1), + connChan: make(chan rpcclient.RpcClientConnection, 1), + cfg: cfg, + dm: dm, + cdrStorage: cdrStorage, + filterSChan: filterSChan, + server: server, + chrsChan: chrsChan, + respChan: respChan, + attrsChan: attrsChan, + thsChan: thsChan, + stsChan: stsChan, + dispatcherChan: dispatcherChan, } } // CDRServer implements Service interface type CDRServer struct { sync.RWMutex + cfg *config.CGRConfig + dm *engine.DataManager + cdrStorage engine.CdrStorage + filterSChan chan *engine.FilterS + server *utils.Server + chrsChan chan rpcclient.RpcClientConnection + respChan chan rpcclient.RpcClientConnection + attrsChan chan rpcclient.RpcClientConnection + thsChan chan rpcclient.RpcClientConnection + stsChan chan rpcclient.RpcClientConnection + dispatcherChan chan rpcclient.RpcClientConnection + cdrS *engine.CDRServer rpcv1 *v1.CDRsV1 rpcv2 *v2.CDRsV2 @@ -47,40 +74,43 @@ type CDRServer struct { } // Start should handle the sercive start -func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) { +func (cdrS *CDRServer) Start() (err error) { if cdrS.IsRunning() { return fmt.Errorf("service aleady running") } utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs)) + filterS := <-cdrS.filterSChan + cdrS.filterSChan <- filterS + var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection - chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().ChargerSConns) + chargerSConn, err = NewConnection(cdrS.cfg, cdrS.chrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ChargerSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ChargerS, err.Error())) return } - ralConn, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().RaterConns) + ralConn, err = NewConnection(cdrS.cfg, cdrS.respChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().RaterConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.RALService, err.Error())) return } - attrSConn, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().AttributeSConns) + attrSConn, err = NewConnection(cdrS.cfg, cdrS.attrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().AttributeSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.AttributeS, err.Error())) return } - thresholdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().ThresholdSConns) + thresholdSConn, err = NewConnection(cdrS.cfg, cdrS.thsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ThresholdSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ThresholdS, err.Error())) return } - statsConn, err = sp.NewConnection(utils.StatS, sp.GetConfig().CdrsCfg().StatSConns) + statsConn, err = NewConnection(cdrS.cfg, cdrS.stsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().StatSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.StatS, err.Error())) @@ -88,19 +118,18 @@ func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (er } cdrS.Lock() defer cdrS.Unlock() - cdrS.cdrS = engine.NewCDRServer(sp.GetConfig(), sp.GetCDRStorage(), sp.GetDM(), - ralConn, attrSConn, - thresholdSConn, statsConn, chargerSConn, sp.GetFilterS()) + cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.cdrStorage, cdrS.dm, + ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, filterS) utils.Logger.Info("Registering CDRS HTTP Handlers.") - cdrS.cdrS.RegisterHandlersToServer(sp.GetServer()) + cdrS.cdrS.RegisterHandlersToServer(cdrS.server) utils.Logger.Info("Registering CDRS RPC service.") cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS) cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1} - sp.GetServer().RpcRegister(cdrS.rpcv1) - sp.GetServer().RpcRegister(cdrS.rpcv2) + cdrS.server.RpcRegister(cdrS.rpcv1) + cdrS.server.RpcRegister(cdrS.rpcv2) // Make the cdr server available for internal communication - sp.GetServer().RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this) - cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational + cdrS.server.RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this) + cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational return } @@ -110,34 +139,34 @@ func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.RpcClientConnec } // Reload handles the change of config -func (cdrS *CDRServer) Reload(sp servmanager.ServiceProvider) (err error) { +func (cdrS *CDRServer) Reload() (err error) { var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection - chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().ChargerSConns) + chargerSConn, err = NewConnection(cdrS.cfg, cdrS.chrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ChargerSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ChargerS, err.Error())) return } - ralConn, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().RaterConns) + ralConn, err = NewConnection(cdrS.cfg, cdrS.respChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().RaterConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.RALService, err.Error())) return } - attrSConn, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().AttributeSConns) + attrSConn, err = NewConnection(cdrS.cfg, cdrS.attrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().AttributeSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.AttributeS, err.Error())) return } - thresholdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().ThresholdSConns) + thresholdSConn, err = NewConnection(cdrS.cfg, cdrS.thsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ThresholdSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ThresholdS, err.Error())) return } - statsConn, err = sp.NewConnection(utils.StatS, sp.GetConfig().CdrsCfg().StatSConns) + statsConn, err = NewConnection(cdrS.cfg, cdrS.stsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().StatSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.StatS, err.Error())) @@ -164,11 +193,6 @@ func (cdrS *CDRServer) Shutdown() (err error) { return } -// GetRPCInterface returns the interface to register for server -func (cdrS *CDRServer) GetRPCInterface() interface{} { - return cdrS.cdrS -} - // IsRunning returns if the service is running func (cdrS *CDRServer) IsRunning() bool { cdrS.RLock() @@ -180,3 +204,8 @@ func (cdrS *CDRServer) IsRunning() bool { func (cdrS *CDRServer) ServiceName() string { return utils.CDRServer } + +// ShouldRun returns if the service should be running +func (cdrS *CDRServer) ShouldRun() bool { + return cdrS.cfg.CdrsCfg().Enabled +} diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index dc1e5f334..aec2e4a26 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -59,16 +59,24 @@ func TestCdrsReload(t *testing.T) { cfg.ChargerSCfg().Enabled = true cfg.RalsCfg().Enabled = true + internalChan := make(chan rpcclient.RpcClientConnection, 1) + internalChan <- nil cacheSChan := make(chan rpcclient.RpcClientConnection, 1) cacheSChan <- chS server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil, /*cdrStorage*/ nil, - /*loadStorage*/ nil, filterSChan, + /*loadStorage*/ nil /*filterSChan*/, nil, server, nil, engineShutdown) srvMngr.SetCacheS(chS) - cdrS := NewCDRServer() - srvMngr.AddService(cdrS, NewRalService(srvMngr), NewChargerService(), &CacheService{connChan: cacheSChan}, NewSchedulerService()) + chrS := NewChargerService(cfg, nil, chS, filterSChan, server, nil, nil) + schS := NewSchedulerService(cfg, nil, chS, server, nil) + tS := NewThresholdService(cfg, nil, chS, filterSChan, server) + ralS := NewRalService(cfg, nil, nil, nil, chS, filterSChan, server, + tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan, + internalChan, schS, engineShutdown) + cdrS := NewCDRServer(cfg, nil, nil, filterSChan, server, chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil) + srvMngr.AddServices(cdrS, ralS, schS, chrS) if err = srvMngr.StartServices(); err != nil { t.Error(err) } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 8d0237122..5bff97359 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -264,10 +264,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) { } if srvMngr.GetConfig().RalsCfg().Enabled { go srvMngr.startService(utils.RALService) + } + if srvMngr.GetConfig().CdrsCfg().Enabled { + go srvMngr.startService(utils.CDRServer) } /* - if srvMngr.GetConfig().CdrsCfg().Enabled { - go srvMngr.startService(utils.CDRServer) - } if srvMngr.GetConfig().SessionSCfg().Enabled { go srvMngr.startService(utils.SessionS) } @@ -359,15 +359,15 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN): if err = srvMngr.reloadService(utils.RALService); err != nil { return + } + case <-srvMngr.GetConfig().GetReloadChan(config.Apier): + if err = srvMngr.reloadService(utils.ApierV1); err != nil { + return + } + case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): + if err = srvMngr.reloadService(utils.CDRServer); err != nil { + return } /* - case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): - if err = srvMngr.reloadService(utils.CDRServer); err != nil { - return - } - case <-srvMngr.GetConfig().GetReloadChan(config.Apier): - if err = srvMngr.reloadService(utils.ApierV1); err != nil { - return - } case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson): if err = srvMngr.reloadService(utils.SessionS); err != nil { return