diff --git a/config/config.go b/config/config.go index 251f1625b..f34189bad 100755 --- a/config/config.go +++ b/config/config.go @@ -1516,6 +1516,8 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) { case AnalyzerCfgJson: case ApierS: cfg.rldChans[ApierS] <- struct{}{} + case EEsJson: + cfg.rldChans[EEsJson] <- struct{}{} case SIPAgentJson: cfg.rldChans[SIPAgentJson] <- struct{}{} } diff --git a/config/slicedp.go b/config/slicedp.go index c39b982fb..04b36287a 100644 --- a/config/slicedp.go +++ b/config/slicedp.go @@ -27,19 +27,19 @@ import ( ) // NewSliceDP constructs a utils.DataProvider -func NewSliceDP(record []string, headers map[string]int) (dP utils.DataProvider) { +func NewSliceDP(record []string, indxAls map[string]int) (dP utils.DataProvider) { return &SliceDP{ - req: record, - cache: utils.MapStorage{}, - hdrs: headers, + req: record, + cache: utils.MapStorage{}, + idxAls: indxAls, } } // SliceDP implements engine.utils.DataProvider so we can pass it to filters type SliceDP struct { - req []string - cache utils.MapStorage - hdrs map[string]int + req []string + cache utils.MapStorage + idxAls map[string]int // aliases for indexes } // String is part of engine.utils.DataProvider interface @@ -88,10 +88,11 @@ func (cP *SliceDP) RemoteHost() net.Addr { return utils.LocalAddr() } +// getIndex returns the index from index alias map or if not found try to convert it to int func (cP *SliceDP) getIndex(idx string) (fieldIdx int, err error) { - if cP.hdrs != nil { + if cP.idxAls != nil { var has bool - if fieldIdx, has = cP.hdrs[idx]; has { + if fieldIdx, has = cP.idxAls[idx]; has { return } } diff --git a/ers/filecsv.go b/ers/filecsv.go index 206535520..7afdf710c 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -136,7 +136,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { if len(rdr.Config().FieldSep) > 0 { csvReader.Comma = rune(rdr.Config().FieldSep[0]) } - var headers map[string]int + var indxAls map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() @@ -153,16 +153,16 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) { strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) { record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) // map the templates - headers = make(map[string]int) + indxAls = make(map[string]int) for i, hdr := range record { - headers[hdr] = i + indxAls[hdr] = i } continue } rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( - config.NewSliceDP(record, headers), reqVars, + config.NewSliceDP(record, indxAls), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/ers/partial_csv.go b/ers/partial_csv.go index d89a27e6b..a02811f98 100644 --- a/ers/partial_csv.go +++ b/ers/partial_csv.go @@ -155,7 +155,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { csvReader.Comma = rune(rdr.Config().FieldSep[0]) } csvReader.Comment = '#' - var headers map[string]int + var indxAls map[string]int rowNr := 0 // This counts the rows in the file, not really number of CDRs evsPosted := 0 timeStart := time.Now() @@ -172,15 +172,15 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) { strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) { record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) // map the templates - headers = make(map[string]int) + indxAls = make(map[string]int) for i, hdr := range record { - headers[hdr] = i + indxAls[hdr] = i } continue } rowNr++ // increment the rowNr after checking if it's not the end of file agReq := agents.NewAgentRequest( - config.NewSliceDP(record, headers), reqVars, + config.NewSliceDP(record, indxAls), reqVars, nil, nil, nil, rdr.Config().Tenant, rdr.cgrCfg.GeneralCfg().DefaultTenant, utils.FirstNonEmpty(rdr.Config().Timezone, diff --git a/rates/rates.go b/rates/rates.go index 84c5b95e0..90389db36 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -49,12 +49,11 @@ func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err e case e := <-exitChan: // global exit rS.Shutdown() exitChan <- e // put back for the others listening for shutdown request - break + return case rld := <-cfgRld: // configuration was reloaded cfgRld <- rld } } - return } // Shutdown is called to shutdown the service diff --git a/services/ees.go b/services/ees.go index d0b8ee7e2..60d4dc87c 100644 --- a/services/ees.go +++ b/services/ees.go @@ -19,6 +19,7 @@ along with this program. If not, see package services import ( + "fmt" "sync" v1 "github.com/cgrates/cgrates/apier/v1" @@ -87,6 +88,7 @@ func (es *EventExporterService) Reload() (err error) { // Shutdown stops the service func (es *EventExporterService) Shutdown() (err error) { es.Lock() + defer es.Unlock() if err = es.eeS.Shutdown(); err != nil { return @@ -105,6 +107,8 @@ func (es *EventExporterService) Start() (err error) { fltrS := <-es.filterSChan es.filterSChan <- fltrS + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS)) + es.Lock() es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) es.Unlock() @@ -113,5 +117,11 @@ func (es *EventExporterService) Start() (err error) { es.server.RpcRegister(es.rpc) } es.intConnChan <- es.eeS - return es.eeS.ListenAndServe(es.exitChan, es.rldChan) + go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) { + if err := eeS.ListenAndServe(exitChan, rldChan); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error())) + exitChan <- true + } + }(es.eeS, es.exitChan, es.rldChan) + return } diff --git a/services/ees_it_test.go b/services/ees_it_test.go new file mode 100644 index 000000000..0ed79bf26 --- /dev/null +++ b/services/ees_it_test.go @@ -0,0 +1,93 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "os" + "path" + "testing" + "time" + + "github.com/cgrates/rpcclient" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +func TestEventExporterSReload(t *testing.T) { + for _, dir := range []string{"/tmp/testCSV", "/tmp/testComposedCSV"} { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + cfg.AttributeSCfg().Enabled = true + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + db := NewDataDBService(cfg, nil) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) + close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) + attrS := NewAttributeService(cfg, db, + chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), + ) + ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + srvMngr.AddServices(ees, attrS, + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if ees.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfigFromPath(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "ees"), + Section: config.EEsJson, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !ees.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.EEsCfg().Enabled = false + cfg.GetReloadChan(config.EEsJson) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if ees.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 52cbd73a1..dd2dfda7a 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -292,7 +292,7 @@ func (srvMngr *ServiceManager) handleReload() { return } case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson): - if err = srvMngr.reloadService(config.EEsJson); err != nil { + if err = srvMngr.reloadService(utils.EventExporterS); err != nil { return } case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson):