Updated CDRs Server

This commit is contained in:
Trial97
2019-10-04 09:26:35 +03:00
committed by Dan Christian Bogos
parent 52d319cae5
commit f0dddcea09
4 changed files with 87 additions and 53 deletions

View File

@@ -686,20 +686,17 @@ func main() {
tS.GetIntenternalChan(), stS.GetIntenternalChan(), internalCacheSChan,
schS.GetIntenternalChan(), attrS.GetIntenternalChan(), internalDispatcherSChan,
schS, exitChan)
// cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server,
// chrS.GetIntenternalChan(),internalRALsv1Chan,
// attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
// stS.GetIntenternalChan(), internalDispatcherSChan)
// schS.SetCdrsConns(cdrS.GetIntenternalChan())
cdrS := services.NewCDRServer(cfg, dm, cdrDb, filterSChan, server,
chrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(),
attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
stS.GetIntenternalChan(), internalDispatcherSChan)
schS.SetCdrsConns(cdrS.GetIntenternalChan())
/*
apiv1, _ := srvManager.GetService(utils.ApierV1)
apiv2, _ := srvManager.GetService(utils.ApierV2)
resp, _ := srvManager.GetService(utils.ResponderS)
smg := services.NewSessionService()
grd := services.NewGuardianService()*/
srvManager.AddServices( /*chS, */ attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2()) /*, cdrS, smg, grd,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS) /* smg, grd,
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
@@ -734,8 +731,8 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.GuardianSv1, internalGuardianSChan)
engine.IntRPC.AddInternalRPCClient(utils.LoaderSv1, internalLoaderSChan)
@@ -755,7 +752,7 @@ func main() {
initConfigSv1(internalConfigChan, server)
// Start CDRC components if necessary
// go startCdrcs(cdrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), internalDispatcherSChan, filterSChan, exitChan)
go startCdrcs(cdrS.GetIntenternalChan(), rals.GetResponder().GetIntenternalChan(), internalDispatcherSChan, filterSChan, exitChan)
if cfg.DispatcherSCfg().Enabled {
go startDispatcherService(internalDispatcherSChan,

View File

@@ -24,6 +24,7 @@ import (
v1 "github.com/cgrates/cgrates/apier/v1"
v2 "github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -31,15 +32,41 @@ import (
)
// NewCDRServer returns the CDR Server
func NewCDRServer() servmanager.Service {
func NewCDRServer(cfg *config.CGRConfig, dm *engine.DataManager,
cdrStorage engine.CdrStorage, filterSChan chan *engine.FilterS,
server *utils.Server, chrsChan, respChan, attrsChan, thsChan, stsChan,
dispatcherChan chan rpcclient.RpcClientConnection) servmanager.Service {
return &CDRServer{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
cdrStorage: cdrStorage,
filterSChan: filterSChan,
server: server,
chrsChan: chrsChan,
respChan: respChan,
attrsChan: attrsChan,
thsChan: thsChan,
stsChan: stsChan,
dispatcherChan: dispatcherChan,
}
}
// CDRServer implements Service interface
type CDRServer struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
cdrStorage engine.CdrStorage
filterSChan chan *engine.FilterS
server *utils.Server
chrsChan chan rpcclient.RpcClientConnection
respChan chan rpcclient.RpcClientConnection
attrsChan chan rpcclient.RpcClientConnection
thsChan chan rpcclient.RpcClientConnection
stsChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
cdrS *engine.CDRServer
rpcv1 *v1.CDRsV1
rpcv2 *v2.CDRsV2
@@ -47,40 +74,43 @@ type CDRServer struct {
}
// Start should handle the sercive start
func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (cdrS *CDRServer) Start() (err error) {
if cdrS.IsRunning() {
return fmt.Errorf("service aleady running")
}
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.CDRs))
filterS := <-cdrS.filterSChan
cdrS.filterSChan <- filterS
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection
chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().ChargerSConns)
chargerSConn, err = NewConnection(cdrS.cfg, cdrS.chrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ChargerSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ChargerS, err.Error()))
return
}
ralConn, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().RaterConns)
ralConn, err = NewConnection(cdrS.cfg, cdrS.respChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().RaterConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.RALService, err.Error()))
return
}
attrSConn, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().AttributeSConns)
attrSConn, err = NewConnection(cdrS.cfg, cdrS.attrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().AttributeSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.AttributeS, err.Error()))
return
}
thresholdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().ThresholdSConns)
thresholdSConn, err = NewConnection(cdrS.cfg, cdrS.thsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ThresholdSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ThresholdS, err.Error()))
return
}
statsConn, err = sp.NewConnection(utils.StatS, sp.GetConfig().CdrsCfg().StatSConns)
statsConn, err = NewConnection(cdrS.cfg, cdrS.stsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().StatSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.StatS, err.Error()))
@@ -88,19 +118,18 @@ func (cdrS *CDRServer) Start(sp servmanager.ServiceProvider, waitCache bool) (er
}
cdrS.Lock()
defer cdrS.Unlock()
cdrS.cdrS = engine.NewCDRServer(sp.GetConfig(), sp.GetCDRStorage(), sp.GetDM(),
ralConn, attrSConn,
thresholdSConn, statsConn, chargerSConn, sp.GetFilterS())
cdrS.cdrS = engine.NewCDRServer(cdrS.cfg, cdrS.cdrStorage, cdrS.dm,
ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn, filterS)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrS.cdrS.RegisterHandlersToServer(sp.GetServer())
cdrS.cdrS.RegisterHandlersToServer(cdrS.server)
utils.Logger.Info("Registering CDRS RPC service.")
cdrS.rpcv1 = v1.NewCDRsV1(cdrS.cdrS)
cdrS.rpcv2 = &v2.CDRsV2{CDRsV1: *cdrS.rpcv1}
sp.GetServer().RpcRegister(cdrS.rpcv1)
sp.GetServer().RpcRegister(cdrS.rpcv2)
cdrS.server.RpcRegister(cdrS.rpcv1)
cdrS.server.RpcRegister(cdrS.rpcv2)
// Make the cdr server available for internal communication
sp.GetServer().RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this)
cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational
cdrS.server.RpcRegister(cdrS.cdrS) // register CdrServer for internal usage (TODO: refactor this)
cdrS.connChan <- cdrS.cdrS // Signal that cdrS is operational
return
}
@@ -110,34 +139,34 @@ func (cdrS *CDRServer) GetIntenternalChan() (conn chan rpcclient.RpcClientConnec
}
// Reload handles the change of config
func (cdrS *CDRServer) Reload(sp servmanager.ServiceProvider) (err error) {
func (cdrS *CDRServer) Reload() (err error) {
var ralConn, attrSConn, thresholdSConn, statsConn, chargerSConn rpcclient.RpcClientConnection
chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().CdrsCfg().ChargerSConns)
chargerSConn, err = NewConnection(cdrS.cfg, cdrS.chrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ChargerSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ChargerS, err.Error()))
return
}
ralConn, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().CdrsCfg().RaterConns)
ralConn, err = NewConnection(cdrS.cfg, cdrS.respChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().RaterConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.RALService, err.Error()))
return
}
attrSConn, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().CdrsCfg().AttributeSConns)
attrSConn, err = NewConnection(cdrS.cfg, cdrS.attrsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().AttributeSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.AttributeS, err.Error()))
return
}
thresholdSConn, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().CdrsCfg().ThresholdSConns)
thresholdSConn, err = NewConnection(cdrS.cfg, cdrS.thsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().ThresholdSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ThresholdS, err.Error()))
return
}
statsConn, err = sp.NewConnection(utils.StatS, sp.GetConfig().CdrsCfg().StatSConns)
statsConn, err = NewConnection(cdrS.cfg, cdrS.stsChan, cdrS.dispatcherChan, cdrS.cfg.CdrsCfg().StatSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.StatS, err.Error()))
@@ -164,11 +193,6 @@ func (cdrS *CDRServer) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (cdrS *CDRServer) GetRPCInterface() interface{} {
return cdrS.cdrS
}
// IsRunning returns if the service is running
func (cdrS *CDRServer) IsRunning() bool {
cdrS.RLock()
@@ -180,3 +204,8 @@ func (cdrS *CDRServer) IsRunning() bool {
func (cdrS *CDRServer) ServiceName() string {
return utils.CDRServer
}
// ShouldRun returns if the service should be running
func (cdrS *CDRServer) ShouldRun() bool {
return cdrS.cfg.CdrsCfg().Enabled
}

View File

@@ -59,16 +59,24 @@ func TestCdrsReload(t *testing.T) {
cfg.ChargerSCfg().Enabled = true
cfg.RalsCfg().Enabled = true
internalChan := make(chan rpcclient.RpcClientConnection, 1)
internalChan <- nil
cacheSChan := make(chan rpcclient.RpcClientConnection, 1)
cacheSChan <- chS
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
/*loadStorage*/ nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
cdrS := NewCDRServer()
srvMngr.AddService(cdrS, NewRalService(srvMngr), NewChargerService(), &CacheService{connChan: cacheSChan}, NewSchedulerService())
chrS := NewChargerService(cfg, nil, chS, filterSChan, server, nil, nil)
schS := NewSchedulerService(cfg, nil, chS, server, nil)
tS := NewThresholdService(cfg, nil, chS, filterSChan, server)
ralS := NewRalService(cfg, nil, nil, nil, chS, filterSChan, server,
tS.GetIntenternalChan(), internalChan, cacheSChan, internalChan, internalChan,
internalChan, schS, engineShutdown)
cdrS := NewCDRServer(cfg, nil, nil, filterSChan, server, chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil)
srvMngr.AddServices(cdrS, ralS, schS, chrS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -264,10 +264,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().RalsCfg().Enabled {
go srvMngr.startService(utils.RALService)
}
if srvMngr.GetConfig().CdrsCfg().Enabled {
go srvMngr.startService(utils.CDRServer)
} /*
if srvMngr.GetConfig().CdrsCfg().Enabled {
go srvMngr.startService(utils.CDRServer)
}
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
}
@@ -359,15 +359,15 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN):
if err = srvMngr.reloadService(utils.RALService); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.Apier):
if err = srvMngr.reloadService(utils.ApierV1); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.Apier):
if err = srvMngr.reloadService(utils.ApierV1); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
if err = srvMngr.reloadService(utils.SessionS); err != nil {
return