From 81a4cdd3a142bd4cc8ceb21d27095239c4c907e3 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 27 Oct 2020 15:54:22 +0200 Subject: [PATCH] Added tests for analyzers --- analyzers/analyzers.go | 53 ++----- analyzers/analyzers_test.go | 182 ++++++++++++++++++++++ analyzers/codec.go | 12 +- analyzers/codec_test.go | 100 ++++++++++++ analyzers/connector.go | 8 +- analyzers/connector_test.go | 65 ++++++++ analyzers/utils.go | 60 ++++++- analyzers/utils_test.go | 104 +++++++++++++ cmd/cgr-engine/cgr-engine.go | 103 +++++++----- config/analyzerscfg_test.go | 38 ++++- config/config_defaults.go | 12 +- config/config_json_test.go | 6 +- config/config_test.go | 6 +- data/ansible/rpm_packages/cgrates.spec.j2 | 1 + packages/debian/rules | 1 + packages/redhat_fedora/cgrates.spec | 1 + services/analyzers.go | 15 +- services/apierv1.go | 2 +- services/apierv2.go | 2 +- services/attributes.go | 2 +- services/cdrs.go | 2 +- services/chargers.go | 2 +- services/dispatcherh.go | 2 +- services/dispatchers.go | 2 +- services/ees.go | 12 +- services/loaders.go | 2 +- services/rals.go | 2 +- services/rates.go | 14 +- services/resources.go | 2 +- services/responders.go | 2 +- services/routes.go | 2 +- services/schedulers.go | 2 +- services/sessions.go | 2 +- services/stats.go | 2 +- services/thresholds.go | 2 +- 35 files changed, 687 insertions(+), 138 deletions(-) create mode 100644 analyzers/analyzers_test.go create mode 100644 analyzers/codec_test.go create mode 100644 analyzers/connector_test.go create mode 100644 analyzers/utils_test.go diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 7b5d8a6d8..35691f46a 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -27,11 +27,7 @@ import ( "time" "github.com/blevesearch/bleve" - "github.com/blevesearch/bleve/index/scorch" - "github.com/blevesearch/bleve/index/store/boltdb" - "github.com/blevesearch/bleve/index/store/goleveldb" - "github.com/blevesearch/bleve/index/store/moss" - "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/search" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -53,18 +49,7 @@ func (aS *AnalyzerService) initDB() (err error) { if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil { aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath) } else if os.IsNotExist(err) { - var indxType, storeType string - switch aS.cfg.AnalyzerSCfg().IndexType { - case utils.MetaScorch: - indxType, storeType = scorch.Name, scorch.Name - case utils.MetaBoltdb: - indxType, storeType = upsidedown.Name, boltdb.Name - case utils.MetaLeveldb: - indxType, storeType = upsidedown.Name, goleveldb.Name - case utils.MetaMoss: - indxType, storeType = upsidedown.Name, moss.Name - } - + indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType) aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, bleve.NewIndexMapping(), indxType, storeType, nil) } @@ -72,7 +57,6 @@ func (aS *AnalyzerService) initDB() (err error) { } func (aS *AnalyzerService) clenaUp() (err error) { - fmt.Println("clean") t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL)) t2.SetField("RequestStartTime") searchReq := bleve.NewSearchRequest(t2) @@ -80,8 +64,13 @@ func (aS *AnalyzerService) clenaUp() (err error) { if res, err = aS.db.Search(searchReq); err != nil { return } + return aS.deleteHits(res.Hits) +} + +// extracted as function in order to test this +func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err error) { hasErr := false - for _, hit := range res.Hits { + for _, hit := range hits { if err = aS.db.Delete(hit.ID); err != nil { hasErr = true } @@ -125,33 +114,11 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string, if strings.HasPrefix(method, utils.AnalyzerSv1) { return nil } - var e interface{} - switch val := err.(type) { - default: - case nil: - case string: - e = val - case error: - e = val.Error() - } return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)), - InfoRPC{ - RequestDuration: eTime.Sub(sTime), - RequestStartTime: sTime, - // EndTime: eTime, - - RequestEncoding: info.enc, - RequestSource: info.from, - RequestDestination: info.to, - - RequestID: id, - RequestMethod: method, - RequestParams: utils.ToJSON(params), - Reply: utils.ToJSON(result), - ReplyError: e, - }) + NewInfoRPC(id, method, params, result, err, info, sTime, eTime)) } +// V1Search returns a list of API that match the query func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error { s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr)) s.Fields = []string{utils.Meta} // return all fields diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go new file mode 100644 index 000000000..e8b9c7c80 --- /dev/null +++ b/analyzers/analyzers_test.go @@ -0,0 +1,182 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package analyzers + +import ( + "os" + "path" + "runtime" + "testing" + "time" + + "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/search" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +func TestNewAnalyzerService(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + // no need to DeepEqual + if err = anz.Shutdown(); err != nil { + t.Fatal(err) + } + if err = anz.initDB(); err != nil { + t.Fatal(err) + } + exitChan := make(chan bool, 1) + exitChan <- true + if err := anz.ListenAndServe(exitChan); err != nil { + t.Fatal(err) + } + anz.db.Close() +} + +func TestAnalyzerSLogTraffic(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + t1 := time.Now().Add(-time.Hour) + if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", &extraInfo{ + enc: utils.MetaJSON, + from: "127.0.0.1:5565", + to: "127.0.0.1:2012", + }, t1, t1.Add(time.Second)); err != nil { + t.Fatal(err) + } + if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ + enc: utils.MetaJSON, + from: "127.0.0.1:5565", + to: "127.0.0.1:2012", + }, t1, t1.Add(time.Second)); err != nil { + t.Fatal(err) + } + t1 = time.Now().Add(-10 * time.Minute) + if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ + enc: utils.MetaJSON, + from: "127.0.0.1:5565", + to: "127.0.0.1:2012", + }, t1, t1.Add(time.Second)); err != nil { + t.Fatal(err) + } + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 2 { + t.Errorf("Expected only 2 documents received:%v", cnt) + } + if err = anz.clenaUp(); err != nil { + t.Fatal(err) + } + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } + + if err = anz.db.Close(); err != nil { + t.Fatal(err) + } + if err = anz.clenaUp(); err != bleve.ErrorIndexClosed { + t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err) + } +} + +func TestAnalyzersDeleteHits(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + if err = anz.deleteHits(search.DocumentMatchCollection{&search.DocumentMatch{}}); err != utils.ErrPartiallyExecuted { + t.Errorf("Expected error: %v,received: %+v", utils.ErrPartiallyExecuted, err) + } +} + +func TestAnalyzersListenAndServe(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + if err := anz.db.Close(); err != nil { + t.Fatal(err) + } + anz.ListenAndServe(make(chan bool)) + + cfg.AnalyzerSCfg().CleanupInterval = 1 + anz, err = NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + go func() { + time.Sleep(1) + runtime.Gosched() + anz.db.Close() + }() + anz.ListenAndServe(make(chan bool)) +} diff --git a/analyzers/codec.go b/analyzers/codec.go index 7998c68aa..eb8ce8fbc 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -25,7 +25,7 @@ import ( ) func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to string) rpc.ServerCodec { - return &AnalyzeServerCodec{ + return &AnalyzerServerCodec{ sc: sc, reqs: make(map[uint64]*rpcAPI), aS: aS, @@ -37,7 +37,7 @@ func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to stri } } -type AnalyzeServerCodec struct { +type AnalyzerServerCodec struct { sc rpc.ServerCodec // keep the API in memory because the write is async @@ -48,7 +48,7 @@ type AnalyzeServerCodec struct { extrainfo *extraInfo } -func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { +func (c *AnalyzerServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { err = c.sc.ReadRequestHeader(r) c.reqsLk.Lock() c.reqIdx = r.Seq @@ -61,14 +61,14 @@ func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { return } -func (c *AnalyzeServerCodec) ReadRequestBody(x interface{}) (err error) { +func (c *AnalyzerServerCodec) ReadRequestBody(x interface{}) (err error) { err = c.sc.ReadRequestBody(x) c.reqsLk.Lock() c.reqs[c.reqIdx].Params = x c.reqsLk.Unlock() return } -func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { +func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { c.reqsLk.Lock() api := c.reqs[c.reqIdx] delete(c.reqs, c.reqIdx) @@ -76,4 +76,4 @@ func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now()) return c.sc.WriteResponse(r, x) } -func (c *AnalyzeServerCodec) Close() error { return c.sc.Close() } +func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() } diff --git a/analyzers/codec_test.go b/analyzers/codec_test.go new file mode 100644 index 000000000..988292676 --- /dev/null +++ b/analyzers/codec_test.go @@ -0,0 +1,100 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package analyzers + +import ( + "net/rpc" + "os" + "path" + "reflect" + "runtime" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +type mockServerCodec struct{} + +func (c *mockServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { + r.Seq = 0 + r.ServiceMethod = utils.CoreSv1Ping + return +} + +func (c *mockServerCodec) ReadRequestBody(x interface{}) (err error) { + return +} +func (c *mockServerCodec) WriteResponse(r *rpc.Response, x interface{}) error { + return nil +} +func (c *mockServerCodec) Close() error { return nil } + +func TestNewServerCodec(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + + codec := anz.NewServerCodec(new(mockServerCodec), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012") + r := new(rpc.Request) + expR := &rpc.Request{ + Seq: 0, + ServiceMethod: utils.CoreSv1Ping, + } + if err = codec.ReadRequestHeader(r); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(r, expR) { + t.Errorf("Expected: %v ,received:%v", expR, r) + } + + if err = codec.ReadRequestBody("args"); err != nil { + t.Fatal(err) + } + if err = codec.WriteResponse(&rpc.Response{ + Error: "error", + Seq: 0, + ServiceMethod: utils.CoreSv1Ping, + }, "reply"); err != nil { + t.Fatal(err) + } + if err = codec.Close(); err != nil { + t.Fatal(err) + } + time.Sleep(100 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } +} diff --git a/analyzers/connector.go b/analyzers/connector.go index 7925fef9a..543141d2a 100644 --- a/analyzers/connector.go +++ b/analyzers/connector.go @@ -24,8 +24,8 @@ import ( "github.com/cgrates/rpcclient" ) -func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector { - return &AnalyzeConnector{ +func (aS *AnalyzerService) NewAnalyzerConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector { + return &AnalyzerConnector{ conn: sc, aS: aS, extrainfo: &extraInfo{ @@ -36,14 +36,14 @@ func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc } } -type AnalyzeConnector struct { +type AnalyzerConnector struct { conn rpcclient.ClientConnector aS *AnalyzerService extrainfo *extraInfo } -func (c *AnalyzeConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { +func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { sTime := time.Now() err = c.conn.Call(serviceMethod, args, reply) go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now()) diff --git a/analyzers/connector_test.go b/analyzers/connector_test.go new file mode 100644 index 000000000..a412a42e7 --- /dev/null +++ b/analyzers/connector_test.go @@ -0,0 +1,65 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package analyzers + +import ( + "errors" + "os" + "path" + "runtime" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +type mockConnector struct{} + +func (c *mockConnector) Call(_ string, _, _ interface{}) (err error) { + return errors.New("error") +} +func TestNewAnalyzeConnector(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + rpc := anz.NewAnalyzerConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012") + if err = rpc.Call(utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" { + t.Errorf("Expected 'error' received %v", err) + } + time.Sleep(100 * time.Millisecond) + runtime.Gosched() + if cnt, err := anz.db.DocCount(); err != nil { + t.Fatal(err) + } else if cnt != 1 { + t.Errorf("Expected only one document received:%v", cnt) + } +} diff --git a/analyzers/utils.go b/analyzers/utils.go index b11265eb8..efe66c880 100644 --- a/analyzers/utils.go +++ b/analyzers/utils.go @@ -20,14 +20,46 @@ package analyzers import ( "time" + + "github.com/blevesearch/bleve/index/scorch" + "github.com/blevesearch/bleve/index/store/boltdb" + "github.com/blevesearch/bleve/index/store/goleveldb" + "github.com/blevesearch/bleve/index/store/moss" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/cgrates/cgrates/utils" ) -type extraInfo struct { - enc string - from string - to string +// NewInfoRPC returns a structure to be indexed +func NewInfoRPC(id uint64, method string, + params, result, err interface{}, + info *extraInfo, sTime, eTime time.Time) *InfoRPC { + var e interface{} + switch val := err.(type) { + default: + case nil: + case string: + e = val + case error: + e = val.Error() + } + return &InfoRPC{ + RequestDuration: eTime.Sub(sTime), + RequestStartTime: sTime, + // EndTime: eTime, + + RequestEncoding: info.enc, + RequestSource: info.from, + RequestDestination: info.to, + + RequestID: id, + RequestMethod: method, + RequestParams: utils.ToJSON(params), + Reply: utils.ToJSON(result), + ReplyError: e, + } } +// InfoRPC the structure to be indexed type InfoRPC struct { RequestDuration time.Duration RequestStartTime time.Time @@ -44,6 +76,12 @@ type InfoRPC struct { ReplyError interface{} } +type extraInfo struct { + enc string + from string + to string +} + type rpcAPI struct { ID uint64 `json:"id"` Method string `json:"method"` @@ -51,3 +89,17 @@ type rpcAPI struct { StartTime time.Time } + +func getIndex(indx string) (indxType, storeType string) { + switch indx { + case utils.MetaScorch: + indxType, storeType = scorch.Name, scorch.Name + case utils.MetaBoltdb: + indxType, storeType = upsidedown.Name, boltdb.Name + case utils.MetaLeveldb: + indxType, storeType = upsidedown.Name, goleveldb.Name + case utils.MetaMoss: + indxType, storeType = upsidedown.Name, moss.Name + } + return +} diff --git a/analyzers/utils_test.go b/analyzers/utils_test.go new file mode 100644 index 000000000..9165ed48c --- /dev/null +++ b/analyzers/utils_test.go @@ -0,0 +1,104 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package analyzers + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/blevesearch/bleve/index/scorch" + "github.com/blevesearch/bleve/index/store/boltdb" + "github.com/blevesearch/bleve/index/store/goleveldb" + "github.com/blevesearch/bleve/index/store/moss" + "github.com/blevesearch/bleve/index/upsidedown" + "github.com/cgrates/cgrates/utils" +) + +func TestGetIndex(t *testing.T) { + expIdxType, expStore := scorch.Name, scorch.Name + idxType, store := getIndex(utils.MetaScorch) + if idxType != expIdxType { + t.Errorf("Expected index: %q,received:%q", expIdxType, idxType) + } + if store != expStore { + t.Errorf("Expected index: %q,received:%q", expStore, store) + } + + expIdxType, expStore = upsidedown.Name, boltdb.Name + idxType, store = getIndex(utils.MetaBoltdb) + if idxType != expIdxType { + t.Errorf("Expected index: %q,received:%q", expIdxType, idxType) + } + if store != expStore { + t.Errorf("Expected index: %q,received:%q", expStore, store) + } + + expIdxType, expStore = upsidedown.Name, goleveldb.Name + idxType, store = getIndex(utils.MetaLeveldb) + if idxType != expIdxType { + t.Errorf("Expected index: %q,received:%q", expIdxType, idxType) + } + if store != expStore { + t.Errorf("Expected index: %q,received:%q", expStore, store) + } + + expIdxType, expStore = upsidedown.Name, moss.Name + idxType, store = getIndex(utils.MetaMoss) + if idxType != expIdxType { + t.Errorf("Expected index: %q,received:%q", expIdxType, idxType) + } + if store != expStore { + t.Errorf("Expected index: %q,received:%q", expStore, store) + } +} + +func TestNewInfoRPC(t *testing.T) { + t1 := time.Now() + expIdx := &InfoRPC{ + RequestDuration: time.Second, + RequestStartTime: t1, + RequestEncoding: utils.MetaJSON, + RequestSource: "127.0.0.1:5565", + RequestDestination: "127.0.0.1:2012", + RequestID: 0, + RequestMethod: utils.CoreSv1Status, + RequestParams: `"status"`, + Reply: `"result"`, + ReplyError: "error", + } + idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ + enc: utils.MetaJSON, + from: "127.0.0.1:5565", + to: "127.0.0.1:2012", + }, t1, t1.Add(time.Second)) + if !reflect.DeepEqual(expIdx, idx) { + t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx)) + } + idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), &extraInfo{ + enc: utils.MetaJSON, + from: "127.0.0.1:5565", + to: "127.0.0.1:2012", + }, t1, t1.Add(time.Second)) + if !reflect.DeepEqual(expIdx, idx) { + t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx)) + } + +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index fcc0b115e..331b3c7d0 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -74,7 +74,8 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, // initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns func initCacheS(internalCacheSChan chan rpcclient.ClientConnector, - server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) { + server *utils.Server, dm *engine.DataManager, exitChan chan bool, + anz *services.AnalyzerService) (chS *engine.CacheS) { chS = engine.NewCacheS(cfg, dm) go func() { if err := chS.Precache(); err != nil { @@ -87,32 +88,73 @@ func initCacheS(internalCacheSChan chan rpcclient.ClientConnector, if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(chSv1) } - internalCacheSChan <- chS + var rpc rpcclient.ClientConnector = chS + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS) + } + internalCacheSChan <- rpc return } -func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server) { +func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server, + anz *services.AnalyzerService) { grdSv1 := v1.NewGuardianSv1() if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(grdSv1) } - internalGuardianSChan <- grdSv1 + var rpc rpcclient.ClientConnector = grdSv1 + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS) + } + internalGuardianSChan <- rpc } -func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server) { +func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server, + anz *services.AnalyzerService) { cSv1 := v1.NewCoreSv1(engine.NewCoreService()) if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(cSv1) } - internalCoreSv1Chan <- cSv1 + var rpc rpcclient.ClientConnector = cSv1 + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CoreS) + } + internalCoreSv1Chan <- rpc } func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnector, - srvMngr *servmanager.ServiceManager, server *utils.Server) { + srvMngr *servmanager.ServiceManager, server *utils.Server, + anz *services.AnalyzerService) { if !cfg.DispatcherSCfg().Enabled { server.RpcRegister(v1.NewServiceManagerV1(srvMngr)) } - internalServiceManagerChan <- srvMngr + var rpc rpcclient.ClientConnector = srvMngr + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager) + } + internalServiceManagerChan <- rpc +} + +// initLogger will initialize syslog writter, needs to be called after config init +func initLogger(cfg *config.CGRConfig) error { + sylogger := cfg.GeneralCfg().Logger + if *syslogger != "" { // Modify the log level if provided by command arguments + sylogger = *syslogger + } + return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID) +} + +func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector, + server *utils.Server, anz *services.AnalyzerService) { + cfgSv1 := v1.NewConfigSv1(cfg) + if !cfg.DispatcherSCfg().Enabled { + server.RpcRegister(cfgSv1) + } + var rpc rpcclient.ClientConnector = cfgSv1 + if anz.IsRunning() { + rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1) + } + internalConfigChan <- rpc } func startRPC(server *utils.Server, internalRaterChan, @@ -238,24 +280,6 @@ func writePid() { } } -// initLogger will initialize syslog writter, needs to be called after config init -func initLogger(cfg *config.CGRConfig) error { - sylogger := cfg.GeneralCfg().Logger - if *syslogger != "" { // Modify the log level if provided by command arguments - sylogger = *syslogger - } - return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID) -} - -func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector, - server *utils.Server) { - cfgSv1 := v1.NewConfigSv1(cfg) - if !cfg.DispatcherSCfg().Enabled { - server.RpcRegister(cfgSv1) - } - internalConfigChan <- cfgSv1 -} - func memProfFile(memProfPath string) bool { f, err := os.Create(memProfPath) if err != nil { @@ -522,15 +546,24 @@ func main() { // Define internal connections via channels filterSChan := make(chan *engine.FilterS, 1) + // init AnalyzerS + anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) + if anz.ShouldRun() { + if err := anz.Start(); err != nil { + fmt.Println(err) + return + } + } + // init CacheS - cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan) + cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz) engine.SetCache(cacheS) // init GuardianSv1 - initGuardianSv1(internalGuardianSChan, server) + initGuardianSv1(internalGuardianSChan, server, anz) // init CoreSv1 - initCoreSv1(internalCoreSv1Chan, server) + initCoreSv1(internalCoreSv1Chan, server, anz) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, exitChan) @@ -567,14 +600,6 @@ func main() { ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan, internalLoaderSChan, connManager) - anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) - if anz.ShouldRun() { - if err := anz.Start(); err != nil { - fmt.Println(err) - return - } - } - srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, rals.GetResponder(), apiSv1, apiSv2, cdrS, smg, services.NewEventReaderService(cfg, filterSChan, exitChan, connManager), @@ -597,7 +622,7 @@ func main() { go startFilterService(filterSChan, cacheS, connManager, cfg, dmService.GetDM(), exitChan) - initServiceManagerV1(internalServeManagerChan, srvManager, server) + initServiceManagerV1(internalServeManagerChan, srvManager, server, anz) // init internalRPCSet to share internal connections among the engine engine.IntRPC = engine.NewRPCClientSet() @@ -627,7 +652,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) // engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan) - initConfigSv1(internalConfigChan, server) + initConfigSv1(internalConfigChan, server, anz) if *preload != utils.EmptyString { runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan) diff --git a/config/analyzerscfg_test.go b/config/analyzerscfg_test.go index 15305b6c8..b954bfb41 100644 --- a/config/analyzerscfg_test.go +++ b/config/analyzerscfg_test.go @@ -20,6 +20,7 @@ package config import ( "reflect" "testing" + "time" "github.com/cgrates/cgrates/utils" ) @@ -29,7 +30,11 @@ func TestAnalyzerSCfgloadFromJsonCfg(t *testing.T) { Enabled: utils.BoolPointer(false), } expected := &AnalyzerSCfg{ - Enabled: false, + Enabled: false, + CleanupInterval: time.Hour, + DBPath: "/var/spool/cgrates/analyzers", + IndexType: utils.MetaScorch, + TTL: 24 * time.Hour, } if jsnCfg, err := NewDefaultCGRConfig(); err != nil { t.Error(err) @@ -46,7 +51,11 @@ func TestAnalyzerSCfgAsMapInterface(t *testing.T) { } }` eMap := map[string]interface{}{ - utils.EnabledCfg: false, + utils.EnabledCfg: false, + utils.CleanupIntervalCfg: "1h0m0s", + utils.DBPathCfg: "/var/spool/cgrates/analyzers", + utils.IndexTypeCfg: utils.MetaScorch, + utils.TTLCfg: "24h0m0s", } if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { t.Error(err) @@ -63,7 +72,11 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) { } }` eMap := map[string]interface{}{ - utils.EnabledCfg: true, + utils.EnabledCfg: true, + utils.CleanupIntervalCfg: "1h0m0s", + utils.DBPathCfg: "/var/spool/cgrates/analyzers", + utils.IndexTypeCfg: utils.MetaScorch, + utils.TTLCfg: "24h0m0s", } if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil { t.Error(err) @@ -71,3 +84,22 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) { t.Errorf("Expected: %+v , recived: %+v", eMap, rcv) } } + +func TestAnalyzerSCfgloadFromJsonCfgErr(t *testing.T) { + jsonCfg := &AnalyzerSJsonCfg{ + Cleanup_interval: utils.StringPointer("24ha"), + } + if jsnCfg, err := NewDefaultCGRConfig(); err != nil { + t.Error(err) + } else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil { + t.Errorf("Expected error received nil") + } + jsonCfg = &AnalyzerSJsonCfg{ + Ttl: utils.StringPointer("24ha"), + } + if jsnCfg, err := NewDefaultCGRConfig(); err != nil { + t.Error(err) + } else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil { + t.Errorf("Expected error received nil") + } +} diff --git a/config/config_defaults.go b/config/config_defaults.go index e21c55546..26d62af89 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -904,12 +904,12 @@ const CGRATES_CFG_JSON = ` }, -"analyzers":{ // AnalyzerS config - "enabled": false, // starts AnalyzerS service: . - "db_path": "/tmp/analyzers", // path to the folder where to store the information - "index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb> - "ttl": "10s", // time to wait before removing the API capture - "cleanup_interval": "1h", // the interval we clean the db +"analyzers":{ // AnalyzerS config + "enabled": false, // starts AnalyzerS service: . + "db_path": "/var/spool/cgrates/analyzers", // path to the folder where to store the information + "index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb> + "ttl": "24h", // time to wait before removing the API capture + "cleanup_interval": "1h", // the interval we clean the db }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 5b8156f48..bf67b85c9 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1680,7 +1680,11 @@ func TestDfTlsCfg(t *testing.T) { func TestDfAnalyzerCfg(t *testing.T) { eCfg := &AnalyzerSJsonCfg{ - Enabled: utils.BoolPointer(false), + Enabled: utils.BoolPointer(false), + Cleanup_interval: utils.StringPointer("1h"), + Db_path: utils.StringPointer("/var/spool/cgrates/analyzers"), + Index_type: utils.StringPointer(utils.MetaScorch), + Ttl: utils.StringPointer("24h"), } if cfg, err := dfCgrJSONCfg.AnalyzerCfgJson(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index 399fc0a0e..6eab499bf 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1863,7 +1863,11 @@ func TestCfgTlsCfg(t *testing.T) { func TestCgrCfgJSONDefaultAnalyzerSCfg(t *testing.T) { aSCfg := &AnalyzerSCfg{ - Enabled: false, + Enabled: false, + CleanupInterval: time.Hour, + DBPath: "/var/spool/cgrates/analyzers", + IndexType: utils.MetaScorch, + TTL: 24 * time.Hour, } if !reflect.DeepEqual(cgrCfg.analyzerSCfg, aSCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.analyzerSCfg, aSCfg) diff --git a/data/ansible/rpm_packages/cgrates.spec.j2 b/data/ansible/rpm_packages/cgrates.spec.j2 index e5c564366..e46ab7390 100644 --- a/data/ansible/rpm_packages/cgrates.spec.j2 +++ b/data/ansible/rpm_packages/cgrates.spec.j2 @@ -97,6 +97,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts +mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers mkdir -p $RPM_BUILD_ROOT%{_libdir}/history mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d diff --git a/packages/debian/rules b/packages/debian/rules index 9c3f60058..ce564e0eb 100755 --- a/packages/debian/rules +++ b/packages/debian/rules @@ -40,6 +40,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/spool/cgrates/cdre/fwv mkdir -p $(PKGDIR)/var/spool/cgrates/tpe mkdir -p $(PKGDIR)/var/spool/cgrates/failed_posts + mkdir -p $(PKGDIR)/var/spool/cgrates/analyzers mkdir -p $(PKGDIR)/var/lib/cgrates/history mkdir -p $(PKGDIR)/var/lib/cgrates/cache_dump mkdir -p $(PKGDIR)/var/log/cgrates diff --git a/packages/redhat_fedora/cgrates.spec b/packages/redhat_fedora/cgrates.spec index bafe525d6..60c71c791 100644 --- a/packages/redhat_fedora/cgrates.spec +++ b/packages/redhat_fedora/cgrates.spec @@ -99,6 +99,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts +mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers mkdir -p $RPM_BUILD_ROOT%{_libdir}/history mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d diff --git a/services/analyzers.go b/services/analyzers.go index dfca01888..dd308e053 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -27,14 +27,18 @@ import ( "github.com/cgrates/cgrates/analyzers" v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" ) +var ( + // used to build the connector for analyzers + intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { return c } +) + // NewAnalyzerService returns the Analyzer Service func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool, - internalAnalyzerSChan chan rpcclient.ClientConnector) servmanager.Service { + internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService { return &AnalyzerService{ connChan: internalAnalyzerSChan, cfg: cfg, @@ -72,6 +76,9 @@ func (anz *AnalyzerService) Start() (err error) { anz.exitChan <- true return }() + intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { + return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to) + } utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec { fromstr := "" if from != nil { @@ -124,3 +131,7 @@ func (anz *AnalyzerService) ServiceName() string { func (anz *AnalyzerService) ShouldRun() bool { return anz.cfg.AnalyzerSCfg().Enabled } + +func (anz *AnalyzerService) GetAnalyzerS() *analyzers.AnalyzerService { + return anz.anz +} diff --git a/services/apierv1.go b/services/apierv1.go index d2a7e0f09..6fcffdf85 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -119,7 +119,7 @@ func (apiService *APIerSv1Service) Start() (err error) { } //backwards compatible - apiService.connChan <- apiService.api + apiService.connChan <- intAnzConn(apiService.api, utils.APIerSv1) apiService.APIerSv1Chan <- apiService.api return diff --git a/services/apierv2.go b/services/apierv2.go index 36bc899d4..f286584bb 100644 --- a/services/apierv2.go +++ b/services/apierv2.go @@ -73,7 +73,7 @@ func (api *APIerSv2Service) Start() (err error) { api.server.RpcRegisterName(utils.ApierV2, api.api) } - api.connChan <- api.api + api.connChan <- intAnzConn(api.api, utils.APIerSv2) return } diff --git a/services/attributes.go b/services/attributes.go index 34cd2a94f..b583ba841 100644 --- a/services/attributes.go +++ b/services/attributes.go @@ -87,7 +87,7 @@ func (attrS *AttributeService) Start() (err error) { if !attrS.cfg.DispatcherSCfg().Enabled { attrS.server.RpcRegister(attrS.rpc) } - attrS.connChan <- attrS.rpc + attrS.connChan <- intAnzConn(attrS.rpc, utils.AttributeS) return } diff --git a/services/cdrs.go b/services/cdrs.go index fd8840091..7a9e04aee 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -107,7 +107,7 @@ func (cdrService *CDRServer) Start() (err error) { // Make the cdr server available for internal communication cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this) } - cdrService.connChan <- cdrService.cdrS // Signal that cdrS is operational + cdrService.connChan <- intAnzConn(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational return } diff --git a/services/chargers.go b/services/chargers.go index f585afa91..09ce1838a 100644 --- a/services/chargers.go +++ b/services/chargers.go @@ -88,7 +88,7 @@ func (chrS *ChargerService) Start() (err error) { if !chrS.cfg.DispatcherSCfg().Enabled { chrS.server.RpcRegister(cSv1) } - chrS.connChan <- cSv1 + chrS.connChan <- intAnzConn(cSv1, utils.ChargerS) return } diff --git a/services/dispatcherh.go b/services/dispatcherh.go index 1b00b1d98..587d76f7b 100644 --- a/services/dispatcherh.go +++ b/services/dispatcherh.go @@ -66,7 +66,7 @@ func (dspS *DispatcherHostsService) Start() (err error) { dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr) go dspS.dspS.ListenAndServe() - dspS.connChan <- dspS.dspS + dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherH) return } diff --git a/services/dispatchers.go b/services/dispatchers.go index a6bb06b39..0896f1e1e 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -145,7 +145,7 @@ func (dspS *DispatcherService) Start() (err error) { dspS.server.RpcRegisterName(utils.RateSv1, v1.NewDispatcherRateSv1(dspS.dspS)) - dspS.connChan <- dspS.dspS + dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherS) return } diff --git a/services/ees.go b/services/ees.go index 624c700d2..cffc10ff6 100644 --- a/services/ees.go +++ b/services/ees.go @@ -118,17 +118,17 @@ func (es *EventExporterService) Start() (err error) { utils.EventExporterS, err)) return } - - es.rpc = v1.NewEventExporterSv1(es.eeS) - if !es.cfg.DispatcherSCfg().Enabled { - es.server.RpcRegister(es.rpc) - } - es.intConnChan <- es.eeS go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) { if err := eeS.ListenAndServe(exitChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error())) exitChan <- true } }(es.eeS, es.exitChan, es.rldChan) + + es.rpc = v1.NewEventExporterSv1(es.eeS) + if !es.cfg.DispatcherSCfg().Enabled { + es.server.RpcRegister(es.rpc) + } + es.intConnChan <- intAnzConn(es.eeS, utils.EventExporterS) return } diff --git a/services/loaders.go b/services/loaders.go index 7a70d5424..319747096 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -85,7 +85,7 @@ func (ldrs *LoaderService) Start() (err error) { if !ldrs.cfg.DispatcherSCfg().Enabled { ldrs.server.RpcRegister(ldrs.rpc) } - ldrs.connChan <- ldrs.rpc + ldrs.connChan <- intAnzConn(ldrs.rpc, utils.LoaderS) return } diff --git a/services/rals.go b/services/rals.go index 06208fb79..d6cfcc5b6 100644 --- a/services/rals.go +++ b/services/rals.go @@ -91,7 +91,7 @@ func (rals *RalService) Start() (err error) { rals.server.RpcRegister(rals.rals) } - rals.connChan <- rals.rals + rals.connChan <- intAnzConn(rals.rals, utils.RALService) return } diff --git a/services/rates.go b/services/rates.go index e832a2575..91ce26224 100644 --- a/services/rates.go +++ b/services/rates.go @@ -126,18 +126,18 @@ func (rs *RateService) Start() (err error) { rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm) rs.Unlock() - rs.rpc = v1.NewRateSv1(rs.rateS) - if !rs.cfg.DispatcherSCfg().Enabled { - rs.server.RpcRegister(rs.rpc) - } - - rs.intConnChan <- rs.rpc - go func(rtS *rates.RateS, exitChan chan bool, rldChan chan struct{}) { if err := rtS.ListenAndServe(exitChan, rldChan); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error())) exitChan <- true } }(rs.rateS, rs.exitChan, rs.rldChan) + + rs.rpc = v1.NewRateSv1(rs.rateS) + if !rs.cfg.DispatcherSCfg().Enabled { + rs.server.RpcRegister(rs.rpc) + } + + rs.intConnChan <- intAnzConn(rs.rpc, utils.RateS) return } diff --git a/services/resources.go b/services/resources.go index 4bef97e16..0f8c86012 100644 --- a/services/resources.go +++ b/services/resources.go @@ -90,7 +90,7 @@ func (reS *ResourceService) Start() (err error) { if !reS.cfg.DispatcherSCfg().Enabled { reS.server.RpcRegister(reS.rpc) } - reS.connChan <- reS.rpc + reS.connChan <- intAnzConn(reS.rpc, utils.ResourceS) return } diff --git a/services/responders.go b/services/responders.go index 7e2701ad5..ccebad82e 100644 --- a/services/responders.go +++ b/services/responders.go @@ -68,7 +68,7 @@ func (resp *ResponderService) Start() (err error) { resp.server.RpcRegister(resp.resp) } - resp.connChan <- resp.resp // Rater done + resp.connChan <- intAnzConn(resp.resp, utils.ResponderS) // Rater done return } diff --git a/services/routes.go b/services/routes.go index 3e1412b5b..56235a074 100644 --- a/services/routes.go +++ b/services/routes.go @@ -91,7 +91,7 @@ func (routeS *RouteService) Start() (err error) { if !routeS.cfg.DispatcherSCfg().Enabled { routeS.server.RpcRegister(routeS.rpc) } - routeS.connChan <- routeS.rpc + routeS.connChan <- intAnzConn(routeS.rpc, utils.RouteS) return } diff --git a/services/schedulers.go b/services/schedulers.go index 2c6bc6917..706a731d1 100644 --- a/services/schedulers.go +++ b/services/schedulers.go @@ -84,7 +84,7 @@ func (schS *SchedulerService) Start() (err error) { if !schS.cfg.DispatcherSCfg().Enabled { schS.server.RpcRegister(schS.rpc) } - schS.connChan <- schS.rpc + schS.connChan <- intAnzConn(schS.rpc, utils.SchedulerS) return } diff --git a/services/sessions.go b/services/sessions.go index 6440a4564..7465510ee 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -82,7 +82,7 @@ func (smg *SessionService) Start() (err error) { //start sync session in a separate gorutine go smg.sm.ListenAndServe(smg.exitChan) // Pass internal connection via BiRPCClient - smg.connChan <- smg.sm + smg.connChan <- intAnzConn(smg.sm, utils.SessionS) // Register RPC handler smg.rpc = v1.NewSMGenericV1(smg.sm) diff --git a/services/stats.go b/services/stats.go index 4566ff236..31790bcc8 100644 --- a/services/stats.go +++ b/services/stats.go @@ -92,7 +92,7 @@ func (sts *StatService) Start() (err error) { if !sts.cfg.DispatcherSCfg().Enabled { sts.server.RpcRegister(sts.rpc) } - sts.connChan <- sts.rpc + sts.connChan <- intAnzConn(sts.rpc, utils.StatS) return } diff --git a/services/thresholds.go b/services/thresholds.go index 04f29d11b..cebb0bc1b 100644 --- a/services/thresholds.go +++ b/services/thresholds.go @@ -87,7 +87,7 @@ func (thrs *ThresholdService) Start() (err error) { if !thrs.cfg.DispatcherSCfg().Enabled { thrs.server.RpcRegister(thrs.rpc) } - thrs.connChan <- thrs.rpc + thrs.connChan <- intAnzConn(thrs.rpc, utils.ThresholdS) return }