From 520451be7eddd51d02a355dbaa1495f9129906b2 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 4 Jul 2024 11:25:32 +0300 Subject: [PATCH] Add precache support for exporters Additional changes: - removed unnecessary locking when initializing the exporter cache map, as the service itself is responsible for locking. - separated setupCache method into ClearExporterCache and SetupExporterCache methods. - removed idle ListenAndServe function that was only waiting for the stopChan to close. The reload case was unreachable due to the reload channel being created in Start instead of being passed down. - removed Shutdown method on EventExporterS and replaced it with the exported ClearExporterCache method as it provided the same functionality. --- ees/ees.go | 97 +++++++++++++------------ ees/ees_test.go | 111 ++++++++++++----------------- general_tests/kafka_ssl_it_test.go | 74 ++++++++++++------- services/ees.go | 46 ++++++------ services/ees_test.go | 2 - 5 files changed, 166 insertions(+), 164 deletions(-) diff --git a/ees/ees.go b/ees/ees.go index 990dfd8fb..2a5ff453a 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -38,17 +38,18 @@ func onCacheEvicted(_ string, value any) { ee.Close() } -// NewEventExporterS instantiates the EventExporterS +// NewEventExporterS initializes a new EventExporterS. func NewEventExporterS(cfg *config.CGRConfig, filterS *engine.FilterS, - connMgr *engine.ConnManager) (eeS *EventExporterS) { - eeS = &EventExporterS{ + connMgr *engine.ConnManager) (*EventExporterS, error) { + eeS := &EventExporterS{ cfg: cfg, filterS: filterS, connMgr: connMgr, - eesChs: make(map[string]*ltcache.Cache), } - eeS.setupCache(cfg.EEsNoLksCfg().Cache) - return + if err := eeS.SetupExporterCache(); err != nil { + return nil, fmt.Errorf("failed to set up exporter cache: %v", err) + } + return eeS, nil } // EventExporterS is managing the EventExporters @@ -57,29 +58,8 @@ type EventExporterS struct { filterS *engine.FilterS connMgr *engine.ConnManager - eesChs map[string]*ltcache.Cache // map[eeType]*ltcache.Cache - eesMux sync.RWMutex // protects the eesChs -} - -// ListenAndServe keeps the service alive -func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { - for { - select { - case <-stopChan: // global exit - return - case rld := <-cfgRld: // configuration was reloaded, destroy the cache - cfgRld <- rld - utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", - utils.EEs)) - eeS.setupCache(eeS.cfg.EEsCfg().Cache) - } - } -} - -// Shutdown is called to shutdown the service -func (eeS *EventExporterS) Shutdown() { - utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) - eeS.setupCache(nil) // cleanup exporters + exporterCache map[string]*ltcache.Cache // map[eeType]*ltcache.Cache + mu sync.RWMutex // protects exporterCache } // Call implements birpc.ClientConnector interface for internal RPC @@ -87,21 +67,44 @@ func (eeS *EventExporterS) Call(ctx *context.Context, serviceMethod string, args return utils.RPCCall(eeS, serviceMethod, args, reply) } -// setupCache deals with cleanup and initialization of the cache of EventExporters -func (eeS *EventExporterS) setupCache(chCfgs map[string]*config.CacheParamCfg) { - eeS.eesMux.Lock() - for chID, ch := range eeS.eesChs { // cleanup +// ClearExporterCache clears the cache of EventExporters. +func (eeS *EventExporterS) ClearExporterCache() { + eeS.mu.Lock() + defer eeS.mu.Unlock() + for chID, ch := range eeS.exporterCache { ch.Clear() - delete(eeS.eesChs, chID) + delete(eeS.exporterCache, chID) } - for chID, chCfg := range chCfgs { // init - if chCfg.Limit == 0 { // cache is disabled, will not create - continue +} + +// SetupExporterCache initializes the cache for EventExporters. +func (eeS *EventExporterS) SetupExporterCache() error { + expCache := make(map[string]*ltcache.Cache) + eesCfg := eeS.cfg.EEsNoLksCfg() + + // Initialize cache. + for chID, chCfg := range eesCfg.Cache { + if chCfg.Limit == 0 { + continue // skip if caching is disabled + } + + expCache[chID] = ltcache.NewCache(chCfg.Limit, chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) + + // Precache exporters if required. + if chCfg.Precache { + for _, expCfg := range eesCfg.Exporters { + if expCfg.Type == chID { + ee, err := NewEventExporter(expCfg, eeS.cfg, eeS.filterS, eeS.connMgr) + if err != nil { + return fmt.Errorf("precache: failed to init EventExporter %q: %v", expCfg.ID, err) + } + expCache[chID].Set(expCfg.ID, ee, nil) + } + } } - eeS.eesChs[chID] = ltcache.NewCache(chCfg.Limit, - chCfg.TTL, chCfg.StaticTTL, onCacheEvicted) } - eeS.eesMux.Unlock() + eeS.exporterCache = expCache + return nil } func (eeS *EventExporterS) attrSProcessEvent(cgrEv *utils.CGREvent, attrIDs []string, ctx string) (*utils.CGREvent, error) { @@ -167,7 +170,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG var metricMapLock sync.RWMutex metricsMap := make(map[string]utils.MapStorage) _, hasVerbose := cgrEv.APIOpts[utils.OptsEEsVerbose] - for cfgIdx, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { + for _, eeCfg := range eeS.cfg.EEsNoLksCfg().Exporters { if eeCfg.Type == utils.MetaNone || // ignore *none type exporter (lenExpIDs != 0 && !expIDs.Has(eeCfg.ID)) { continue @@ -201,9 +204,9 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG } } - eeS.eesMux.RLock() - eeCache, hasCache := eeS.eesChs[eeCfg.Type] - eeS.eesMux.RUnlock() + eeS.mu.RLock() + eeCache, hasCache := eeS.exporterCache[eeCfg.Type] + eeS.mu.RUnlock() var isCached bool var ee EventExporter if hasCache { @@ -214,11 +217,11 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG } if !isCached { - if ee, err = NewEventExporter(eeS.cfg.EEsCfg().Exporters[cfgIdx], eeS.cfg, eeS.filterS, eeS.connMgr); err != nil { + if ee, err = NewEventExporter(eeCfg, eeS.cfg, eeS.filterS, eeS.connMgr); err != nil { return } if hasCache { - eeS.eesMux.Lock() + eeS.mu.Lock() if _, has := eeCache.Get(eeCfg.ID); !has { eeCache.Set(eeCfg.ID, ee, nil) } else { @@ -226,7 +229,7 @@ func (eeS *EventExporterS) V1ProcessEvent(ctx *context.Context, cgrEv *engine.CG // the meantime. Mark this instance to be closed after the export. hasCache = false } - eeS.eesMux.Unlock() + eeS.mu.Unlock() } } diff --git a/ees/ees_test.go b/ees/ees_test.go index 8a7034ab3..4ef91b27e 100644 --- a/ees/ees_test.go +++ b/ees/ees_test.go @@ -36,50 +36,15 @@ import ( "github.com/cgrates/rpcclient" ) -func TestListenAndServe(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfg.EEsCfg().Cache = make(map[string]*config.CacheParamCfg) - cfg.EEsCfg().Cache = map[string]*config.CacheParamCfg{ - utils.MetaFileCSV: { - Limit: -1, - TTL: 5 * time.Second, - }, - utils.MetaNone: { - Limit: 0, - }, - } - newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) - newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - stopChan := make(chan struct{}, 1) - cfgRld := make(chan struct{}, 1) - cfgRld <- struct{}{} - go func() { - stopChan <- struct{}{} - }() - var err error - utils.Logger, err = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) - if err != nil { - t.Error(err) - } - utils.Logger.SetLogLevel(6) - logBuf := new(bytes.Buffer) - log.SetOutput(logBuf) - eeS.ListenAndServe(stopChan, cfgRld) - logExpect := "[INFO] reloading configuration internals." - if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) { - t.Errorf("Expected %q but received %q", logExpect, rcv) - } - logBuf.Reset() -} - func TestCall(t *testing.T) { cfg := config.NewDefaultCGRConfig() newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } errExpect := "UNSUPPORTED_SERVICE_METHOD" if err := eeS.Call(context.Background(), "test", 24532, 43643); err == nil || err.Error() != errExpect { t.Errorf("Expected %q but received %q", errExpect, err) @@ -127,7 +92,10 @@ func TestAttrSProcessEvent(t *testing.T) { connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn, }) - eeS := NewEventExporterS(cfg, filterS, connMgr) + eeS, err := NewEventExporterS(cfg, filterS, connMgr) + if err != nil { + t.Fatal(err) + } // cgrEv := &utils.CGREvent{} exp := &utils.CGREvent{Event: map[string]any{"testcase": 1}} if rplyEv, err := eeS.attrSProcessEvent(cgrEv, []string{}, utils.EmptyString); err != nil { @@ -156,7 +124,10 @@ func TestAttrSProcessEvent2(t *testing.T) { connMgr := engine.NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAttributes): clientConn, }) - eeS := NewEventExporterS(cfg, filterS, connMgr) + eeS, err := NewEventExporterS(cfg, filterS, connMgr) + if err != nil { + t.Fatal(err) + } cgrEv := &utils.CGREvent{ APIOpts: make(map[string]any), } @@ -177,7 +148,10 @@ func TestV1ProcessEvent(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -227,7 +201,10 @@ func TestV1ProcessEvent2(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -267,7 +244,10 @@ func TestV1ProcessEvent3(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -292,8 +272,11 @@ func TestV1ProcessEvent4(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - eeS.eesChs = map[string]*ltcache.Cache{ + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } + eeS.exporterCache = map[string]*ltcache.Cache{ utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } @@ -301,7 +284,7 @@ func TestV1ProcessEvent4(t *testing.T) { if err != nil { t.Error(err) } - eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"}) + eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", newEeS, []string{"grp1"}) cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -358,12 +341,15 @@ func TestV1ProcessEventMockMetrics(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - eeS.eesChs = map[string]*ltcache.Cache{ + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } + eeS.exporterCache = map[string]*ltcache.Cache{ utils.MetaHTTPPost: ltcache.NewCache(1, time.Second, false, onCacheEvicted), } - eeS.eesChs[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"}) + eeS.exporterCache[utils.MetaHTTPPost].Set("SQLExporterFull", mEe, []string{"grp1"}) cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -408,7 +394,10 @@ func TestV1ProcessEvent5(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } var rply map[string]map[string]any errExpect := "unsupported exporter type: " if err := eeS.V1ProcessEvent(context.Background(), cgrEv, &rply); err == nil || err.Error() != errExpect { @@ -423,7 +412,10 @@ func TestV1ProcessEvent6(t *testing.T) { newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) + eeS, err := NewEventExporterS(cfg, filterS, nil) + if err != nil { + t.Fatal(err) + } cgrEv := &engine.CGREventWithEeIDs{ EeIDs: []string{"SQLExporterFull"}, CGREvent: &utils.CGREvent{ @@ -459,21 +451,6 @@ func TestOnCacheEvicted(t *testing.T) { } bufLog.Reset() } -func TestShutdown(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - newIDb := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) - newDM := engine.NewDataManager(newIDb, cfg.CacheCfg(), nil) - filterS := engine.NewFilterS(cfg, nil, newDM) - eeS := NewEventExporterS(cfg, filterS, nil) - logBuf := new(bytes.Buffer) - log.SetOutput(logBuf) - eeS.Shutdown() - logExpect := "[INFO] shutdown " - if rcv := logBuf.String(); !strings.Contains(rcv, logExpect) { - t.Errorf("Expected %q but received %q", logExpect, rcv) - } - logBuf.Reset() -} func TestUpdateEEMetrics(t *testing.T) { dc, _ := newEEMetrics(utils.EmptyString) diff --git a/general_tests/kafka_ssl_it_test.go b/general_tests/kafka_ssl_it_test.go index 6e26620f4..6c5c0bf50 100644 --- a/general_tests/kafka_ssl_it_test.go +++ b/general_tests/kafka_ssl_it_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net" "strconv" + "sync" "testing" "time" @@ -64,6 +65,9 @@ func TestKafkaSSL(t *testing.T) { "ees": { "enabled": true, + // "cache": { + // "*kafka_json_map": {"limit": -1, "ttl": "5s", "precache": false}, + // }, "exporters": [ { "id": "kafka_ssl", @@ -135,32 +139,52 @@ func TestKafkaSSL(t *testing.T) { // export event to cgrates-cdrs topic, then the reader will consume it and // export it to the 'processed' topic t.Run("export kafka event", func(t *testing.T) { + n := 1 + var wg sync.WaitGroup + wg.Add(n) + var reply map[string]map[string]interface{} - if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, - &engine.CGREventWithEeIDs{ - EeIDs: []string{"kafka_ssl"}, - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "KafkaEvent", - Event: map[string]interface{}{ - utils.ToR: utils.MetaVoice, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.MetaRated, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.AccountField: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813748, 0).UTC(), - utils.Usage: 10 * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - }, - }, - }, &reply); err != nil { - t.Error(err) + for range n { + go func() { + defer wg.Done() + if err := client.Call(context.Background(), utils.EeSv1ProcessEvent, + &engine.CGREventWithEeIDs{ + EeIDs: []string{"kafka_ssl"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "KafkaEvent", + Event: map[string]interface{}{ + utils.ToR: utils.MetaVoice, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.MetaRated, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813748, 0).UTC(), + utils.Usage: 10 * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + }, + }, + }, &reply); err != nil { + t.Error(err) + } + }() + } + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Errorf("timed out waiting for %s replies", utils.EeSv1ProcessEvent) } }) diff --git a/services/ees.go b/services/ees.go index f83f595c6..4b48c27d1 100644 --- a/services/ees.go +++ b/services/ees.go @@ -33,16 +33,14 @@ import ( // NewEventExporterService constructs EventExporterService func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - connMgr *engine.ConnManager, server *cores.Server, - intConnChan chan birpc.ClientConnector, anz *AnalyzerService, - srvDep map[string]*sync.WaitGroup) servmanager.Service { + connMgr *engine.ConnManager, server *cores.Server, intConnChan chan birpc.ClientConnector, + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &EventExporterService{ cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, server: server, intConnChan: intConnChan, - rldChan: make(chan struct{}), anz: anz, srvDep: srvDep, } @@ -50,15 +48,13 @@ func NewEventExporterService(cfg *config.CGRConfig, filterSChan chan *engine.Fil // EventExporterService is the service structure for EventExporterS type EventExporterService struct { - sync.RWMutex + mu sync.RWMutex cfg *config.CGRConfig filterSChan chan *engine.FilterS connMgr *engine.ConnManager server *cores.Server intConnChan chan birpc.ClientConnector - rldChan chan struct{} - stopChan chan struct{} eeS *ees.EventExporterS anz *AnalyzerService @@ -77,26 +73,28 @@ func (es *EventExporterService) ShouldRun() (should bool) { // IsRunning returns if the service is running func (es *EventExporterService) IsRunning() bool { - es.RLock() - defer es.RUnlock() + es.mu.RLock() + defer es.mu.RUnlock() return es.eeS != nil } // Reload handles the change of config -func (es *EventExporterService) Reload() (err error) { - es.rldChan <- struct{}{} - return // for the momment nothing to reload +func (es *EventExporterService) Reload() error { + es.mu.Lock() + defer es.mu.Unlock() + es.eeS.ClearExporterCache() + return es.eeS.SetupExporterCache() } // Shutdown stops the service -func (es *EventExporterService) Shutdown() (err error) { - es.Lock() - defer es.Unlock() - close(es.stopChan) - es.eeS.Shutdown() +func (es *EventExporterService) Shutdown() error { + es.mu.Lock() + defer es.mu.Unlock() + utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) + es.eeS.ClearExporterCache() es.eeS = nil <-es.intConnChan - return + return nil } // Start should handle the service start @@ -110,12 +108,14 @@ func (es *EventExporterService) Start() error { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs)) - es.Lock() - defer es.Unlock() + es.mu.Lock() + defer es.mu.Unlock() - es.eeS = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) - es.stopChan = make(chan struct{}) - go es.eeS.ListenAndServe(es.stopChan, es.rldChan) + var err error + es.eeS, err = ees.NewEventExporterS(es.cfg, fltrS, es.connMgr) + if err != nil { + return err + } srv, err := engine.NewServiceWithName(es.eeS, utils.EeS, true) if err != nil { diff --git a/services/ees_test.go b/services/ees_test.go index 6b5654bd8..d14e25168 100644 --- a/services/ees_test.go +++ b/services/ees_test.go @@ -54,9 +54,7 @@ func TestEventExporterSCoverage(t *testing.T) { intConnChan: make(chan birpc.ClientConnector, 1), anz: anz, srvDep: srvDep, - rldChan: make(chan struct{}, 1), eeS: &ees.EventExporterS{}, - stopChan: make(chan struct{}, 1), } if !srv2.IsRunning() { t.Errorf("Expected service to be running")