Add precache support for exporters

Additional changes:
- removed unnecessary locking when initializing the exporter
  cache map, as the service itself is responsible for locking.
- separated setupCache method into ClearExporterCache and
  SetupExporterCache methods.
- removed idle ListenAndServe function that was only waiting for the
  stopChan to close. The reload case was unreachable due to the reload
  channel being created in Start instead of being passed down.
- removed Shutdown method on EventExporterS and replaced it with the
  exported ClearExporterCache method as it provided the same
  functionality.
This commit is contained in:
ionutboangiu
2024-07-04 11:25:32 +03:00
committed by Dan Christian Bogos
parent 4260852918
commit 520451be7e
5 changed files with 166 additions and 164 deletions

View File

@@ -38,17 +38,18 @@ func onCacheEvicted(_ string, value any) {
ee.Close()
}
// NewEventExporterS instantiates the EventExporterS
// NewEventExporterS initializes a new EventExporterS.
func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS,
connMgr *engine.ConnManager) (eeS *EventExporterS) {
eeS = &EventExporterS{
connMgr *engine.ConnManager) (*EventExporterS, error) {
eeS := &EventExporterS{
cfg: cfg,
filterS: filterS,
connMgr: connMgr,
eesChs: make(map[string]*ltcache.Cache),
}
eeS.setupCache(cfg.EEsNoLksCfg().Cache)
return
if err := eeS.SetupExporterCache(); err != nil {
return nil, fmt.Errorf("failed to set up exporter cache: %v", err)
}
return eeS, nil
}
// EventExporterS is managing the EventExporters
@@ -57,29 +58,8 @@ type EventExporterS struct {
filterS *engine.FilterS
connMgr *engine.ConnManager
eesChs map[string]*ltcache.Cache // map[eeType]*ltcache.Cache
eesMux sync.RWMutex // protects the eesChs
}
// ListenAndServe keeps the service alive
func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) {
for {
select {
case <-stopChan: // global exit
return
case rld := <-cfgRld: // configuration was reloaded, destroy the cache
cfgRld <- rld
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
utils.EEs))
eeS.setupCache(eeS.cfg.EEsCfg().Cache)
}
}
}
// Shutdown is called to shutdown the service
func (eeS *EventExporterS) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs))
eeS.setupCache(nil) // cleanup exporters
exporterCache map[string]*ltcache.Cache // map[eeType]*ltcache.Cache
mu sync.RWMutex // protects exporterCache
}
// Call implements birpc.ClientConnector interface for internal RPC
@@ -87,21 +67,44 @@ func (eeS *EventExporterS) Call(ctx *context.Context, serviceMethod string, args
return utils.RPCCall(eeS, serviceMethod, args, reply)
}
// setupCache deals with cleanup and initialization of the cache of EventExporters
func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) {
eeS.eesMux.Lock()
for chID, ch := range eeS.eesChs { // cleanup
// ClearExporterCache clears the cache of EventExporters.
func (eeS *EventExporterS) ClearExporterCache() {
eeS.mu.Lock()
defer eeS.mu.Unlock()
for chID, ch := range eeS.exporterCache {
ch.Clear()
delete(eeS.eesChs, chID)
delete(eeS.exporterCache, chID)
}
for chID, chCfg := range chCfgs { // init
if chCfg.Limit == 0 { // cache is disabled, will not create
continue
}
// SetupExporterCache initializes the cache for EventExporters.
func (eeS *EventExporterS) SetupExporterCache() error {
expCache := make(map[string]*ltcache.Cache)
eesCfg := eeS.cfg.EEsNoLksCfg()
// Initialize cache.
for chID, chCfg := range eesCfg.Cache {
if chCfg.Limit == 0 {
continue // skip if caching is disabled
}
expCache[chID] = ltcache.NewCache(chCfg.Limit, chCfg.TTL, chCfg.StaticTTL, onCacheEvicted)
// Precache exporters if required.
if chCfg.Precache {
for _, expCfg := range eesCfg.Exporters {
if expCfg.Type == chID {
ee, err := NewEventExporter(expCfg, eeS.cfg, eeS.filterS, eeS.connMgr)
if err != nil {
return fmt.Errorf("precache: failed to init EventExporter %q: %v", expCfg.ID, err)
}
expCache[chID].Set(expCfg.ID, ee, nil)
}
}
}
eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit,
chCfg.TTL, chCfg.StaticTTL, onCacheEvicted)
}
eeS.eesMux.Unlock()
eeS.exporterCache = expCache
return nil
}
func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (*utils.CGREvent, error) {
@@ -167,7 +170,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
var metricMapLock sync.RWMutex
metricsMap := make(map[string]utils.MapStorage)
_, hasVerbose := cgrEv.APIOpts[utils.OptsEEsVerbose]
for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
for _, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters {
if eeCfg.Type == utils.MetaNone || // ignore *none type exporter
(lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) {
continue
@@ -201,9 +204,9 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
}
}
eeS.eesMux.RLock()
eeCache, hasCache := eeS.eesChs[eeCfg.Type]
eeS.eesMux.RUnlock()
eeS.mu.RLock()
eeCache, hasCache := eeS.exporterCache[eeCfg.Type]
eeS.mu.RUnlock()
var isCached bool
var ee EventExporter
if hasCache {
@@ -214,11 +217,11 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
}
if !isCached {
if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil {
if ee, err = NewEventExporter(eeCfg, eeS.cfg, eeS.filterS, eeS.connMgr); err != nil {
return
}
if hasCache {
eeS.eesMux.Lock()
eeS.mu.Lock()
if _, has := eeCache.Get(eeCfg.ID); !has {
eeCache.Set(eeCfg.ID, ee, nil)
} else {
@@ -226,7 +229,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG
// the meantime. Mark this instance to be closed after the export.
hasCache = false
}
eeS.eesMux.Unlock()
eeS.mu.Unlock()
}
}

View File

@@ -36,50 +36,15 @@ import (
"github.com/cgrates/rpcclient"
)
func TestListenAndServe(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.EEsCfg().Cache = make(map[string]*config.CacheParamCfg)
cfg.EEsCfg().Cache = map[string]*config.CacheParamCfg{
utils.MetaFileCSV: {
Limit: -1,
TTL: 5 * time.Second,
},
utils.MetaNone: {
Limit: 0,
},
}
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
stopChan := make(chan struct{}, 1)
cfgRld := make(chan struct{}, 1)
cfgRld <- struct{}{}
go func() {
stopChan <- struct{}{}
}()
var err error
utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
if err != nil {
t.Error(err)
}
utils.Logger.SetLogLevel(6)
logBuf := new(bytes.Buffer)
log.SetOutput(logBuf)
eeS.ListenAndServe(stopChan, cfgRld)
logExpect := "[INFO] <EEs> reloading configuration internals."
if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) {
t.Errorf("Expected %q but received %q", logExpect, rcv)
}
logBuf.Reset()
}
func TestCall(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
errExpect := "UNSUPPORTED_SERVICE_METHOD"
if err := eeS.Call(context.Background(), "test", 24532, 43643); err == nil || err.Error() != errExpect {
t.Errorf("Expected %q but received %q", errExpect, err)
@@ -127,7 +92,10 @@ func TestAttrSProcessEvent(t *testing.T) {
connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn,
})
eeS := NewEventExporterS(cfg, filterS, connMgr)
eeS, err := NewEventExporterS(cfg, filterS, connMgr)
if err != nil {
t.Fatal(err)
}
// cgrEv := &utils.CGREvent{}
exp := &utils.CGREvent{Event: map[string]any{"testcase": 1}}
if rplyEv, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil {
@@ -156,7 +124,10 @@ func TestAttrSProcessEvent2(t *testing.T) {
connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn,
})
eeS := NewEventExporterS(cfg, filterS, connMgr)
eeS, err := NewEventExporterS(cfg, filterS, connMgr)
if err != nil {
t.Fatal(err)
}
cgrEv := &utils.CGREvent{
APIOpts: make(map[string]any),
}
@@ -177,7 +148,10 @@ func TestV1ProcessEvent(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -227,7 +201,10 @@ func TestV1ProcessEvent2(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -267,7 +244,10 @@ func TestV1ProcessEvent3(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -292,8 +272,11 @@ func TestV1ProcessEvent4(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS.eesChs = map[string]*ltcache.Cache{
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
eeS.exporterCache = map[string]*ltcache.Cache{
utils.MetaHTTPPost: ltcache.NewCache(1,
time.Second, false, onCacheEvicted),
}
@@ -301,7 +284,7 @@ func TestV1ProcessEvent4(t *testing.T) {
if err != nil {
t.Error(err)
}
eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"})
eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"})
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -358,12 +341,15 @@ func TestV1ProcessEventMockMetrics(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS.eesChs = map[string]*ltcache.Cache{
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
eeS.exporterCache = map[string]*ltcache.Cache{
utils.MetaHTTPPost: ltcache.NewCache(1,
time.Second, false, onCacheEvicted),
}
eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"})
eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"})
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -408,7 +394,10 @@ func TestV1ProcessEvent5(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
var rply map[string]map[string]any
errExpect := "unsupported exporter type: <invalid_type>"
if err := eeS.V1ProcessEvent(context.Background(), cgrEv, &rply); err == nil || err.Error() != errExpect {
@@ -423,7 +412,10 @@ func TestV1ProcessEvent6(t *testing.T) {
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
eeS, err := NewEventExporterS(cfg, filterS, nil)
if err != nil {
t.Fatal(err)
}
cgrEv := &engine.CGREventWithEeIDs{
EeIDs: []string{"SQLExporterFull"},
CGREvent: &utils.CGREvent{
@@ -459,21 +451,6 @@ func TestOnCacheEvicted(t *testing.T) {
}
bufLog.Reset()
}
func TestShutdown(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items)
newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil)
filterS := engine.NewFilterS(cfg, nil, newDM)
eeS := NewEventExporterS(cfg, filterS, nil)
logBuf := new(bytes.Buffer)
log.SetOutput(logBuf)
eeS.Shutdown()
logExpect := "[INFO] <CoreS> shutdown <EEs>"
if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) {
t.Errorf("Expected %q but received %q", logExpect, rcv)
}
logBuf.Reset()
}
func TestUpdateEEMetrics(t *testing.T) {
dc, _ := newEEMetrics(utils.EmptyString)

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"testing"
"time"
@@ -64,6 +65,9 @@ func TestKafkaSSL(t *testing.T) {
"ees": {
"enabled": true,
// "cache": {
// "*kafka_json_map": {"limit": -1, "ttl": "5s", "precache": false},
// },
"exporters": [
{
"id": "kafka_ssl",
@@ -135,32 +139,52 @@ func TestKafkaSSL(t *testing.T) {
// export event to cgrates-cdrs topic, then the reader will consume it and
// export it to the 'processed' topic
t.Run("export kafka event", func(t *testing.T) {
n := 1
var wg sync.WaitGroup
wg.Add(n)
var reply map[string]map[string]interface{}
if err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
&engine.CGREventWithEeIDs{
EeIDs: []string{"kafka_ssl"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "KafkaEvent",
Event: map[string]interface{}{
utils.ToR: utils.MetaVoice,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
},
},
}, &reply); err != nil {
t.Error(err)
for range n {
go func() {
defer wg.Done()
if err := client.Call(context.Background(), utils.EeSv1ProcessEvent,
&engine.CGREventWithEeIDs{
EeIDs: []string{"kafka_ssl"},
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "KafkaEvent",
Event: map[string]interface{}{
utils.ToR: utils.MetaVoice,
utils.OriginID: "abcdef",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.MetaRated,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.AccountField: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Unix(1383813745, 0).UTC(),
utils.AnswerTime: time.Unix(1383813748, 0).UTC(),
utils.Usage: 10 * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 1.01,
},
},
}, &reply); err != nil {
t.Error(err)
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Errorf("timed out waiting for %s replies", utils.EeSv1ProcessEvent)
}
})

View File

@@ -33,16 +33,14 @@ import (
// NewEventExporterService constructs EventExporterService
func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager, server *cores.Server,
intConnChan chan birpc.ClientConnector, anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
connMgr *engine.ConnManager, server *cores.Server, intConnChan chan birpc.ClientConnector,
anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &EventExporterService{
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
server: server,
intConnChan: intConnChan,
rldChan: make(chan struct{}),
anz: anz,
srvDep: srvDep,
}
@@ -50,15 +48,13 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil
// EventExporterService is the service structure for EventExporterS
type EventExporterService struct {
sync.RWMutex
mu sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
connMgr *engine.ConnManager
server *cores.Server
intConnChan chan birpc.ClientConnector
rldChan chan struct{}
stopChan chan struct{}
eeS *ees.EventExporterS
anz *AnalyzerService
@@ -77,26 +73,28 @@ func (es *EventExporterService) ShouldRun() (should bool) {
// IsRunning returns if the service is running
func (es *EventExporterService) IsRunning() bool {
es.RLock()
defer es.RUnlock()
es.mu.RLock()
defer es.mu.RUnlock()
return es.eeS != nil
}
// Reload handles the change of config
func (es *EventExporterService) Reload() (err error) {
es.rldChan <- struct{}{}
return // for the momment nothing to reload
func (es *EventExporterService) Reload() error {
es.mu.Lock()
defer es.mu.Unlock()
es.eeS.ClearExporterCache()
return es.eeS.SetupExporterCache()
}
// Shutdown stops the service
func (es *EventExporterService) Shutdown() (err error) {
es.Lock()
defer es.Unlock()
close(es.stopChan)
es.eeS.Shutdown()
func (es *EventExporterService) Shutdown() error {
es.mu.Lock()
defer es.mu.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs))
es.eeS.ClearExporterCache()
es.eeS = nil
<-es.intConnChan
return
return nil
}
// Start should handle the service start
@@ -110,12 +108,14 @@ func (es *EventExporterService) Start() error {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs))
es.Lock()
defer es.Unlock()
es.mu.Lock()
defer es.mu.Unlock()
es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
es.stopChan = make(chan struct{})
go es.eeS.ListenAndServe(es.stopChan, es.rldChan)
var err error
es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr)
if err != nil {
return err
}
srv, err := engine.NewServiceWithName(es.eeS, utils.EeS, true)
if err != nil {

View File

@@ -54,9 +54,7 @@ func TestEventExporterSCoverage(t *testing.T) {
intConnChan: make(chan birpc.ClientConnector, 1),
anz: anz,
srvDep: srvDep,
rldChan: make(chan struct{}, 1),
eeS: &ees.EventExporterS{},
stopChan: make(chan struct{}, 1),
}
if !srv2.IsRunning() {
t.Errorf("Expected service to be running")