diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go
index 7b5d8a6d8..35691f46a 100755
--- a/analyzers/analyzers.go
+++ b/analyzers/analyzers.go
@@ -27,11 +27,7 @@ import (
"time"
"github.com/blevesearch/bleve"
- "github.com/blevesearch/bleve/index/scorch"
- "github.com/blevesearch/bleve/index/store/boltdb"
- "github.com/blevesearch/bleve/index/store/goleveldb"
- "github.com/blevesearch/bleve/index/store/moss"
- "github.com/blevesearch/bleve/index/upsidedown"
+ "github.com/blevesearch/bleve/search"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -53,18 +49,7 @@ func (aS *AnalyzerService) initDB() (err error) {
if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil {
aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath)
} else if os.IsNotExist(err) {
- var indxType, storeType string
- switch aS.cfg.AnalyzerSCfg().IndexType {
- case utils.MetaScorch:
- indxType, storeType = scorch.Name, scorch.Name
- case utils.MetaBoltdb:
- indxType, storeType = upsidedown.Name, boltdb.Name
- case utils.MetaLeveldb:
- indxType, storeType = upsidedown.Name, goleveldb.Name
- case utils.MetaMoss:
- indxType, storeType = upsidedown.Name, moss.Name
- }
-
+ indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType)
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath,
bleve.NewIndexMapping(), indxType, storeType, nil)
}
@@ -72,7 +57,6 @@ func (aS *AnalyzerService) initDB() (err error) {
}
func (aS *AnalyzerService) clenaUp() (err error) {
- fmt.Println("clean")
t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL))
t2.SetField("RequestStartTime")
searchReq := bleve.NewSearchRequest(t2)
@@ -80,8 +64,13 @@ func (aS *AnalyzerService) clenaUp() (err error) {
if res, err = aS.db.Search(searchReq); err != nil {
return
}
+ return aS.deleteHits(res.Hits)
+}
+
+// extracted as function in order to test this
+func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err error) {
hasErr := false
- for _, hit := range res.Hits {
+ for _, hit := range hits {
if err = aS.db.Delete(hit.ID); err != nil {
hasErr = true
}
@@ -125,33 +114,11 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
if strings.HasPrefix(method, utils.AnalyzerSv1) {
return nil
}
- var e interface{}
- switch val := err.(type) {
- default:
- case nil:
- case string:
- e = val
- case error:
- e = val.Error()
- }
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
- InfoRPC{
- RequestDuration: eTime.Sub(sTime),
- RequestStartTime: sTime,
- // EndTime: eTime,
-
- RequestEncoding: info.enc,
- RequestSource: info.from,
- RequestDestination: info.to,
-
- RequestID: id,
- RequestMethod: method,
- RequestParams: utils.ToJSON(params),
- Reply: utils.ToJSON(result),
- ReplyError: e,
- })
+ NewInfoRPC(id, method, params, result, err, info, sTime, eTime))
}
+// V1Search returns a list of API that match the query
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
s.Fields = []string{utils.Meta} // return all fields
diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go
new file mode 100644
index 000000000..e8b9c7c80
--- /dev/null
+++ b/analyzers/analyzers_test.go
@@ -0,0 +1,182 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package analyzers
+
+import (
+ "os"
+ "path"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/blevesearch/bleve"
+ "github.com/blevesearch/bleve/search"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestNewAnalyzerService(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // no need to DeepEqual
+ if err = anz.Shutdown(); err != nil {
+ t.Fatal(err)
+ }
+ if err = anz.initDB(); err != nil {
+ t.Fatal(err)
+ }
+ exitChan := make(chan bool, 1)
+ exitChan <- true
+ if err := anz.ListenAndServe(exitChan); err != nil {
+ t.Fatal(err)
+ }
+ anz.db.Close()
+}
+
+func TestAnalyzerSLogTraffic(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ cfg.AnalyzerSCfg().TTL = 30 * time.Minute
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ t1 := time.Now().Add(-time.Hour)
+ if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", &extraInfo{
+ enc: utils.MetaJSON,
+ from: "127.0.0.1:5565",
+ to: "127.0.0.1:2012",
+ }, t1, t1.Add(time.Second)); err != nil {
+ t.Fatal(err)
+ }
+ if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
+ enc: utils.MetaJSON,
+ from: "127.0.0.1:5565",
+ to: "127.0.0.1:2012",
+ }, t1, t1.Add(time.Second)); err != nil {
+ t.Fatal(err)
+ }
+ t1 = time.Now().Add(-10 * time.Minute)
+ if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
+ enc: utils.MetaJSON,
+ from: "127.0.0.1:5565",
+ to: "127.0.0.1:2012",
+ }, t1, t1.Add(time.Second)); err != nil {
+ t.Fatal(err)
+ }
+ if cnt, err := anz.db.DocCount(); err != nil {
+ t.Fatal(err)
+ } else if cnt != 2 {
+ t.Errorf("Expected only 2 documents received:%v", cnt)
+ }
+ if err = anz.clenaUp(); err != nil {
+ t.Fatal(err)
+ }
+ if cnt, err := anz.db.DocCount(); err != nil {
+ t.Fatal(err)
+ } else if cnt != 1 {
+ t.Errorf("Expected only one document received:%v", cnt)
+ }
+
+ if err = anz.db.Close(); err != nil {
+ t.Fatal(err)
+ }
+ if err = anz.clenaUp(); err != bleve.ErrorIndexClosed {
+ t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err)
+ }
+}
+
+func TestAnalyzersDeleteHits(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ cfg.AnalyzerSCfg().TTL = 30 * time.Minute
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = anz.deleteHits(search.DocumentMatchCollection{&search.DocumentMatch{}}); err != utils.ErrPartiallyExecuted {
+ t.Errorf("Expected error: %v,received: %+v", utils.ErrPartiallyExecuted, err)
+ }
+}
+
+func TestAnalyzersListenAndServe(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ cfg.AnalyzerSCfg().TTL = 30 * time.Minute
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := anz.db.Close(); err != nil {
+ t.Fatal(err)
+ }
+ anz.ListenAndServe(make(chan bool))
+
+ cfg.AnalyzerSCfg().CleanupInterval = 1
+ anz, err = NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ time.Sleep(1)
+ runtime.Gosched()
+ anz.db.Close()
+ }()
+ anz.ListenAndServe(make(chan bool))
+}
diff --git a/analyzers/codec.go b/analyzers/codec.go
index 7998c68aa..eb8ce8fbc 100644
--- a/analyzers/codec.go
+++ b/analyzers/codec.go
@@ -25,7 +25,7 @@ import (
)
func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to string) rpc.ServerCodec {
- return &AnalyzeServerCodec{
+ return &AnalyzerServerCodec{
sc: sc,
reqs: make(map[uint64]*rpcAPI),
aS: aS,
@@ -37,7 +37,7 @@ func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to stri
}
}
-type AnalyzeServerCodec struct {
+type AnalyzerServerCodec struct {
sc rpc.ServerCodec
// keep the API in memory because the write is async
@@ -48,7 +48,7 @@ type AnalyzeServerCodec struct {
extrainfo *extraInfo
}
-func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
+func (c *AnalyzerServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
err = c.sc.ReadRequestHeader(r)
c.reqsLk.Lock()
c.reqIdx = r.Seq
@@ -61,14 +61,14 @@ func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
return
}
-func (c *AnalyzeServerCodec) ReadRequestBody(x interface{}) (err error) {
+func (c *AnalyzerServerCodec) ReadRequestBody(x interface{}) (err error) {
err = c.sc.ReadRequestBody(x)
c.reqsLk.Lock()
c.reqs[c.reqIdx].Params = x
c.reqsLk.Unlock()
return
}
-func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
+func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
c.reqsLk.Lock()
api := c.reqs[c.reqIdx]
delete(c.reqs, c.reqIdx)
@@ -76,4 +76,4 @@ func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now())
return c.sc.WriteResponse(r, x)
}
-func (c *AnalyzeServerCodec) Close() error { return c.sc.Close() }
+func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() }
diff --git a/analyzers/codec_test.go b/analyzers/codec_test.go
new file mode 100644
index 000000000..988292676
--- /dev/null
+++ b/analyzers/codec_test.go
@@ -0,0 +1,100 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package analyzers
+
+import (
+ "net/rpc"
+ "os"
+ "path"
+ "reflect"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+type mockServerCodec struct{}
+
+func (c *mockServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
+ r.Seq = 0
+ r.ServiceMethod = utils.CoreSv1Ping
+ return
+}
+
+func (c *mockServerCodec) ReadRequestBody(x interface{}) (err error) {
+ return
+}
+func (c *mockServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
+ return nil
+}
+func (c *mockServerCodec) Close() error { return nil }
+
+func TestNewServerCodec(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ cfg.AnalyzerSCfg().TTL = 30 * time.Minute
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ codec := anz.NewServerCodec(new(mockServerCodec), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
+ r := new(rpc.Request)
+ expR := &rpc.Request{
+ Seq: 0,
+ ServiceMethod: utils.CoreSv1Ping,
+ }
+ if err = codec.ReadRequestHeader(r); err != nil {
+ t.Fatal(err)
+ } else if !reflect.DeepEqual(r, expR) {
+ t.Errorf("Expected: %v ,received:%v", expR, r)
+ }
+
+ if err = codec.ReadRequestBody("args"); err != nil {
+ t.Fatal(err)
+ }
+ if err = codec.WriteResponse(&rpc.Response{
+ Error: "error",
+ Seq: 0,
+ ServiceMethod: utils.CoreSv1Ping,
+ }, "reply"); err != nil {
+ t.Fatal(err)
+ }
+ if err = codec.Close(); err != nil {
+ t.Fatal(err)
+ }
+ time.Sleep(100 * time.Millisecond)
+ runtime.Gosched()
+ if cnt, err := anz.db.DocCount(); err != nil {
+ t.Fatal(err)
+ } else if cnt != 1 {
+ t.Errorf("Expected only one document received:%v", cnt)
+ }
+}
diff --git a/analyzers/connector.go b/analyzers/connector.go
index 7925fef9a..543141d2a 100644
--- a/analyzers/connector.go
+++ b/analyzers/connector.go
@@ -24,8 +24,8 @@ import (
"github.com/cgrates/rpcclient"
)
-func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector {
- return &AnalyzeConnector{
+func (aS *AnalyzerService) NewAnalyzerConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector {
+ return &AnalyzerConnector{
conn: sc,
aS: aS,
extrainfo: &extraInfo{
@@ -36,14 +36,14 @@ func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc
}
}
-type AnalyzeConnector struct {
+type AnalyzerConnector struct {
conn rpcclient.ClientConnector
aS *AnalyzerService
extrainfo *extraInfo
}
-func (c *AnalyzeConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
+func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
sTime := time.Now()
err = c.conn.Call(serviceMethod, args, reply)
go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now())
diff --git a/analyzers/connector_test.go b/analyzers/connector_test.go
new file mode 100644
index 000000000..a412a42e7
--- /dev/null
+++ b/analyzers/connector_test.go
@@ -0,0 +1,65 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package analyzers
+
+import (
+ "errors"
+ "os"
+ "path"
+ "runtime"
+ "testing"
+ "time"
+
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+type mockConnector struct{}
+
+func (c *mockConnector) Call(_ string, _, _ interface{}) (err error) {
+ return errors.New("error")
+}
+func TestNewAnalyzeConnector(t *testing.T) {
+ cfg, err := config.NewDefaultCGRConfig()
+ if err != nil {
+ t.Fatal(err)
+ }
+ cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
+ if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
+ t.Fatal(err)
+ }
+ if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
+ t.Fatal(err)
+ }
+ anz, err := NewAnalyzerService(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ rpc := anz.NewAnalyzerConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
+ if err = rpc.Call(utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" {
+ t.Errorf("Expected 'error' received %v", err)
+ }
+ time.Sleep(100 * time.Millisecond)
+ runtime.Gosched()
+ if cnt, err := anz.db.DocCount(); err != nil {
+ t.Fatal(err)
+ } else if cnt != 1 {
+ t.Errorf("Expected only one document received:%v", cnt)
+ }
+}
diff --git a/analyzers/utils.go b/analyzers/utils.go
index b11265eb8..efe66c880 100644
--- a/analyzers/utils.go
+++ b/analyzers/utils.go
@@ -20,14 +20,46 @@ package analyzers
import (
"time"
+
+ "github.com/blevesearch/bleve/index/scorch"
+ "github.com/blevesearch/bleve/index/store/boltdb"
+ "github.com/blevesearch/bleve/index/store/goleveldb"
+ "github.com/blevesearch/bleve/index/store/moss"
+ "github.com/blevesearch/bleve/index/upsidedown"
+ "github.com/cgrates/cgrates/utils"
)
-type extraInfo struct {
- enc string
- from string
- to string
+// NewInfoRPC returns a structure to be indexed
+func NewInfoRPC(id uint64, method string,
+ params, result, err interface{},
+ info *extraInfo, sTime, eTime time.Time) *InfoRPC {
+ var e interface{}
+ switch val := err.(type) {
+ default:
+ case nil:
+ case string:
+ e = val
+ case error:
+ e = val.Error()
+ }
+ return &InfoRPC{
+ RequestDuration: eTime.Sub(sTime),
+ RequestStartTime: sTime,
+ // EndTime: eTime,
+
+ RequestEncoding: info.enc,
+ RequestSource: info.from,
+ RequestDestination: info.to,
+
+ RequestID: id,
+ RequestMethod: method,
+ RequestParams: utils.ToJSON(params),
+ Reply: utils.ToJSON(result),
+ ReplyError: e,
+ }
}
+// InfoRPC the structure to be indexed
type InfoRPC struct {
RequestDuration time.Duration
RequestStartTime time.Time
@@ -44,6 +76,12 @@ type InfoRPC struct {
ReplyError interface{}
}
+type extraInfo struct {
+ enc string
+ from string
+ to string
+}
+
type rpcAPI struct {
ID uint64 `json:"id"`
Method string `json:"method"`
@@ -51,3 +89,17 @@ type rpcAPI struct {
StartTime time.Time
}
+
+func getIndex(indx string) (indxType, storeType string) {
+ switch indx {
+ case utils.MetaScorch:
+ indxType, storeType = scorch.Name, scorch.Name
+ case utils.MetaBoltdb:
+ indxType, storeType = upsidedown.Name, boltdb.Name
+ case utils.MetaLeveldb:
+ indxType, storeType = upsidedown.Name, goleveldb.Name
+ case utils.MetaMoss:
+ indxType, storeType = upsidedown.Name, moss.Name
+ }
+ return
+}
diff --git a/analyzers/utils_test.go b/analyzers/utils_test.go
new file mode 100644
index 000000000..9165ed48c
--- /dev/null
+++ b/analyzers/utils_test.go
@@ -0,0 +1,104 @@
+/*
+Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
+Copyright (C) ITsysCOM GmbH
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package analyzers
+
+import (
+ "errors"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/blevesearch/bleve/index/scorch"
+ "github.com/blevesearch/bleve/index/store/boltdb"
+ "github.com/blevesearch/bleve/index/store/goleveldb"
+ "github.com/blevesearch/bleve/index/store/moss"
+ "github.com/blevesearch/bleve/index/upsidedown"
+ "github.com/cgrates/cgrates/utils"
+)
+
+func TestGetIndex(t *testing.T) {
+ expIdxType, expStore := scorch.Name, scorch.Name
+ idxType, store := getIndex(utils.MetaScorch)
+ if idxType != expIdxType {
+ t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
+ }
+ if store != expStore {
+ t.Errorf("Expected index: %q,received:%q", expStore, store)
+ }
+
+ expIdxType, expStore = upsidedown.Name, boltdb.Name
+ idxType, store = getIndex(utils.MetaBoltdb)
+ if idxType != expIdxType {
+ t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
+ }
+ if store != expStore {
+ t.Errorf("Expected index: %q,received:%q", expStore, store)
+ }
+
+ expIdxType, expStore = upsidedown.Name, goleveldb.Name
+ idxType, store = getIndex(utils.MetaLeveldb)
+ if idxType != expIdxType {
+ t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
+ }
+ if store != expStore {
+ t.Errorf("Expected index: %q,received:%q", expStore, store)
+ }
+
+ expIdxType, expStore = upsidedown.Name, moss.Name
+ idxType, store = getIndex(utils.MetaMoss)
+ if idxType != expIdxType {
+ t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
+ }
+ if store != expStore {
+ t.Errorf("Expected index: %q,received:%q", expStore, store)
+ }
+}
+
+func TestNewInfoRPC(t *testing.T) {
+ t1 := time.Now()
+ expIdx := &InfoRPC{
+ RequestDuration: time.Second,
+ RequestStartTime: t1,
+ RequestEncoding: utils.MetaJSON,
+ RequestSource: "127.0.0.1:5565",
+ RequestDestination: "127.0.0.1:2012",
+ RequestID: 0,
+ RequestMethod: utils.CoreSv1Status,
+ RequestParams: `"status"`,
+ Reply: `"result"`,
+ ReplyError: "error",
+ }
+ idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
+ enc: utils.MetaJSON,
+ from: "127.0.0.1:5565",
+ to: "127.0.0.1:2012",
+ }, t1, t1.Add(time.Second))
+ if !reflect.DeepEqual(expIdx, idx) {
+ t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
+ }
+ idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), &extraInfo{
+ enc: utils.MetaJSON,
+ from: "127.0.0.1:5565",
+ to: "127.0.0.1:2012",
+ }, t1, t1.Add(time.Second))
+ if !reflect.DeepEqual(expIdx, idx) {
+ t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
+ }
+
+}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index fcc0b115e..331b3c7d0 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -74,7 +74,8 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
func initCacheS(internalCacheSChan chan rpcclient.ClientConnector,
- server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) {
+ server *utils.Server, dm *engine.DataManager, exitChan chan bool,
+ anz *services.AnalyzerService) (chS *engine.CacheS) {
chS = engine.NewCacheS(cfg, dm)
go func() {
if err := chS.Precache(); err != nil {
@@ -87,32 +88,73 @@ func initCacheS(internalCacheSChan chan rpcclient.ClientConnector,
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(chSv1)
}
- internalCacheSChan <- chS
+ var rpc rpcclient.ClientConnector = chS
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS)
+ }
+ internalCacheSChan <- rpc
return
}
-func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server) {
+func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server,
+ anz *services.AnalyzerService) {
grdSv1 := v1.NewGuardianSv1()
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(grdSv1)
}
- internalGuardianSChan <- grdSv1
+ var rpc rpcclient.ClientConnector = grdSv1
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS)
+ }
+ internalGuardianSChan <- rpc
}
-func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server) {
+func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server,
+ anz *services.AnalyzerService) {
cSv1 := v1.NewCoreSv1(engine.NewCoreService())
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cSv1)
}
- internalCoreSv1Chan <- cSv1
+ var rpc rpcclient.ClientConnector = cSv1
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CoreS)
+ }
+ internalCoreSv1Chan <- rpc
}
func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnector,
- srvMngr *servmanager.ServiceManager, server *utils.Server) {
+ srvMngr *servmanager.ServiceManager, server *utils.Server,
+ anz *services.AnalyzerService) {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(v1.NewServiceManagerV1(srvMngr))
}
- internalServiceManagerChan <- srvMngr
+ var rpc rpcclient.ClientConnector = srvMngr
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager)
+ }
+ internalServiceManagerChan <- rpc
+}
+
+// initLogger will initialize syslog writter, needs to be called after config init
+func initLogger(cfg *config.CGRConfig) error {
+ sylogger := cfg.GeneralCfg().Logger
+ if *syslogger != "" { // Modify the log level if provided by command arguments
+ sylogger = *syslogger
+ }
+ return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID)
+}
+
+func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector,
+ server *utils.Server, anz *services.AnalyzerService) {
+ cfgSv1 := v1.NewConfigSv1(cfg)
+ if !cfg.DispatcherSCfg().Enabled {
+ server.RpcRegister(cfgSv1)
+ }
+ var rpc rpcclient.ClientConnector = cfgSv1
+ if anz.IsRunning() {
+ rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1)
+ }
+ internalConfigChan <- rpc
}
func startRPC(server *utils.Server, internalRaterChan,
@@ -238,24 +280,6 @@ func writePid() {
}
}
-// initLogger will initialize syslog writter, needs to be called after config init
-func initLogger(cfg *config.CGRConfig) error {
- sylogger := cfg.GeneralCfg().Logger
- if *syslogger != "" { // Modify the log level if provided by command arguments
- sylogger = *syslogger
- }
- return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID)
-}
-
-func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector,
- server *utils.Server) {
- cfgSv1 := v1.NewConfigSv1(cfg)
- if !cfg.DispatcherSCfg().Enabled {
- server.RpcRegister(cfgSv1)
- }
- internalConfigChan <- cfgSv1
-}
-
func memProfFile(memProfPath string) bool {
f, err := os.Create(memProfPath)
if err != nil {
@@ -522,15 +546,24 @@ func main() {
// Define internal connections via channels
filterSChan := make(chan *engine.FilterS, 1)
+ // init AnalyzerS
+ anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
+ if anz.ShouldRun() {
+ if err := anz.Start(); err != nil {
+ fmt.Println(err)
+ return
+ }
+ }
+
// init CacheS
- cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
+ cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz)
engine.SetCache(cacheS)
// init GuardianSv1
- initGuardianSv1(internalGuardianSChan, server)
+ initGuardianSv1(internalGuardianSChan, server, anz)
// init CoreSv1
- initCoreSv1(internalCoreSv1Chan, server)
+ initCoreSv1(internalCoreSv1Chan, server, anz)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
@@ -567,14 +600,6 @@ func main() {
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan,
internalLoaderSChan, connManager)
- anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
- if anz.ShouldRun() {
- if err := anz.Start(); err != nil {
- fmt.Println(err)
- return
- }
- }
-
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals,
rals.GetResponder(), apiSv1, apiSv2, cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, exitChan, connManager),
@@ -597,7 +622,7 @@ func main() {
go startFilterService(filterSChan, cacheS, connManager,
cfg, dmService.GetDM(), exitChan)
- initServiceManagerV1(internalServeManagerChan, srvManager, server)
+ initServiceManagerV1(internalServeManagerChan, srvManager, server, anz)
// init internalRPCSet to share internal connections among the engine
engine.IntRPC = engine.NewRPCClientSet()
@@ -627,7 +652,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan)
// engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan)
- initConfigSv1(internalConfigChan, server)
+ initConfigSv1(internalConfigChan, server, anz)
if *preload != utils.EmptyString {
runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan)
diff --git a/config/analyzerscfg_test.go b/config/analyzerscfg_test.go
index 15305b6c8..b954bfb41 100644
--- a/config/analyzerscfg_test.go
+++ b/config/analyzerscfg_test.go
@@ -20,6 +20,7 @@ package config
import (
"reflect"
"testing"
+ "time"
"github.com/cgrates/cgrates/utils"
)
@@ -29,7 +30,11 @@ func TestAnalyzerSCfgloadFromJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
}
expected := &AnalyzerSCfg{
- Enabled: false,
+ Enabled: false,
+ CleanupInterval: time.Hour,
+ DBPath: "/var/spool/cgrates/analyzers",
+ IndexType: utils.MetaScorch,
+ TTL: 24 * time.Hour,
}
if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
t.Error(err)
@@ -46,7 +51,11 @@ func TestAnalyzerSCfgAsMapInterface(t *testing.T) {
}
}`
eMap := map[string]interface{}{
- utils.EnabledCfg: false,
+ utils.EnabledCfg: false,
+ utils.CleanupIntervalCfg: "1h0m0s",
+ utils.DBPathCfg: "/var/spool/cgrates/analyzers",
+ utils.IndexTypeCfg: utils.MetaScorch,
+ utils.TTLCfg: "24h0m0s",
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -63,7 +72,11 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) {
}
}`
eMap := map[string]interface{}{
- utils.EnabledCfg: true,
+ utils.EnabledCfg: true,
+ utils.CleanupIntervalCfg: "1h0m0s",
+ utils.DBPathCfg: "/var/spool/cgrates/analyzers",
+ utils.IndexTypeCfg: utils.MetaScorch,
+ utils.TTLCfg: "24h0m0s",
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -71,3 +84,22 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) {
t.Errorf("Expected: %+v , recived: %+v", eMap, rcv)
}
}
+
+func TestAnalyzerSCfgloadFromJsonCfgErr(t *testing.T) {
+ jsonCfg := &AnalyzerSJsonCfg{
+ Cleanup_interval: utils.StringPointer("24ha"),
+ }
+ if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
+ t.Error(err)
+ } else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil {
+ t.Errorf("Expected error received nil")
+ }
+ jsonCfg = &AnalyzerSJsonCfg{
+ Ttl: utils.StringPointer("24ha"),
+ }
+ if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
+ t.Error(err)
+ } else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil {
+ t.Errorf("Expected error received nil")
+ }
+}
diff --git a/config/config_defaults.go b/config/config_defaults.go
index e21c55546..26d62af89 100755
--- a/config/config_defaults.go
+++ b/config/config_defaults.go
@@ -904,12 +904,12 @@ const CGRATES_CFG_JSON = `
},
-"analyzers":{ // AnalyzerS config
- "enabled": false, // starts AnalyzerS service: .
- "db_path": "/tmp/analyzers", // path to the folder where to store the information
- "index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb>
- "ttl": "10s", // time to wait before removing the API capture
- "cleanup_interval": "1h", // the interval we clean the db
+"analyzers":{ // AnalyzerS config
+ "enabled": false, // starts AnalyzerS service: .
+ "db_path": "/var/spool/cgrates/analyzers", // path to the folder where to store the information
+ "index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb>
+ "ttl": "24h", // time to wait before removing the API capture
+ "cleanup_interval": "1h", // the interval we clean the db
},
diff --git a/config/config_json_test.go b/config/config_json_test.go
index 5b8156f48..bf67b85c9 100755
--- a/config/config_json_test.go
+++ b/config/config_json_test.go
@@ -1680,7 +1680,11 @@ func TestDfTlsCfg(t *testing.T) {
func TestDfAnalyzerCfg(t *testing.T) {
eCfg := &AnalyzerSJsonCfg{
- Enabled: utils.BoolPointer(false),
+ Enabled: utils.BoolPointer(false),
+ Cleanup_interval: utils.StringPointer("1h"),
+ Db_path: utils.StringPointer("/var/spool/cgrates/analyzers"),
+ Index_type: utils.StringPointer(utils.MetaScorch),
+ Ttl: utils.StringPointer("24h"),
}
if cfg, err := dfCgrJSONCfg.AnalyzerCfgJson(); err != nil {
t.Error(err)
diff --git a/config/config_test.go b/config/config_test.go
index 399fc0a0e..6eab499bf 100755
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -1863,7 +1863,11 @@ func TestCfgTlsCfg(t *testing.T) {
func TestCgrCfgJSONDefaultAnalyzerSCfg(t *testing.T) {
aSCfg := &AnalyzerSCfg{
- Enabled: false,
+ Enabled: false,
+ CleanupInterval: time.Hour,
+ DBPath: "/var/spool/cgrates/analyzers",
+ IndexType: utils.MetaScorch,
+ TTL: 24 * time.Hour,
}
if !reflect.DeepEqual(cgrCfg.analyzerSCfg, aSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.analyzerSCfg, aSCfg)
diff --git a/data/ansible/rpm_packages/cgrates.spec.j2 b/data/ansible/rpm_packages/cgrates.spec.j2
index e5c564366..e46ab7390 100644
--- a/data/ansible/rpm_packages/cgrates.spec.j2
+++ b/data/ansible/rpm_packages/cgrates.spec.j2
@@ -97,6 +97,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts
+mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers
mkdir -p $RPM_BUILD_ROOT%{_libdir}/history
mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump
mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d
diff --git a/packages/debian/rules b/packages/debian/rules
index 9c3f60058..ce564e0eb 100755
--- a/packages/debian/rules
+++ b/packages/debian/rules
@@ -40,6 +40,7 @@ binary-arch: clean
mkdir -p $(PKGDIR)/var/spool/cgrates/cdre/fwv
mkdir -p $(PKGDIR)/var/spool/cgrates/tpe
mkdir -p $(PKGDIR)/var/spool/cgrates/failed_posts
+ mkdir -p $(PKGDIR)/var/spool/cgrates/analyzers
mkdir -p $(PKGDIR)/var/lib/cgrates/history
mkdir -p $(PKGDIR)/var/lib/cgrates/cache_dump
mkdir -p $(PKGDIR)/var/log/cgrates
diff --git a/packages/redhat_fedora/cgrates.spec b/packages/redhat_fedora/cgrates.spec
index bafe525d6..60c71c791 100644
--- a/packages/redhat_fedora/cgrates.spec
+++ b/packages/redhat_fedora/cgrates.spec
@@ -99,6 +99,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts
+mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers
mkdir -p $RPM_BUILD_ROOT%{_libdir}/history
mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump
mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d
diff --git a/services/analyzers.go b/services/analyzers.go
index dfca01888..dd308e053 100644
--- a/services/analyzers.go
+++ b/services/analyzers.go
@@ -27,14 +27,18 @@ import (
"github.com/cgrates/cgrates/analyzers"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
- "github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
+var (
+ // used to build the connector for analyzers
+ intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { return c }
+)
+
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool,
- internalAnalyzerSChan chan rpcclient.ClientConnector) servmanager.Service {
+ internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
@@ -72,6 +76,9 @@ func (anz *AnalyzerService) Start() (err error) {
anz.exitChan <- true
return
}()
+ intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector {
+ return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
+ }
utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
fromstr := ""
if from != nil {
@@ -124,3 +131,7 @@ func (anz *AnalyzerService) ServiceName() string {
func (anz *AnalyzerService) ShouldRun() bool {
return anz.cfg.AnalyzerSCfg().Enabled
}
+
+func (anz *AnalyzerService) GetAnalyzerS() *analyzers.AnalyzerService {
+ return anz.anz
+}
diff --git a/services/apierv1.go b/services/apierv1.go
index d2a7e0f09..6fcffdf85 100644
--- a/services/apierv1.go
+++ b/services/apierv1.go
@@ -119,7 +119,7 @@ func (apiService *APIerSv1Service) Start() (err error) {
}
//backwards compatible
- apiService.connChan <- apiService.api
+ apiService.connChan <- intAnzConn(apiService.api, utils.APIerSv1)
apiService.APIerSv1Chan <- apiService.api
return
diff --git a/services/apierv2.go b/services/apierv2.go
index 36bc899d4..f286584bb 100644
--- a/services/apierv2.go
+++ b/services/apierv2.go
@@ -73,7 +73,7 @@ func (api *APIerSv2Service) Start() (err error) {
api.server.RpcRegisterName(utils.ApierV2, api.api)
}
- api.connChan <- api.api
+ api.connChan <- intAnzConn(api.api, utils.APIerSv2)
return
}
diff --git a/services/attributes.go b/services/attributes.go
index 34cd2a94f..b583ba841 100644
--- a/services/attributes.go
+++ b/services/attributes.go
@@ -87,7 +87,7 @@ func (attrS *AttributeService) Start() (err error) {
if !attrS.cfg.DispatcherSCfg().Enabled {
attrS.server.RpcRegister(attrS.rpc)
}
- attrS.connChan <- attrS.rpc
+ attrS.connChan <- intAnzConn(attrS.rpc, utils.AttributeS)
return
}
diff --git a/services/cdrs.go b/services/cdrs.go
index fd8840091..7a9e04aee 100644
--- a/services/cdrs.go
+++ b/services/cdrs.go
@@ -107,7 +107,7 @@ func (cdrService *CDRServer) Start() (err error) {
// Make the cdr server available for internal communication
cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this)
}
- cdrService.connChan <- cdrService.cdrS // Signal that cdrS is operational
+ cdrService.connChan <- intAnzConn(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational
return
}
diff --git a/services/chargers.go b/services/chargers.go
index f585afa91..09ce1838a 100644
--- a/services/chargers.go
+++ b/services/chargers.go
@@ -88,7 +88,7 @@ func (chrS *ChargerService) Start() (err error) {
if !chrS.cfg.DispatcherSCfg().Enabled {
chrS.server.RpcRegister(cSv1)
}
- chrS.connChan <- cSv1
+ chrS.connChan <- intAnzConn(cSv1, utils.ChargerS)
return
}
diff --git a/services/dispatcherh.go b/services/dispatcherh.go
index 1b00b1d98..587d76f7b 100644
--- a/services/dispatcherh.go
+++ b/services/dispatcherh.go
@@ -66,7 +66,7 @@ func (dspS *DispatcherHostsService) Start() (err error) {
dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr)
go dspS.dspS.ListenAndServe()
- dspS.connChan <- dspS.dspS
+ dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherH)
return
}
diff --git a/services/dispatchers.go b/services/dispatchers.go
index a6bb06b39..0896f1e1e 100644
--- a/services/dispatchers.go
+++ b/services/dispatchers.go
@@ -145,7 +145,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.server.RpcRegisterName(utils.RateSv1,
v1.NewDispatcherRateSv1(dspS.dspS))
- dspS.connChan <- dspS.dspS
+ dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherS)
return
}
diff --git a/services/ees.go b/services/ees.go
index 624c700d2..cffc10ff6 100644
--- a/services/ees.go
+++ b/services/ees.go
@@ -118,17 +118,17 @@ func (es *EventExporterService) Start() (err error) {
utils.EventExporterS, err))
return
}
-
- es.rpc = v1.NewEventExporterSv1(es.eeS)
- if !es.cfg.DispatcherSCfg().Enabled {
- es.server.RpcRegister(es.rpc)
- }
- es.intConnChan <- es.eeS
go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) {
if err := eeS.ListenAndServe(exitChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error()))
exitChan <- true
}
}(es.eeS, es.exitChan, es.rldChan)
+
+ es.rpc = v1.NewEventExporterSv1(es.eeS)
+ if !es.cfg.DispatcherSCfg().Enabled {
+ es.server.RpcRegister(es.rpc)
+ }
+ es.intConnChan <- intAnzConn(es.eeS, utils.EventExporterS)
return
}
diff --git a/services/loaders.go b/services/loaders.go
index 7a70d5424..319747096 100644
--- a/services/loaders.go
+++ b/services/loaders.go
@@ -85,7 +85,7 @@ func (ldrs *LoaderService) Start() (err error) {
if !ldrs.cfg.DispatcherSCfg().Enabled {
ldrs.server.RpcRegister(ldrs.rpc)
}
- ldrs.connChan <- ldrs.rpc
+ ldrs.connChan <- intAnzConn(ldrs.rpc, utils.LoaderS)
return
}
diff --git a/services/rals.go b/services/rals.go
index 06208fb79..d6cfcc5b6 100644
--- a/services/rals.go
+++ b/services/rals.go
@@ -91,7 +91,7 @@ func (rals *RalService) Start() (err error) {
rals.server.RpcRegister(rals.rals)
}
- rals.connChan <- rals.rals
+ rals.connChan <- intAnzConn(rals.rals, utils.RALService)
return
}
diff --git a/services/rates.go b/services/rates.go
index e832a2575..91ce26224 100644
--- a/services/rates.go
+++ b/services/rates.go
@@ -126,18 +126,18 @@ func (rs *RateService) Start() (err error) {
rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm)
rs.Unlock()
- rs.rpc = v1.NewRateSv1(rs.rateS)
- if !rs.cfg.DispatcherSCfg().Enabled {
- rs.server.RpcRegister(rs.rpc)
- }
-
- rs.intConnChan <- rs.rpc
-
go func(rtS *rates.RateS, exitChan chan bool, rldChan chan struct{}) {
if err := rtS.ListenAndServe(exitChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error()))
exitChan <- true
}
}(rs.rateS, rs.exitChan, rs.rldChan)
+
+ rs.rpc = v1.NewRateSv1(rs.rateS)
+ if !rs.cfg.DispatcherSCfg().Enabled {
+ rs.server.RpcRegister(rs.rpc)
+ }
+
+ rs.intConnChan <- intAnzConn(rs.rpc, utils.RateS)
return
}
diff --git a/services/resources.go b/services/resources.go
index 4bef97e16..0f8c86012 100644
--- a/services/resources.go
+++ b/services/resources.go
@@ -90,7 +90,7 @@ func (reS *ResourceService) Start() (err error) {
if !reS.cfg.DispatcherSCfg().Enabled {
reS.server.RpcRegister(reS.rpc)
}
- reS.connChan <- reS.rpc
+ reS.connChan <- intAnzConn(reS.rpc, utils.ResourceS)
return
}
diff --git a/services/responders.go b/services/responders.go
index 7e2701ad5..ccebad82e 100644
--- a/services/responders.go
+++ b/services/responders.go
@@ -68,7 +68,7 @@ func (resp *ResponderService) Start() (err error) {
resp.server.RpcRegister(resp.resp)
}
- resp.connChan <- resp.resp // Rater done
+ resp.connChan <- intAnzConn(resp.resp, utils.ResponderS) // Rater done
return
}
diff --git a/services/routes.go b/services/routes.go
index 3e1412b5b..56235a074 100644
--- a/services/routes.go
+++ b/services/routes.go
@@ -91,7 +91,7 @@ func (routeS *RouteService) Start() (err error) {
if !routeS.cfg.DispatcherSCfg().Enabled {
routeS.server.RpcRegister(routeS.rpc)
}
- routeS.connChan <- routeS.rpc
+ routeS.connChan <- intAnzConn(routeS.rpc, utils.RouteS)
return
}
diff --git a/services/schedulers.go b/services/schedulers.go
index 2c6bc6917..706a731d1 100644
--- a/services/schedulers.go
+++ b/services/schedulers.go
@@ -84,7 +84,7 @@ func (schS *SchedulerService) Start() (err error) {
if !schS.cfg.DispatcherSCfg().Enabled {
schS.server.RpcRegister(schS.rpc)
}
- schS.connChan <- schS.rpc
+ schS.connChan <- intAnzConn(schS.rpc, utils.SchedulerS)
return
}
diff --git a/services/sessions.go b/services/sessions.go
index 6440a4564..7465510ee 100644
--- a/services/sessions.go
+++ b/services/sessions.go
@@ -82,7 +82,7 @@ func (smg *SessionService) Start() (err error) {
//start sync session in a separate gorutine
go smg.sm.ListenAndServe(smg.exitChan)
// Pass internal connection via BiRPCClient
- smg.connChan <- smg.sm
+ smg.connChan <- intAnzConn(smg.sm, utils.SessionS)
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm)
diff --git a/services/stats.go b/services/stats.go
index 4566ff236..31790bcc8 100644
--- a/services/stats.go
+++ b/services/stats.go
@@ -92,7 +92,7 @@ func (sts *StatService) Start() (err error) {
if !sts.cfg.DispatcherSCfg().Enabled {
sts.server.RpcRegister(sts.rpc)
}
- sts.connChan <- sts.rpc
+ sts.connChan <- intAnzConn(sts.rpc, utils.StatS)
return
}
diff --git a/services/thresholds.go b/services/thresholds.go
index 04f29d11b..cebb0bc1b 100644
--- a/services/thresholds.go
+++ b/services/thresholds.go
@@ -87,7 +87,7 @@ func (thrs *ThresholdService) Start() (err error) {
if !thrs.cfg.DispatcherSCfg().Enabled {
thrs.server.RpcRegister(thrs.rpc)
}
- thrs.connChan <- thrs.rpc
+ thrs.connChan <- intAnzConn(thrs.rpc, utils.ThresholdS)
return
}