diff --git a/config/config.go b/config/config.go index dd3be7090..b0e3e9875 100755 --- a/config/config.go +++ b/config/config.go @@ -294,7 +294,7 @@ type CGRConfig struct { routeSCfg *RouteSCfg // RouteS config sureTaxCfg *SureTaxCfg // SureTax config dispatcherSCfg *DispatcherSCfg // DispatcherS config - dispatcherHCfg *DispatcherHCfg // DispatcherS config + dispatcherHCfg *DispatcherHCfg // DispatcherH config loaderCgrCfg *LoaderCgrCfg // LoaderCgr config migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config @@ -664,7 +664,7 @@ func (cfg *CGRConfig) loadDispatcherSCfg(jsnCfg *CgrJsonCfg) (err error) { return cfg.dispatcherSCfg.loadFromJsonCfg(jsnDispatcherSCfg) } -// loadDispatcherHCfg loads the DispatcherS section of the configuration +// loadDispatcherHCfg loads the DispatcherH section of the configuration func (cfg *CGRConfig) loadDispatcherHCfg(jsnCfg *CgrJsonCfg) (err error) { var jsnDispatcherHCfg *DispatcherHJsonCfg if jsnDispatcherHCfg, err = jsnCfg.DispatcherHJsonCfg(); err != nil { @@ -894,7 +894,7 @@ func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg { return cfg.dispatcherSCfg } -// DispatcherHCfg returns the config for DispatcherS +// DispatcherHCfg returns the config for DispatcherH func (cfg *CGRConfig) DispatcherHCfg() *DispatcherHCfg { cfg.lks[DispatcherSJson].Lock() defer cfg.lks[DispatcherSJson].Unlock() diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index b81c44f2f..e21940d1f 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -87,7 +87,7 @@ func TestApiersReload(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expecting OK ,received %s", reply) } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine + time.Sleep(100 * time.Millisecond) //need to switch to gorutine if !apiSv1.IsRunning() { t.Errorf("Expected service to be running") } @@ -102,7 +102,7 @@ func TestApiersReload(t *testing.T) { } cfg.ApierCfg().Enabled = false cfg.GetReloadChan(config.ApierS) <- struct{}{} - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) if apiSv1.IsRunning() { t.Errorf("Expected service to be down") } diff --git a/services/apierv1.go b/services/apierv1.go index e5b285139..f4fe62cb4 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -164,7 +164,7 @@ func (apiService *APIerSv1Service) ShouldRun() bool { return apiService.cfg.ApierCfg().Enabled } -// GetDMChan returns the DataManager chanel +// GetAPIerSv1Chan returns the DataManager chanel func (apiService *APIerSv1Service) GetAPIerSv1Chan() chan *v1.APIerSv1 { apiService.RLock() defer apiService.RUnlock() diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 9f9994156..8a16b9987 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -49,8 +49,9 @@ func TestAttributeSReload(t *testing.T) { server := utils.NewServer() srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) + attrRPC := make(chan rpcclient.ClientConnector, 1) attrS := NewAttributeService(cfg, db, - chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), + chS, filterSChan, server, attrRPC, ) engine.NewConnManager(cfg, nil) srvMngr.AddServices(attrS, @@ -74,7 +75,12 @@ func TestAttributeSReload(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expecting OK ,received %s", reply) } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine + select { + case d := <-attrRPC: + attrRPC <- d + case <-time.After(time.Second): + t.Fatal("It took to long to reload the cache") + } if !attrS.IsRunning() { t.Errorf("Expected service to be running") } diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 378c1251b..1d1472811 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -69,8 +69,9 @@ func TestCdrsReload(t *testing.T) { make(chan rpcclient.ClientConnector, 1), make(chan rpcclient.ClientConnector, 1), engineShutdown, nil) + cdrsRPC := make(chan rpcclient.ClientConnector, 1) cdrS := NewCDRServer(cfg, db, stordb, filterSChan, server, - make(chan rpcclient.ClientConnector, 1), + cdrsRPC, nil) srvMngr.AddServices(cdrS, ralS, schS, chrS, NewLoaderService(cfg, db, filterSChan, server, engineShutdown, @@ -97,7 +98,12 @@ func TestCdrsReload(t *testing.T) { } else if reply != utils.OK { t.Errorf("Expecting OK ,received %s", reply) } - time.Sleep(10 * time.Millisecond) //need to switch to gorutine + select { + case d := <-cdrsRPC: + cdrsRPC <- d + case <-time.After(time.Second): + t.Fatal("It took to long to reload the cache") + } if !cdrS.IsRunning() { t.Errorf("Expected service to be running") } diff --git a/services/datadb.go b/services/datadb.go index 33a8a3aae..2c7ee2661 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -162,7 +162,7 @@ func (db *DataDBService) needsConnectionReload() bool { // GetDMChan returns the DataManager chanel func (db *DataDBService) GetDMChan() chan *engine.DataManager { - db.RLock() - defer db.RUnlock() + db.Lock() + defer db.Unlock() return db.dbchan } diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 63b3af503..ad9ce7f6f 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -176,7 +176,6 @@ func (srvMngr *ServiceManager) AddServices(services ...Service) { } func (srvMngr *ServiceManager) handleReload() { - var err error for { select { case ext := <-srvMngr.engineShutdown: @@ -192,122 +191,66 @@ func (srvMngr *ServiceManager) handleReload() { } return case <-srvMngr.GetConfig().GetReloadChan(config.ATTRIBUTE_JSN): - if err = srvMngr.reloadService(utils.AttributeS); err != nil { - return - } + go srvMngr.reloadService(utils.AttributeS) case <-srvMngr.GetConfig().GetReloadChan(config.ChargerSCfgJson): - if err = srvMngr.reloadService(utils.ChargerS); err != nil { - return - } + go srvMngr.reloadService(utils.ChargerS) case <-srvMngr.GetConfig().GetReloadChan(config.THRESHOLDS_JSON): - if err = srvMngr.reloadService(utils.ThresholdS); err != nil { - return - } + go srvMngr.reloadService(utils.ThresholdS) case <-srvMngr.GetConfig().GetReloadChan(config.STATS_JSON): - if err = srvMngr.reloadService(utils.StatS); err != nil { - return - } + go srvMngr.reloadService(utils.StatS) case <-srvMngr.GetConfig().GetReloadChan(config.RESOURCES_JSON): - if err = srvMngr.reloadService(utils.ResourceS); err != nil { - return - } + go srvMngr.reloadService(utils.ResourceS) case <-srvMngr.GetConfig().GetReloadChan(config.RouteSJson): - if err = srvMngr.reloadService(utils.RouteS); err != nil { - return - } + go srvMngr.reloadService(utils.RouteS) case <-srvMngr.GetConfig().GetReloadChan(config.SCHEDULER_JSN): - if err = srvMngr.reloadService(utils.SchedulerS); err != nil { - return - } + go srvMngr.reloadService(utils.SchedulerS) case <-srvMngr.GetConfig().GetReloadChan(config.RALS_JSN): - if err = srvMngr.reloadService(utils.RALService); err != nil { - return - } + go srvMngr.reloadService(utils.RALService) case <-srvMngr.GetConfig().GetReloadChan(config.ApierS): - if err = srvMngr.reloadService(utils.APIerSv1); err != nil { - return - } - if err = srvMngr.reloadService(utils.APIerSv2); err != nil { - return - } + go func() { + srvMngr.reloadService(utils.APIerSv1) + srvMngr.reloadService(utils.APIerSv2) + }() case <-srvMngr.GetConfig().GetReloadChan(config.CDRS_JSN): - if err = srvMngr.reloadService(utils.CDRServer); err != nil { - return - } + go srvMngr.reloadService(utils.CDRServer) case <-srvMngr.GetConfig().GetReloadChan(config.SessionSJson): - if err = srvMngr.reloadService(utils.SessionS); err != nil { - return - } + go srvMngr.reloadService(utils.SessionS) case <-srvMngr.GetConfig().GetReloadChan(config.ERsJson): - if err = srvMngr.reloadService(utils.ERs); err != nil { - return - } + go srvMngr.reloadService(utils.ERs) case <-srvMngr.GetConfig().GetReloadChan(config.DNSAgentJson): - if err = srvMngr.reloadService(utils.DNSAgent); err != nil { - return - } + go srvMngr.reloadService(utils.DNSAgent) case <-srvMngr.GetConfig().GetReloadChan(config.FreeSWITCHAgentJSN): - if err = srvMngr.reloadService(utils.FreeSWITCHAgent); err != nil { - return - } + go srvMngr.reloadService(utils.FreeSWITCHAgent) case <-srvMngr.GetConfig().GetReloadChan(config.KamailioAgentJSN): - if err = srvMngr.reloadService(utils.KamailioAgent); err != nil { - return - } + go srvMngr.reloadService(utils.KamailioAgent) case <-srvMngr.GetConfig().GetReloadChan(config.AsteriskAgentJSN): - if err = srvMngr.reloadService(utils.AsteriskAgent); err != nil { - return - } + go srvMngr.reloadService(utils.AsteriskAgent) case <-srvMngr.GetConfig().GetReloadChan(config.RA_JSN): - if err = srvMngr.reloadService(utils.RadiusAgent); err != nil { - return - } + go srvMngr.reloadService(utils.RadiusAgent) case <-srvMngr.GetConfig().GetReloadChan(config.DA_JSN): - if err = srvMngr.reloadService(utils.DiameterAgent); err != nil { - return - } + go srvMngr.reloadService(utils.DiameterAgent) case <-srvMngr.GetConfig().GetReloadChan(config.HttpAgentJson): - if err = srvMngr.reloadService(utils.HTTPAgent); err != nil { - return - } + go srvMngr.reloadService(utils.HTTPAgent) case <-srvMngr.GetConfig().GetReloadChan(config.LoaderJson): - if err = srvMngr.reloadService(utils.LoaderS); err != nil { - return - } + go srvMngr.reloadService(utils.LoaderS) case <-srvMngr.GetConfig().GetReloadChan(config.AnalyzerCfgJson): - if err = srvMngr.reloadService(utils.AnalyzerS); err != nil { - return - } + go srvMngr.reloadService(utils.AnalyzerS) case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherSJson): - if err = srvMngr.reloadService(utils.DispatcherS); err != nil { - return - } + go srvMngr.reloadService(utils.DispatcherS) case <-srvMngr.GetConfig().GetReloadChan(config.DATADB_JSN): - if err = srvMngr.reloadService(utils.DataDB); err != nil { - return - } + go srvMngr.reloadService(utils.DataDB) case <-srvMngr.GetConfig().GetReloadChan(config.STORDB_JSN): - if err = srvMngr.reloadService(utils.StorDB); err != nil { - return - } + go srvMngr.reloadService(utils.StorDB) case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson): - if err = srvMngr.reloadService(utils.EventExporterS); err != nil { - return - } + go srvMngr.reloadService(utils.EventExporterS) case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson): - if err = srvMngr.reloadService(utils.RateS); err != nil { - return - } + go srvMngr.reloadService(utils.RateS) case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName): engine.Cache.Clear([]string{utils.CacheRPCConnections}) case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson): - if err = srvMngr.reloadService(utils.SIPAgent); err != nil { - return - } + go srvMngr.reloadService(utils.SIPAgent) case <-srvMngr.GetConfig().GetReloadChan(config.DispatcherHJson): - if err = srvMngr.reloadService(utils.DispatcherH); err != nil { - return - } + go srvMngr.reloadService(utils.DispatcherH) } // handle RPC server } diff --git a/utils/cgrevent.go b/utils/cgrevent.go index 6eb563310..1d9c083e0 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -182,9 +182,9 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e return } //check if we have suppliersLimit in event and in case it has add it in args - limitIface, hasRoutesLimit := ev[RoutesLimit] + limitIface, hasRoutesLimit := ev[OptsRoutesLimit] if hasRoutesLimit { - delete(ev, RoutesLimit) + delete(ev, OptsRoutesLimit) var limit int64 if limit, err = IfaceAsInt64(limitIface); err != nil { return @@ -194,11 +194,11 @@ func GetRoutePaginatorFromOpts(ev map[string]interface{}) (args Paginator, err e } } //check if we have offset in event and in case it has add it in args - offsetIface, hasRoutesOffset := ev[RoutesOffset] + offsetIface, hasRoutesOffset := ev[OptsRoutesOffset] if !hasRoutesOffset { return } - delete(ev, RoutesOffset) + delete(ev, OptsRoutesOffset) var offset int64 if offset, err = IfaceAsInt64(offsetIface); err != nil { return diff --git a/utils/cgrevent_test.go b/utils/cgrevent_test.go index de9031bbe..c3b5cb516 100644 --- a/utils/cgrevent_test.go +++ b/utils/cgrevent_test.go @@ -303,8 +303,8 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { } //normal check opts = map[string]interface{}{ - RoutesLimit: 18, - RoutesOffset: 20, + OptsRoutesLimit: 18, + OptsRoutesOffset: 20, } eOut = Paginator{ @@ -316,9 +316,9 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { t.Error(err) } //check if *routes_limit and *routes_offset was deleted - if _, has := opts[RoutesLimit]; has { + if _, has := opts[OptsRoutesLimit]; has { t.Errorf("*routes_limit wasn't deleted") - } else if _, has := opts[RoutesOffset]; has { + } else if _, has := opts[OptsRoutesOffset]; has { t.Errorf("*routes_offset wasn't deleted") } if !reflect.DeepEqual(eOut, rcv) { @@ -326,7 +326,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { } //check without *routes_limit, but with *routes_offset opts = map[string]interface{}{ - RoutesOffset: 20, + OptsRoutesOffset: 20, } eOut = Paginator{ @@ -337,9 +337,9 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { t.Error(err) } //check if *routes_limit and *routes_offset was deleted - if _, has := opts[RoutesLimit]; has { + if _, has := opts[OptsRoutesLimit]; has { t.Errorf("*routes_limit wasn't deleted") - } else if _, has := opts[RoutesOffset]; has { + } else if _, has := opts[OptsRoutesOffset]; has { t.Errorf("*routes_offset wasn't deleted") } if !reflect.DeepEqual(eOut, rcv) { @@ -347,7 +347,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { } //check with notAnInt at *routes_limit opts = map[string]interface{}{ - RoutesLimit: "Not an int", + OptsRoutesLimit: "Not an int", } eOut = Paginator{} rcv, err = GetRoutePaginatorFromOpts(opts) @@ -359,7 +359,7 @@ func TestCGREventconsumeRoutePaginator(t *testing.T) { } //check with notAnInt at and *routes_offset opts = map[string]interface{}{ - RoutesOffset: "Not an int", + OptsRoutesOffset: "Not an int", } eOut = Paginator{} rcv, err = GetRoutePaginatorFromOpts(opts) diff --git a/utils/consts.go b/utils/consts.go index 93b265fe6..e364524f8 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -762,8 +762,6 @@ const ( Preference = "Preference" Flags = "Flags" Service = "Service" - MetaRoutesLimit = "*routes_limit" - MetaRoutesOffset = "*routes_offset" ApierV = "ApierV" MetaApier = "*apier" MetaAnalyzer = "*analyzer" @@ -2300,15 +2298,7 @@ var CGROptionsSet = NewStringSet([]string{OptsRatesStartTime, OptsSessionTTL, Op OptsSessionTTLLastUsed, OptsSessionTTLLastUsage, OptsSessionTTLUsage, OptsDebitInterval, OptsStirATest, OptsStirPayloadMaxDuration, OptsStirIdentity, OptsStirOriginatorTn, OptsStirOriginatorURI, OptsStirDestinationTn, OptsStirDestinationURI, OptsStirPublicKeyPath, OptsStirPrivateKeyPath, - OptsAPIKey, OptsRouteID, OptsContext, OptsAttributesProcessRuns}) - -// SessionS ProccessEvent posible options -const ( - - // SupplierS - RoutesLimit = "RoutesLimit" - RoutesOffset = "RoutesOffset" -) + OptsAPIKey, OptsRouteID, OptsContext, OptsAttributesProcessRuns, OptsRoutesLimit, OptsRoutesOffset}) // EventExporter metrics const ( @@ -2329,6 +2319,8 @@ const ( // Event Opts const ( + OptsRoutesLimit = "*routes_limit" + OptsRoutesOffset = "*routes_offset" OptsRatesStartTime = "*ratesStartTime" OptsSessionTTL = "*sessionTTL" OptsSessionTTLMaxDelay = "*sessionTTLMaxDelay"