Added tests for analyzers

This commit is contained in:
Trial97
2020-10-27 15:54:22 +02:00
committed by Dan Christian Bogos
parent 6b60d49bf6
commit 81a4cdd3a1
35 changed files with 687 additions and 138 deletions

View File

@@ -27,11 +27,7 @@ import (
"time"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/index/scorch"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/index/store/goleveldb"
"github.com/blevesearch/bleve/index/store/moss"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/search"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -53,18 +49,7 @@ func (aS *AnalyzerService) initDB() (err error) {
if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil {
aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath)
} else if os.IsNotExist(err) {
var indxType, storeType string
switch aS.cfg.AnalyzerSCfg().IndexType {
case utils.MetaScorch:
indxType, storeType = scorch.Name, scorch.Name
case utils.MetaBoltdb:
indxType, storeType = upsidedown.Name, boltdb.Name
case utils.MetaLeveldb:
indxType, storeType = upsidedown.Name, goleveldb.Name
case utils.MetaMoss:
indxType, storeType = upsidedown.Name, moss.Name
}
indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType)
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath,
bleve.NewIndexMapping(), indxType, storeType, nil)
}
@@ -72,7 +57,6 @@ func (aS *AnalyzerService) initDB() (err error) {
}
func (aS *AnalyzerService) clenaUp() (err error) {
fmt.Println("clean")
t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL))
t2.SetField("RequestStartTime")
searchReq := bleve.NewSearchRequest(t2)
@@ -80,8 +64,13 @@ func (aS *AnalyzerService) clenaUp() (err error) {
if res, err = aS.db.Search(searchReq); err != nil {
return
}
return aS.deleteHits(res.Hits)
}
// extracted as function in order to test this
func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err error) {
hasErr := false
for _, hit := range res.Hits {
for _, hit := range hits {
if err = aS.db.Delete(hit.ID); err != nil {
hasErr = true
}
@@ -125,33 +114,11 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
if strings.HasPrefix(method, utils.AnalyzerSv1) {
return nil
}
var e interface{}
switch val := err.(type) {
default:
case nil:
case string:
e = val
case error:
e = val.Error()
}
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
InfoRPC{
RequestDuration: eTime.Sub(sTime),
RequestStartTime: sTime,
// EndTime: eTime,
RequestEncoding: info.enc,
RequestSource: info.from,
RequestDestination: info.to,
RequestID: id,
RequestMethod: method,
RequestParams: utils.ToJSON(params),
Reply: utils.ToJSON(result),
ReplyError: e,
})
NewInfoRPC(id, method, params, result, err, info, sTime, eTime))
}
// V1Search returns a list of API that match the query
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
s.Fields = []string{utils.Meta} // return all fields

182
analyzers/analyzers_test.go Normal file
View File

@@ -0,0 +1,182 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package analyzers
import (
"os"
"path"
"runtime"
"testing"
"time"
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/search"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
func TestNewAnalyzerService(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
// no need to DeepEqual
if err = anz.Shutdown(); err != nil {
t.Fatal(err)
}
if err = anz.initDB(); err != nil {
t.Fatal(err)
}
exitChan := make(chan bool, 1)
exitChan <- true
if err := anz.ListenAndServe(exitChan); err != nil {
t.Fatal(err)
}
anz.db.Close()
}
func TestAnalyzerSLogTraffic(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
t1 := time.Now().Add(-time.Hour)
if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", &extraInfo{
enc: utils.MetaJSON,
from: "127.0.0.1:5565",
to: "127.0.0.1:2012",
}, t1, t1.Add(time.Second)); err != nil {
t.Fatal(err)
}
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
enc: utils.MetaJSON,
from: "127.0.0.1:5565",
to: "127.0.0.1:2012",
}, t1, t1.Add(time.Second)); err != nil {
t.Fatal(err)
}
t1 = time.Now().Add(-10 * time.Minute)
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
enc: utils.MetaJSON,
from: "127.0.0.1:5565",
to: "127.0.0.1:2012",
}, t1, t1.Add(time.Second)); err != nil {
t.Fatal(err)
}
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 2 {
t.Errorf("Expected only 2 documents received:%v", cnt)
}
if err = anz.clenaUp(); err != nil {
t.Fatal(err)
}
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
if err = anz.db.Close(); err != nil {
t.Fatal(err)
}
if err = anz.clenaUp(); err != bleve.ErrorIndexClosed {
t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err)
}
}
func TestAnalyzersDeleteHits(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
if err = anz.deleteHits(search.DocumentMatchCollection{&search.DocumentMatch{}}); err != utils.ErrPartiallyExecuted {
t.Errorf("Expected error: %v,received: %+v", utils.ErrPartiallyExecuted, err)
}
}
func TestAnalyzersListenAndServe(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
if err := anz.db.Close(); err != nil {
t.Fatal(err)
}
anz.ListenAndServe(make(chan bool))
cfg.AnalyzerSCfg().CleanupInterval = 1
anz, err = NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
go func() {
time.Sleep(1)
runtime.Gosched()
anz.db.Close()
}()
anz.ListenAndServe(make(chan bool))
}

