Updated ees service tests

This commit is contained in:
Trial97
2020-05-29 16:50:49 +03:00
committed by Dan Christian Bogos
parent 336e755d53
commit deb57e190c
8 changed files with 126 additions and 21 deletions

View File

@@ -1516,6 +1516,8 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) {
case AnalyzerCfgJson:
case ApierS:
cfg.rldChans[ApierS] <- struct{}{}
case EEsJson:
cfg.rldChans[EEsJson] <- struct{}{}
case SIPAgentJson:
cfg.rldChans[SIPAgentJson] <- struct{}{}
}

View File

@@ -27,19 +27,19 @@ import (
)
// NewSliceDP constructs a utils.DataProvider
func NewSliceDP(record []string, headers map[string]int) (dP utils.DataProvider) {
func NewSliceDP(record []string, indxAls map[string]int) (dP utils.DataProvider) {
return &SliceDP{
req: record,
cache: utils.MapStorage{},
hdrs: headers,
req: record,
cache: utils.MapStorage{},
idxAls: indxAls,
}
}
// SliceDP implements engine.utils.DataProvider so we can pass it to filters
type SliceDP struct {
req []string
cache utils.MapStorage
hdrs map[string]int
req []string
cache utils.MapStorage
idxAls map[string]int // aliases for indexes
}
// String is part of engine.utils.DataProvider interface
@@ -88,10 +88,11 @@ func (cP *SliceDP) RemoteHost() net.Addr {
return utils.LocalAddr()
}
// getIndex returns the index from index alias map or if not found try to convert it to int
func (cP *SliceDP) getIndex(idx string) (fieldIdx int, err error) {
if cP.hdrs != nil {
if cP.idxAls != nil {
var has bool
if fieldIdx, has = cP.hdrs[idx]; has {
if fieldIdx, has = cP.idxAls[idx]; has {
return
}
}

View File

@@ -136,7 +136,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
if len(rdr.Config().FieldSep) > 0 {
csvReader.Comma = rune(rdr.Config().FieldSep[0])
}
var headers map[string]int
var indxAls map[string]int
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
@@ -153,16 +153,16 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) {
record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar)
// map the templates
headers = make(map[string]int)
indxAls = make(map[string]int)
for i, hdr := range record {
headers[hdr] = i
indxAls[hdr] = i
}
continue
}
rowNr++ // increment the rowNr after checking if it's not the end of file
agReq := agents.NewAgentRequest(
config.NewSliceDP(record, headers), reqVars,
config.NewSliceDP(record, indxAls), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -155,7 +155,7 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
csvReader.Comma = rune(rdr.Config().FieldSep[0])
}
csvReader.Comment = '#'
var headers map[string]int
var indxAls map[string]int
rowNr := 0 // This counts the rows in the file, not really number of CDRs
evsPosted := 0
timeStart := time.Now()
@@ -172,15 +172,15 @@ func (rdr *PartialCSVFileER) processFile(fPath, fName string) (err error) {
strings.HasPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar) {
record[0] = strings.TrimPrefix(record[0], rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx].HeaderDefineChar)
// map the templates
headers = make(map[string]int)
indxAls = make(map[string]int)
for i, hdr := range record {
headers[hdr] = i
indxAls[hdr] = i
}
continue
}
rowNr++ // increment the rowNr after checking if it's not the end of file
agReq := agents.NewAgentRequest(
config.NewSliceDP(record, headers), reqVars,
config.NewSliceDP(record, indxAls), reqVars,
nil, nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,

View File

@@ -49,12 +49,11 @@ func (rS *RateS) ListenAndServe(exitChan chan bool, cfgRld chan struct{}) (err e
case e := <-exitChan: // global exit
rS.Shutdown()
exitChan <- e // put back for the others listening for shutdown request
break
return
case rld := <-cfgRld: // configuration was reloaded
cfgRld <- rld
}
}
return
}
// Shutdown is called to shutdown the service

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package services
import (
"fmt"
"sync"
v1 "github.com/cgrates/cgrates/apier/v1"
@@ -87,6 +88,7 @@ func (es *EventExporterService) Reload() (err error) {
// Shutdown stops the service
func (es *EventExporterService) Shutdown() (err error) {
es.Lock()
defer es.Unlock()
if err = es.eeS.Shutdown(); err != nil {
return
@@ -105,6 +107,8 @@ func (es *EventExporterService) Start() (err error) {
fltrS := <-es.filterSChan
es.filterSChan <- fltrS
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS))
es.Lock()
es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
es.Unlock()
@@ -113,5 +117,11 @@ func (es *EventExporterService) Start() (err error) {
es.server.RpcRegister(es.rpc)
}
es.intConnChan <- es.eeS
return es.eeS.ListenAndServe(es.exitChan, es.rldChan)
go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) {
if err := eeS.ListenAndServe(exitChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error()))
exitChan <- true
}
}(es.eeS, es.exitChan, es.rldChan)
return
}

93
services/ees_it_test.go Normal file
View File

@@ -0,0 +1,93 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"os"
"path"
"testing"
"time"
"github.com/cgrates/rpcclient"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
func TestEventExporterSReload(t *testing.T) {
for _, dir := range []string{"/tmp/testCSV", "/tmp/testComposedCSV"} {
if err := os.RemoveAll(dir); err != nil {
t.Fatal("Error removing folder: ", dir, err)
}
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal("Error creating folder: ", dir, err)
}
}
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
cfg.AttributeSCfg().Enabled = true
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
engineShutdown := make(chan bool, 1)
server := utils.NewServer()
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
db := NewDataDBService(cfg, nil)
chS := engine.NewCacheS(cfg, nil)
close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles))
close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes))
attrS := NewAttributeService(cfg, db,
chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1),
)
ees := NewEventExporterService(cfg, filterSChan, engine.NewConnManager(cfg, nil), server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
srvMngr.AddServices(ees, attrS,
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
if err = srvMngr.StartServices(); err != nil {
t.Error(err)
}
if ees.IsRunning() {
t.Errorf("Expected service to be down")
}
var reply string
if err := cfg.V1ReloadConfigFromPath(&config.ConfigReloadWithArgDispatcher{
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "ees"),
Section: config.EEsJson,
}, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expecting OK ,received %s", reply)
}
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
if !ees.IsRunning() {
t.Errorf("Expected service to be running")
}
cfg.EEsCfg().Enabled = false
cfg.GetReloadChan(config.EEsJson) <- struct{}{}
time.Sleep(10 * time.Millisecond)
if ees.IsRunning() {
t.Errorf("Expected service to be down")
}
engineShutdown <- true
}

View File

@@ -292,7 +292,7 @@ func (srvMngr *ServiceManager) handleReload() {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson):
if err = srvMngr.reloadService(config.EEsJson); err != nil {
if err = srvMngr.reloadService(utils.EventExporterS); err != nil {
return
}
case <-srvMngr.GetConfig().GetReloadChan(config.RateSJson):