diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 85eb9ce84..54c9241d5 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "os" + "path" "strconv" "strings" "time" @@ -29,12 +30,16 @@ import ( "github.com/blevesearch/bleve" "github.com/blevesearch/bleve/search" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // NewAnalyzerService initializes a AnalyzerService -func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) { - aS = &AnalyzerService{cfg: cfg} +func NewAnalyzerService(cfg *config.CGRConfig, filterS chan *engine.FilterS) (aS *AnalyzerService, err error) { + aS = &AnalyzerService{ + cfg: cfg, + filterSChan: filterS, + } err = aS.initDB() return } @@ -43,14 +48,21 @@ func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) type AnalyzerService struct { db bleve.Index cfg *config.CGRConfig + + // because we do not use the filters only for API + // start the service without them + // and populate them on the first API call + filterSChan chan *engine.FilterS + filterS *engine.FilterS } func (aS *AnalyzerService) initDB() (err error) { - if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil { - aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath) + dbPath := path.Join(aS.cfg.AnalyzerSCfg().DBPath, "db") + if _, err = os.Stat(dbPath); err == nil { + aS.db, err = bleve.Open(dbPath) } else if os.IsNotExist(err) { indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType) - aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, + aS.db, err = bleve.NewUsing(dbPath, bleve.NewIndexMapping(), indxType, storeType, nil) } return @@ -117,25 +129,59 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string, NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime)) } -// V1StringQuery returns a list of API that match the query -func (aS *AnalyzerService) V1StringQuery(searchstr string, reply *[]map[string]interface{}) error { +// QueryArgs the structure that we use to filter the API calls +type QueryArgs struct { + // a string based on the query language(https://blevesearch.com/docs/Query-String-Query/) that we send to bleve + HeaderFilters string + // a list of filters that we use to filter the call similar to how we filter the events + ContentFilters []string +} - s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr)) +// V1StringQuery returns a list of API that match the query +func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]interface{}) error { + s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(args.HeaderFilters)) s.Fields = []string{utils.Meta} // return all fields searchResults, err := aS.db.Search(s) if err != nil { return err } - rply := make([]map[string]interface{}, searchResults.Hits.Len()) - for i, obj := range searchResults.Hits { - rply[i] = obj.Fields + rply := make([]map[string]interface{}, 0, searchResults.Hits.Len()) + lCntFltrs := len(args.ContentFilters) + if lCntFltrs != 0 && + aS.filterS == nil { // populate the filter on the first API that requeres them + aS.filterS = <-aS.filterSChan + aS.filterSChan <- aS.filterS + } + for _, obj := range searchResults.Hits { // make sure that the result is corectly marshaled - rply[i][utils.Reply] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply])) - rply[i][utils.RequestParams] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams])) + rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply])) + req := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams])) + obj.Fields[utils.Reply] = rep + obj.Fields[utils.RequestParams] = req // try to pretty print the duration - if dur, err := utils.IfaceAsDuration(rply[i][utils.RequestDuration]); err == nil { - rply[i][utils.RequestDuration] = dur.String() + if dur, err := utils.IfaceAsDuration(obj.Fields[utils.RequestDuration]); err == nil { + obj.Fields[utils.RequestDuration] = dur.String() } + if lCntFltrs != 0 { + repDP, err := unmarshalJSON(rep) + if err != nil { + return err + } + reqDP, err := unmarshalJSON(req) + if err != nil { + return err + } + if pass, err := aS.filterS.Pass(aS.cfg.GeneralCfg().DefaultTenant, + args.ContentFilters, utils.MapStorage{ + utils.MetaReq: reqDP, + utils.MetaRep: repDP, + }); err != nil { + return err + } else if !pass { + continue + } + } + rply = append(rply, obj.Fields) } *reply = rply return nil diff --git a/analyzers/analyzers_it_test.go b/analyzers/analyzers_it_test.go index 1b9e2e114..79d598dcc 100644 --- a/analyzers/analyzers_it_test.go +++ b/analyzers/analyzers_it_test.go @@ -82,18 +82,17 @@ func TestAnalyzerSIT(t *testing.T) { func testAnalyzerSInitCfg(t *testing.T) { var err error + if err := os.RemoveAll("/tmp/analyzers/"); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll("/tmp/analyzers/", 0700); err != nil { + t.Fatal(err) + } anzCfgPath = path.Join(*dataDir, "conf", "samples", "analyzers") anzCfg, err = config.NewCGRConfigFromPath(anzCfgPath) if err != nil { t.Error(err) } - - if err := os.RemoveAll(anzCfg.AnalyzerSCfg().DBPath); err != nil { - t.Fatal(err) - } - if err = os.MkdirAll(path.Dir(anzCfg.AnalyzerSCfg().DBPath), 0700); err != nil { - t.Fatal(err) - } } func testAnalyzerSInitDataDb(t *testing.T) { @@ -199,7 +198,7 @@ func testAnalyzerSChargerSv1ProcessEvent(t *testing.T) { func testAnalyzerSV1Search(t *testing.T) { var result []map[string]interface{} - if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil { + if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`}, &result); err != nil { t.Error(err) } else if len(result) != 1 { t.Errorf("Unexpected result: %s", utils.ToJSON(result)) @@ -208,7 +207,7 @@ func testAnalyzerSV1Search(t *testing.T) { func testAnalyzerSV1Search2(t *testing.T) { var result []map[string]interface{} - if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil { + if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`}, &result); err != nil { t.Error(err) } else if len(result) != 1 { t.Errorf("Unexpected result: %s", utils.ToJSON(result)) diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index d8e18bb5b..399bea629 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -21,7 +21,6 @@ package analyzers import ( "encoding/json" "os" - "path" "reflect" "runtime" "strconv" @@ -43,10 +42,10 @@ func TestNewAnalyzerService(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -78,10 +77,10 @@ func TestAnalyzerSLogTraffic(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -134,10 +133,10 @@ func TestAnalyzersDeleteHits(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -159,10 +158,10 @@ func TestAnalyzersListenAndServe(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -172,7 +171,7 @@ func TestAnalyzersListenAndServe(t *testing.T) { anz.ListenAndServe(make(chan struct{})) cfg.AnalyzerSCfg().CleanupInterval = 1 - anz, err = NewAnalyzerService(cfg) + anz, err = NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -197,10 +196,10 @@ func TestAnalyzersV1Search(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } @@ -252,13 +251,13 @@ func TestAnalyzersV1Search(t *testing.T) { t.Fatal(err) } reply := []map[string]interface{}{} - if err = anz.V1StringQuery(utils.CoreSv1Ping, &reply); err != nil { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.CoreSv1Ping}, &reply); err != nil { t.Fatal(err) } else if len(reply) != 4 { t.Errorf("Expected 4 hits received: %v", len(reply)) } reply = []map[string]interface{}{} - if err = anz.V1StringQuery("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestMethod:" + utils.CoreSv1Ping}, &reply); err != nil { t.Fatal(err) } else if len(reply) != 4 { t.Errorf("Expected 4 hits received: %v", len(reply)) @@ -276,20 +275,20 @@ func TestAnalyzersV1Search(t *testing.T) { "RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339), }} reply = []map[string]interface{}{} - if err = anz.V1StringQuery(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.RequestDuration + ":>=" + strconv.FormatInt(int64(time.Hour), 10)}, &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) } reply = []map[string]interface{}{} - if err = anz.V1StringQuery(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.RequestStartTime + ":<=\"" + t1.Add(-23*time.Hour).UTC().Format(time.RFC3339) + "\""}, &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) } reply = []map[string]interface{}{} - if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != nil { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestEncoding:*gob"}, &reply); err != nil { t.Fatal(err) } else if !reflect.DeepEqual(expRply, reply) { t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) @@ -298,7 +297,7 @@ func TestAnalyzersV1Search(t *testing.T) { if err = anz.db.Close(); err != nil { t.Fatal(err) } - if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed { + if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestEncoding:*gob"}, &reply); err != bleve.ErrorIndexClosed { t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err) } if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { diff --git a/analyzers/codec_test.go b/analyzers/codec_test.go index ad2e075b3..dc9ffa640 100644 --- a/analyzers/codec_test.go +++ b/analyzers/codec_test.go @@ -21,7 +21,6 @@ package analyzers import ( "net/rpc" "os" - "path" "reflect" "runtime" "testing" @@ -57,10 +56,10 @@ func TestNewServerCodec(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } diff --git a/analyzers/connector_test.go b/analyzers/connector_test.go index 149c29a05..a538314ee 100644 --- a/analyzers/connector_test.go +++ b/analyzers/connector_test.go @@ -21,7 +21,6 @@ package analyzers import ( "errors" "os" - "path" "runtime" "testing" "time" @@ -44,10 +43,10 @@ func TestNewAnalyzeConnector(t *testing.T) { if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { t.Fatal(err) } - if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil { t.Fatal(err) } - anz, err := NewAnalyzerService(cfg) + anz, err := NewAnalyzerService(cfg, nil) if err != nil { t.Fatal(err) } diff --git a/analyzers/libanalyzers.go b/analyzers/libanalyzers.go index ade3a7935..fd0d558e4 100644 --- a/analyzers/libanalyzers.go +++ b/analyzers/libanalyzers.go @@ -19,6 +19,8 @@ along with this program. If not, see package analyzers import ( + "encoding/json" + "strconv" "time" "github.com/blevesearch/bleve/index/scorch" @@ -97,3 +99,33 @@ func getIndex(indx string) (indxType, storeType string) { } return } + +// unmarshalJSON will transform the message in a map[string]interface{} of []interface{} +// depending of the first character +// used for filters purposes so the nil is replaced with empty map +func unmarshalJSON(jsn json.RawMessage) (interface{}, error) { + switch { + case string(jsn) == "null" || + len(jsn) == 0: // nil or empty response + // by default consider nil as an empty map for filtering purposes + return map[string]interface{}{}, nil + case string(jsn) == "true": // booleans + return true, nil + case string(jsn) == "false": + return false, nil + case jsn[0] == '"': // string + return string(jsn[1 : len(jsn)-1]), nil + case jsn[0] >= '0' && jsn[0] <= '9': // float64 + return strconv.ParseFloat(string(jsn), 64) + case jsn[0] == '[': // slice + var val []interface{} + err := json.Unmarshal(jsn, &val) + return val, err + case jsn[0] == '{': // map + var val map[string]interface{} + err := json.Unmarshal(jsn, &val) + return val, err + default: + return nil, new(json.SyntaxError) + } +} diff --git a/analyzers/libanalyzers_test.go b/analyzers/libanalyzers_test.go index 275baf0fd..dd6f9dc94 100644 --- a/analyzers/libanalyzers_test.go +++ b/analyzers/libanalyzers_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package analyzers import ( + "encoding/json" "errors" "reflect" "testing" @@ -96,3 +97,62 @@ func TestNewInfoRPC(t *testing.T) { } } + +func TestUnmarshalJSON(t *testing.T) { + expErr := new(json.SyntaxError) + if _, err := unmarshalJSON(json.RawMessage(`a`)); errors.Is(err, expErr) { + t.Errorf("Expected error: %s,received %+v", expErr, err) + } + var exp interface{} = true + if val, err := unmarshalJSON(json.RawMessage(`true`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + + exp = false + if val, err := unmarshalJSON(json.RawMessage(`false`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + + exp = "string" + if val, err := unmarshalJSON(json.RawMessage(`"string"`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + + exp = 94. + if val, err := unmarshalJSON(json.RawMessage(`94`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + + exp = []interface{}{"1", "2", "3"} + if val, err := unmarshalJSON(json.RawMessage(`["1","2","3"]`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + exp = map[string]interface{}{"1": "A", "2": "B", "3": "C"} + if val, err := unmarshalJSON(json.RawMessage(`{"1":"A","2":"B","3":"C"}`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + + exp = map[string]interface{}{} + if val, err := unmarshalJSON(json.RawMessage(`null`)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } + if val, err := unmarshalJSON(json.RawMessage(``)); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(val, exp) { + t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val)) + } +} diff --git a/apier/v1/analyzer.go b/apier/v1/analyzer.go index 8abca5a37..d72cf3b42 100755 --- a/apier/v1/analyzer.go +++ b/apier/v1/analyzer.go @@ -46,6 +46,6 @@ func (aSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error { } // StringQuery returns a list of API that match the query -func (aSv1 *AnalyzerSv1) StringQuery(search string, reply *[]map[string]interface{}) error { +func (aSv1 *AnalyzerSv1) StringQuery(search *analyzers.QueryArgs, reply *[]map[string]interface{}) error { return aSv1.aS.V1StringQuery(search, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b33190164..1b79a9e88 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -339,7 +339,8 @@ func runPreload(loader *services.LoaderService, internalLoaderSChan chan rpcclie return } - internalLoaderSChan <- <-internalLoaderSChan + ldrs := <-internalLoaderSChan + internalLoaderSChan <- ldrs var reply string for _, loaderID := range strings.Split(*preload, utils.FIELDS_SEP) { @@ -532,7 +533,7 @@ func main() { filterSChan := make(chan *engine.FilterS, 1) // init AnalyzerS - anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) + anz := services.NewAnalyzerService(cfg, server, filterSChan, exitChan, internalAnalyzerSChan) if anz.ShouldRun() { if err := anz.Start(); err != nil { fmt.Println(err) diff --git a/config/configsanity.go b/config/configsanity.go index 83ac884be..a21c02429 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -21,7 +21,6 @@ package config import ( "fmt" "os" - "path" "strings" "github.com/cgrates/cgrates/utils" @@ -744,9 +743,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } if cfg.analyzerSCfg.Enabled { - dir := path.Dir(cfg.analyzerSCfg.DBPath) // only the base path is mandatory to exist - if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { - return fmt.Errorf("<%s> nonexistent DB folder: %q", utils.AnalyzerS, dir) + if _, err := os.Stat(cfg.analyzerSCfg.DBPath); err != nil && os.IsNotExist(err) { + return fmt.Errorf("<%s> nonexistent DB folder: %q", utils.AnalyzerS, cfg.analyzerSCfg.DBPath) } if !utils.AnzIndexType.Has(cfg.analyzerSCfg.IndexType) { return fmt.Errorf("<%s> unsuported index type: %q", utils.AnalyzerS, cfg.analyzerSCfg.IndexType) diff --git a/config/configsanity_test.go b/config/configsanity_test.go index e30a1927a..5bc0a4d12 100644 --- a/config/configsanity_test.go +++ b/config/configsanity_test.go @@ -1155,11 +1155,11 @@ func TestConfigSanityAnalyzer(t *testing.T) { } cfg.analyzerSCfg.DBPath = "/inexistent/Path" - expected = " nonexistent DB folder: \"/inexistent\"" + expected = " nonexistent DB folder: \"/inexistent/Path\"" if err := cfg.checkConfigSanity(); err == nil || err.Error() != expected { t.Errorf("Expecting: %+q received: %+q", expected, err) } - cfg.analyzerSCfg.DBPath = utils.EmptyString + cfg.analyzerSCfg.DBPath = "/" cfg.analyzerSCfg.IndexType = utils.MetaScorch expected = " the TTL needs to be bigger than 0" diff --git a/data/ansible/rpm_packages/cgrates.spec.j2 b/data/ansible/rpm_packages/cgrates.spec.j2 index e5c564366..e46ab7390 100644 --- a/data/ansible/rpm_packages/cgrates.spec.j2 +++ b/data/ansible/rpm_packages/cgrates.spec.j2 @@ -97,6 +97,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts +mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers mkdir -p $RPM_BUILD_ROOT%{_libdir}/history mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d diff --git a/packages/debian/rules b/packages/debian/rules index 9c3f60058..ce564e0eb 100755 --- a/packages/debian/rules +++ b/packages/debian/rules @@ -40,6 +40,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/spool/cgrates/cdre/fwv mkdir -p $(PKGDIR)/var/spool/cgrates/tpe mkdir -p $(PKGDIR)/var/spool/cgrates/failed_posts + mkdir -p $(PKGDIR)/var/spool/cgrates/analyzers mkdir -p $(PKGDIR)/var/lib/cgrates/history mkdir -p $(PKGDIR)/var/lib/cgrates/cache_dump mkdir -p $(PKGDIR)/var/log/cgrates diff --git a/packages/redhat_fedora/cgrates.spec b/packages/redhat_fedora/cgrates.spec index bafe525d6..60c71c791 100644 --- a/packages/redhat_fedora/cgrates.spec +++ b/packages/redhat_fedora/cgrates.spec @@ -99,6 +99,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts +mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers mkdir -p $RPM_BUILD_ROOT%{_libdir}/history mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d diff --git a/rates/librates.go b/rates/librates.go index bc7478c3a..9cead1deb 100644 --- a/rates/librates.go +++ b/rates/librates.go @@ -172,7 +172,7 @@ func orderRatesOnIntervals(aRts []*engine.Rate, sTime time.Time, usage time.Dura } } } else { // only first rate is considered for units - ordRts = []*orderedRate{&orderedRate{time.Duration(0), rtIdx[sortedATimes[0]].winner().rt}} + ordRts = []*orderedRate{{time.Duration(0), rtIdx[sortedATimes[0]].winner().rt}} } return } @@ -226,7 +226,7 @@ func computeRateSIntervals(rts []*orderedRate, intervalStart, usage time.Duratio intIncrm := int64(iRt.Increment) cmpFactor := intUsage / intIncrm if intUsage%intIncrm != 0 { - cmpFactor += 1 // int division has used math.Floor, need Ceil + cmpFactor++ // int division has used math.Floor, need Ceil } rIcrm := &engine.RateSIncrement{ UsageStart: iRtUsageSIdx, diff --git a/rates/librates_test.go b/rates/librates_test.go index 2aecce9f2..4a7d4fbda 100644 --- a/rates/librates_test.go +++ b/rates/librates_test.go @@ -1599,14 +1599,12 @@ func TestComputeRateSIntervals(t *testing.T) { ID: "RATE1", IntervalRates: []*engine.IntervalRate{ { - IntervalStart: time.Duration(0), Unit: time.Duration(1 * time.Minute), Increment: time.Duration(1 * time.Second), Value: 0.20, }, { - IntervalStart: time.Duration(2 * time.Minute), Unit: time.Duration(1 * time.Minute), Increment: time.Duration(1 * time.Second), diff --git a/rates/rates.go b/rates/rates.go index 6a37ffb62..fde907c18 100644 --- a/rates/rates.go +++ b/rates/rates.go @@ -49,7 +49,7 @@ func (rS *RateS) ListenAndServe(stopChan, cfgRld chan struct{}) { utils.CoreS, utils.RateS)) for { select { - case <-stopChan: // global exit + case <-stopChan: return case rld := <-cfgRld: // configuration was reloaded cfgRld <- rld @@ -170,6 +170,28 @@ func (rS *RateS) rateProfileCostForEvent(rtPfl *engine.RateProfile, args *utils. return } +// ArgsCostForEvent arguments used for proccess event +type ArgsCostForEvent struct { + RateProfileIDs []string + *utils.CGREventWithOpts +} + +// StartTime returns the event time used to check active rate profiles +func (args *ArgsCostForEvent) StartTime(tmz string) (sTime time.Time, err error) { + if tIface, has := args.Opts[utils.OptsRatesStartTime]; has { + return utils.IfaceAsTime(tIface, tmz) + } + return time.Now(), nil +} + +// Usage returns the event time used to check active rate profiles +func (args *ArgsCostForEvent) Usage() (usage time.Duration, err error) { + if uIface, has := args.Opts[utils.OptsRatesUsage]; has { + return utils.IfaceAsDuration(uIface) + } + return time.Duration(time.Minute), nil +} + // V1CostForEvent will be called to calculate the cost for an event func (rS *RateS) V1CostForEvent(args *utils.ArgsCostForEvent, rpCost *engine.RateProfileCost) (err error) { rPfIDs := make([]string, len(args.RateProfileIDs)) diff --git a/services/analyzers.go b/services/analyzers.go index 79b3aaf07..1042549b3 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -26,28 +26,32 @@ import ( v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) // NewAnalyzerService returns the Analyzer Service -func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, exitChan chan<- struct{}, +func NewAnalyzerService(cfg *config.CGRConfig, server *cores.Server, + filterSChan chan *engine.FilterS, exitChan chan<- struct{}, internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService { return &AnalyzerService{ - connChan: internalAnalyzerSChan, - cfg: cfg, - server: server, - exitChan: exitChan, + connChan: internalAnalyzerSChan, + cfg: cfg, + server: server, + filterSChan: filterSChan, + exitChan: exitChan, } } // AnalyzerService implements Service interface type AnalyzerService struct { sync.RWMutex - cfg *config.CGRConfig - server *cores.Server - stopChan chan struct{} - exitChan chan<- struct{} + cfg *config.CGRConfig + server *cores.Server + filterSChan chan *engine.FilterS + stopChan chan struct{} + exitChan chan<- struct{} anz *analyzers.AnalyzerService rpc *v1.AnalyzerSv1 @@ -59,7 +63,10 @@ func (anz *AnalyzerService) Start() (err error) { if anz.IsRunning() { return utils.ErrServiceAlreadyRunning } - if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg); err != nil { + + anz.Lock() + defer anz.Unlock() + if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg, anz.filterSChan); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) return } diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go index 1fadbc470..d3d1475cd 100644 --- a/services/apiers_it_test.go +++ b/services/apiers_it_test.go @@ -55,7 +55,7 @@ func TestApiersReload(t *testing.T) { db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService), diff --git a/services/asteriskagent_it_test.go b/services/asteriskagent_it_test.go index 7c915c54b..1c2622c3b 100644 --- a/services/asteriskagent_it_test.go +++ b/services/asteriskagent_it_test.go @@ -51,7 +51,7 @@ func TestAsteriskAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewAsteriskAgent(cfg, engineShutdown, nil) diff --git a/services/attributes_it_test.go b/services/attributes_it_test.go index 2c7863bcb..de3ad62e3 100644 --- a/services/attributes_it_test.go +++ b/services/attributes_it_test.go @@ -51,7 +51,7 @@ func TestAttributeSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) attrRPC := make(chan rpcclient.ClientConnector, 1) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, attrRPC, anz) diff --git a/services/cdrs_it_test.go b/services/cdrs_it_test.go index 26b3beac6..51a3ae200 100644 --- a/services/cdrs_it_test.go +++ b/services/cdrs_it_test.go @@ -64,7 +64,7 @@ func TestCdrsReload(t *testing.T) { db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL stordb := NewStorDBService(cfg) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) ralS := NewRalService(cfg, chS, server, diff --git a/services/chargers_it_test.go b/services/chargers_it_test.go index aceb03475..74d9574fb 100644 --- a/services/chargers_it_test.go +++ b/services/chargers_it_test.go @@ -52,7 +52,7 @@ func TestChargerSReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) diff --git a/services/datadb_it_test.go b/services/datadb_it_test.go index 8509c401f..0f335b55f 100644 --- a/services/datadb_it_test.go +++ b/services/datadb_it_test.go @@ -52,7 +52,7 @@ func TestDataDBReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) cM := engine.NewConnManager(cfg, nil) db := NewDataDBService(cfg, cM) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) srvMngr.AddServices(NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz), NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) diff --git a/services/diameteragent_it_test.go b/services/diameteragent_it_test.go index 650f99da9..537d9e22d 100644 --- a/services/diameteragent_it_test.go +++ b/services/diameteragent_it_test.go @@ -51,7 +51,7 @@ func TestDiameterAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewDiameterAgent(cfg, filterSChan, engineShutdown, nil) diff --git a/services/dispatchers_it_test.go b/services/dispatchers_it_test.go index 824b977b9..cae47baa8 100644 --- a/services/dispatchers_it_test.go +++ b/services/dispatchers_it_test.go @@ -53,7 +53,7 @@ func TestDispatcherSReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) srv := NewDispatcherService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) diff --git a/services/dnsagent_it_test.go b/services/dnsagent_it_test.go index 0d763ef95..bd1062a3b 100644 --- a/services/dnsagent_it_test.go +++ b/services/dnsagent_it_test.go @@ -51,7 +51,7 @@ func TestDNSAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewDNSAgent(cfg, filterSChan, engineShutdown, nil) diff --git a/services/ees_it_test.go b/services/ees_it_test.go index 2375d4667..28ae0b15e 100644 --- a/services/ees_it_test.go +++ b/services/ees_it_test.go @@ -60,7 +60,7 @@ func TestEventExporterSReload(t *testing.T) { chS := engine.NewCacheS(cfg, nil) close(chS.GetPrecacheChannel(utils.CacheAttributeProfiles)) close(chS.GetPrecacheChannel(utils.CacheAttributeFilterIndexes)) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) attrS := NewAttributeService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 422050461..d7ce735f1 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -55,7 +55,7 @@ func TestEventReaderSReload(t *testing.T) { engineShutdown := make(chan struct{}, 1) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) attrS := NewEventReaderService(cfg, filterSChan, engineShutdown, nil) diff --git a/services/freeswitchagent_it_test.go b/services/freeswitchagent_it_test.go index a7ffd8bf9..bef870ebd 100644 --- a/services/freeswitchagent_it_test.go +++ b/services/freeswitchagent_it_test.go @@ -51,7 +51,7 @@ func TestFreeSwitchAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewFreeswitchAgent(cfg, engineShutdown, nil) diff --git a/services/kamailioagent_it_test.go b/services/kamailioagent_it_test.go index cd12c760b..f22ccc6b2 100644 --- a/services/kamailioagent_it_test.go +++ b/services/kamailioagent_it_test.go @@ -51,7 +51,7 @@ func TestKamailioAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewKamailioAgent(cfg, engineShutdown, nil) diff --git a/services/radiusagent_it_test.go b/services/radiusagent_it_test.go index ed85a9289..82fead045 100644 --- a/services/radiusagent_it_test.go +++ b/services/radiusagent_it_test.go @@ -51,7 +51,7 @@ func TestRadiusAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewRadiusAgent(cfg, filterSChan, engineShutdown, nil) diff --git a/services/rals_it_test.go b/services/rals_it_test.go index 09610b0c2..0819b44e4 100644 --- a/services/rals_it_test.go +++ b/services/rals_it_test.go @@ -63,7 +63,7 @@ func TestRalsReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) diff --git a/services/rates_it_test.go b/services/rates_it_test.go index 83cc26537..115068c8e 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -50,7 +50,7 @@ func TestRateSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheRateProfiles)) close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)) close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes)) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) rS := NewRateService(cfg, chS, filterSChan, db, server, make(chan rpcclient.ClientConnector, 1), anz) srvMngr.AddServices(rS, NewLoaderService(cfg, db, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz), db) diff --git a/services/resources_it_test.go b/services/resources_it_test.go index f6a3631b1..53471d780 100644 --- a/services/resources_it_test.go +++ b/services/resources_it_test.go @@ -54,7 +54,7 @@ func TestResourceSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheResourceFilterIndexes)) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) reS := NewResourceService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) diff --git a/services/routes_it_test.go b/services/routes_it_test.go index a821887cf..f6f52eb55 100644 --- a/services/routes_it_test.go +++ b/services/routes_it_test.go @@ -53,7 +53,7 @@ func TestSupplierSReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sts := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) supS := NewRouteService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) diff --git a/services/schedulers_it_test.go b/services/schedulers_it_test.go index 592e4ed63..d7ef7f77b 100644 --- a/services/schedulers_it_test.go +++ b/services/schedulers_it_test.go @@ -47,7 +47,7 @@ func TestSchedulerSReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) engine.NewConnManager(cfg, nil) srvMngr.AddServices(schS, diff --git a/services/sessions_it_test.go b/services/sessions_it_test.go index dc9d28a58..d63dbe8dd 100644 --- a/services/sessions_it_test.go +++ b/services/sessions_it_test.go @@ -70,7 +70,7 @@ func TestSessionSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) cfg.StorDbCfg().Type = utils.INTERNAL - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) stordb := NewStorDBService(cfg) chrS := NewChargerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) diff --git a/services/sipagent_it_test.go b/services/sipagent_it_test.go index f3094e608..1341d80d8 100644 --- a/services/sipagent_it_test.go +++ b/services/sipagent_it_test.go @@ -51,7 +51,7 @@ func TestSIPAgentReload(t *testing.T) { server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) sS := NewSessionService(cfg, db, server, make(chan rpcclient.ClientConnector, 1), engineShutdown, nil, nil, anz) srv := NewSIPAgent(cfg, filterSChan, engineShutdown, nil) diff --git a/services/stats_it_test.go b/services/stats_it_test.go index ff5fc19ce..22417cd35 100644 --- a/services/stats_it_test.go +++ b/services/stats_it_test.go @@ -54,7 +54,7 @@ func TestStatSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheStatFilterIndexes)) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) sS := NewStatService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil, anz) diff --git a/services/thresholds_it_test.go b/services/thresholds_it_test.go index 766895cce..14842433b 100644 --- a/services/thresholds_it_test.go +++ b/services/thresholds_it_test.go @@ -49,7 +49,7 @@ func TestThresholdSReload(t *testing.T) { close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) server := cores.NewServer(nil) srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) - anz := NewAnalyzerService(cfg, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) + anz := NewAnalyzerService(cfg, server, filterSChan, engineShutdown, make(chan rpcclient.ClientConnector, 1)) db := NewDataDBService(cfg, nil) tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), anz) engine.NewConnManager(cfg, nil)