Updated service reload

This commit is contained in:
Trial97
2020-08-25 16:25:34 +03:00
committed by Dan Christian Bogos
parent 7668e228b2
commit 70b6506336
10 changed files with 71 additions and 124 deletions

View File

@@ -294,7 +294,7 @@ type CGRConfig struct {
routeSCfg *RouteSCfg // RouteS config
sureTaxCfg *SureTaxCfg // SureTax config
dispatcherSCfg *DispatcherSCfg // DispatcherS config
dispatcherHCfg *DispatcherHCfg // DispatcherS config
dispatcherHCfg *DispatcherHCfg // DispatcherH config
loaderCgrCfg *LoaderCgrCfg // LoaderCgr config
migratorCgrCfg *MigratorCgrCfg // MigratorCgr config
mailerCfg *MailerCfg // Mailer config
@@ -664,7 +664,7 @@ func (cfg *CGRConfig) loadDispatcherSCfg(jsnCfg *CgrJsonCfg) (err error) {
return cfg.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherSCfg)
}
// loadDispatcherHCfg loads the DispatcherS section of the configuration
// loadDispatcherHCfg loads the DispatcherH section of the configuration
func (cfg *CGRConfig) loadDispatcherHCfg(jsnCfg *CgrJsonCfg) (err error) {
var jsnDispatcherHCfg *DispatcherHJsonCfg
if jsnDispatcherHCfg, err = jsnCfg.DispatcherHJsonCfg(); err != nil {
@@ -894,7 +894,7 @@ func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg {
return cfg.dispatcherSCfg
}
// DispatcherHCfg returns the config for DispatcherS
// DispatcherHCfg returns the config for DispatcherH
func (cfg *CGRConfig) DispatcherHCfg() *DispatcherHCfg {
cfg.lks[DispatcherSJson].Lock()
defer cfg.lks[DispatcherSJson].Unlock()

View File

@@ -87,7 +87,7 @@ func TestApiersReload(t *testing.T) {
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
time.Sleep(100 * time.Millisecond) //need to switch to gorutine
if !apiSv1.IsRunning() {
t.Errorf("Expected service to be running")
}
@@ -102,7 +102,7 @@ func TestApiersReload(t *testing.T) {
}
cfg.ApierCfg().Enabled = false
cfg.GetReloadChan(config.ApierS) <- struct{}{}
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
if apiSv1.IsRunning() {
t.Errorf("Expected service to be down")
}

View File

@@ -164,7 +164,7 @@ func (apiService *APIerSv1Service) ShouldRun() bool {
return apiService.cfg.ApierCfg().Enabled
}
// GetDMChan returns the DataManager chanel
// GetAPIerSv1Chan returns the DataManager chanel
func (apiService *APIerSv1Service) GetAPIerSv1Chan() chan *v1.APIerSv1 {
apiService.RLock()
defer apiService.RUnlock()

View File

@@ -49,8 +49,9 @@ func TestAttributeSReload(t *testing.T) {
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
attrRPC := make(chan rpcclient.ClientConnector, 1)
attrS := NewAttributeService(cfg, db,
chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1),
chS, filterSChan, server, attrRPC,
)
engine.NewConnManager(cfg, nil)
srvMngr.AddServices(attrS,
@@ -74,7 +75,12 @@ func TestAttributeSReload(t *testing.T) {
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
select {
case d := <-attrRPC:
attrRPC <- d
case <-time.After(time.Second):
t.Fatal("It took to long to reload the cache")
}
if !attrS.IsRunning() {
t.Errorf("Expected service to be running")
}

View File

@@ -69,8 +69,9 @@ func TestCdrsReload(t *testing.T) {
make(chan rpcclient.ClientConnector, 1),
make(chan rpcclient.ClientConnector, 1),
engineShutdown, nil)
cdrsRPC := make(chan rpcclient.ClientConnector, 1)
cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server,
make(chan rpcclient.ClientConnector, 1),
cdrsRPC,
nil)
srvMngr.AddServices(cdrS, ralS, schS, chrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown,
@@ -97,7 +98,12 @@ func TestCdrsReload(t *testing.T) {
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
select {
case d := <-cdrsRPC:
cdrsRPC <- d
case <-time.After(time.Second):
t.Fatal("It took to long to reload the cache")
}
if !cdrS.IsRunning() {
t.Errorf("Expected service to be running")
}

View File

@@ -162,7 +162,7 @@ func (db *DataDBService) needsConnectionReload() bool {
// GetDMChan returns the DataManager chanel
func (db *DataDBService) GetDMChan() chan *engine.DataManager {
db.RLock()
defer db.RUnlock()
db.Lock()
defer db.Unlock()
return db.dbchan
}

View File

@@ -176,7 +176,6 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) {
}
func (srvMngr *ServiceManager) handleReload() {
var err error
for {
select {
case ext := <-srvMngr.engineShutdown:
@@ -192,122 +191,66 @@ func (srvMngr *ServiceManager) handleReload() {
}
return
case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN):
if err = srvMngr.reloadService(utils.AttributeS); err != nil {
return
}
go srvMngr.reloadService(utils.AttributeS)
case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSCfgJson):
if err = srvMngr.reloadService(utils.ChargerS); err != nil {
return
}
go srvMngr.reloadService(utils.ChargerS)
case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON):
if err = srvMngr.reloadService(utils.ThresholdS); err != nil {
return
}
go srvMngr.reloadService(utils.ThresholdS)
case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON):
if err = srvMngr.reloadService(utils.StatS); err != nil {
return
}
go srvMngr.reloadService(utils.StatS)
case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON):
if err = srvMngr.reloadService(utils.ResourceS); err != nil {
return
}
go srvMngr.reloadService(utils.ResourceS)
case <-srvMngr.GetConfig().GetReloadChan(config.RouteSJson):
if err = srvMngr.reloadService(utils.RouteS); err != nil {
return
}
go srvMngr.reloadService(utils.RouteS)
case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN):
if err = srvMngr.reloadService(utils.SchedulerS); err != nil {
return
}
go srvMngr.reloadService(utils.SchedulerS)
case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN):
if err = srvMngr.reloadService(utils.RALService); err != nil {
return
}
go srvMngr.reloadService(utils.RALService)
case <-srvMngr.GetConfig().GetReloadChan(config.ApierS):
if err = srvMngr.reloadService(utils.APIerSv1); err != nil {
return
}
if err = srvMngr.reloadService(utils.APIerSv2); err != nil {
return
}
go func() {
srvMngr.reloadService(utils.APIerSv1)
srvMngr.reloadService(utils.APIerSv2)
}()
case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN):
if err = srvMngr.reloadService(utils.CDRServer); err != nil {
return
}
go srvMngr.reloadService(utils.CDRServer)
case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson):
if err = srvMngr.reloadService(utils.SessionS); err != nil {
return
}
go srvMngr.reloadService(utils.SessionS)
case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson):
if err = srvMngr.reloadService(utils.ERs); err != nil {
return
}
go srvMngr.reloadService(utils.ERs)
case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson):
if err = srvMngr.reloadService(utils.DNSAgent); err != nil {
return
}
go srvMngr.reloadService(utils.DNSAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSN):
if err = srvMngr.reloadService(utils.FreeSWITCHAgent); err != nil {
return
}
go srvMngr.reloadService(utils.FreeSWITCHAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN):
if err = srvMngr.reloadService(utils.KamailioAgent); err != nil {
return
}
go srvMngr.reloadService(utils.KamailioAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSN):
if err = srvMngr.reloadService(utils.AsteriskAgent); err != nil {
return
}
go srvMngr.reloadService(utils.AsteriskAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.RA_JSN):
if err = srvMngr.reloadService(utils.RadiusAgent); err != nil {
return
}
go srvMngr.reloadService(utils.RadiusAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.DA_JSN):
if err = srvMngr.reloadService(utils.DiameterAgent); err != nil {
return
}
go srvMngr.reloadService(utils.DiameterAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson):
if err = srvMngr.reloadService(utils.HTTPAgent); err != nil {
return
}
go srvMngr.reloadService(utils.HTTPAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.LoaderJson):
if err = srvMngr.reloadService(utils.LoaderS); err != nil {
return
}
go srvMngr.reloadService(utils.LoaderS)
case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerCfgJson):
if err = srvMngr.reloadService(utils.AnalyzerS); err != nil {
return
}
go srvMngr.reloadService(utils.AnalyzerS)
case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherSJson):
if err = srvMngr.reloadService(utils.DispatcherS); err != nil {
return
}
go srvMngr.reloadService(utils.DispatcherS)
case <-srvMngr.GetConfig().GetReloadChan(config.DATADB_JSN):
if err = srvMngr.reloadService(utils.DataDB); err != nil {
return
}
go srvMngr.reloadService(utils.DataDB)
case <-srvMngr.GetConfig().GetReloadChan(config.STORDB_JSN):
if err = srvMngr.reloadService(utils.StorDB); err != nil {
return
}
go srvMngr.reloadService(utils.StorDB)
case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson):
if err = srvMngr.reloadService(utils.EventExporterS); err != nil {
return
}
go srvMngr.reloadService(utils.EventExporterS)
case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson):
if err = srvMngr.reloadService(utils.RateS); err != nil {
return
}
go srvMngr.reloadService(utils.RateS)
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName):
engine.Cache.Clear([]string{utils.CacheRPCConnections})
case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson):
if err = srvMngr.reloadService(utils.SIPAgent); err != nil {
return
}
go srvMngr.reloadService(utils.SIPAgent)
case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson):
if err = srvMngr.reloadService(utils.DispatcherH); err != nil {
return
}
go srvMngr.reloadService(utils.DispatcherH)
}
// handle RPC server
}

