Updated AnalyzerSv1 API

This commit is contained in:
Trial97
2020-11-17 09:17:26 +02:00
committed by Dan Christian Bogos
parent 979a2818a7
commit 28ea7e8596
41 changed files with 258 additions and 95 deletions

View File

@@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strconv"
"strings"
"time"
@@ -29,12 +30,16 @@ import (
"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/search"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewAnalyzerService initializes a AnalyzerService
func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) {
aS = &AnalyzerService{cfg: cfg}
func NewAnalyzerService(cfg *config.CGRConfig, filterS chan *engine.FilterS) (aS *AnalyzerService, err error) {
aS = &AnalyzerService{
cfg: cfg,
filterSChan: filterS,
}
err = aS.initDB()
return
}
@@ -43,14 +48,21 @@ func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error)
type AnalyzerService struct {
db bleve.Index
cfg *config.CGRConfig
// because we do not use the filters only for API
// start the service without them
// and populate them on the first API call
filterSChan chan *engine.FilterS
filterS *engine.FilterS
}
func (aS *AnalyzerService) initDB() (err error) {
if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil {
aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath)
dbPath := path.Join(aS.cfg.AnalyzerSCfg().DBPath, "db")
if _, err = os.Stat(dbPath); err == nil {
aS.db, err = bleve.Open(dbPath)
} else if os.IsNotExist(err) {
indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType)
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath,
aS.db, err = bleve.NewUsing(dbPath,
bleve.NewIndexMapping(), indxType, storeType, nil)
}
return
@@ -117,25 +129,59 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime))
}
// V1StringQuery returns a list of API that match the query
func (aS *AnalyzerService) V1StringQuery(searchstr string, reply *[]map[string]interface{}) error {
// QueryArgs the structure that we use to filter the API calls
type QueryArgs struct {
// a string based on the query language(https://blevesearch.com/docs/Query-String-Query/) that we send to bleve
HeaderFilters string
// a list of filters that we use to filter the call similar to how we filter the events
ContentFilters []string
}
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
// V1StringQuery returns a list of API that match the query
func (aS *AnalyzerService) V1StringQuery(args *QueryArgs, reply *[]map[string]interface{}) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(args.HeaderFilters))
s.Fields = []string{utils.Meta} // return all fields
searchResults, err := aS.db.Search(s)
if err != nil {
return err
}
rply := make([]map[string]interface{}, searchResults.Hits.Len())
for i, obj := range searchResults.Hits {
rply[i] = obj.Fields
rply := make([]map[string]interface{}, 0, searchResults.Hits.Len())
lCntFltrs := len(args.ContentFilters)
if lCntFltrs != 0 &&
aS.filterS == nil { // populate the filter on the first API that requeres them
aS.filterS = <-aS.filterSChan
aS.filterSChan <- aS.filterS
}
for _, obj := range searchResults.Hits {
// make sure that the result is corectly marshaled
rply[i][utils.Reply] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply]))
rply[i][utils.RequestParams] = json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams]))
rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply]))
req := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams]))
obj.Fields[utils.Reply] = rep
obj.Fields[utils.RequestParams] = req
// try to pretty print the duration
if dur, err := utils.IfaceAsDuration(rply[i][utils.RequestDuration]); err == nil {
rply[i][utils.RequestDuration] = dur.String()
if dur, err := utils.IfaceAsDuration(obj.Fields[utils.RequestDuration]); err == nil {
obj.Fields[utils.RequestDuration] = dur.String()
}
if lCntFltrs != 0 {
repDP, err := unmarshalJSON(rep)
if err != nil {
return err
}
reqDP, err := unmarshalJSON(req)
if err != nil {
return err
}
if pass, err := aS.filterS.Pass(aS.cfg.GeneralCfg().DefaultTenant,
args.ContentFilters, utils.MapStorage{
utils.MetaReq: reqDP,
utils.MetaRep: repDP,
}); err != nil {
return err
} else if !pass {
continue
}
}
rply = append(rply, obj.Fields)
}
*reply = rply
return nil

View File

