From 27fce31f8b9979759d4f8fc79283d74b1e1c3bbb Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 18 Nov 2020 16:05:58 +0200 Subject: [PATCH] Updated AnalyzerS to register API after obtaining the filterS --- analyzers/analyzers.go | 26 +++++++++++--------------- analyzers/analyzers_test.go | 17 ++++++++--------- analyzers/codec_test.go | 2 +- analyzers/connector_test.go | 2 +- engine/tploader.go | 32 -------------------------------- services/analyzers.go | 20 +++++++++++++++----- 6 files changed, 36 insertions(+), 63 deletions(-) delete mode 100644 engine/tploader.go diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 19714f9f1..8db5a5887 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -35,10 +35,9 @@ import ( ) // NewAnalyzerService initializes a AnalyzerService -func NewAnalyzerService(cfg *config.CGRConfig, filterS chan *engine.FilterS) (aS *AnalyzerService, err error) { +func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) { aS = &AnalyzerService{ - cfg: cfg, - filterSChan: filterS, + cfg: cfg, } err = aS.initDB() return @@ -49,11 +48,13 @@ type AnalyzerService struct { db bleve.Index cfg *config.CGRConfig - // because we do not use the filters only for API - // start the service without them - // and populate them on the first API call - filterSChan chan *engine.FilterS - filterS *engine.FilterS + filterS *engine.FilterS +} + +// SetFilterS will set the filterS used in APIs +// this function is called before the API is registerd +func (aS *AnalyzerService) SetFilterS(fS *engine.FilterS) { + aS.filterS = fS } func (aS *AnalyzerService) initDB() (err error) { @@ -146,12 +147,7 @@ func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]in return err } rply := make([]map[string]interface{}, 0, searchResults.Hits.Len()) - lCntFltrs := len(args.ContentFilters) - if lCntFltrs != 0 && - aS.filterS == nil { // populate the filter on the first API that requeres them - aS.filterS = <-aS.filterSChan - aS.filterSChan <- aS.filterS - } + lenContentFltrs := len(args.ContentFilters) for _, obj := range searchResults.Hits { // make sure that the result is corectly marshaled rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply])) @@ -165,7 +161,7 @@ func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]in if val, has := obj.Fields[utils.ReplyError]; !has || len(utils.IfaceAsString(val)) == 0 { obj.Fields[utils.ReplyError] = nil } - if lCntFltrs != 0 { + if lenContentFltrs != 0 { dp, err := getDPFromSearchresult(req, rep, obj.Fields) if err != nil { return err diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index 70c30cdb8..35e5a710e 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -46,7 +46,7 @@ func TestNewAnalyzerService(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } @@ -81,7 +81,7 @@ func TestAnalyzerSLogTraffic(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } @@ -137,7 +137,7 @@ func TestAnalyzersDeleteHits(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } @@ -162,7 +162,7 @@ func TestAnalyzersListenAndServe(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestAnalyzersListenAndServe(t *testing.T) { anz.ListenAndServe(make(chan struct{})) cfg.AnalyzerSCfg().CleanupInterval = 1 - anz, err = NewAnalyzerService(cfg, nil) + anz, err = NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } @@ -201,13 +201,12 @@ func TestAnalyzersV1Search(t *testing.T) { t.Fatal(err) } dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), cfg.CacheCfg(), nil) - fs := engine.NewFilterS(cfg, nil, dm) - fsChan := make(chan *engine.FilterS, 1) - fsChan <- fs - anz, err := NewAnalyzerService(cfg, fsChan) + anz, err := NewAnalyzerService(cfg) + if err != nil { t.Fatal(err) } + anz.SetFilterS(engine.NewFilterS(cfg, nil, dm)) // generate trafic t1 := time.Now() if err = anz.logTrafic(0, utils.CoreSv1Ping, diff --git a/analyzers/codec_test.go b/analyzers/codec_test.go index dc9ffa640..eae5a1fd8 100644 --- a/analyzers/codec_test.go +++ b/analyzers/codec_test.go @@ -59,7 +59,7 @@ func TestNewServerCodec(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } diff --git a/analyzers/connector_test.go b/analyzers/connector_test.go index a538314ee..7c09c17d0 100644 --- a/analyzers/connector_test.go +++ b/analyzers/connector_test.go @@ -46,7 +46,7 @@ func TestNewAnalyzeConnector(t *testing.T) { if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg, nil) + anz, err := NewAnalyzerService(cfg) if err != nil { t.Fatal(err) } diff --git a/engine/tploader.go b/engine/tploader.go deleted file mode 100644 index 313de18b8..000000000 --- a/engine/tploader.go +++ /dev/null @@ -1,32 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -// TPReader is the data source for TPLoader -type TPReader interface { - // Read will read one record from data source - Read() (interface{}, error) -} - -// TPLoader will read a record from TPReader and write it out to dataManager -type TPLoader struct { - srcType string // needed by Load for choosing destiantion - dataReader TPReader // provides data to load - dm *DataManager // writes data to load -} diff --git a/services/analyzers.go b/services/analyzers.go index 1042549b3..1a3c3b528 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -66,7 +66,7 @@ func (anz *AnalyzerService) Start() (err error) { anz.Lock() defer anz.Unlock() - if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg, anz.filterSChan); err != nil { + if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } @@ -80,10 +80,20 @@ func (anz *AnalyzerService) Start() (err error) { }() anz.server.SetAnalyzer(anz.anz) anz.rpc = v1.NewAnalyzerSv1(anz.anz) - if !anz.cfg.DispatcherSCfg().Enabled { - anz.server.RpcRegister(anz.rpc) - } - anz.connChan <- anz.rpc + go func() { + var fS *engine.FilterS + select { + case <-anz.stopChan: + return + case fS = <-anz.filterSChan: + anz.filterSChan <- fS + anz.anz.SetFilterS(fS) + } + if !anz.cfg.DispatcherSCfg().Enabled { + anz.server.RpcRegister(anz.rpc) + } + anz.connChan <- anz.rpc + }() return }