Updated analyzers

This commit is contained in:
Trial97
2020-10-23 17:14:33 +03:00
committed by Dan Christian Bogos
parent 0810803475
commit 08d4f1fc21
14 changed files with 323 additions and 150 deletions

View File

@@ -20,17 +20,46 @@ package analyzers
import (
"fmt"
"os"
"strconv"
"time"
"github.com/blevesearch/bleve"
// import the bleve packages in order to register the indextype and storagetype
"github.com/blevesearch/bleve/document"
_ "github.com/blevesearch/bleve/index/scorch"
_ "github.com/blevesearch/bleve/index/store/boltdb"
_ "github.com/blevesearch/bleve/index/store/goleveldb"
_ "github.com/blevesearch/bleve/index/store/moss"
_ "github.com/blevesearch/bleve/index/upsidedown"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
// NewAnalyzerService initializes a AnalyzerService
func NewAnalyzerService() (*AnalyzerService, error) {
return &AnalyzerService{}, nil
func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) {
aS = &AnalyzerService{cfg: cfg}
err = aS.initDB()
return
}
// AnalyzerService is the service handling analyzer
type AnalyzerService struct {
db bleve.Index
cfg *config.CGRConfig
}
func (aS *AnalyzerService) initDB() (err error) {
fmt.Println(aS.cfg.AnalyzerSCfg().DBPath)
if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil {
fmt.Println("exista")
aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath)
} else if os.IsNotExist(err) {
fmt.Println("nu exista")
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, bleve.NewIndexMapping(),
aS.cfg.AnalyzerSCfg().IndexType, aS.cfg.AnalyzerSCfg().StoreType, nil)
}
return
}
// ListenAndServe will initialize the service
@@ -44,6 +73,55 @@ func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) error {
// Shutdown is called to shutdown the service
func (aS *AnalyzerService) Shutdown() error {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.AnalyzerS))
aS.db.Close()
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.AnalyzerS))
return nil
}
func (aS *AnalyzerService) logTrafic(id uint64, method string,
params, result, err interface{},
info *extraInfo, sTime, eTime time.Time) error {
var e interface{}
switch val := err.(type) {
default:
case nil:
case string:
e = val
case error:
e = val.Error()
}
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
InfoRPC{
Duration: eTime.Sub(sTime),
StartTime: sTime,
EndTime: eTime,
Encoding: info.enc,
From: info.from,
To: info.to,
ID: id,
Method: method,
Params: params,
Result: result,
Error: e,
})
}
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]*document.Document) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
searchResults, err := aS.db.Search(s)
if err != nil {
return err
}
rply := make([]*document.Document, searchResults.Hits.Len())
for i, obj := range searchResults.Hits {
fmt.Println(obj.ID)
fmt.Println(obj.Index)
d, _ := aS.db.Document(obj.ID)
fmt.Println(d.Fields[0].Name())
rply[i] = d
}
*reply = rply
return nil
}

View File

