Added TTL for analyzers

This commit is contained in:
Trial97
2020-10-26 18:20:57 +02:00
committed by Dan Christian Bogos
parent 08d4f1fc21
commit 6b60d49bf6
18 changed files with 159 additions and 74 deletions

View File

@@ -19,19 +19,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package analyzers
import (
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/blevesearch/bleve"
// import the bleve packages in order to register the indextype and storagetype
"github.com/blevesearch/bleve/document"
_ "github.com/blevesearch/bleve/index/scorch"
_ "github.com/blevesearch/bleve/index/store/boltdb"
_ "github.com/blevesearch/bleve/index/store/goleveldb"
_ "github.com/blevesearch/bleve/index/store/moss"
_ "github.com/blevesearch/bleve/index/upsidedown"
"github.com/blevesearch/bleve/index/scorch"
"github.com/blevesearch/bleve/index/store/boltdb"
"github.com/blevesearch/bleve/index/store/goleveldb"
"github.com/blevesearch/bleve/index/store/moss"
"github.com/blevesearch/bleve/index/upsidedown"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
@@ -50,24 +50,65 @@ type AnalyzerService struct {
}
func (aS *AnalyzerService) initDB() (err error) {
fmt.Println(aS.cfg.AnalyzerSCfg().DBPath)
if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil {
fmt.Println("exista")
aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath)
} else if os.IsNotExist(err) {
fmt.Println("nu exista")
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, bleve.NewIndexMapping(),
aS.cfg.AnalyzerSCfg().IndexType, aS.cfg.AnalyzerSCfg().StoreType, nil)
var indxType, storeType string
switch aS.cfg.AnalyzerSCfg().IndexType {
case utils.MetaScorch:
indxType, storeType = scorch.Name, scorch.Name
case utils.MetaBoltdb:
indxType, storeType = upsidedown.Name, boltdb.Name
case utils.MetaLeveldb:
indxType, storeType = upsidedown.Name, goleveldb.Name
case utils.MetaMoss:
indxType, storeType = upsidedown.Name, moss.Name
}
aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath,
bleve.NewIndexMapping(), indxType, storeType, nil)
}
return
}
func (aS *AnalyzerService) clenaUp() (err error) {
fmt.Println("clean")
t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL))
t2.SetField("RequestStartTime")
searchReq := bleve.NewSearchRequest(t2)
var res *bleve.SearchResult
if res, err = aS.db.Search(searchReq); err != nil {
return
}
hasErr := false
for _, hit := range res.Hits {
if err = aS.db.Delete(hit.ID); err != nil {
hasErr = true
}
}
if hasErr {
err = utils.ErrPartiallyExecuted
}
return
}
// ListenAndServe will initialize the service
func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) error {
func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AnalyzerS))
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
if err = aS.clenaUp(); err != nil { // clean up the data at the system start
return
}
for {
select {
case e := <-exitChan:
exitChan <- e // put back for the others listening for shutdown request
return
case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval):
if err = aS.clenaUp(); err != nil {
return
}
}
}
}
// Shutdown is called to shutdown the service
@@ -81,6 +122,9 @@ func (aS *AnalyzerService) Shutdown() error {
func (aS *AnalyzerService) logTrafic(id uint64, method string,
params, result, err interface{},
info *extraInfo, sTime, eTime time.Time) error {
if strings.HasPrefix(method, utils.AnalyzerSv1) {
return nil
}
var e interface{}
switch val := err.(type) {
default:
@@ -92,35 +136,39 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string,
}
return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)),
InfoRPC{
Duration: eTime.Sub(sTime),
StartTime: sTime,
EndTime: eTime,
RequestDuration: eTime.Sub(sTime),
RequestStartTime: sTime,
// EndTime: eTime,
Encoding: info.enc,
From: info.from,
To: info.to,
RequestEncoding: info.enc,
RequestSource: info.from,
RequestDestination: info.to,
ID: id,
Method: method,
Params: params,
Result: result,
Error: e,
RequestID: id,
RequestMethod: method,
RequestParams: utils.ToJSON(params),
Reply: utils.ToJSON(result),
ReplyError: e,
})
}
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]*document.Document) error {
func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error {
s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr))
s.Fields = []string{utils.Meta} // return all fields
searchResults, err := aS.db.Search(s)
if err != nil {
return err
}
rply := make([]*document.Document, searchResults.Hits.Len())
rply := make([]map[string]interface{}, searchResults.Hits.Len())
for i, obj := range searchResults.Hits {
fmt.Println(obj.ID)
fmt.Println(obj.Index)
d, _ := aS.db.Document(obj.ID)
fmt.Println(d.Fields[0].Name())
rply[i] = d
rply[i] = obj.Fields
// make sure that the result is corectly marshaled
rply[i]["Result"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Result"]))
rply[i]["Params"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Params"]))
// try to pretty print the duration
if dur, err := utils.IfaceAsDuration(rply[i]["Duration"]); err == nil {
rply[i]["Duration"] = dur.String()
}
}
*reply = rply
return nil

View File

@@ -29,20 +29,21 @@ type extraInfo struct {
}
type InfoRPC struct {
Duration time.Duration
StartTime time.Time
EndTime time.Time
RequestDuration time.Duration
RequestStartTime time.Time
// EndTime time.Time
Encoding string
From string
To string
RequestEncoding string
RequestSource string
RequestDestination string
ID uint64
Method string
Params interface{}
Result interface{}
Error interface{}
RequestID uint64
RequestMethod string
RequestParams interface{}
Reply interface{}
ReplyError interface{}
}
type rpcAPI struct {
ID uint64 `json:"id"`
Method string `json:"method"`