diff --git a/ees/ees.go b/ees/ees.go index 990dfd8fb..2a5ff453a 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -38,17 +38,18 @@ func onCacheEvicted(_ string, value any) { ee.Close() } -// NewEventExporterS instantiates the EventExporterS +// NewEventExporterS initializes a new EventExporterS. func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (eeS *EventExporterS) { - eeS = &EventExporterS{ + connMgr *engine.ConnManager) (*EventExporterS, error) { + eeS := &EventExporterS{ cfg: cfg, filterS: filterS, connMgr: connMgr, - eesChs: make(map[string]*ltcache.Cache), } - eeS.setupCache(cfg.EEsNoLksCfg().Cache) - return + if err := eeS.SetupExporterCache(); err != nil { + return nil, fmt.Errorf("failed to set up exporter cache: %v", err) + } + return eeS, nil } // EventExporterS is managing the EventExporters @@ -57,29 +58,8 @@ type EventExporterS struct { filterS *engine.FilterS connMgr *engine.ConnManager - eesChs map[string]*ltcache.Cache // map[eeType]*ltcache.Cache - eesMux sync.RWMutex // protects the eesChs -} - -// ListenAndServe keeps the service alive -func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { - for { - select { - case <-stopChan: // global exit - return - case rld := <-cfgRld: // configuration was reloaded, destroy the cache - cfgRld <- rld - utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", - utils.EEs)) - eeS.setupCache(eeS.cfg.EEsCfg().Cache) - } - } -} - -// Shutdown is called to shutdown the service -func (eeS *EventExporterS) Shutdown() { - utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) - eeS.setupCache(nil) // cleanup exporters + exporterCache map[string]*ltcache.Cache // map[eeType]*ltcache.Cache + mu sync.RWMutex // protects exporterCache } // Call implements birpc.ClientConnector interface for internal RPC @@ -87,21 +67,44 @@ func (eeS *EventExporterS) Call(ctx *context.Context, serviceMethod string, args return utils.RPCCall(eeS, serviceMethod, args, reply) } -// setupCache deals with cleanup and initialization of the cache of EventExporters -func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) { - eeS.eesMux.Lock() - for chID, ch := range eeS.eesChs { // cleanup +// ClearExporterCache clears the cache of EventExporters. +func (eeS *EventExporterS) ClearExporterCache() { + eeS.mu.Lock() + defer eeS.mu.Unlock() + for chID, ch := range eeS.exporterCache { ch.Clear() - delete(eeS.eesChs, chID) + delete(eeS.exporterCache, chID) } - for chID, chCfg := range chCfgs { // init - if chCfg.Limit == 0 { // cache is disabled, will not create - continue +} + +// SetupExporterCache initializes the cache for EventExporters. +func (eeS *EventExporterS) SetupExporterCache() error { + expCache := make(map[string]*ltcache.Cache) + eesCfg := eeS.cfg.EEsNoLksCfg() + + // Initialize cache. + for chID, chCfg := range eesCfg.Cache { + if chCfg.Limit == 0 { + continue // skip if caching is disabled + } + + expCache[chID] = ltcache.NewCache(chCfg.Limit, chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) + + // Precache exporters if required. + if chCfg.Precache { + for _, expCfg := range eesCfg.Exporters { + if expCfg.Type == chID { + ee, err := NewEventExporter(expCfg, eeS.cfg, eeS.filterS, eeS.connMgr) + if err != nil { + return fmt.Errorf("precache: failed to init EventExporter %q: %v", expCfg.ID, err) + } + expCache[chID].Set(expCfg.ID, ee, nil) + } + } } - eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit, - chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) } - eeS.eesMux.Unlock() + eeS.exporterCache = expCache + return nil } func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (*utils.CGREvent, error) { @@ -167,7 +170,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG var metricMapLock sync.RWMutex metricsMap := make(map[string]utils.MapStorage) _, hasVerbose := cgrEv.APIOpts[utils.OptsEEsVerbose] - for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { + for _, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { if eeCfg.Type == utils.MetaNone || // ignore *none type exporter (lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) { continue @@ -201,9 +204,9 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG } } - eeS.eesMux.RLock() - eeCache, hasCache := eeS.eesChs[eeCfg.Type] - eeS.eesMux.RUnlock() + eeS.mu.RLock() + eeCache, hasCache := eeS.exporterCache[eeCfg.Type] + eeS.mu.RUnlock() var isCached bool var ee EventExporter if hasCache { @@ -214,11 +217,11 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG } if !isCached { - if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil { + if ee, err = NewEventExporter(eeCfg, eeS.cfg, eeS.filterS, eeS.connMgr); err != nil { return } if hasCache { - eeS.eesMux.Lock() + eeS.mu.Lock() if _, has := eeCache.Get(eeCfg.ID); !has { eeCache.Set(eeCfg.ID, ee, nil) } else { @@ -226,7 +229,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG // the meantime. Mark this instance to be closed after the export. hasCache = false } - eeS.eesMux.Unlock() + eeS.mu.Unlock() } } diff --git a/ees/ees_test.go b/ees/ees_test.go index 8a7034ab3..4ef91b27e 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -36,50 +36,15 @@ import ( "github.com/cgrates/rpcclient" ) -func TestListenAndServe(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.EEsCfg().Cache = make(map[string]*config.CacheParamCfg) - cfg.EEsCfg().Cache = map[string]*config.CacheParamCfg{ - utils.MetaFileCSV: { - Limit: -1, - TTL: 5 * time.Second, - }, - utils.MetaNone: { - Limit: 0, - }, - } - newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) - newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - stopChan := make(chan struct{}, 1) - cfgRld := make(chan struct{}, 1) - cfgRld <- struct{}{} - go func() { - stopChan <- struct{}{} - }() - var err error - utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) - if err != nil { - t.Error(err) - } - utils.Logger.SetLogLevel(6) - logBuf := new(bytes.Buffer) - log.SetOutput(logBuf) - eeS.ListenAndServe(stopChan, cfgRld) - logExpect := "[INFO] reloading configuration internals." - if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) { - t.Errorf("Expected %q but received %q", logExpect, rcv) - } - logBuf.Reset() -} - func TestCall(t *testing.T) { cfg := config.NewDefaultCGRConfig() newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } errExpect := "UNSUPPORTED_SERVICE_METHOD" if err := eeS.Call(context.Background(), "test", 24532, 43643); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but received %q", errExpect, err) @@ -127,7 +92,10 @@ func TestAttrSProcessEvent(t *testing.T) { connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn, }) - eeS := NewEventExporterS(cfg, filterS, connMgr) + eeS, err := NewEventExporterS(cfg, filterS, connMgr) + if err != nil { + t.Fatal(err) + } // cgrEv := &utils.CGREvent{} exp := &utils.CGREvent{Event: map[string]any{"testcase": 1}} if rplyEv, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { @@ -156,7 +124,10 @@ func TestAttrSProcessEvent2(t *testing.T) { connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn, }) - eeS := NewEventExporterS(cfg, filterS, connMgr) + eeS, err := NewEventExporterS(cfg, filterS, connMgr) + if err != nil { + t.Fatal(err) + } cgrEv := &utils.CGREvent{ APIOpts: make(map[string]any), } @@ -177,7 +148,10 @@ func TestV1ProcessEvent(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -227,7 +201,10 @@ func TestV1ProcessEvent2(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -267,7 +244,10 @@ func TestV1ProcessEvent3(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -292,8 +272,11 @@ func TestV1ProcessEvent4(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - eeS.eesChs = map[string]*ltcache.Cache{ + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } + eeS.exporterCache = map[string]*ltcache.Cache{ utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } @@ -301,7 +284,7 @@ func TestV1ProcessEvent4(t *testing.T) { if err != nil { t.Error(err) } - eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"}) + eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"}) cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -358,12 +341,15 @@ func TestV1ProcessEventMockMetrics(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - eeS.eesChs = map[string]*ltcache.Cache{ + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } + eeS.exporterCache = map[string]*ltcache.Cache{ utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } - eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"}) + eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"}) cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -408,7 +394,10 @@ func TestV1ProcessEvent5(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } var rply map[string]map[string]any errExpect := "unsupported exporter type: " if err := eeS.V1ProcessEvent(context.Background(), cgrEv, &rply); err == nil || err.Error() != errExpect { @@ -423,7 +412,10 @@ func TestV1ProcessEvent6(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -459,21 +451,6 @@ func TestOnCacheEvicted(t *testing.T) { } bufLog.Reset() } -func TestShutdown(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) - newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - logBuf := new(bytes.Buffer) - log.SetOutput(logBuf) - eeS.Shutdown() - logExpect := "[INFO] shutdown " - if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) { - t.Errorf("Expected %q but received %q", logExpect, rcv) - } - logBuf.Reset() -} func TestUpdateEEMetrics(t *testing.T) { dc, _ := newEEMetrics(utils.EmptyString) diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go index 6e26620f4..6c5c0bf50 100644 --- a/general_tests/kafka_ssl_it_test.go +++ b/general_tests/kafka_ssl_it_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "strconv" + "sync" "testing" "time" @@ -64,6 +65,9 @@ func TestKafkaSSL(t *testing.T) { "ees": { "enabled": true, + // "cache": { + // "*kafka_json_map": {"limit": -1, "ttl": "5s", "precache": false}, + // }, "exporters": [ { "id": "kafka_ssl", @@ -135,32 +139,52 @@ func TestKafkaSSL(t *testing.T) { // export event to cgrates-cdrs topic, then the reader will consume it and // export it to the 'processed' topic t.Run("export kafka event", func(t *testing.T) { + n := 1 + var wg sync.WaitGroup + wg.Add(n) + var reply map[string]map[string]interface{} - if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, - &engine.CGREventWithEeIDs{ - EeIDs: []string{"kafka_ssl"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "KafkaEvent", - Event: map[string]interface{}{ - utils.ToR: utils.MetaVoice, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813748, 0).UTC(), - utils.Usage: 10 * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - }, - }, - }, &reply); err != nil { - t.Error(err) + for range n { + go func() { + defer wg.Done() + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &engine.CGREventWithEeIDs{ + EeIDs: []string{"kafka_ssl"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + }, &reply); err != nil { + t.Error(err) + } + }() + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Errorf("timed out waiting for %s replies", utils.EeSv1ProcessEvent) } }) diff --git a/services/ees.go b/services/ees.go index f83f595c6..4b48c27d1 100644 --- a/services/ees.go +++ b/services/ees.go @@ -33,16 +33,14 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, server *cores.Server, - intConnChan chan birpc.ClientConnector, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, server *cores.Server, intConnChan chan birpc.ClientConnector, + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, server: server, intConnChan: intConnChan, - rldChan: make(chan struct{}), anz: anz, srvDep: srvDep, } @@ -50,15 +48,13 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil // EventExporterService is the service structure for EventExporterS type EventExporterService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS connMgr *engine.ConnManager server *cores.Server intConnChan chan birpc.ClientConnector - rldChan chan struct{} - stopChan chan struct{} eeS *ees.EventExporterS anz *AnalyzerService @@ -77,26 +73,28 @@ func (es *EventExporterService) ShouldRun() (should bool) { // IsRunning returns if the service is running func (es *EventExporterService) IsRunning() bool { - es.RLock() - defer es.RUnlock() + es.mu.RLock() + defer es.mu.RUnlock() return es.eeS != nil } // Reload handles the change of config -func (es *EventExporterService) Reload() (err error) { - es.rldChan <- struct{}{} - return // for the momment nothing to reload +func (es *EventExporterService) Reload() error { + es.mu.Lock() + defer es.mu.Unlock() + es.eeS.ClearExporterCache() + return es.eeS.SetupExporterCache() } // Shutdown stops the service -func (es *EventExporterService) Shutdown() (err error) { - es.Lock() - defer es.Unlock() - close(es.stopChan) - es.eeS.Shutdown() +func (es *EventExporterService) Shutdown() error { + es.mu.Lock() + defer es.mu.Unlock() + utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) + es.eeS.ClearExporterCache() es.eeS = nil <-es.intConnChan - return + return nil } // Start should handle the service start @@ -110,12 +108,14 @@ func (es *EventExporterService) Start() error { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs)) - es.Lock() - defer es.Unlock() + es.mu.Lock() + defer es.mu.Unlock() - es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) - es.stopChan = make(chan struct{}) - go es.eeS.ListenAndServe(es.stopChan, es.rldChan) + var err error + es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) + if err != nil { + return err + } srv, err := engine.NewServiceWithName(es.eeS, utils.EeS, true) if err != nil { diff --git a/services/ees_test.go b/services/ees_test.go index 6b5654bd8..d14e25168 100644 --- a/services/ees_test.go +++ b/services/ees_test.go @@ -54,9 +54,7 @@ func TestEventExporterSCoverage(t *testing.T) { intConnChan: make(chan birpc.ClientConnector, 1), anz: anz, srvDep: srvDep, - rldChan: make(chan struct{}, 1), eeS: &ees.EventExporterS{}, - stopChan: make(chan struct{}, 1), } if !srv2.IsRunning() { t.Errorf("Expected service to be running")