View File

@@ -25,7 +25,7 @@ import (
)
func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to string) rpc.ServerCodec {
return &AnalyzeServerCodec{
return &AnalyzerServerCodec{
sc: sc,
reqs: make(map[uint64]*rpcAPI),
aS: aS,
@@ -37,7 +37,7 @@ func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to stri
}
}
type AnalyzeServerCodec struct {
type AnalyzerServerCodec struct {
sc rpc.ServerCodec
// keep the API in memory because the write is async
@@ -48,7 +48,7 @@ type AnalyzeServerCodec struct {
extrainfo *extraInfo
}
func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
func (c *AnalyzerServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
err = c.sc.ReadRequestHeader(r)
c.reqsLk.Lock()
c.reqIdx = r.Seq
@@ -61,14 +61,14 @@ func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
return
}
func (c *AnalyzeServerCodec) ReadRequestBody(x interface{}) (err error) {
func (c *AnalyzerServerCodec) ReadRequestBody(x interface{}) (err error) {
err = c.sc.ReadRequestBody(x)
c.reqsLk.Lock()
c.reqs[c.reqIdx].Params = x
c.reqsLk.Unlock()
return
}
func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
c.reqsLk.Lock()
api := c.reqs[c.reqIdx]
delete(c.reqs, c.reqIdx)
@@ -76,4 +76,4 @@ func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now())
return c.sc.WriteResponse(r, x)
}
func (c *AnalyzeServerCodec) Close() error { return c.sc.Close() }
func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() }

100
analyzers/codec_test.go Normal file
View File

@@ -0,0 +1,100 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package analyzers
import (
"net/rpc"
"os"
"path"
"reflect"
"runtime"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
type mockServerCodec struct{}
func (c *mockServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
r.Seq = 0
r.ServiceMethod = utils.CoreSv1Ping
return
}
func (c *mockServerCodec) ReadRequestBody(x interface{}) (err error) {
return
}
func (c *mockServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
return nil
}
func (c *mockServerCodec) Close() error { return nil }
func TestNewServerCodec(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
codec := anz.NewServerCodec(new(mockServerCodec), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
r := new(rpc.Request)
expR := &rpc.Request{
Seq: 0,
ServiceMethod: utils.CoreSv1Ping,
}
if err = codec.ReadRequestHeader(r); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(r, expR) {
t.Errorf("Expected: %v ,received:%v", expR, r)
}
if err = codec.ReadRequestBody("args"); err != nil {
t.Fatal(err)
}
if err = codec.WriteResponse(&rpc.Response{
Error: "error",
Seq: 0,
ServiceMethod: utils.CoreSv1Ping,
}, "reply"); err != nil {
t.Fatal(err)
}
if err = codec.Close(); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
}

View File

@@ -24,8 +24,8 @@ import (
"github.com/cgrates/rpcclient"
)
func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector {
return &AnalyzeConnector{
func (aS *AnalyzerService) NewAnalyzerConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector {
return &AnalyzerConnector{
conn: sc,
aS: aS,
extrainfo: &extraInfo{
@@ -36,14 +36,14 @@ func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc
}
}
type AnalyzeConnector struct {
type AnalyzerConnector struct {
conn rpcclient.ClientConnector
aS *AnalyzerService
extrainfo *extraInfo
}
func (c *AnalyzeConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
sTime := time.Now()
err = c.conn.Call(serviceMethod, args, reply)
go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now())

View File

@@ -0,0 +1,65 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package analyzers
import (
"errors"
"os"
"path"
"runtime"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
type mockConnector struct{}
func (c *mockConnector) Call(_ string, _, _ interface{}) (err error) {
return errors.New("error")
}
func TestNewAnalyzeConnector(t *testing.T) {
cfg, err := config.NewDefaultCGRConfig()
if err != nil {
t.Fatal(err)
}
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
if err != nil {
t.Fatal(err)
}
rpc := anz.NewAnalyzerConnector(new(mockConnector), utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012")
if err = rpc.Call(utils.CoreSv1Ping, "args", "reply"); err == nil || err.Error() != "error" {
t.Errorf("Expected 'error' received %v", err)
}
time.Sleep(100 * time.Millisecond)
runtime.Gosched()
if cnt, err := anz.db.DocCount(); err != nil {
t.Fatal(err)
} else if cnt != 1 {
t.Errorf("Expected only one document received:%v", cnt)
}
}

View File

@@ -20,14 +20,46 @@ package analyzers
import (
"time"
"github.com/blevesearch/bleve/index/scorch"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/index/store/goleveldb"
"github.com/blevesearch/bleve/index/store/moss"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/cgrates/cgrates/utils"
)
type extraInfo struct {
enc string
from string
to string
// NewInfoRPC returns a structure to be indexed
func NewInfoRPC(id uint64, method string,
params, result, err interface{},
info *extraInfo, sTime, eTime time.Time) *InfoRPC {
var e interface{}
switch val := err.(type) {
default:
case nil:
case string:
e = val
case error:
e = val.Error()
}
return &InfoRPC{
RequestDuration: eTime.Sub(sTime),
RequestStartTime: sTime,
// EndTime: eTime,
RequestEncoding: info.enc,
RequestSource: info.from,
RequestDestination: info.to,
RequestID: id,
RequestMethod: method,
RequestParams: utils.ToJSON(params),
Reply: utils.ToJSON(result),
ReplyError: e,
}
}
// InfoRPC the structure to be indexed
type InfoRPC struct {
RequestDuration time.Duration
RequestStartTime time.Time
@@ -44,6 +76,12 @@ type InfoRPC struct {
ReplyError interface{}
}
type extraInfo struct {
enc string
from string
to string
}
type rpcAPI struct {
ID uint64 `json:"id"`
Method string `json:"method"`
@@ -51,3 +89,17 @@ type rpcAPI struct {
StartTime time.Time
}
func getIndex(indx string) (indxType, storeType string) {
switch indx {
case utils.MetaScorch:
indxType, storeType = scorch.Name, scorch.Name
case utils.MetaBoltdb:
indxType, storeType = upsidedown.Name, boltdb.Name
case utils.MetaLeveldb:
indxType, storeType = upsidedown.Name, goleveldb.Name
case utils.MetaMoss:
indxType, storeType = upsidedown.Name, moss.Name
}
return
}

104
analyzers/utils_test.go Normal file
View File

@@ -0,0 +1,104 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package analyzers
import (
"errors"
"reflect"
"testing"
"time"
"github.com/blevesearch/bleve/index/scorch"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/index/store/goleveldb"
"github.com/blevesearch/bleve/index/store/moss"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/cgrates/cgrates/utils"
)
func TestGetIndex(t *testing.T) {
expIdxType, expStore := scorch.Name, scorch.Name
idxType, store := getIndex(utils.MetaScorch)
if idxType != expIdxType {
t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
}
if store != expStore {
t.Errorf("Expected index: %q,received:%q", expStore, store)
}
expIdxType, expStore = upsidedown.Name, boltdb.Name
idxType, store = getIndex(utils.MetaBoltdb)
if idxType != expIdxType {
t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
}
if store != expStore {
t.Errorf("Expected index: %q,received:%q", expStore, store)
}
expIdxType, expStore = upsidedown.Name, goleveldb.Name
idxType, store = getIndex(utils.MetaLeveldb)
if idxType != expIdxType {
t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
}
if store != expStore {
t.Errorf("Expected index: %q,received:%q", expStore, store)
}
expIdxType, expStore = upsidedown.Name, moss.Name
idxType, store = getIndex(utils.MetaMoss)
if idxType != expIdxType {
t.Errorf("Expected index: %q,received:%q", expIdxType, idxType)
}
if store != expStore {
t.Errorf("Expected index: %q,received:%q", expStore, store)
}
}
func TestNewInfoRPC(t *testing.T) {
t1 := time.Now()
expIdx := &InfoRPC{
RequestDuration: time.Second,
RequestStartTime: t1,
RequestEncoding: utils.MetaJSON,
RequestSource: "127.0.0.1:5565",
RequestDestination: "127.0.0.1:2012",
RequestID: 0,
RequestMethod: utils.CoreSv1Status,
RequestParams: `"status"`,
Reply: `"result"`,
ReplyError: "error",
}
idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
enc: utils.MetaJSON,
from: "127.0.0.1:5565",
to: "127.0.0.1:2012",
}, t1, t1.Add(time.Second))
if !reflect.DeepEqual(expIdx, idx) {
t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
}
idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), &extraInfo{
enc: utils.MetaJSON,
from: "127.0.0.1:5565",
to: "127.0.0.1:2012",
}, t1, t1.Add(time.Second))
if !reflect.DeepEqual(expIdx, idx) {
t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
}
}

View File

@@ -74,7 +74,8 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
// initCacheS inits the CacheS and starts precaching as well as populating internal channel for RPC conns
func initCacheS(internalCacheSChan chan rpcclient.ClientConnector,
server *utils.Server, dm *engine.DataManager, exitChan chan bool) (chS *engine.CacheS) {
server *utils.Server, dm *engine.DataManager, exitChan chan bool,
anz *services.AnalyzerService) (chS *engine.CacheS) {
chS = engine.NewCacheS(cfg, dm)
go func() {
if err := chS.Precache(); err != nil {
@@ -87,32 +88,73 @@ func initCacheS(internalCacheSChan chan rpcclient.ClientConnector,
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(chSv1)
}
internalCacheSChan <- chS
var rpc rpcclient.ClientConnector = chS
if anz.IsRunning() {
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CacheS)
}
internalCacheSChan <- rpc
return
}
func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server) {
func initGuardianSv1(internalGuardianSChan chan rpcclient.ClientConnector, server *utils.Server,
anz *services.AnalyzerService) {
grdSv1 := v1.NewGuardianSv1()
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(grdSv1)
}
internalGuardianSChan <- grdSv1
var rpc rpcclient.ClientConnector = grdSv1
if anz.IsRunning() {
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.GuardianS)
}
internalGuardianSChan <- rpc
}
func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server) {
func initCoreSv1(internalCoreSv1Chan chan rpcclient.ClientConnector, server *utils.Server,
anz *services.AnalyzerService) {
cSv1 := v1.NewCoreSv1(engine.NewCoreService())
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cSv1)
}
internalCoreSv1Chan <- cSv1
var rpc rpcclient.ClientConnector = cSv1
if anz.IsRunning() {
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.CoreS)
}
internalCoreSv1Chan <- rpc
}
func initServiceManagerV1(internalServiceManagerChan chan rpcclient.ClientConnector,
srvMngr *servmanager.ServiceManager, server *utils.Server) {
srvMngr *servmanager.ServiceManager, server *utils.Server,
anz *services.AnalyzerService) {
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(v1.NewServiceManagerV1(srvMngr))
}
internalServiceManagerChan <- srvMngr
var rpc rpcclient.ClientConnector = srvMngr
if anz.IsRunning() {
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ServiceManager)
}
internalServiceManagerChan <- rpc
}
// initLogger will initialize syslog writter, needs to be called after config init
func initLogger(cfg *config.CGRConfig) error {
sylogger := cfg.GeneralCfg().Logger
if *syslogger != "" { // Modify the log level if provided by command arguments
sylogger = *syslogger
}
return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID)
}
func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector,
server *utils.Server, anz *services.AnalyzerService) {
cfgSv1 := v1.NewConfigSv1(cfg)
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cfgSv1)
}
var rpc rpcclient.ClientConnector = cfgSv1
if anz.IsRunning() {
rpc = anz.GetAnalyzerS().NewAnalyzerConnector(rpc, utils.MetaInternal, utils.EmptyString, utils.ConfigSv1)
}
internalConfigChan <- rpc
}
func startRPC(server *utils.Server, internalRaterChan,
@@ -238,24 +280,6 @@ func writePid() {
}
}
// initLogger will initialize syslog writter, needs to be called after config init
func initLogger(cfg *config.CGRConfig) error {
sylogger := cfg.GeneralCfg().Logger
if *syslogger != "" { // Modify the log level if provided by command arguments
sylogger = *syslogger
}
return utils.Newlogger(sylogger, cfg.GeneralCfg().NodeID)
}
func initConfigSv1(internalConfigChan chan rpcclient.ClientConnector,
server *utils.Server) {
cfgSv1 := v1.NewConfigSv1(cfg)
if !cfg.DispatcherSCfg().Enabled {
server.RpcRegister(cfgSv1)
}
internalConfigChan <- cfgSv1
}
func memProfFile(memProfPath string) bool {
f, err := os.Create(memProfPath)
if err != nil {
@@ -522,15 +546,24 @@ func main() {
// Define internal connections via channels
filterSChan := make(chan *engine.FilterS, 1)
// init AnalyzerS
anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
if anz.ShouldRun() {
if err := anz.Start(); err != nil {
fmt.Println(err)
return
}
}
// init CacheS
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan)
cacheS := initCacheS(internalCacheSChan, server, dmService.GetDM(), exitChan, anz)
engine.SetCache(cacheS)
// init GuardianSv1
initGuardianSv1(internalGuardianSChan, server)
initGuardianSv1(internalGuardianSChan, server, anz)
// init CoreSv1
initCoreSv1(internalCoreSv1Chan, server)
initCoreSv1(internalCoreSv1Chan, server, anz)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, exitChan)
@@ -567,14 +600,6 @@ func main() {
ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, exitChan,
internalLoaderSChan, connManager)
anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
if anz.ShouldRun() {
if err := anz.Start(); err != nil {
fmt.Println(err)
return
}
}
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals,
rals.GetResponder(), apiSv1, apiSv2, cdrS, smg,
services.NewEventReaderService(cfg, filterSChan, exitChan, connManager),
@@ -597,7 +622,7 @@ func main() {
go startFilterService(filterSChan, cacheS, connManager,
cfg, dmService.GetDM(), exitChan)
initServiceManagerV1(internalServeManagerChan, srvManager, server)
initServiceManagerV1(internalServeManagerChan, srvManager, server, anz)
// init internalRPCSet to share internal connections among the engine
engine.IntRPC = engine.NewRPCClientSet()
@@ -627,7 +652,7 @@ func main() {
engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan)
// engine.IntRPC.AddInternalRPCClient(utils.DispatcherHv1, internalDispatcherHChan)
initConfigSv1(internalConfigChan, server)
initConfigSv1(internalConfigChan, server, anz)
if *preload != utils.EmptyString {
runPreload(ldrs, internalLoaderSChan, internalPreloadChan, exitChan)

View File

@@ -20,6 +20,7 @@ package config
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -29,7 +30,11 @@ func TestAnalyzerSCfgloadFromJsonCfg(t *testing.T) {
Enabled: utils.BoolPointer(false),
}
expected := &AnalyzerSCfg{
Enabled: false,
Enabled: false,
CleanupInterval: time.Hour,
DBPath: "/var/spool/cgrates/analyzers",
IndexType: utils.MetaScorch,
TTL: 24 * time.Hour,
}
if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
t.Error(err)
@@ -46,7 +51,11 @@ func TestAnalyzerSCfgAsMapInterface(t *testing.T) {
}
}`
eMap := map[string]interface{}{
utils.EnabledCfg: false,
utils.EnabledCfg: false,
utils.CleanupIntervalCfg: "1h0m0s",
utils.DBPathCfg: "/var/spool/cgrates/analyzers",
utils.IndexTypeCfg: utils.MetaScorch,
utils.TTLCfg: "24h0m0s",
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -63,7 +72,11 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) {
}
}`
eMap := map[string]interface{}{
utils.EnabledCfg: true,
utils.EnabledCfg: true,
utils.CleanupIntervalCfg: "1h0m0s",
utils.DBPathCfg: "/var/spool/cgrates/analyzers",
utils.IndexTypeCfg: utils.MetaScorch,
utils.TTLCfg: "24h0m0s",
}
if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSONStr); err != nil {
t.Error(err)
@@ -71,3 +84,22 @@ func TestAnalyzerSCfgAsMapInterface1(t *testing.T) {
t.Errorf("Expected: %+v , recived: %+v", eMap, rcv)
}
}
func TestAnalyzerSCfgloadFromJsonCfgErr(t *testing.T) {
jsonCfg := &AnalyzerSJsonCfg{
Cleanup_interval: utils.StringPointer("24ha"),
}
if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
t.Error(err)
} else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil {
t.Errorf("Expected error received nil")
}
jsonCfg = &AnalyzerSJsonCfg{
Ttl: utils.StringPointer("24ha"),
}
if jsnCfg, err := NewDefaultCGRConfig(); err != nil {
t.Error(err)
} else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err == nil {
t.Errorf("Expected error received nil")
}
}