View File

@@ -182,9 +182,9 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e
return
}
//check if we have suppliersLimit in event and in case it has add it in args
limitIface, hasRoutesLimit := ev[RoutesLimit]
limitIface, hasRoutesLimit := ev[OptsRoutesLimit]
if hasRoutesLimit {
delete(ev, RoutesLimit)
delete(ev, OptsRoutesLimit)
var limit int64
if limit, err = IfaceAsInt64(limitIface); err != nil {
return
@@ -194,11 +194,11 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e
}
}
//check if we have offset in event and in case it has add it in args
offsetIface, hasRoutesOffset := ev[RoutesOffset]
offsetIface, hasRoutesOffset := ev[OptsRoutesOffset]
if !hasRoutesOffset {
return
}
delete(ev, RoutesOffset)
delete(ev, OptsRoutesOffset)
var offset int64
if offset, err = IfaceAsInt64(offsetIface); err != nil {
return

View File

@@ -303,8 +303,8 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
}
//normal check
opts = map[string]interface{}{
RoutesLimit: 18,
RoutesOffset: 20,
OptsRoutesLimit: 18,
OptsRoutesOffset: 20,
}
eOut = Paginator{
@@ -316,9 +316,9 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
t.Error(err)
}
//check if *routes_limit and *routes_offset was deleted
if _, has := opts[RoutesLimit]; has {
if _, has := opts[OptsRoutesLimit]; has {
t.Errorf("*routes_limit wasn't deleted")
} else if _, has := opts[RoutesOffset]; has {
} else if _, has := opts[OptsRoutesOffset]; has {
t.Errorf("*routes_offset wasn't deleted")
}
if !reflect.DeepEqual(eOut, rcv) {
@@ -326,7 +326,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
}
//check without *routes_limit, but with *routes_offset
opts = map[string]interface{}{
RoutesOffset: 20,
OptsRoutesOffset: 20,
}
eOut = Paginator{
@@ -337,9 +337,9 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
t.Error(err)
}
//check if *routes_limit and *routes_offset was deleted
if _, has := opts[RoutesLimit]; has {
if _, has := opts[OptsRoutesLimit]; has {
t.Errorf("*routes_limit wasn't deleted")
} else if _, has := opts[RoutesOffset]; has {
} else if _, has := opts[OptsRoutesOffset]; has {
t.Errorf("*routes_offset wasn't deleted")
}
if !reflect.DeepEqual(eOut, rcv) {
@@ -347,7 +347,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
}
//check with notAnInt at *routes_limit
opts = map[string]interface{}{
RoutesLimit: "Not an int",
OptsRoutesLimit: "Not an int",
}
eOut = Paginator{}
rcv, err = GetRoutePaginatorFromOpts(opts)
@@ -359,7 +359,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) {
}
//check with notAnInt at and *routes_offset
opts = map[string]interface{}{
RoutesOffset: "Not an int",
OptsRoutesOffset: "Not an int",
}
eOut = Paginator{}
rcv, err = GetRoutePaginatorFromOpts(opts)

View File

@@ -762,8 +762,6 @@ const (
Preference = "Preference"
Flags = "Flags"
Service = "Service"
MetaRoutesLimit = "*routes_limit"
MetaRoutesOffset = "*routes_offset"
ApierV = "ApierV"
MetaApier = "*apier"
MetaAnalyzer = "*analyzer"
@@ -2300,15 +2298,7 @@ var CGROptionsSet = NewStringSet([]string{OptsRatesStartTime, OptsSessionTTL, Op
OptsSessionTTLLastUsed, OptsSessionTTLLastUsage, OptsSessionTTLUsage, OptsDebitInterval, OptsStirATest,
OptsStirPayloadMaxDuration, OptsStirIdentity, OptsStirOriginatorTn, OptsStirOriginatorURI,
OptsStirDestinationTn, OptsStirDestinationURI, OptsStirPublicKeyPath, OptsStirPrivateKeyPath,
OptsAPIKey, OptsRouteID, OptsContext, OptsAttributesProcessRuns})
// SessionS ProccessEvent posible options
const (
// SupplierS
RoutesLimit = "RoutesLimit"
RoutesOffset = "RoutesOffset"
)
OptsAPIKey, OptsRouteID, OptsContext, OptsAttributesProcessRuns, OptsRoutesLimit, OptsRoutesOffset})
// EventExporter metrics
const (
@@ -2329,6 +2319,8 @@ const (
// Event Opts
const (
OptsRoutesLimit = "*routes_limit"
OptsRoutesOffset = "*routes_offset"
OptsRatesStartTime = "*ratesStartTime"
OptsSessionTTL = "*sessionTTL"
OptsSessionTTLMaxDelay = "*sessionTTLMaxDelay"