mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Finished tests for analyzers
This commit is contained in:
committed by
Dan Christian Bogos
parent
3ae4971967
commit
bb08b933f7
@@ -58,7 +58,7 @@ func (aS *AnalyzerService) initDB() (err error) {
|
||||
|
||||
func (aS *AnalyzerService) clenaUp() (err error) {
|
||||
t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL))
|
||||
t2.SetField("RequestStartTime")
|
||||
t2.SetField(utils.RequestStartTime)
|
||||
searchReq := bleve.NewSearchRequest(t2)
|
||||
var res *bleve.SearchResult
|
||||
if res, err = aS.db.Search(searchReq); err != nil {
|
||||
@@ -110,12 +110,12 @@ func (aS *AnalyzerService) Shutdown() error {
|
||||
|
||||
func (aS *AnalyzerService) logTrafic(id uint64, method string,
|
||||
params, result, err interface{},
|
||||
info *extraInfo, sTime, eTime time.Time) error {
|
||||
enc, from, to string, sTime, eTime time.Time) error {
|
||||
if strings.HasPrefix(method, utils.AnalyzerSv1) {
|
||||
return nil
|
||||
}
|
||||
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
|
||||
NewInfoRPC(id, method, params, result, err, info, sTime, eTime))
|
||||
NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime))
|
||||
}
|
||||
|
||||
// V1Search returns a list of API that match the query
|
||||
@@ -130,11 +130,11 @@ func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interf
|
||||
for i, obj := range searchResults.Hits {
|
||||
rply[i] = obj.Fields
|
||||
// make sure that the result is corectly marshaled
|
||||
rply[i]["Result"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Result"]))
|
||||
rply[i]["Params"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Params"]))
|
||||
rply[i][utils.Reply] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply]))
|
||||
rply[i][utils.RequestParams] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams]))
|
||||
// try to pretty print the duration
|
||||
if dur, err := utils.IfaceAsDuration(rply[i]["Duration"]); err == nil {
|
||||
rply[i]["Duration"] = dur.String()
|
||||
if dur, err := utils.IfaceAsDuration(rply[i][utils.RequestDuration]); err == nil {
|
||||
rply[i][utils.RequestDuration] = dur.String()
|
||||
}
|
||||
}
|
||||
*reply = rply
|
||||
|
||||
@@ -19,9 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package analyzers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -80,26 +83,17 @@ func TestAnalyzerSLogTraffic(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t1 := time.Now().Add(-time.Hour)
|
||||
if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error", &extraInfo{
|
||||
enc: utils.MetaJSON,
|
||||
from: "127.0.0.1:5565",
|
||||
to: "127.0.0.1:2012",
|
||||
}, t1, t1.Add(time.Second)); err != nil {
|
||||
if err = anz.logTrafic(0, utils.AnalyzerSv1Ping, "status", "result", "error",
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
|
||||
enc: utils.MetaJSON,
|
||||
from: "127.0.0.1:5565",
|
||||
to: "127.0.0.1:2012",
|
||||
}, t1, t1.Add(time.Second)); err != nil {
|
||||
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error",
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t1 = time.Now().Add(-10 * time.Minute)
|
||||
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
|
||||
enc: utils.MetaJSON,
|
||||
from: "127.0.0.1:5565",
|
||||
to: "127.0.0.1:2012",
|
||||
}, t1, t1.Add(time.Second)); err != nil {
|
||||
if err = anz.logTrafic(0, utils.CoreSv1Status, "status", "result", "error",
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cnt, err := anz.db.DocCount(); err != nil {
|
||||
@@ -180,3 +174,119 @@ func TestAnalyzersListenAndServe(t *testing.T) {
|
||||
}()
|
||||
anz.ListenAndServe(make(chan bool))
|
||||
}
|
||||
|
||||
func TestAnalyzersV1Search(t *testing.T) {
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.AnalyzerSCfg().DBPath = "/tmp/analyzers"
|
||||
cfg.AnalyzerSCfg().TTL = 30 * time.Minute
|
||||
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
anz, err := NewAnalyzerService(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// generate trafic
|
||||
t1 := time.Now()
|
||||
if err = anz.logTrafic(0, utils.CoreSv1Ping,
|
||||
&utils.CGREventWithOpts{
|
||||
Opts: map[string]interface{}{
|
||||
utils.EventSource: utils.MetaCDRs,
|
||||
},
|
||||
}, utils.Pong, nil, utils.MetaJSON, "127.0.0.1:5565",
|
||||
"127.0.0.1:2012", t1, t1.Add(time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = anz.logTrafic(1, utils.CoreSv1Ping,
|
||||
&utils.CGREventWithOpts{
|
||||
Opts: map[string]interface{}{
|
||||
utils.EventSource: utils.MetaAttributes,
|
||||
},
|
||||
}, utils.Pong, nil,
|
||||
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012",
|
||||
t1.Add(time.Second), t1.Add(20*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = anz.logTrafic(2, utils.CoreSv1Ping,
|
||||
&utils.CGREventWithOpts{
|
||||
Opts: map[string]interface{}{
|
||||
utils.EventSource: utils.MetaAttributes,
|
||||
},
|
||||
}, utils.Pong, nil,
|
||||
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012",
|
||||
t1.Add(2*time.Second), t1.Add(10*time.Second)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err = anz.logTrafic(3, utils.CoreSv1Ping,
|
||||
&utils.CGREventWithOpts{
|
||||
Opts: map[string]interface{}{
|
||||
utils.EventSource: utils.MetaAttributes,
|
||||
},
|
||||
}, utils.Pong, nil,
|
||||
|
||||
utils.MetaGOB, "127.0.0.1:5566", "127.0.0.1:2013",
|
||||
t1.Add(-24*time.Hour), t1.Add(-23*time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
reply := []map[string]interface{}{}
|
||||
if err = anz.V1Search(utils.CoreSv1Ping, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(reply) != 4 {
|
||||
t.Errorf("Expected 4 hits received: %v", len(reply))
|
||||
}
|
||||
reply = []map[string]interface{}{}
|
||||
if err = anz.V1Search("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if len(reply) != 4 {
|
||||
t.Errorf("Expected 4 hits received: %v", len(reply))
|
||||
}
|
||||
|
||||
expRply := []map[string]interface{}{{
|
||||
"RequestDestination": "127.0.0.1:2013",
|
||||
"RequestDuration": "1h0m0s",
|
||||
"RequestEncoding": "*gob",
|
||||
"RequestID": 3.,
|
||||
"RequestMethod": "CoreSv1.Ping",
|
||||
"RequestParams": json.RawMessage(`{"Opts":{"EventSource":"*attributes"}}`),
|
||||
"Reply": json.RawMessage(`"Pong"`),
|
||||
"RequestSource": "127.0.0.1:5566",
|
||||
"RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339),
|
||||
}}
|
||||
reply = []map[string]interface{}{}
|
||||
if err = anz.V1Search(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(expRply, reply) {
|
||||
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
|
||||
}
|
||||
|
||||
reply = []map[string]interface{}{}
|
||||
if err = anz.V1Search(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(expRply, reply) {
|
||||
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
|
||||
}
|
||||
reply = []map[string]interface{}{}
|
||||
if err = anz.V1Search("RequestEncoding:*gob", &reply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if !reflect.DeepEqual(expRply, reply) {
|
||||
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
|
||||
}
|
||||
|
||||
if err = anz.db.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err = anz.V1Search("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed {
|
||||
t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,11 +29,9 @@ func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to stri
|
||||
sc: sc,
|
||||
reqs: make(map[uint64]*rpcAPI),
|
||||
aS: aS,
|
||||
extrainfo: &extraInfo{
|
||||
enc: enc,
|
||||
from: from,
|
||||
to: to,
|
||||
},
|
||||
enc: enc,
|
||||
from: from,
|
||||
to: to,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,11 +39,13 @@ type AnalyzerServerCodec struct {
|
||||
sc rpc.ServerCodec
|
||||
|
||||
// keep the API in memory because the write is async
|
||||
reqs map[uint64]*rpcAPI
|
||||
reqIdx uint64
|
||||
reqsLk sync.RWMutex
|
||||
aS *AnalyzerService
|
||||
extrainfo *extraInfo
|
||||
reqs map[uint64]*rpcAPI
|
||||
reqIdx uint64
|
||||
reqsLk sync.RWMutex
|
||||
aS *AnalyzerService
|
||||
enc string
|
||||
from string
|
||||
to string
|
||||
}
|
||||
|
||||
func (c *AnalyzerServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
|
||||
@@ -73,7 +73,7 @@ func (c *AnalyzerServerCodec) WriteResponse(r *rpc.Response, x interface{}) erro
|
||||
api := c.reqs[c.reqIdx]
|
||||
delete(c.reqs, c.reqIdx)
|
||||
c.reqsLk.Unlock()
|
||||
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now())
|
||||
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.enc, c.from, c.to, api.StartTime, time.Now())
|
||||
return c.sc.WriteResponse(r, x)
|
||||
}
|
||||
func (c *AnalyzerServerCodec) Close() error { return c.sc.Close() }
|
||||
|
||||
@@ -28,24 +28,24 @@ func (aS *AnalyzerService) NewAnalyzerConnector(sc rpcclient.ClientConnector, en
|
||||
return &AnalyzerConnector{
|
||||
conn: sc,
|
||||
aS: aS,
|
||||
extrainfo: &extraInfo{
|
||||
enc: enc,
|
||||
from: from,
|
||||
to: to,
|
||||
},
|
||||
enc: enc,
|
||||
from: from,
|
||||
to: to,
|
||||
}
|
||||
}
|
||||
|
||||
type AnalyzerConnector struct {
|
||||
conn rpcclient.ClientConnector
|
||||
|
||||
aS *AnalyzerService
|
||||
extrainfo *extraInfo
|
||||
aS *AnalyzerService
|
||||
enc string
|
||||
from string
|
||||
to string
|
||||
}
|
||||
|
||||
func (c *AnalyzerConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
|
||||
sTime := time.Now()
|
||||
err = c.conn.Call(serviceMethod, args, reply)
|
||||
go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now())
|
||||
go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.enc, c.from, c.to, sTime, time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
// NewInfoRPC returns a structure to be indexed
|
||||
func NewInfoRPC(id uint64, method string,
|
||||
params, result, err interface{},
|
||||
info *extraInfo, sTime, eTime time.Time) *InfoRPC {
|
||||
enc, from, to string, sTime, eTime time.Time) *InfoRPC {
|
||||
var e interface{}
|
||||
switch val := err.(type) {
|
||||
default:
|
||||
@@ -47,9 +47,9 @@ func NewInfoRPC(id uint64, method string,
|
||||
RequestStartTime: sTime,
|
||||
// EndTime: eTime,
|
||||
|
||||
RequestEncoding: info.enc,
|
||||
RequestSource: info.from,
|
||||
RequestDestination: info.to,
|
||||
RequestEncoding: enc,
|
||||
RequestSource: from,
|
||||
RequestDestination: to,
|
||||
|
||||
RequestID: id,
|
||||
RequestMethod: method,
|
||||
@@ -76,12 +76,6 @@ type InfoRPC struct {
|
||||
ReplyError interface{}
|
||||
}
|
||||
|
||||
type extraInfo struct {
|
||||
enc string
|
||||
from string
|
||||
to string
|
||||
}
|
||||
|
||||
type rpcAPI struct {
|
||||
ID uint64 `json:"id"`
|
||||
Method string `json:"method"`
|
||||
|
||||
@@ -84,19 +84,13 @@ func TestNewInfoRPC(t *testing.T) {
|
||||
Reply: `"result"`,
|
||||
ReplyError: "error",
|
||||
}
|
||||
idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error", &extraInfo{
|
||||
enc: utils.MetaJSON,
|
||||
from: "127.0.0.1:5565",
|
||||
to: "127.0.0.1:2012",
|
||||
}, t1, t1.Add(time.Second))
|
||||
idx := NewInfoRPC(0, utils.CoreSv1Status, "status", "result", "error",
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second))
|
||||
if !reflect.DeepEqual(expIdx, idx) {
|
||||
t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
|
||||
}
|
||||
idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"), &extraInfo{
|
||||
enc: utils.MetaJSON,
|
||||
from: "127.0.0.1:5565",
|
||||
to: "127.0.0.1:2012",
|
||||
}, t1, t1.Add(time.Second))
|
||||
idx = NewInfoRPC(0, utils.CoreSv1Status, "status", "result", errors.New("error"),
|
||||
utils.MetaJSON, "127.0.0.1:5565", "127.0.0.1:2012", t1, t1.Add(time.Second))
|
||||
if !reflect.DeepEqual(expIdx, idx) {
|
||||
t.Errorf("Expected:%s, received:%s", utils.ToJSON(expIdx), utils.ToJSON(idx))
|
||||
}
|
||||
|
||||
@@ -2486,12 +2486,17 @@ const (
|
||||
ProcessedOpt = "Processed"
|
||||
)
|
||||
|
||||
// Analyzers cpmstants
|
||||
// Analyzers constants
|
||||
const (
|
||||
MetaScorch = "*scorch"
|
||||
MetaBoltdb = "*boltdb"
|
||||
MetaLeveldb = "*leveldb"
|
||||
MetaMoss = "*mossdb"
|
||||
|
||||
RequestStartTime = "RequestStartTime"
|
||||
RequestDuration = "RequestDuration"
|
||||
RequestParams = "RequestParams"
|
||||
Reply = "Reply"
|
||||
)
|
||||
|
||||
func buildCacheInstRevPrefixes() {
|
||||
|
||||
Reference in New Issue
Block a user