View File

@@ -904,12 +904,12 @@ const CGRATES_CFG_JSON = `
},
"analyzers":{ // AnalyzerS config
"enabled": false, // starts AnalyzerS service: <true|false>.
"db_path": "/tmp/analyzers", // path to the folder where to store the information
"index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb>
"ttl": "10s", // time to wait before removing the API capture
"cleanup_interval": "1h", // the interval we clean the db
"analyzers":{ // AnalyzerS config
"enabled": false, // starts AnalyzerS service: <true|false>.
"db_path": "/var/spool/cgrates/analyzers", // path to the folder where to store the information
"index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb>
"ttl": "24h", // time to wait before removing the API capture
"cleanup_interval": "1h", // the interval we clean the db
},

View File

@@ -1680,7 +1680,11 @@ func TestDfTlsCfg(t *testing.T) {
func TestDfAnalyzerCfg(t *testing.T) {
eCfg := &AnalyzerSJsonCfg{
Enabled: utils.BoolPointer(false),
Enabled: utils.BoolPointer(false),
Cleanup_interval: utils.StringPointer("1h"),
Db_path: utils.StringPointer("/var/spool/cgrates/analyzers"),
Index_type: utils.StringPointer(utils.MetaScorch),
Ttl: utils.StringPointer("24h"),
}
if cfg, err := dfCgrJSONCfg.AnalyzerCfgJson(); err != nil {
t.Error(err)

View File

@@ -1863,7 +1863,11 @@ func TestCfgTlsCfg(t *testing.T) {
func TestCgrCfgJSONDefaultAnalyzerSCfg(t *testing.T) {
aSCfg := &AnalyzerSCfg{
Enabled: false,
Enabled: false,
CleanupInterval: time.Hour,
DBPath: "/var/spool/cgrates/analyzers",
IndexType: utils.MetaScorch,
TTL: 24 * time.Hour,
}
if !reflect.DeepEqual(cgrCfg.analyzerSCfg, aSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.analyzerSCfg, aSCfg)

View File

@@ -97,6 +97,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers
mkdir -p $RPM_BUILD_ROOT%{_libdir}/history
mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump
mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d

View File

@@ -40,6 +40,7 @@ binary-arch: clean
mkdir -p $(PKGDIR)/var/spool/cgrates/cdre/fwv
mkdir -p $(PKGDIR)/var/spool/cgrates/tpe
mkdir -p $(PKGDIR)/var/spool/cgrates/failed_posts
mkdir -p $(PKGDIR)/var/spool/cgrates/analyzers
mkdir -p $(PKGDIR)/var/lib/cgrates/history
mkdir -p $(PKGDIR)/var/lib/cgrates/cache_dump
mkdir -p $(PKGDIR)/var/log/cgrates

View File

@@ -99,6 +99,7 @@ mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/csv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/cdre/fwv
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/tpe
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/failed_posts
mkdir -p $RPM_BUILD_ROOT%{_spooldir}/analyzers
mkdir -p $RPM_BUILD_ROOT%{_libdir}/history
mkdir -p $RPM_BUILD_ROOT%{_libdir}/cache_dump
mkdir -p $RPM_BUILD_ROOT/etc/logrotate.d

View File

@@ -27,14 +27,18 @@ import (
"github.com/cgrates/cgrates/analyzers"
v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
var (
// used to build the connector for analyzers
intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector { return c }
)
// NewAnalyzerService returns the Analyzer Service
func NewAnalyzerService(cfg *config.CGRConfig, server *utils.Server, exitChan chan bool,
internalAnalyzerSChan chan rpcclient.ClientConnector) servmanager.Service {
internalAnalyzerSChan chan rpcclient.ClientConnector) *AnalyzerService {
return &AnalyzerService{
connChan: internalAnalyzerSChan,
cfg: cfg,
@@ -72,6 +76,9 @@ func (anz *AnalyzerService) Start() (err error) {
anz.exitChan <- true
return
}()
intAnzConn = func(c rpcclient.ClientConnector, to string) rpcclient.ClientConnector {
return anz.anz.NewAnalyzerConnector(c, utils.MetaInternal, utils.EmptyString, to)
}
utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
fromstr := ""
if from != nil {
@@ -124,3 +131,7 @@ func (anz *AnalyzerService) ServiceName() string {
func (anz *AnalyzerService) ShouldRun() bool {
return anz.cfg.AnalyzerSCfg().Enabled
}
func (anz *AnalyzerService) GetAnalyzerS() *analyzers.AnalyzerService {
return anz.anz
}

View File

@@ -119,7 +119,7 @@ func (apiService *APIerSv1Service) Start() (err error) {
}
//backwards compatible
apiService.connChan <- apiService.api
apiService.connChan <- intAnzConn(apiService.api, utils.APIerSv1)
apiService.APIerSv1Chan <- apiService.api
return

View File

@@ -73,7 +73,7 @@ func (api *APIerSv2Service) Start() (err error) {
api.server.RpcRegisterName(utils.ApierV2, api.api)
}
api.connChan <- api.api
api.connChan <- intAnzConn(api.api, utils.APIerSv2)
return
}

View File

@@ -87,7 +87,7 @@ func (attrS *AttributeService) Start() (err error) {
if !attrS.cfg.DispatcherSCfg().Enabled {
attrS.server.RpcRegister(attrS.rpc)
}
attrS.connChan <- attrS.rpc
attrS.connChan <- intAnzConn(attrS.rpc, utils.AttributeS)
return
}

View File

@@ -107,7 +107,7 @@ func (cdrService *CDRServer) Start() (err error) {
// Make the cdr server available for internal communication
cdrService.server.RpcRegister(cdrService.cdrS) // register CdrServer for internal usage (TODO: refactor this)
}
cdrService.connChan <- cdrService.cdrS // Signal that cdrS is operational
cdrService.connChan <- intAnzConn(cdrService.cdrS, utils.CDRServer) // Signal that cdrS is operational
return
}

View File

@@ -88,7 +88,7 @@ func (chrS *ChargerService) Start() (err error) {
if !chrS.cfg.DispatcherSCfg().Enabled {
chrS.server.RpcRegister(cSv1)
}
chrS.connChan <- cSv1
chrS.connChan <- intAnzConn(cSv1, utils.ChargerS)
return
}

View File

@@ -66,7 +66,7 @@ func (dspS *DispatcherHostsService) Start() (err error) {
dspS.dspS = dispatcherh.NewDispatcherHService(dspS.cfg, dspS.connMgr)
go dspS.dspS.ListenAndServe()
dspS.connChan <- dspS.dspS
dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherH)
return
}

View File

@@ -145,7 +145,7 @@ func (dspS *DispatcherService) Start() (err error) {
dspS.server.RpcRegisterName(utils.RateSv1,
v1.NewDispatcherRateSv1(dspS.dspS))
dspS.connChan <- dspS.dspS
dspS.connChan <- intAnzConn(dspS.dspS, utils.DispatcherS)
return
}

View File

@@ -118,17 +118,17 @@ func (es *EventExporterService) Start() (err error) {
utils.EventExporterS, err))
return
}
es.rpc = v1.NewEventExporterSv1(es.eeS)
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(es.rpc)
}
es.intConnChan <- es.eeS
go func(eeS *ees.EventExporterS, exitChan chan bool, rldChan chan struct{}) {
if err := eeS.ListenAndServe(exitChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error()))
exitChan <- true
}
}(es.eeS, es.exitChan, es.rldChan)
es.rpc = v1.NewEventExporterSv1(es.eeS)
if !es.cfg.DispatcherSCfg().Enabled {
es.server.RpcRegister(es.rpc)
}
es.intConnChan <- intAnzConn(es.eeS, utils.EventExporterS)
return
}

View File

@@ -85,7 +85,7 @@ func (ldrs *LoaderService) Start() (err error) {
if !ldrs.cfg.DispatcherSCfg().Enabled {
ldrs.server.RpcRegister(ldrs.rpc)
}
ldrs.connChan <- ldrs.rpc
ldrs.connChan <- intAnzConn(ldrs.rpc, utils.LoaderS)
return
}

View File

@@ -91,7 +91,7 @@ func (rals *RalService) Start() (err error) {
rals.server.RpcRegister(rals.rals)
}
rals.connChan <- rals.rals
rals.connChan <- intAnzConn(rals.rals, utils.RALService)
return
}

View File

@@ -126,18 +126,18 @@ func (rs *RateService) Start() (err error) {
rs.rateS = rates.NewRateS(rs.cfg, fltrS, dm)
rs.Unlock()
rs.rpc = v1.NewRateSv1(rs.rateS)
if !rs.cfg.DispatcherSCfg().Enabled {
rs.server.RpcRegister(rs.rpc)
}
rs.intConnChan <- rs.rpc
go func(rtS *rates.RateS, exitChan chan bool, rldChan chan struct{}) {
if err := rtS.ListenAndServe(exitChan, rldChan); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.EventExporterS, err.Error()))
exitChan <- true
}
}(rs.rateS, rs.exitChan, rs.rldChan)
rs.rpc = v1.NewRateSv1(rs.rateS)
if !rs.cfg.DispatcherSCfg().Enabled {
rs.server.RpcRegister(rs.rpc)
}
rs.intConnChan <- intAnzConn(rs.rpc, utils.RateS)
return
}

View File

@@ -90,7 +90,7 @@ func (reS *ResourceService) Start() (err error) {
if !reS.cfg.DispatcherSCfg().Enabled {
reS.server.RpcRegister(reS.rpc)
}
reS.connChan <- reS.rpc
reS.connChan <- intAnzConn(reS.rpc, utils.ResourceS)
return
}

View File

@@ -68,7 +68,7 @@ func (resp *ResponderService) Start() (err error) {
resp.server.RpcRegister(resp.resp)
}
resp.connChan <- resp.resp // Rater done
resp.connChan <- intAnzConn(resp.resp, utils.ResponderS) // Rater done
return
}

View File

@@ -91,7 +91,7 @@ func (routeS *RouteService) Start() (err error) {
if !routeS.cfg.DispatcherSCfg().Enabled {
routeS.server.RpcRegister(routeS.rpc)
}
routeS.connChan <- routeS.rpc
routeS.connChan <- intAnzConn(routeS.rpc, utils.RouteS)
return
}

View File

@@ -84,7 +84,7 @@ func (schS *SchedulerService) Start() (err error) {
if !schS.cfg.DispatcherSCfg().Enabled {
schS.server.RpcRegister(schS.rpc)
}
schS.connChan <- schS.rpc
schS.connChan <- intAnzConn(schS.rpc, utils.SchedulerS)
return
}

View File

@@ -82,7 +82,7 @@ func (smg *SessionService) Start() (err error) {
//start sync session in a separate gorutine
go smg.sm.ListenAndServe(smg.exitChan)
// Pass internal connection via BiRPCClient
smg.connChan <- smg.sm
smg.connChan <- intAnzConn(smg.sm, utils.SessionS)
// Register RPC handler
smg.rpc = v1.NewSMGenericV1(smg.sm)

View File

@@ -92,7 +92,7 @@ func (sts *StatService) Start() (err error) {
if !sts.cfg.DispatcherSCfg().Enabled {
sts.server.RpcRegister(sts.rpc)
}
sts.connChan <- sts.rpc
sts.connChan <- intAnzConn(sts.rpc, utils.StatS)
return
}

View File

@@ -87,7 +87,7 @@ func (thrs *ThresholdService) Start() (err error) {
if !thrs.cfg.DispatcherSCfg().Enabled {
thrs.server.RpcRegister(thrs.rpc)
}
thrs.connChan <- thrs.rpc
thrs.connChan <- intAnzConn(thrs.rpc, utils.ThresholdS)
return
}