mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
222 lines
7.1 KiB
Go
222 lines
7.1 KiB
Go
/*
|
|
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
|
Copyright (C) ITsysCOM GmbH
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Affero General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Affero General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Affero General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/>
|
|
*/
|
|
|
|
package services
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/cgrates/birpc"
|
|
"github.com/cgrates/cgrates/config"
|
|
"github.com/cgrates/cgrates/engine"
|
|
"github.com/cgrates/cgrates/servmanager"
|
|
"github.com/cgrates/cgrates/utils"
|
|
"github.com/cgrates/rpcclient"
|
|
)
|
|
|
|
// NewConnManagerService instantiates a new ConnManagerService.
|
|
func NewConnManagerService(cfg *config.CGRConfig) *ConnManagerService {
|
|
return &ConnManagerService{
|
|
cfg: cfg,
|
|
stateDeps: NewStateDependencies([]string{utils.StateServiceUP, utils.StateServiceDOWN}),
|
|
}
|
|
}
|
|
|
|
// ConnManagerService implements Service interface.
|
|
type ConnManagerService struct {
|
|
mu sync.RWMutex
|
|
cfg *config.CGRConfig
|
|
connMgr *engine.ConnManager
|
|
anz *AnalyzerService
|
|
stateDeps *StateDependencies // channel subscriptions for state changes
|
|
}
|
|
|
|
// Start handles the service start.
|
|
func (s *ConnManagerService) Start(_ *utils.SyncedChan, registry *servmanager.ServiceRegistry) error {
|
|
s.anz = registry.Lookup(utils.AnalyzerS).(*AnalyzerService)
|
|
if s.anz.ShouldRun() { // wait for AnalyzerS only if it should run
|
|
if _, err := WaitForServiceState(utils.StateServiceUP, utils.AnalyzerS, registry,
|
|
s.cfg.GeneralCfg().ConnectTimeout); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
s.connMgr = engine.NewConnManager(s.cfg)
|
|
return nil
|
|
}
|
|
|
|
// Reload handles the config changes.
|
|
func (s *ConnManagerService) Reload(_ *utils.SyncedChan, _ *servmanager.ServiceRegistry) error {
|
|
s.connMgr.Reload()
|
|
return nil
|
|
}
|
|
|
|
// Shutdown stops the service.
|
|
func (s *ConnManagerService) Shutdown(_ *servmanager.ServiceRegistry) error {
|
|
s.connMgr = nil
|
|
engine.SetConnManager(nil)
|
|
return nil
|
|
}
|
|
|
|
// ServiceName returns the service name
|
|
func (s *ConnManagerService) ServiceName() string {
|
|
return utils.ConnManager
|
|
}
|
|
|
|
// ShouldRun returns if the service should be running.
|
|
func (s *ConnManagerService) ShouldRun() bool {
|
|
return true
|
|
}
|
|
|
|
// StateChan returns signaling channel of specific state
|
|
func (s *ConnManagerService) StateChan(stateID string) chan struct{} {
|
|
return s.stateDeps.StateChan(stateID)
|
|
}
|
|
|
|
// ConnManager returns the ConnManager object.
|
|
func (s *ConnManagerService) ConnManager() *engine.ConnManager {
|
|
return s.connMgr
|
|
}
|
|
|
|
// AddInternalConn registers direct internal RPC access for a service.
|
|
// TODO: Add function to remove internal conns (useful for shutdown).
|
|
func (s *ConnManagerService) AddInternalConn(svcName string, receiver birpc.ClientConnector) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
route, exists := serviceMethods[svcName]
|
|
if !exists {
|
|
return
|
|
}
|
|
rpcIntChan := make(chan birpc.ClientConnector, 1)
|
|
s.connMgr.AddInternalConn(route.internalPath, route.receiver, rpcIntChan)
|
|
if route.biRPCPath != "" {
|
|
s.connMgr.AddInternalConn(route.biRPCPath, route.receiver, rpcIntChan)
|
|
}
|
|
rpcIntChan <- s.anz.GetInternalCodec(receiver, svcName)
|
|
}
|
|
|
|
// internalRoute defines how a service's methods can be accessed internally within the system.
|
|
type internalRoute struct {
|
|
receiver string // method receiver name (e.g. "ChargerSv1")
|
|
internalPath string // internal API path
|
|
biRPCPath string // bidirectional API path, if supported
|
|
}
|
|
|
|
var serviceMethods = map[string]internalRoute{
|
|
utils.AnalyzerS: {
|
|
receiver: utils.AnalyzerSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAnalyzerS),
|
|
},
|
|
utils.AdminS: {
|
|
receiver: utils.AdminSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAdminS),
|
|
},
|
|
utils.AttributeS: {
|
|
receiver: utils.AttributeSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes),
|
|
},
|
|
utils.CacheS: {
|
|
receiver: utils.CacheSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches),
|
|
},
|
|
utils.CDRServer: {
|
|
receiver: utils.CDRsV1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCDRs),
|
|
},
|
|
utils.ChargerS: {
|
|
receiver: utils.ChargerSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaChargers),
|
|
},
|
|
utils.LoaderS: {
|
|
receiver: utils.LoaderSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaLoaders),
|
|
},
|
|
utils.ResourceS: {
|
|
receiver: utils.ResourceSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaResources),
|
|
},
|
|
utils.IPs: {
|
|
receiver: utils.IPsV1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaIPs),
|
|
},
|
|
utils.SessionS: {
|
|
receiver: utils.SessionSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS),
|
|
biRPCPath: utils.ConcatenatedKey(rpcclient.BiRPCInternal, utils.MetaSessionS),
|
|
},
|
|
utils.StatS: {
|
|
receiver: utils.StatSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaStats),
|
|
},
|
|
utils.RankingS: {
|
|
receiver: utils.RankingSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRankings),
|
|
},
|
|
utils.TrendS: {
|
|
receiver: utils.TrendSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTrends),
|
|
},
|
|
utils.RouteS: {
|
|
receiver: utils.RouteSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRoutes),
|
|
},
|
|
utils.ThresholdS: {
|
|
receiver: utils.ThresholdSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaThresholds),
|
|
},
|
|
utils.ServiceManagerS: {
|
|
receiver: utils.ServiceManagerV1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaServiceManager),
|
|
},
|
|
utils.ConfigS: {
|
|
receiver: utils.ConfigSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaConfig),
|
|
},
|
|
utils.CoreS: {
|
|
receiver: utils.CoreSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCore),
|
|
},
|
|
utils.EEs: {
|
|
receiver: utils.EeSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs),
|
|
},
|
|
utils.RateS: {
|
|
receiver: utils.RateSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRates),
|
|
},
|
|
utils.AccountS: {
|
|
receiver: utils.AccountSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts),
|
|
},
|
|
utils.ActionS: {
|
|
receiver: utils.ActionSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions),
|
|
},
|
|
utils.TPeS: {
|
|
receiver: utils.TPeSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes),
|
|
},
|
|
utils.EFs: {
|
|
receiver: utils.EfSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs),
|
|
},
|
|
utils.ERs: {
|
|
receiver: utils.ErSv1,
|
|
internalPath: utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs),
|
|
},
|
|
}
|