Files
cgrates/analyzers/analyzers.go
2025-10-13 09:57:41 +02:00

191 lines
5.5 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package analyzers
import (
"encoding/json"
"math"
"math/rand"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/cgrates/birpc/context"
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/search"
"github.com/blevesearch/bleve/v2/search/query"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewAnalyzerS initializes a AnalyzerService
func NewAnalyzerS(cfg *config.CGRConfig) (aS *AnalyzerS, err error) {
aS = &AnalyzerS{
cfg: cfg,
}
err = aS.initDB()
return
}
// AnalyzerS is the service handling analyzer
type AnalyzerS struct {
db bleve.Index
cfg *config.CGRConfig
fltrS *engine.FilterS
}
// SetFilterS will set the filterS used in APIs
// this function is called before the API is registerd
func (aS *AnalyzerS) SetFilterS(fS *engine.FilterS) {
aS.fltrS = fS
}
func (aS *AnalyzerS) initDB() (err error) {
if aS.cfg.AnalyzerSCfg().IndexType == utils.MetaInternal {
aS.db, err = bleve.NewMemOnly(bleve.NewIndexMapping())
return
}
dbPath := path.Join(aS.cfg.AnalyzerSCfg().DBPath, utils.AnzDBDir)
if _, err = os.Stat(dbPath); err == nil {
aS.db, err = bleve.Open(dbPath)
} else if os.IsNotExist(err) {
indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType)
aS.db, err = bleve.NewUsing(dbPath,
bleve.NewIndexMapping(), indxType, storeType, nil)
}
return
}
func (aS *AnalyzerS) clenaUp() (err error) {
t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL))
t2.SetField(utils.RequestStartTime)
searchReq := bleve.NewSearchRequest(t2)
var res *bleve.SearchResult
if res, err = aS.db.Search(searchReq); err != nil {
return
}
return aS.deleteHits(res.Hits)
}
// extracted as function in order to test this
func (aS *AnalyzerS) deleteHits(hits search.DocumentMatchCollection) (err error) {
hasErr := false
for _, hit := range hits {
if err = aS.db.Delete(hit.ID); err != nil {
hasErr = true
}
}
if hasErr {
err = utils.ErrPartiallyExecuted
}
return
}
// ListenAndServe will initialize the service
func (aS *AnalyzerS) ListenAndServe(ctx *context.Context) (err error) {
if err = aS.clenaUp(); err != nil { // clean up the data at the system start
return
}
for {
select {
case <-ctx.Done():
return
case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval):
if err = aS.clenaUp(); err != nil {
return
}
}
}
}
// Shutdown is called to shutdown the service
func (aS *AnalyzerS) Shutdown() error {
return aS.db.Close()
}
func (aS *AnalyzerS) logTrafic(id uint64, method string,
params, result, err any,
enc, from, to string, sTime, eTime time.Time) error {
if strings.HasPrefix(method, utils.AnalyzerSv1) {
return nil
}
return aS.db.Index(utils.ConcatenatedKey(enc, from, to, method, strconv.FormatInt(rand.Int63n(100000000000000), 16)),
NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime))
}
// QueryArgs the structure that we use to filter the API calls
type QueryArgs struct {
// a string based on the query language(https://blevesearch.com/docs/Query-String-Query/) that we send to bleve
HeaderFilters string
// a list of filters that we use to filter the call similar to how we filter the events
ContentFilters []string
}
// V1StringQuery returns a list of API that match the query
func (aS *AnalyzerS) V1StringQuery(ctx *context.Context, args *QueryArgs, reply *[]map[string]any) error {
var q query.Query
if args.HeaderFilters == utils.EmptyString {
q = bleve.NewMatchAllQuery()
} else {
q = bleve.NewQueryStringQuery(args.HeaderFilters)
}
s := bleve.NewSearchRequestOptions(q, math.MaxInt, 0, false)
s.Fields = []string{utils.Meta} // return all fields
searchResults, err := aS.db.SearchInContext(ctx, s)
if err != nil {
return err
}
rply := make([]map[string]any, 0, searchResults.Hits.Len())
lenContentFltrs := len(args.ContentFilters)
for _, obj := range searchResults.Hits {
// make sure that the result is corectly marshaled
rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply]))
req := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams]))
obj.Fields[utils.Reply] = rep
obj.Fields[utils.RequestParams] = req
// try to pretty print the duration
if dur, err := utils.IfaceAsDuration(obj.Fields[utils.RequestDuration]); err == nil {
obj.Fields[utils.RequestDuration] = dur.String()
}
if val, has := obj.Fields[utils.ReplyError]; !has || len(utils.IfaceAsString(val)) == 0 {
obj.Fields[utils.ReplyError] = nil
}
if lenContentFltrs != 0 {
dp, err := getDPFromSearchresult(req, rep, obj.Fields)
if err != nil {
return err
}
if pass, err := aS.fltrS.Pass(ctx, aS.cfg.GeneralCfg().DefaultTenant,
args.ContentFilters, dp); err != nil {
return err
} else if !pass {
continue
}
}
rply = append(rply, obj.Fields)
}
*reply = rply
return nil
}