/* Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see */ package analyzers import ( "encoding/json" "fmt" "os" "path" "strconv" "strings" "time" "github.com/blevesearch/bleve/v2" "github.com/blevesearch/bleve/v2/search" "github.com/blevesearch/bleve/v2/search/query" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // NewAnalyzerService initializes a AnalyzerService func NewAnalyzerService(cfg *config.CGRConfig) (aS *AnalyzerService, err error) { aS = &AnalyzerService{ cfg: cfg, } err = aS.initDB() return } // AnalyzerService is the service handling analyzer type AnalyzerService struct { db bleve.Index cfg *config.CGRConfig filterS *engine.FilterS } // SetFilterS will set the filterS used in APIs // this function is called before the API is registerd func (aS *AnalyzerService) SetFilterS(fS *engine.FilterS) { aS.filterS = fS } func (aS *AnalyzerService) initDB() (err error) { if aS.cfg.AnalyzerSCfg().IndexType == utils.MetaInternal { aS.db, err = bleve.NewMemOnly(bleve.NewIndexMapping()) return } dbPath := path.Join(aS.cfg.AnalyzerSCfg().DBPath, utils.AnzDBDir) if _, err = os.Stat(dbPath); err == nil { aS.db, err = bleve.Open(dbPath) } else if os.IsNotExist(err) { indxType, storeType := getIndex(aS.cfg.AnalyzerSCfg().IndexType) aS.db, err = bleve.NewUsing(dbPath, bleve.NewIndexMapping(), indxType, storeType, nil) } return } func (aS *AnalyzerService) clenaUp() (err error) { t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL)) t2.SetField(utils.RequestStartTime) searchReq := bleve.NewSearchRequest(t2) var res *bleve.SearchResult if res, err = aS.db.Search(searchReq); err != nil { return } return aS.deleteHits(res.Hits) } // extracted as function in order to test this func (aS *AnalyzerService) deleteHits(hits search.DocumentMatchCollection) (err error) { hasErr := false for _, hit := range hits { if err = aS.db.Delete(hit.ID); err != nil { hasErr = true } } if hasErr { err = utils.ErrPartiallyExecuted } return } // ListenAndServe will initialize the service func (aS *AnalyzerService) ListenAndServe(stopChan <-chan struct{}) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AnalyzerS)) if err = aS.clenaUp(); err != nil { // clean up the data at the system start return } for { select { case <-stopChan: return case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval): if err = aS.clenaUp(); err != nil { return } } } } // Shutdown is called to shutdown the service func (aS *AnalyzerService) Shutdown() error { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.AnalyzerS)) aS.db.Close() utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.AnalyzerS)) return nil } func (aS *AnalyzerService) logTrafic(id uint64, method string, params, result, err any, enc, from, to string, sTime, eTime time.Time) error { if strings.HasPrefix(method, utils.AnalyzerSv1) { return nil } return aS.db.Index(utils.ConcatenatedKey(enc, from, to, method, strconv.FormatInt(sTime.Unix(), 10)), NewInfoRPC(id, method, params, result, err, enc, from, to, sTime, eTime)) } // QueryArgs the structure that we use to filter the API calls type QueryArgs struct { // a string based on the query language(https://blevesearch.com/docs/Query-String-Query/) that we send to bleve HeaderFilters string // a list of filters that we use to filter the call similar to how we filter the events ContentFilters []string } // V1StringQuery returns a list of API that match the query func (aS *AnalyzerService) V1StringQuery(ctx *context.Context, args *QueryArgs, reply *[]map[string]any) error { var q query.Query if args.HeaderFilters == utils.EmptyString { q = bleve.NewMatchAllQuery() } else { q = bleve.NewQueryStringQuery(args.HeaderFilters) } s := bleve.NewSearchRequest(q) s.Fields = []string{utils.Meta} // return all fields searchResults, err := aS.db.Search(s) if err != nil { return err } rply := make([]map[string]any, 0, searchResults.Hits.Len()) lenContentFltrs := len(args.ContentFilters) for _, obj := range searchResults.Hits { // make sure that the result is corectly marshaled rep := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.Reply])) req := json.RawMessage(utils.IfaceAsString(obj.Fields[utils.RequestParams])) obj.Fields[utils.Reply] = rep obj.Fields[utils.RequestParams] = req // try to pretty print the duration if dur, err := utils.IfaceAsDuration(obj.Fields[utils.RequestDuration]); err == nil { obj.Fields[utils.RequestDuration] = dur.String() } if val, has := obj.Fields[utils.ReplyError]; !has || len(utils.IfaceAsString(val)) == 0 { obj.Fields[utils.ReplyError] = nil } if lenContentFltrs != 0 { dp, err := getDPFromSearchresult(req, rep, obj.Fields) if err != nil { return err } if pass, err := aS.filterS.Pass(aS.cfg.GeneralCfg().DefaultTenant, args.ContentFilters, dp); err != nil { return err } else if !pass { continue } } rply = append(rply, obj.Fields) } *reply = rply return nil }