From bb08b933f76def7e1418c649d64660f82000a250 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Tue, 27 Oct 2020 17:40:40 +0200 Subject: [PATCH] Finished tests for analyzers --- analyzers/analyzers.go | 14 ++-- analyzers/analyzers_test.go | 140 ++++++++++++++++++++++++++++++++---- analyzers/codec.go | 22 +++--- analyzers/connector.go | 16 ++--- analyzers/utils.go | 14 ++-- analyzers/utils_test.go | 14 ++-- utils/consts.go | 7 +- 7 files changed, 165 insertions(+), 62 deletions(-) diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index 35691f46a..298390112 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -58,7 +58,7 @@ func (aS *AnalyzerService) initDB() (err error) { func (aS *AnalyzerService) clenaUp() (err error) { t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL)) - t2.SetField("RequestStartTime") + t2.SetField(utils.RequestStartTime) searchReq := bleve.NewSearchRequest(t2) var res *bleve.SearchResult if res, err = aS.db.Search(searchReq); err != nil { @@ -110,12 +110,12 @@ func (aS *AnalyzerService) Shutdown() error { func (aS *AnalyzerService) logTrafic(id uint64, method string, params, result, err interface{}, - info *extraInfo, sTime, eTime time.Time) error { + enc, from, to string, sTime, eTime time.Time) error { if strings.HasPrefix(method, utils.AnalyzerSv1) { return nil } return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)), - NewInfoRPC(id, method, params, result, err, info, sTime, eTime)) + NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime)) } // V1Search returns a list of API that match the query @@ -130,11 +130,11 @@ func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interf for i, obj := range searchResults.Hits { rply[i] = obj.Fields // make sure that the result is corectly marshaled - rply[i]["Result"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Result"])) - rply[i]["Params"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Params"])) + rply[i][utils.Reply] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply])) + rply[i][utils.RequestParams] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams])) // try to pretty print the duration - if dur, err := utils.IfaceAsDuration(rply[i]["Duration"]); err == nil { - rply[i]["Duration"] = dur.String() + if dur, err := utils.IfaceAsDuration(rply[i][utils.RequestDuration]); err == nil { + rply[i][utils.RequestDuration] = dur.String() } } *reply = rply diff --git a/analyzers/analyzers_test.go b/analyzers/analyzers_test.go index e8b9c7c80..c243b5267 100644 --- a/analyzers/analyzers_test.go +++ b/analyzers/analyzers_test.go @@ -19,9 +19,12 @@ along with this program. If not, see package analyzers import ( + "encoding/json" "os" "path" + "reflect" "runtime" + "strconv" "testing" "time" @@ -80,26 +83,17 @@ func TestAnalyzerSLogTraffic(t *testing.T) { t.Fatal(err) } t1 := time.Now().Add(-time.Hour) - if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", &extraInfo{ - enc: utils.MetaJSON, - from: "127.0.0.1:5565", - to: "127.0.0.1:2012", - }, t1, t1.Add(time.Second)); err != nil { + if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil { t.Fatal(err) } - if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ - enc: utils.MetaJSON, - from: "127.0.0.1:5565", - to: "127.0.0.1:2012", - }, t1, t1.Add(time.Second)); err != nil { + if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil { t.Fatal(err) } t1 = time.Now().Add(-10 * time.Minute) - if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ - enc: utils.MetaJSON, - from: "127.0.0.1:5565", - to: "127.0.0.1:2012", - }, t1, t1.Add(time.Second)); err != nil { + if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil { t.Fatal(err) } if cnt, err := anz.db.DocCount(); err != nil { @@ -180,3 +174,119 @@ func TestAnalyzersListenAndServe(t *testing.T) { }() anz.ListenAndServe(make(chan bool)) } + +func TestAnalyzersV1Search(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers" + cfg.AnalyzerSCfg().TTL = 30 * time.Minute + if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil { + t.Fatal(err) + } + if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil { + t.Fatal(err) + } + anz, err := NewAnalyzerService(cfg) + if err != nil { + t.Fatal(err) + } + // generate trafic + t1 := time.Now() + if err = anz.logTrafic(0, utils.CoreSv1Ping, + &utils.CGREventWithOpts{ + Opts: map[string]interface{}{ + utils.EventSource: utils.MetaCDRs, + }, + }, utils.Pong, nil, utils.MetaJSON, "127.0.0.1:5565", + "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil { + t.Fatal(err) + } + + if err = anz.logTrafic(1, utils.CoreSv1Ping, + &utils.CGREventWithOpts{ + Opts: map[string]interface{}{ + utils.EventSource: utils.MetaAttributes, + }, + }, utils.Pong, nil, + + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", + t1.Add(time.Second), t1.Add(20*time.Second)); err != nil { + t.Fatal(err) + } + + if err = anz.logTrafic(2, utils.CoreSv1Ping, + &utils.CGREventWithOpts{ + Opts: map[string]interface{}{ + utils.EventSource: utils.MetaAttributes, + }, + }, utils.Pong, nil, + + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", + t1.Add(2*time.Second), t1.Add(10*time.Second)); err != nil { + t.Fatal(err) + } + + if err = anz.logTrafic(3, utils.CoreSv1Ping, + &utils.CGREventWithOpts{ + Opts: map[string]interface{}{ + utils.EventSource: utils.MetaAttributes, + }, + }, utils.Pong, nil, + + utils.MetaGOB, "127.0.0.1:5566", "127.0.0.1:2013", + t1.Add(-24*time.Hour), t1.Add(-23*time.Hour)); err != nil { + t.Fatal(err) + } + reply := []map[string]interface{}{} + if err = anz.V1Search(utils.CoreSv1Ping, &reply); err != nil { + t.Fatal(err) + } else if len(reply) != 4 { + t.Errorf("Expected 4 hits received: %v", len(reply)) + } + reply = []map[string]interface{}{} + if err = anz.V1Search("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil { + t.Fatal(err) + } else if len(reply) != 4 { + t.Errorf("Expected 4 hits received: %v", len(reply)) + } + + expRply := []map[string]interface{}{{ + "RequestDestination": "127.0.0.1:2013", + "RequestDuration": "1h0m0s", + "RequestEncoding": "*gob", + "RequestID": 3., + "RequestMethod": "CoreSv1.Ping", + "RequestParams": json.RawMessage(`{"Opts":{"EventSource":"*attributes"}}`), + "Reply": json.RawMessage(`"Pong"`), + "RequestSource": "127.0.0.1:5566", + "RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339), + }} + reply = []map[string]interface{}{} + if err = anz.V1Search(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expRply, reply) { + t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) + } + + reply = []map[string]interface{}{} + if err = anz.V1Search(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expRply, reply) { + t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) + } + reply = []map[string]interface{}{} + if err = anz.V1Search("RequestEncoding:*gob", &reply); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(expRply, reply) { + t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply)) + } + + if err = anz.db.Close(); err != nil { + t.Fatal(err) + } + if err = anz.V1Search("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed { + t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err) + } +} diff --git a/analyzers/codec.go b/analyzers/codec.go index eb8ce8fbc..d0c3d1cff 100644 --- a/analyzers/codec.go +++ b/analyzers/codec.go @@ -29,11 +29,9 @@ func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to stri sc: sc, reqs: make(map[uint64]*rpcAPI), aS: aS, - extrainfo: &extraInfo{ - enc: enc, - from: from, - to: to, - }, + enc: enc, + from: from, + to: to, } } @@ -41,11 +39,13 @@ type AnalyzerServerCodec struct { sc rpc.ServerCodec // keep the API in memory because the write is async - reqs map[uint64]*rpcAPI - reqIdx uint64 - reqsLk sync.RWMutex - aS *AnalyzerService - extrainfo *extraInfo + reqs map[uint64]*rpcAPI + reqIdx uint64 + reqsLk sync.RWMutex + aS *AnalyzerService + enc string + from string + to string } func (c *AnalyzerServerCodec) ReadRequestHeader(r *rpc.Request) (err error) { @@ -73,7 +73,7 @@ func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) erro api := c.reqs[c.reqIdx] delete(c.reqs, c.reqIdx) c.reqsLk.Unlock() - go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now()) + go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now()) return c.sc.WriteResponse(r, x) } func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() } diff --git a/analyzers/connector.go b/analyzers/connector.go index 543141d2a..3aadc0e9f 100644 --- a/analyzers/connector.go +++ b/analyzers/connector.go @@ -28,24 +28,24 @@ func (aS *AnalyzerService) NewAnalyzerConnector(sc rpcclient.ClientConnector, en return &AnalyzerConnector{ conn: sc, aS: aS, - extrainfo: &extraInfo{ - enc: enc, - from: from, - to: to, - }, + enc: enc, + from: from, + to: to, } } type AnalyzerConnector struct { conn rpcclient.ClientConnector - aS *AnalyzerService - extrainfo *extraInfo + aS *AnalyzerService + enc string + from string + to string } func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) { sTime := time.Now() err = c.conn.Call(serviceMethod, args, reply) - go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now()) + go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.enc, c.from, c.to, sTime, time.Now()) return } diff --git a/analyzers/utils.go b/analyzers/utils.go index efe66c880..ade3a7935 100644 --- a/analyzers/utils.go +++ b/analyzers/utils.go @@ -32,7 +32,7 @@ import ( // NewInfoRPC returns a structure to be indexed func NewInfoRPC(id uint64, method string, params, result, err interface{}, - info *extraInfo, sTime, eTime time.Time) *InfoRPC { + enc, from, to string, sTime, eTime time.Time) *InfoRPC { var e interface{} switch val := err.(type) { default: @@ -47,9 +47,9 @@ func NewInfoRPC(id uint64, method string, RequestStartTime: sTime, // EndTime: eTime, - RequestEncoding: info.enc, - RequestSource: info.from, - RequestDestination: info.to, + RequestEncoding: enc, + RequestSource: from, + RequestDestination: to, RequestID: id, RequestMethod: method, @@ -76,12 +76,6 @@ type InfoRPC struct { ReplyError interface{} } -type extraInfo struct { - enc string - from string - to string -} - type rpcAPI struct { ID uint64 `json:"id"` Method string `json:"method"` diff --git a/analyzers/utils_test.go b/analyzers/utils_test.go index 9165ed48c..275baf0fd 100644 --- a/analyzers/utils_test.go +++ b/analyzers/utils_test.go @@ -84,19 +84,13 @@ func TestNewInfoRPC(t *testing.T) { Reply: `"result"`, ReplyError: "error", } - idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{ - enc: utils.MetaJSON, - from: "127.0.0.1:5565", - to: "127.0.0.1:2012", - }, t1, t1.Add(time.Second)) + idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)) if !reflect.DeepEqual(expIdx, idx) { t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx)) } - idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), &extraInfo{ - enc: utils.MetaJSON, - from: "127.0.0.1:5565", - to: "127.0.0.1:2012", - }, t1, t1.Add(time.Second)) + idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), + utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)) if !reflect.DeepEqual(expIdx, idx) { t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx)) } diff --git a/utils/consts.go b/utils/consts.go index 62845e728..bf16f6e5b 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -2486,12 +2486,17 @@ const ( ProcessedOpt = "Processed" ) -// Analyzers cpmstants +// Analyzers constants const ( MetaScorch = "*scorch" MetaBoltdb = "*boltdb" MetaLeveldb = "*leveldb" MetaMoss = "*mossdb" + + RequestStartTime = "RequestStartTime" + RequestDuration = "RequestDuration" + RequestParams = "RequestParams" + Reply = "Reply" ) func buildCacheInstRevPrefixes() {