Updated AnalyzerS to register API after obtaining the filterS

This commit is contained in:
Trial97
2020-11-18 16:05:58 +02:00
committed by Dan Christian Bogos
parent 6fd40f7296
commit 27fce31f8b
6 changed files with 36 additions and 63 deletions

View File

@@ -35,10 +35,9 @@ import (
)
// NewAnalyzerService initializes a AnalyzerService
func NewAnalyzerService(cfg *config.CGRConfig, filterS chan *engine.FilterS) (aS *AnalyzerService, err error) {
func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) {
aS = &AnalyzerService{
cfg: cfg,
filterSChan: filterS,
cfg: cfg,
}
err = aS.initDB()
return
@@ -49,11 +48,13 @@ type AnalyzerService struct {
db bleve.Index
cfg *config.CGRConfig
// because we do not use the filters only for API
// start the service without them
// and populate them on the first API call
filterSChan chan *engine.FilterS
filterS *engine.FilterS
filterS *engine.FilterS
}
// SetFilterS will set the filterS used in APIs
// this function is called before the API is registerd
func (aS *AnalyzerService) SetFilterS(fS *engine.FilterS) {
aS.filterS = fS
}
func (aS *AnalyzerService) initDB() (err error) {
@@ -146,12 +147,7 @@ func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]in
return err
}
rply := make([]map[string]interface{}, 0, searchResults.Hits.Len())
lCntFltrs := len(args.ContentFilters)
if lCntFltrs != 0 &&
aS.filterS == nil { // populate the filter on the first API that requeres them
aS.filterS = <-aS.filterSChan
aS.filterSChan <- aS.filterS
}
lenContentFltrs := len(args.ContentFilters)
for _, obj := range searchResults.Hits {
// make sure that the result is corectly marshaled
rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply]))
@@ -165,7 +161,7 @@ func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]in
if val, has := obj.Fields[utils.ReplyError]; !has || len(utils.IfaceAsString(val)) == 0 {
obj.Fields[utils.ReplyError] = nil
}
if lCntFltrs != 0 {
if lenContentFltrs != 0 {
dp, err := getDPFromSearchresult(req, rep, obj.Fields)
if err != nil {
return err

View File

@@ -46,7 +46,7 @@ func TestNewAnalyzerService(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -81,7 +81,7 @@ func TestAnalyzerSLogTraffic(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -137,7 +137,7 @@ func TestAnalyzersDeleteHits(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -162,7 +162,7 @@ func TestAnalyzersListenAndServe(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -172,7 +172,7 @@ func TestAnalyzersListenAndServe(t *testing.T) {
anz.ListenAndServe(make(chan struct{}))
cfg.AnalyzerSCfg().CleanupInterval = 1
anz, err = NewAnalyzerService(cfg, nil)
anz, err = NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -201,13 +201,12 @@ func TestAnalyzersV1Search(t *testing.T) {
t.Fatal(err)
}
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, true), cfg.CacheCfg(), nil)
fs := engine.NewFilterS(cfg, nil, dm)
fsChan := make(chan *engine.FilterS, 1)
fsChan <- fs
anz, err := NewAnalyzerService(cfg, fsChan)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
anz.SetFilterS(engine.NewFilterS(cfg, nil, dm))
// generate trafic
t1 := time.Now()
if err = anz.logTrafic(0, utils.CoreSv1Ping,

View File

@@ -59,7 +59,7 @@ func TestNewServerCodec(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}

View File

@@ -46,7 +46,7 @@ func TestNewAnalyzeConnector(t *testing.T) {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg, nil)
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}

View File

@@ -1,32 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
// TPReader is the data source for TPLoader
type TPReader interface {
// Read will read one record from data source
Read() (interface{}, error)
}
// TPLoader will read a record from TPReader and write it out to dataManager
type TPLoader struct {
srcType string // needed by Load for choosing destiantion
dataReader TPReader // provides data to load
dm *DataManager // writes data to load
}

View File

@@ -66,7 +66,7 @@ func (anz *AnalyzerService) Start() (err error) {
anz.Lock()
defer anz.Unlock()
if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg, anz.filterSChan); err != nil {
if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error()))
return
}
@@ -80,10 +80,20 @@ func (anz *AnalyzerService) Start() (err error) {
}()
anz.server.SetAnalyzer(anz.anz)
anz.rpc = v1.NewAnalyzerSv1(anz.anz)
if !anz.cfg.DispatcherSCfg().Enabled {
anz.server.RpcRegister(anz.rpc)
}
anz.connChan <- anz.rpc
go func() {
var fS *engine.FilterS
select {
case <-anz.stopChan:
return
case fS = <-anz.filterSChan:
anz.filterSChan <- fS
anz.anz.SetFilterS(fS)
}
if !anz.cfg.DispatcherSCfg().Enabled {
anz.server.RpcRegister(anz.rpc)
}
anz.connChan <- anz.rpc
}()
return
}