Updated Session Service

This commit is contained in:
Trial97
2019-10-04 10:07:41 +03:00
committed by Dan Christian Bogos
parent 718a9a992b
commit 82771490a2
4 changed files with 115 additions and 75 deletions

View File

@@ -700,19 +700,23 @@ func main() {
attrS.GetIntenternalChan(), tS.GetIntenternalChan(),
stS.GetIntenternalChan(), internalDispatcherSChan)
schS.SetCdrsConns(cdrS.GetIntenternalChan())
/*
smg := services.NewSessionService()
*/
smg := services.NewSessionService(cfg, dm, server, cdrS.GetIntenternalChan(),
rals.GetResponder().GetIntenternalChan(), reS.GetIntenternalChan(),
tS.GetIntenternalChan(), stS.GetIntenternalChan(), supS.GetIntenternalChan(),
attrS.GetIntenternalChan(), cdrS.GetIntenternalChan(), internalDispatcherSChan, exitChan)
srvManager.AddServices(attrS, chrS, tS, stS, reS, supS, schS, rals,
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS) /* smg,
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
services.NewKamailioAgent(),
services.NewAsteriskAgent(), // partial reload
services.NewRadiusAgent(), // partial reload
services.NewDiameterAgent(), // partial reload
services.NewHTTPAgent(), // no reload
rals.GetResponder(), rals.GetAPIv1(), rals.GetAPIv2(), cdrS, smg)
/*
services.NewEventReaderService(),
services.NewDNSAgent(),
services.NewFreeswitchAgent(),
services.NewKamailioAgent(),
services.NewAsteriskAgent(), // partial reload
services.NewRadiusAgent(), // partial reload
services.NewDiameterAgent(), // partial reload
services.NewHTTPAgent(), // no reload
*/
/*
@@ -734,7 +738,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.ApierV1, rals.GetAPIv1().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ApierV2, rals.GetAPIv2().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.AttributeSv1, attrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan) // server or from apier
engine.IntRPC.AddInternalRPCClient(utils.CacheSv1, internalCacheSChan)
engine.IntRPC.AddInternalRPCClient(utils.CDRsV1, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.CDRsV2, cdrS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ChargerSv1, chrS.GetIntenternalChan())
@@ -743,7 +747,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.ResourceSv1, reS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.Responder, rals.GetResponder().GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SchedulerSv1, schS.GetIntenternalChan())
// engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, internalSMGChan) // server or from apier
engine.IntRPC.AddInternalRPCClient(utils.SessionSv1, smg.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.StatSv1, stS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.SupplierSv1, supS.GetIntenternalChan())
engine.IntRPC.AddInternalRPCClient(utils.ThresholdSv1, tS.GetIntenternalChan())
@@ -771,13 +775,11 @@ func main() {
go startLoaderS(internalLoaderSChan, internalCacheSChan, cfg, dm, server, filterSChan, exitChan)
// Serve rpc connections
/*
go startRpc(server, internalRaterChan, cdrS.GetIntenternalChan(),
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
supS.GetIntenternalChan(), internalSMGChan, internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan)
*/
go startRpc(server, rals.GetResponder().GetIntenternalChan(), cdrS.GetIntenternalChan(),
reS.GetIntenternalChan(), stS.GetIntenternalChan(),
attrS.GetIntenternalChan(), chrS.GetIntenternalChan(), tS.GetIntenternalChan(),
supS.GetIntenternalChan(), smg.GetIntenternalChan(), internalAnalyzerSChan,
internalDispatcherSChan, internalLoaderSChan, rals.GetIntenternalChan(), internalCacheSChan, exitChan)
<-exitChan
if *cpuProfDir != "" { // wait to end cpuProfiling

View File

@@ -23,6 +23,8 @@ import (
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
@@ -30,15 +32,45 @@ import (
)
// NewSessionService returns the Session Service
func NewSessionService() servmanager.Service {
func NewSessionService(cfg *config.CGRConfig, dm *engine.DataManager,
server *utils.Server, chrsChan, respChan, resChan, thsChan, stsChan,
supChan, attrsChan, cdrsChan, dispatcherChan chan rpcclient.RpcClientConnection,
exitChan chan bool) servmanager.Service {
return &SessionService{
connChan: make(chan rpcclient.RpcClientConnection, 1),
connChan: make(chan rpcclient.RpcClientConnection, 1),
cfg: cfg,
dm: dm,
server: server,
chrsChan: chrsChan,
respChan: respChan,
resChan: resChan,
thsChan: thsChan,
stsChan: stsChan,
supChan: supChan,
attrsChan: attrsChan,
cdrsChan: cdrsChan,
dispatcherChan: dispatcherChan,
exitChan: exitChan,
}
}
// SessionService implements Service interface
type SessionService struct {
sync.RWMutex
cfg *config.CGRConfig
dm *engine.DataManager
server *utils.Server
chrsChan chan rpcclient.RpcClientConnection
respChan chan rpcclient.RpcClientConnection
resChan chan rpcclient.RpcClientConnection
thsChan chan rpcclient.RpcClientConnection
stsChan chan rpcclient.RpcClientConnection
supChan chan rpcclient.RpcClientConnection
attrsChan chan rpcclient.RpcClientConnection
cdrsChan chan rpcclient.RpcClientConnection
dispatcherChan chan rpcclient.RpcClientConnection
exitChan chan bool
sm *sessions.SessionS
rpc *v1.SMGenericV1
rpcv1 *v1.SessionSv1
@@ -46,11 +78,10 @@ type SessionService struct {
// in order to stop the bircp server if necesary
bircpEnabled bool
server *utils.Server
}
// Start should handle the sercive start
func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool) (err error) {
func (smg *SessionService) Start() (err error) {
if smg.IsRunning() {
return fmt.Errorf("service aleady running")
}
@@ -59,69 +90,69 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool)
defer smg.Unlock()
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection
if chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil {
if chargerSConn, err = NewConnection(smg.cfg, smg.chrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ChargerSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ChargerS, err.Error()))
return
}
if ralsConns, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil {
if ralsConns, err = NewConnection(smg.cfg, smg.respChan, smg.dispatcherChan, smg.cfg.SessionSCfg().RALsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResponderS, err.Error()))
return
}
if resSConns, err = sp.NewConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil {
if resSConns, err = NewConnection(smg.cfg, smg.resChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ResSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResourceS, err.Error()))
return
}
if threshSConns, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil {
if threshSConns, err = NewConnection(smg.cfg, smg.thsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ThreshSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ThresholdS, err.Error()))
return
}
if statSConns, err = sp.NewConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil {
if statSConns, err = NewConnection(smg.cfg, smg.stsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().StatSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.StatS, err.Error()))
return
}
if suplSConns, err = sp.NewConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil {
if suplSConns, err = NewConnection(smg.cfg, smg.supChan, smg.dispatcherChan, smg.cfg.SessionSCfg().SupplSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.SupplierS, err.Error()))
return
}
if attrConns, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil {
if attrConns, err = NewConnection(smg.cfg, smg.attrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().AttrSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.AttributeS, err.Error()))
return
}
if cdrsConn, err = sp.NewConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil {
if cdrsConn, err = NewConnection(smg.cfg, smg.cdrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().CDRsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.CDRServer, err.Error()))
return
}
sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns,
sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout,
sp.GetConfig().GeneralCfg().ReplyTimeout)
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns,
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
smg.cfg.GeneralCfg().ReplyTimeout)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>",
utils.SessionS, err.Error()))
return
}
smg.sm = sessions.NewSessionS(sp.GetConfig(), ralsConns, resSConns, threshSConns,
smg.sm = sessions.NewSessionS(smg.cfg, ralsConns, resSConns, threshSConns,
statSConns, suplSConns, attrConns, cdrsConn, chargerSConn,
sReplConns, sp.GetDM(), sp.GetConfig().GeneralCfg().DefaultTimezone)
sReplConns, smg.dm, smg.cfg.GeneralCfg().DefaultTimezone)
//start sync session in a separate gorutine
go func(sm *sessions.SessionS) {
if err = sm.ListenAndServe(sp.GetExitChan()); err != nil {
if err = sm.ListenAndServe(smg.exitChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", utils.SessionS, err))
}
}(smg.sm)
@@ -129,25 +160,24 @@ func (smg *SessionService) Start(sp servmanager.ServiceProvider, waitCache bool)
smg.connChan <- smg.sm
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm)
sp.GetServer().RpcRegister(smg.rpc)
smg.server.RpcRegister(smg.rpc)
smg.rpcv1 = v1.NewSessionSv1(smg.sm) // methods with multiple options
if !sp.GetConfig().DispatcherSCfg().Enabled {
sp.GetServer().RpcRegister(smg.rpcv1)
if !smg.cfg.DispatcherSCfg().Enabled {
smg.server.RpcRegister(smg.rpcv1)
}
// Register BiRpc handlers
if sp.GetConfig().SessionSCfg().ListenBijson != "" {
if smg.cfg.SessionSCfg().ListenBijson != "" {
smg.bircpEnabled = true
smg.server = sp.GetServer()
for method, handler := range smg.rpc.Handlers() {
sp.GetServer().BiRPCRegisterName(method, handler)
smg.server.BiRPCRegisterName(method, handler)
}
for method, handler := range smg.rpcv1.Handlers() {
sp.GetServer().BiRPCRegisterName(method, handler)
smg.server.BiRPCRegisterName(method, handler)
}
// run this in it's own gorutine
go func() {
if err := sp.GetServer().ServeBiJSON(sp.GetConfig().SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
if err := smg.server.ServeBiJSON(smg.cfg.SessionSCfg().ListenBijson, smg.sm.OnBiJSONConnect, smg.sm.OnBiJSONDisconnect); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> serve BiRPC error: %s!", utils.SessionS, err))
smg.Lock()
smg.bircpEnabled = false
@@ -164,60 +194,60 @@ func (smg *SessionService) GetIntenternalChan() (conn chan rpcclient.RpcClientCo
}
// Reload handles the change of config
func (smg *SessionService) Reload(sp servmanager.ServiceProvider) (err error) {
func (smg *SessionService) Reload() (err error) {
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrConns, cdrsConn, chargerSConn rpcclient.RpcClientConnection
if chargerSConn, err = sp.NewConnection(utils.ChargerS, sp.GetConfig().SessionSCfg().ChargerSConns); err != nil {
if chargerSConn, err = NewConnection(smg.cfg, smg.chrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ChargerSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ChargerS, err.Error()))
return
}
if ralsConns, err = sp.NewConnection(utils.ResponderS, sp.GetConfig().SessionSCfg().RALsConns); err != nil {
if ralsConns, err = NewConnection(smg.cfg, smg.respChan, smg.dispatcherChan, smg.cfg.SessionSCfg().RALsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResponderS, err.Error()))
return
}
if resSConns, err = sp.NewConnection(utils.ResourceS, sp.GetConfig().SessionSCfg().ResSConns); err != nil {
if resSConns, err = NewConnection(smg.cfg, smg.resChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ResSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ResourceS, err.Error()))
return
}
if threshSConns, err = sp.NewConnection(utils.ThresholdS, sp.GetConfig().SessionSCfg().ThreshSConns); err != nil {
if threshSConns, err = NewConnection(smg.cfg, smg.thsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().ThreshSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ThresholdS, err.Error()))
return
}
if statSConns, err = sp.NewConnection(utils.StatS, sp.GetConfig().SessionSCfg().StatSConns); err != nil {
if statSConns, err = NewConnection(smg.cfg, smg.stsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().StatSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.StatS, err.Error()))
return
}
if suplSConns, err = sp.NewConnection(utils.SupplierS, sp.GetConfig().SessionSCfg().SupplSConns); err != nil {
if suplSConns, err = NewConnection(smg.cfg, smg.supChan, smg.dispatcherChan, smg.cfg.SessionSCfg().SupplSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.SupplierS, err.Error()))
return
}
if attrConns, err = sp.NewConnection(utils.AttributeS, sp.GetConfig().SessionSCfg().AttrSConns); err != nil {
if attrConns, err = NewConnection(smg.cfg, smg.attrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().AttrSConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.AttributeS, err.Error()))
return
}
if cdrsConn, err = sp.NewConnection(utils.CDRServer, sp.GetConfig().SessionSCfg().CDRsConns); err != nil {
if cdrsConn, err = NewConnection(smg.cfg, smg.cdrsChan, smg.dispatcherChan, smg.cfg.SessionSCfg().CDRsConns); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.CDRServer, err.Error()))
return
}
sReplConns, err := sessions.NewSReplConns(sp.GetConfig().SessionSCfg().SessionReplicationConns,
sp.GetConfig().GeneralCfg().Reconnects, sp.GetConfig().GeneralCfg().ConnectTimeout,
sp.GetConfig().GeneralCfg().ReplyTimeout)
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns,
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
smg.cfg.GeneralCfg().ReplyTimeout)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SMGReplicationConnection error: <%s>",
utils.SessionS, err.Error()))
@@ -255,11 +285,6 @@ func (smg *SessionService) Shutdown() (err error) {
return
}
// GetRPCInterface returns the interface to register for server
func (smg *SessionService) GetRPCInterface() interface{} {
return smg.sm
}
// IsRunning returns if the service is running
func (smg *SessionService) IsRunning() bool {
smg.RLock()
@@ -271,3 +296,8 @@ func (smg *SessionService) IsRunning() bool {
func (smg *SessionService) ServiceName() string {
return utils.SessionS
}
// ShouldRun returns if the service should be running
func (smg *SessionService) ShouldRun() bool {
return smg.cfg.SessionSCfg().Enabled
}

View File

@@ -60,17 +60,25 @@ func TestSessionSReload(t *testing.T) {
close(chS.GetPrecacheChannel(utils.CacheSharedGroups))
close(chS.GetPrecacheChannel(utils.CacheTimings))
internalChan := make(chan rpcclient.RpcClientConnection, 1)
internalChan <- nil
cacheSChan := make(chan rpcclient.RpcClientConnection, 1)
cacheSChan <- chS
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg /*dm*/, nil,
/*cdrStorage*/ nil,
/*loadStorage*/ nil, filterSChan,
/*cdrStorage*/ nil /*loadStorage*/, nil /*filterSChan*/, nil,
server, nil, engineShutdown)
srvMngr.SetCacheS(chS)
attrS := NewSessionService()
srvMngr.AddService(attrS, NewChargerService(), NewRalService(srvMngr), &CacheService{connChan: cacheSChan}, NewSchedulerService(), NewCDRServer())
chrS := NewChargerService(cfg, nil, chS, filterSChan, server, nil, nil)
schS := NewSchedulerService(cfg, nil, chS, server, nil)
ralS := NewRalService(cfg, nil, nil, nil, chS, filterSChan, server,
/*tS*/ internalChan, internalChan, cacheSChan, internalChan, internalChan,
internalChan, schS, engineShutdown)
cdrS := NewCDRServer(cfg, nil, nil, filterSChan, server, chrS.GetIntenternalChan(), ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil)
attrS := NewSessionService(cfg, nil, server, chrS.GetIntenternalChan(),
ralS.GetResponder().GetIntenternalChan(), nil, nil, nil, nil, nil, cdrS.GetIntenternalChan(), nil, engineShutdown)
srvMngr.AddServices(attrS, chrS, schS, ralS, cdrS)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}

View File

@@ -267,10 +267,10 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
}
if srvMngr.GetConfig().CdrsCfg().Enabled {
go srvMngr.startService(utils.CDRServer)
}
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
} /*
if srvMngr.GetConfig().SessionSCfg().Enabled {
go srvMngr.startService(utils.SessionS)
}
if srvMngr.GetConfig().ERsCfg().Enabled {
go srvMngr.startService(utils.ERs)
}
@@ -367,11 +367,11 @@ func (srvMngr *ServiceManager) handleReload() {
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
if err = srvMngr.reloadService(utils.SessionS); err != nil {
return
} /*
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
if err = srvMngr.reloadService(utils.SessionS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs); err != nil {
return