@@ -20,34 +20,60 @@ package analyzers
import (
"net/rpc"
"sync"
"time"
)
func NewAnalyzeServerCodec(sc rpc.ServerCodec) rpc.ServerCodec {
return &AnalyzeServerCodec{sc: sc, req: new(RPCServerRequest)}
func (aS *AnalyzerService) NewServerCodec(sc rpc.ServerCodec, enc, from, to string) rpc.ServerCodec {
return &AnalyzeServerCodec{
sc: sc,
reqs: make(map[uint64]*rpcAPI),
aS: aS,
extrainfo: &extraInfo{
enc: enc,
from: from,
to: to,
},
}
}
type AnalyzeServerCodec struct {
sc rpc.ServerCodec
// keep the information about the header so we handle this when the body is readed
// the ReadRequestHeader and ReadRequestBody are called in pairs
req *RPCServerRequest
// keep the API in memory because the write is async
reqs map[uint64]*rpcAPI
reqIdx uint64
reqsLk sync.RWMutex
aS *AnalyzerService
extrainfo *extraInfo
}
func (c *AnalyzeServerCodec) ReadRequestHeader(r *rpc.Request) (err error) {
c.req.reset()
err = c.sc.ReadRequestHeader(r)
c.req.Method = r.ServiceMethod
c.req.ID = r.Seq
c.reqsLk.Lock()
c.reqIdx = r.Seq
c.reqs[c.reqIdx] = &rpcAPI{
ID: r.Seq,
Method: r.ServiceMethod,
StartTime: time.Now(),
}
c.reqsLk.Unlock()
return
}
func (c *AnalyzeServerCodec) ReadRequestBody(x interface{}) (err error) {
err = c.sc.ReadRequestBody(x)
go h.handleRequest(c.req.ID, c.req.Method, x)
c.reqsLk.Lock()
c.reqs[c.reqIdx].Params = x
c.reqsLk.Unlock()
return
}
func (c *AnalyzeServerCodec) WriteResponse(r *rpc.Response, x interface{}) error {
go h.handleResponse(r.Seq, x, r.Error)
c.reqsLk.Lock()
api := c.reqs[c.reqIdx]
delete(c.reqs, c.reqIdx)
c.reqsLk.Unlock()
go c.aS.logTrafic(api.ID, api.Method, api.Params, x, r.Error, c.extrainfo, api.StartTime, time.Now())
return c.sc.WriteResponse(r, x)
}
func (c *AnalyzeServerCodec) Close() error { return c.sc.Close() }

View File

@@ -19,28 +19,33 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package analyzers
import (
"sync"
"time"
"github.com/cgrates/rpcclient"
)
func NewAnalyzeConnector(sc rpcclient.ClientConnector) rpcclient.ClientConnector {
return &AnalyzeConnector{conn: sc}
func (aS *AnalyzerService) NewAnalyzeConnector(sc rpcclient.ClientConnector, enc, from, to string) rpcclient.ClientConnector {
return &AnalyzeConnector{
conn: sc,
aS: aS,
extrainfo: &extraInfo{
enc: enc,
from: from,
to: to,
},
}
}
type AnalyzeConnector struct {
conn rpcclient.ClientConnector
seq uint64
seqLk sync.Mutex
conn rpcclient.ClientConnector
aS *AnalyzerService
extrainfo *extraInfo
}
func (c *AnalyzeConnector) Call(serviceMethod string, args interface{}, reply interface{}) (err error) {
c.seqLk.Lock()
id := c.seq
c.seq++
c.seqLk.Unlock()
go h.handleRequest(id, serviceMethod, args)
sTime := time.Now()
err = c.conn.Call(serviceMethod, args, reply)
go h.handleResponse(id, reply, err)
go c.aS.logTrafic(0, serviceMethod, args, reply, err, c.extrainfo, sTime, time.Now())
return
}

View File

@@ -19,61 +19,34 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package analyzers
import (
"log"
"github.com/cgrates/cgrates/utils"
"time"
)
var h = new(Handler)
type Handler struct {
type extraInfo struct {
enc string
from string
to string
}
func (h *Handler) handleTrafic(x interface{}) {
log.Println(utils.ToJSON(x))
}
type InfoRPC struct {
Duration time.Duration
StartTime time.Time
EndTime time.Time
func (h *Handler) handleRequest(id uint64, method string, args interface{}) {
h.handleTrafic(&RPCServerRequest{
ID: id,
Method: method,
Params: []interface{}{args},
})
}
func (h *Handler) handleResponse(id uint64, x, err interface{}) {
var e interface{}
switch val := x.(type) {
default:
case nil:
case string:
e = val
case error:
e = val.Error()
}
h.handleTrafic(&RPCServerResponse{
ID: id,
Result: x,
Error: e,
})
}
Encoding string
From string
To string
type RPCServerRequest struct {
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
ID uint64
Method string
Params interface{}
Result interface{}
Error interface{}
}
func (r *RPCServerRequest) reset() {
r.Method = ""
r.Params = nil
r.ID = 0
}
type RPCServerResponse struct {
type rpcAPI struct {
ID uint64 `json:"id"`
Result interface{} `json:"result"`
Error interface{} `json:"error"`
Method string `json:"method"`
Params interface{} `json:"params"`
StartTime time.Time
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"github.com/blevesearch/bleve/document"
"github.com/cgrates/cgrates/analyzers"
"github.com/cgrates/cgrates/utils"
)
@@ -44,3 +45,7 @@ func (aSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error {
*reply = utils.Pong
return nil
}
func (aSv1 *AnalyzerSv1) Search(search string, reply *[]*document.Document) error {
return aSv1.aS.V1Search(search, reply)
}

View File

@@ -568,6 +568,11 @@ func main() {
internalLoaderSChan, connManager)
anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan)
if anz.ShouldRun() {
if err := anz.Start(); err != nil {
return
}
}
srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals,
rals.GetResponder(), apiSv1, apiSv2, cdrS, smg,

View File

@@ -18,17 +18,29 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import "github.com/cgrates/cgrates/utils"
import (
"path"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/cgrates/cgrates/utils"
)
// AttributeSCfg is the configuration of attribute service
type AnalyzerSCfg struct {
Enabled bool
Enabled bool
DBPath string
IndexType string
StoreType string
}
func (alS *AnalyzerSCfg) loadFromJsonCfg(jsnCfg *AnalyzerSJsonCfg) (err error) {
if jsnCfg == nil {
return
}
alS.DBPath = path.Join("/home", "trial", "analize")
alS.IndexType = upsidedown.Name
alS.StoreType = boltdb.Name
if jsnCfg.Enabled != nil {
alS.Enabled = *jsnCfg.Enabled
}

View File

@@ -123,4 +123,9 @@
"apiers_conns": ["*internal"],
},
"analyzers":{ // AnalyzerS config
"enabled":true // starts AnalyzerS service: <true|false>.
},
}

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"net"
"net/http"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -173,7 +172,7 @@ func register(req *http.Request) (*json.RawMessage, error) {
return sReq.Id, err
}
var addr string
if addr, err = getRemoteIP(req); err != nil {
if addr, err = utils.GetRemoteIP(req); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to obtain the remote IP because: %s",
utils.DispatcherH, err))
return sReq.Id, err
@@ -195,27 +194,6 @@ func register(req *http.Request) (*json.RawMessage, error) {
return sReq.Id, nil
}
func getRemoteIP(r *http.Request) (ip string, err error) {
ip = r.Header.Get("X-REAL-IP")
if net.ParseIP(ip) != nil {
return
}
for _, ip = range strings.Split(r.Header.Get("X-FORWARDED-FOR"), utils.FIELDS_SEP) {
if net.ParseIP(ip) != nil {
return
}
}
if ip, _, err = net.SplitHostPort(r.RemoteAddr); err != nil {
return
}
if net.ParseIP(ip) != nil {
return
}
ip = utils.EmptyString
err = fmt.Errorf("no valid ip found")
return
}
func getConnPort(cfg *config.CGRConfig, transport string, tls bool) (port string, err error) {
var address string
var extraPath string

View File

@@ -128,48 +128,6 @@ func TestGetConnPort(t *testing.T) {
}
}
func TestGetRemoteIP(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(nil))
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "127.0.0.1:2356"
exp := "127.0.0.1"
if rply, err := getRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
req.RemoteAddr = "notAnIP"
if _, err := getRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.RemoteAddr = "127.0.0:2012"
if _, err := getRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.Header.Set("X-FORWARDED-FOR", "127.0.0.2,127.0.0.3")
exp = "127.0.0.2"
if rply, err := getRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
req.Header.Set("X-FORWARDED-FOR", "127.0.0.")
if _, err := getRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.Header.Set("X-REAL-IP", "127.0.0.4")
exp = "127.0.0.4"
if rply, err := getRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
}
func TestRegister(t *testing.T) {
ra := &RegisterArgs{
Tenant: "cgrates.org",

View File

@@ -20,6 +20,8 @@ package services
import (
"fmt"
"net"
"net/rpc"
"sync"
"github.com/cgrates/cgrates/analyzers"
@@ -58,7 +60,7 @@ func (anz *AnalyzerService) Start() (err error) {
if anz.IsRunning() {
return utils.ErrServiceAlreadyRunning
}
if anz.anz, err = analyzers.NewAnalyzerService(); err != nil {
if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error()))
anz.exitChan <- true
return
@@ -71,6 +73,17 @@ func (anz *AnalyzerService) Start() (err error) {
anz.exitChan <- true
return
}()
utils.AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
fromstr := ""
if from != nil {
fromstr = from.String()
}
tostr := ""
if to != nil {
tostr = to.String()
}
return anz.anz.NewServerCodec(conn, enc, fromstr, tostr)
}
anz.rpc = v1.NewAnalyzerSv1(anz.anz)
if !anz.cfg.DispatcherSCfg().Enabled {
anz.server.RpcRegister(anz.rpc)

View File

@@ -19,12 +19,15 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package utils
import (
"io"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
var ConReqs *ConcReqs
var AnalizerWraperFunc = func(conn rpc.ServerCodec, enc string, from, to net.Addr) rpc.ServerCodec {
return conn
}
type ConcReqs struct {
limit int
@@ -69,12 +72,20 @@ func (cR *ConcReqs) Deallocate() {
return
}
func newConcReqsGOBCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
return newConcReqsServerCodec(newGobServerCodec(conn))
type conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
func newConcReqsJSONCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
return newConcReqsServerCodec(jsonrpc.NewServerCodec(conn))
func newConcReqsGOBCodec(conn conn) rpc.ServerCodec {
return AnalizerWraperFunc(newConcReqsServerCodec(newGobServerCodec(conn)), MetaGOB, conn.RemoteAddr(), conn.LocalAddr())
}
func newConcReqsJSONCodec(conn conn) rpc.ServerCodec {
return AnalizerWraperFunc(newConcReqsServerCodec(jsonrpc.NewServerCodec(conn)), MetaJSON, conn.RemoteAddr(), conn.LocalAddr())
}
func newConcReqsServerCodec(sc rpc.ServerCodec) rpc.ServerCodec {

View File

@@ -214,7 +214,9 @@ func (s *Server) ServeGOB(addr string, exitChan chan bool) {
func handleRequest(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.Header().Set("Content-Type", "application/json")
res := NewRPCRequest(r.Body).Call()
rmtIP, _ := GetRemoteIP(r)
rmtAddr, _ := net.ResolveIPAddr(EmptyString, rmtIP)
res := newRPCRequest(r.Body, rmtAddr).Call()
io.Copy(w, res)
}
@@ -334,16 +336,22 @@ func (s *Server) StopBiRPC() {
// rpcRequest represents a RPC request.
// rpcRequest implements the io.ReadWriteCloser interface.
type rpcRequest struct {
r io.Reader // holds the JSON formated RPC request
rw io.ReadWriter // holds the JSON formated RPC response
done chan bool // signals then end of the RPC request
r io.Reader // holds the JSON formated RPC request
rw io.ReadWriter // holds the JSON formated RPC response
done chan bool // signals then end of the RPC request
remoteAddr net.Addr
}
// NewRPCRequest returns a new rpcRequest.
func NewRPCRequest(r io.Reader) *rpcRequest {
// newRPCRequest returns a new rpcRequest.
func newRPCRequest(r io.Reader, remoteAddr net.Addr) *rpcRequest {
var buf bytes.Buffer
done := make(chan bool)
return &rpcRequest{r, &buf, done}
return &rpcRequest{
r: r,
rw: &buf,
done: done,
remoteAddr: remoteAddr,
}
}
func (r *rpcRequest) Read(p []byte) (n int, err error) {
@@ -356,6 +364,13 @@ func (r *rpcRequest) Write(p []byte) (n int, err error) {
return
}
func (r *rpcRequest) LocalAddr() net.Addr {
return LocalAddr()
}
func (r *rpcRequest) RemoteAddr() net.Addr {
return r.remoteAddr
}
func (r *rpcRequest) Close() error {
//r.done <- true // seem to be called sometimes before the write command finishes!
return nil
@@ -549,3 +564,25 @@ func (s *Server) ServeHTTPTLS(addr, serverCrt, serverKey, caCert string, serverP
exitChan <- true
return
}
// GetRemoteIP returns the IP from http request
func GetRemoteIP(r *http.Request) (ip string, err error) {
ip = r.Header.Get("X-REAL-IP")
if net.ParseIP(ip) != nil {
return
}
for _, ip = range strings.Split(r.Header.Get("X-FORWARDED-FOR"), FIELDS_SEP) {
if net.ParseIP(ip) != nil {
return
}
}
if ip, _, err = net.SplitHostPort(r.RemoteAddr); err != nil {
return
}
if net.ParseIP(ip) != nil {
return
}
ip = EmptyString
err = fmt.Errorf("no valid ip found")
return
}

67
utils/server_test.go Normal file
View File

@@ -0,0 +1,67 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package utils
import (
"bytes"
"net/http"
"testing"
)
func TestGetRemoteIP(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:2080/json_rpc", bytes.NewBuffer(nil))
if err != nil {
t.Fatal(err)
}
req.RemoteAddr = "127.0.0.1:2356"
exp := "127.0.0.1"
if rply, err := GetRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
req.RemoteAddr = "notAnIP"
if _, err := GetRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.RemoteAddr = "127.0.0:2012"
if _, err := GetRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.Header.Set("X-FORWARDED-FOR", "127.0.0.2,127.0.0.3")
exp = "127.0.0.2"
if rply, err := GetRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
req.Header.Set("X-FORWARDED-FOR", "127.0.0.")
if _, err := GetRemoteIP(req); err == nil {
t.Fatal("Expected error received nil")
}
req.Header.Set("X-REAL-IP", "127.0.0.4")
exp = "127.0.0.4"
if rply, err := GetRemoteIP(req); err != nil {
t.Fatal(err)
} else if rply != exp {
t.Errorf("Expected: %q ,received: %q", exp, rply)
}
}