@@ -82,18 +82,17 @@ func TestAnalyzerSIT(t *testing.T) {
func testAnalyzerSInitCfg(t *testing.T) {
var err error
if err := os.RemoveAll("/tmp/analyzers/"); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll("/tmp/analyzers/", 0700); err != nil {
t.Fatal(err)
}
anzCfgPath = path.Join(*dataDir, "conf", "samples", "analyzers")
anzCfg, err = config.NewCGRConfigFromPath(anzCfgPath)
if err != nil {
t.Error(err)
}
if err := os.RemoveAll(anzCfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(anzCfg.AnalyzerSCfg().DBPath), 0700); err != nil {
t.Fatal(err)
}
}
func testAnalyzerSInitDataDb(t *testing.T) {
@@ -199,7 +198,7 @@ func testAnalyzerSChargerSv1ProcessEvent(t *testing.T) {
func testAnalyzerSV1Search(t *testing.T) {
var result []map[string]interface{}
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`, &result); err != nil {
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*internal +RequestMethod:AttributeSv1\.ProcessEvent`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))
@@ -208,7 +207,7 @@ func testAnalyzerSV1Search(t *testing.T) {
func testAnalyzerSV1Search2(t *testing.T) {
var result []map[string]interface{}
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`, &result); err != nil {
if err := anzRPC.Call(utils.AnalyzerSv1StringQuery, &QueryArgs{HeaderFilters: `+RequestEncoding:\*json +RequestMethod:ChargerSv1\.ProcessEvent`}, &result); err != nil {
t.Error(err)
} else if len(result) != 1 {
t.Errorf("Unexpected result: %s", utils.ToJSON(result))

View File

@@ -21,7 +21,6 @@ package analyzers
import (
"encoding/json"
"os"
"path"
"reflect"
"runtime"
"strconv"
@@ -43,10 +42,10 @@ func TestNewAnalyzerService(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -78,10 +77,10 @@ func TestAnalyzerSLogTraffic(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -134,10 +133,10 @@ func TestAnalyzersDeleteHits(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -159,10 +158,10 @@ func TestAnalyzersListenAndServe(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -172,7 +171,7 @@ func TestAnalyzersListenAndServe(t *testing.T) {
anz.ListenAndServe(make(chan struct{}))
cfg.AnalyzerSCfg().CleanupInterval = 1
anz, err = NewAnalyzerService(cfg)
anz, err = NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -197,10 +196,10 @@ func TestAnalyzersV1Search(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}
@@ -252,13 +251,13 @@ func TestAnalyzersV1Search(t *testing.T) {
t.Fatal(err)
}
reply := []map[string]interface{}{}
if err = anz.V1StringQuery(utils.CoreSv1Ping, &reply); err != nil {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.CoreSv1Ping}, &reply); err != nil {
t.Fatal(err)
} else if len(reply) != 4 {
t.Errorf("Expected 4 hits received: %v", len(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1StringQuery("RequestMethod:"+utils.CoreSv1Ping, &reply); err != nil {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestMethod:" + utils.CoreSv1Ping}, &reply); err != nil {
t.Fatal(err)
} else if len(reply) != 4 {
t.Errorf("Expected 4 hits received: %v", len(reply))
@@ -276,20 +275,20 @@ func TestAnalyzersV1Search(t *testing.T) {
"RequestStartTime": t1.Add(-24 * time.Hour).UTC().Format(time.RFC3339),
}}
reply = []map[string]interface{}{}
if err = anz.V1StringQuery(utils.RequestDuration+":>="+strconv.FormatInt(int64(time.Hour), 10), &reply); err != nil {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.RequestDuration + ":>=" + strconv.FormatInt(int64(time.Hour), 10)}, &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1StringQuery(utils.RequestStartTime+":<=\""+t1.Add(-23*time.Hour).UTC().Format(time.RFC3339)+"\"", &reply); err != nil {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: utils.RequestStartTime + ":<=\"" + t1.Add(-23*time.Hour).UTC().Format(time.RFC3339) + "\""}, &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
}
reply = []map[string]interface{}{}
if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != nil {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestEncoding:*gob"}, &reply); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(expRply, reply) {
t.Errorf("Expected %s received: %s", utils.ToJSON(expRply), utils.ToJSON(reply))
@@ -298,7 +297,7 @@ func TestAnalyzersV1Search(t *testing.T) {
if err = anz.db.Close(); err != nil {
t.Fatal(err)
}
if err = anz.V1StringQuery("RequestEncoding:*gob", &reply); err != bleve.ErrorIndexClosed {
if err = anz.V1StringQuery(&QueryArgs{HeaderFilters: "RequestEncoding:*gob"}, &reply); err != bleve.ErrorIndexClosed {
t.Errorf("Expected error: %v,received: %+v", bleve.ErrorIndexClosed, err)
}
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {

View File

@@ -21,7 +21,6 @@ package analyzers
import (
"net/rpc"
"os"
"path"
"reflect"
"runtime"
"testing"
@@ -57,10 +56,10 @@ func TestNewServerCodec(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -21,7 +21,6 @@ package analyzers
import (
"errors"
"os"
"path"
"runtime"
"testing"
"time"
@@ -44,10 +43,10 @@ func TestNewAnalyzeConnector(t *testing.T) {
if err := os.RemoveAll(cfg.AnalyzerSCfg().DBPath); err != nil {
t.Fatal(err)
}
if err = os.MkdirAll(path.Dir(cfg.AnalyzerSCfg().DBPath), 0700); err != nil {
if err = os.MkdirAll(cfg.AnalyzerSCfg().DBPath, 0700); err != nil {
t.Fatal(err)
}
anz, err := NewAnalyzerService(cfg)
anz, err := NewAnalyzerService(cfg, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package analyzers
import (
"encoding/json"
"strconv"
"time"
"github.com/blevesearch/bleve/index/scorch"
@@ -97,3 +99,33 @@ func getIndex(indx string) (indxType, storeType string) {
}
return
}
// unmarshalJSON will transform the message in a map[string]interface{} of []interface{}
// depending of the first character
// used for filters purposes so the nil is replaced with empty map
func unmarshalJSON(jsn json.RawMessage) (interface{}, error) {
switch {
case string(jsn) == "null" ||
len(jsn) == 0: // nil or empty response
// by default consider nil as an empty map for filtering purposes
return map[string]interface{}{}, nil
case string(jsn) == "true": // booleans
return true, nil
case string(jsn) == "false":
return false, nil
case jsn[0] == '"': // string
return string(jsn[1 : len(jsn)-1]), nil
case jsn[0] >= '0' && jsn[0] <= '9': // float64
return strconv.ParseFloat(string(jsn), 64)
case jsn[0] == '[': // slice
var val []interface{}
err := json.Unmarshal(jsn, &val)
return val, err
case jsn[0] == '{': // map
var val map[string]interface{}
err := json.Unmarshal(jsn, &val)
return val, err
default:
return nil, new(json.SyntaxError)
}
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package analyzers
import (
"encoding/json"
"errors"
"reflect"
"testing"
@@ -96,3 +97,62 @@ func TestNewInfoRPC(t *testing.T) {
}
}
func TestUnmarshalJSON(t *testing.T) {
expErr := new(json.SyntaxError)
if _, err := unmarshalJSON(json.RawMessage(`a`)); errors.Is(err, expErr) {
t.Errorf("Expected error: %s,received %+v", expErr, err)
}
var exp interface{} = true
if val, err := unmarshalJSON(json.RawMessage(`true`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = false
if val, err := unmarshalJSON(json.RawMessage(`false`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = "string"
if val, err := unmarshalJSON(json.RawMessage(`"string"`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = 94.
if val, err := unmarshalJSON(json.RawMessage(`94`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = []interface{}{"1", "2", "3"}
if val, err := unmarshalJSON(json.RawMessage(`["1","2","3"]`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = map[string]interface{}{"1": "A", "2": "B", "3": "C"}
if val, err := unmarshalJSON(json.RawMessage(`{"1":"A","2":"B","3":"C"}`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
exp = map[string]interface{}{}
if val, err := unmarshalJSON(json.RawMessage(`null`)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
if val, err := unmarshalJSON(json.RawMessage(``)); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(val, exp) {
t.Errorf("Expected: %s,received %s", utils.ToJSON(exp), utils.ToJSON(val))
}
}