diff --git a/analyzers/analyzers.go b/analyzers/analyzers.go index b48f120ae..7b5d8a6d8 100755 --- a/analyzers/analyzers.go +++ b/analyzers/analyzers.go @@ -19,19 +19,19 @@ along with this program. If not, see package analyzers import ( + "encoding/json" "fmt" "os" "strconv" + "strings" "time" "github.com/blevesearch/bleve" - // import the bleve packages in order to register the indextype and storagetype - "github.com/blevesearch/bleve/document" - _ "github.com/blevesearch/bleve/index/scorch" - _ "github.com/blevesearch/bleve/index/store/boltdb" - _ "github.com/blevesearch/bleve/index/store/goleveldb" - _ "github.com/blevesearch/bleve/index/store/moss" - _ "github.com/blevesearch/bleve/index/upsidedown" + "github.com/blevesearch/bleve/index/scorch" + "github.com/blevesearch/bleve/index/store/boltdb" + "github.com/blevesearch/bleve/index/store/goleveldb" + "github.com/blevesearch/bleve/index/store/moss" + "github.com/blevesearch/bleve/index/upsidedown" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) @@ -50,24 +50,65 @@ type AnalyzerService struct { } func (aS *AnalyzerService) initDB() (err error) { - fmt.Println(aS.cfg.AnalyzerSCfg().DBPath) if _, err = os.Stat(aS.cfg.AnalyzerSCfg().DBPath); err == nil { - fmt.Println("exista") aS.db, err = bleve.Open(aS.cfg.AnalyzerSCfg().DBPath) } else if os.IsNotExist(err) { - fmt.Println("nu exista") - aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, bleve.NewIndexMapping(), - aS.cfg.AnalyzerSCfg().IndexType, aS.cfg.AnalyzerSCfg().StoreType, nil) + var indxType, storeType string + switch aS.cfg.AnalyzerSCfg().IndexType { + case utils.MetaScorch: + indxType, storeType = scorch.Name, scorch.Name + case utils.MetaBoltdb: + indxType, storeType = upsidedown.Name, boltdb.Name + case utils.MetaLeveldb: + indxType, storeType = upsidedown.Name, goleveldb.Name + case utils.MetaMoss: + indxType, storeType = upsidedown.Name, moss.Name + } + + aS.db, err = bleve.NewUsing(aS.cfg.AnalyzerSCfg().DBPath, + bleve.NewIndexMapping(), indxType, storeType, nil) + } + return +} + +func (aS *AnalyzerService) clenaUp() (err error) { + fmt.Println("clean") + t2 := bleve.NewDateRangeQuery(time.Time{}, time.Now().Add(-aS.cfg.AnalyzerSCfg().TTL)) + t2.SetField("RequestStartTime") + searchReq := bleve.NewSearchRequest(t2) + var res *bleve.SearchResult + if res, err = aS.db.Search(searchReq); err != nil { + return + } + hasErr := false + for _, hit := range res.Hits { + if err = aS.db.Delete(hit.ID); err != nil { + hasErr = true + } + } + if hasErr { + err = utils.ErrPartiallyExecuted } return } // ListenAndServe will initialize the service -func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) error { +func (aS *AnalyzerService) ListenAndServe(exitChan chan bool) (err error) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AnalyzerS)) - e := <-exitChan - exitChan <- e // put back for the others listening for shutdown request - return nil + if err = aS.clenaUp(); err != nil { // clean up the data at the system start + return + } + for { + select { + case e := <-exitChan: + exitChan <- e // put back for the others listening for shutdown request + return + case <-time.After(aS.cfg.AnalyzerSCfg().CleanupInterval): + if err = aS.clenaUp(); err != nil { + return + } + } + } } // Shutdown is called to shutdown the service @@ -81,6 +122,9 @@ func (aS *AnalyzerService) Shutdown() error { func (aS *AnalyzerService) logTrafic(id uint64, method string, params, result, err interface{}, info *extraInfo, sTime, eTime time.Time) error { + if strings.HasPrefix(method, utils.AnalyzerSv1) { + return nil + } var e interface{} switch val := err.(type) { default: @@ -92,35 +136,39 @@ func (aS *AnalyzerService) logTrafic(id uint64, method string, } return aS.db.Index(utils.ConcatenatedKey(method, strconv.FormatInt(sTime.Unix(), 10)), InfoRPC{ - Duration: eTime.Sub(sTime), - StartTime: sTime, - EndTime: eTime, + RequestDuration: eTime.Sub(sTime), + RequestStartTime: sTime, + // EndTime: eTime, - Encoding: info.enc, - From: info.from, - To: info.to, + RequestEncoding: info.enc, + RequestSource: info.from, + RequestDestination: info.to, - ID: id, - Method: method, - Params: params, - Result: result, - Error: e, + RequestID: id, + RequestMethod: method, + RequestParams: utils.ToJSON(params), + Reply: utils.ToJSON(result), + ReplyError: e, }) } -func (aS *AnalyzerService) V1Search(searchstr string, reply *[]*document.Document) error { +func (aS *AnalyzerService) V1Search(searchstr string, reply *[]map[string]interface{}) error { s := bleve.NewSearchRequest(bleve.NewQueryStringQuery(searchstr)) + s.Fields = []string{utils.Meta} // return all fields searchResults, err := aS.db.Search(s) if err != nil { return err } - rply := make([]*document.Document, searchResults.Hits.Len()) + rply := make([]map[string]interface{}, searchResults.Hits.Len()) for i, obj := range searchResults.Hits { - fmt.Println(obj.ID) - fmt.Println(obj.Index) - d, _ := aS.db.Document(obj.ID) - fmt.Println(d.Fields[0].Name()) - rply[i] = d + rply[i] = obj.Fields + // make sure that the result is corectly marshaled + rply[i]["Result"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Result"])) + rply[i]["Params"] = json.RawMessage(utils.IfaceAsString(obj.Fields["Params"])) + // try to pretty print the duration + if dur, err := utils.IfaceAsDuration(rply[i]["Duration"]); err == nil { + rply[i]["Duration"] = dur.String() + } } *reply = rply return nil diff --git a/analyzers/utils.go b/analyzers/utils.go index 0c9d84cb6..b11265eb8 100644 --- a/analyzers/utils.go +++ b/analyzers/utils.go @@ -29,20 +29,21 @@ type extraInfo struct { } type InfoRPC struct { - Duration time.Duration - StartTime time.Time - EndTime time.Time + RequestDuration time.Duration + RequestStartTime time.Time + // EndTime time.Time - Encoding string - From string - To string + RequestEncoding string + RequestSource string + RequestDestination string - ID uint64 - Method string - Params interface{} - Result interface{} - Error interface{} + RequestID uint64 + RequestMethod string + RequestParams interface{} + Reply interface{} + ReplyError interface{} } + type rpcAPI struct { ID uint64 `json:"id"` Method string `json:"method"` diff --git a/apier/v1/analyzer.go b/apier/v1/analyzer.go index f2c62d990..fd0e6a468 100755 --- a/apier/v1/analyzer.go +++ b/apier/v1/analyzer.go @@ -19,7 +19,6 @@ along with this program. If not, see package v1 import ( - "github.com/blevesearch/bleve/document" "github.com/cgrates/cgrates/analyzers" "github.com/cgrates/cgrates/utils" ) @@ -46,6 +45,6 @@ func (aSv1 *AnalyzerSv1) Ping(ign *utils.CGREvent, reply *string) error { return nil } -func (aSv1 *AnalyzerSv1) Search(search string, reply *[]*document.Document) error { +func (aSv1 *AnalyzerSv1) Search(search string, reply *[]map[string]interface{}) error { return aSv1.aS.V1Search(search, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 9447b4c20..fcc0b115e 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -570,6 +570,7 @@ func main() { anz := services.NewAnalyzerService(cfg, server, exitChan, internalAnalyzerSChan) if anz.ShouldRun() { if err := anz.Start(); err != nil { + fmt.Println(err) return } } diff --git a/config/analyzerscfg.go b/config/analyzerscfg.go index b3c24ef11..a777137fd 100755 --- a/config/analyzerscfg.go +++ b/config/analyzerscfg.go @@ -19,36 +19,53 @@ along with this program. If not, see package config import ( - "path" + "time" - "github.com/blevesearch/bleve/index/store/boltdb" - "github.com/blevesearch/bleve/index/upsidedown" "github.com/cgrates/cgrates/utils" ) -// AttributeSCfg is the configuration of attribute service +// AnalyzerSCfg is the configuration of analyzer service type AnalyzerSCfg struct { - Enabled bool - DBPath string - IndexType string - StoreType string + Enabled bool + DBPath string + IndexType string + TTL time.Duration + CleanupInterval time.Duration } -func (alS *AnalyzerSCfg) loadFromJsonCfg(jsnCfg *AnalyzerSJsonCfg) (err error) { +func (alS *AnalyzerSCfg) loadFromJSONCfg(jsnCfg *AnalyzerSJsonCfg) (err error) { if jsnCfg == nil { return } - alS.DBPath = path.Join("/home", "trial", "analize") - alS.IndexType = upsidedown.Name - alS.StoreType = boltdb.Name if jsnCfg.Enabled != nil { alS.Enabled = *jsnCfg.Enabled } + if jsnCfg.Db_path != nil { + alS.DBPath = *jsnCfg.Db_path + } + if jsnCfg.Index_type != nil { + alS.IndexType = *jsnCfg.Index_type + } + if jsnCfg.Ttl != nil { + if alS.TTL, err = time.ParseDuration(*jsnCfg.Ttl); err != nil { + return + } + } + if jsnCfg.Cleanup_interval != nil { + if alS.CleanupInterval, err = time.ParseDuration(*jsnCfg.Cleanup_interval); err != nil { + return + } + } return nil } func (alS *AnalyzerSCfg) AsMapInterface() map[string]interface{} { return map[string]interface{}{ utils.EnabledCfg: alS.Enabled, + + utils.DBPathCfg: alS.DBPath, + utils.IndexTypeCfg: alS.IndexType, + utils.TTLCfg: alS.TTL.String(), + utils.CleanupIntervalCfg: alS.CleanupInterval.String(), } } diff --git a/config/analyzerscfg_test.go b/config/analyzerscfg_test.go index 2a50e6bb4..15305b6c8 100644 --- a/config/analyzerscfg_test.go +++ b/config/analyzerscfg_test.go @@ -33,7 +33,7 @@ func TestAnalyzerSCfgloadFromJsonCfg(t *testing.T) { } if jsnCfg, err := NewDefaultCGRConfig(); err != nil { t.Error(err) - } else if err = jsnCfg.analyzerSCfg.loadFromJsonCfg(jsonCfg); err != nil { + } else if err = jsnCfg.analyzerSCfg.loadFromJSONCfg(jsonCfg); err != nil { t.Error(err) } else if !reflect.DeepEqual(jsnCfg.analyzerSCfg, expected) { t.Errorf("Expected %+v \n, received %+v", expected, jsnCfg.analyzerSCfg) diff --git a/config/config.go b/config/config.go index 9fe0b02a8..a996b4a58 100755 --- a/config/config.go +++ b/config/config.go @@ -734,7 +734,7 @@ func (cfg *CGRConfig) loadAnalyzerCgrCfg(jsnCfg *CgrJsonCfg) (err error) { if jsnAnalyzerCgrCfg, err = jsnCfg.AnalyzerCfgJson(); err != nil { return } - return cfg.analyzerSCfg.loadFromJsonCfg(jsnAnalyzerCgrCfg) + return cfg.analyzerSCfg.loadFromJSONCfg(jsnAnalyzerCgrCfg) } // loadAPIBanCgrCfg loads the Analyzer section of the configuration diff --git a/config/config_defaults.go b/config/config_defaults.go index f1496ce2f..e21c55546 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -905,7 +905,11 @@ const CGRATES_CFG_JSON = ` "analyzers":{ // AnalyzerS config - "enabled":false // starts AnalyzerS service: . + "enabled": false, // starts AnalyzerS service: . + "db_path": "/tmp/analyzers", // path to the folder where to store the information + "index_type": "*scorch", // the type of index for the storage: <*scorch|*boltdb|*leveldb|*mossdb> + "ttl": "10s", // time to wait before removing the API capture + "cleanup_interval": "1h", // the interval we clean the db }, diff --git a/config/configs.go b/config/configs.go index 87e79d86a..4319707b4 100644 --- a/config/configs.go +++ b/config/configs.go @@ -110,7 +110,7 @@ func handleConfigSFile(path string, w http.ResponseWriter) { func (cScfg *ConfigSCfg) AsMapInterface() (initialMP map[string]interface{}) { initialMP = map[string]interface{}{ utils.EnabledCfg: cScfg.Enabled, - utils.UrlCfg: cScfg.Url, + utils.URLCfg: cScfg.Url, utils.RootDirCfg: cScfg.RootDir, } return diff --git a/config/configs_test.go b/config/configs_test.go index 7223295c2..ab5e945dd 100644 --- a/config/configs_test.go +++ b/config/configs_test.go @@ -56,7 +56,7 @@ func TestConfigsAsMapInterface(t *testing.T) { }` eMap := map[string]interface{}{ utils.EnabledCfg: true, - utils.UrlCfg: "", + utils.URLCfg: "", utils.RootDirCfg: "/var/spool/cgrates/configs", } if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgsJSONStr); err != nil { @@ -72,7 +72,7 @@ func TestConfigsAsMapInterface2(t *testing.T) { }` eMap := map[string]interface{}{ utils.EnabledCfg: false, - utils.UrlCfg: "/configs/", + utils.URLCfg: "/configs/", utils.RootDirCfg: "/var/spool/cgrates/configs", } if cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgsJSONStr); err != nil { diff --git a/config/httpagntcfg.go b/config/httpagntcfg.go index 419d704a5..5d442654d 100644 --- a/config/httpagntcfg.go +++ b/config/httpagntcfg.go @@ -130,7 +130,7 @@ func (ca *HttpAgentCfg) loadFromJsonCfg(jsnCfg *HttpAgentJsonCfg, separator stri func (ca *HttpAgentCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { initialMP = map[string]interface{}{ utils.IDCfg: ca.ID, - utils.UrlCfg: ca.Url, + utils.URLCfg: ca.Url, utils.SessionSConnsCfg: ca.SessionSConns, utils.RequestPayloadCfg: ca.RequestPayload, utils.ReplyPayloadCfg: ca.ReplyPayload, diff --git a/config/httpagntcfg_test.go b/config/httpagntcfg_test.go index 3596a8b18..706879344 100644 --- a/config/httpagntcfg_test.go +++ b/config/httpagntcfg_test.go @@ -534,7 +534,7 @@ func TestHttpAgentCfgAsMapInterface(t *testing.T) { eMap := []map[string]interface{}{ { utils.IdCfg: "conecto1", - utils.UrlCfg: "/conecto", + utils.URLCfg: "/conecto", utils.SessionSConnsCfg: []string{"*localhost"}, utils.RequestPayloadCfg: "*url", utils.ReplyPayloadCfg: "*xml", diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4af79d9f0..d856c25ed 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -602,7 +602,11 @@ type FcTemplateJsonCfg struct { // Analyzer service json config section type AnalyzerSJsonCfg struct { - Enabled *bool + Enabled *bool + Db_path *string + Index_type *string + Ttl *string + Cleanup_interval *string } type ApierJsonCfg struct { diff --git a/config/suretaxcfg.go b/config/suretaxcfg.go index a38783804..3c98afacd 100644 --- a/config/suretaxcfg.go +++ b/config/suretaxcfg.go @@ -177,7 +177,7 @@ func (self *SureTaxCfg) loadFromJsonCfg(jsnCfg *SureTaxJsonCfg) (err error) { func (st *SureTaxCfg) AsMapInterface(separator string) (initialMP map[string]interface{}) { initialMP = map[string]interface{}{ - utils.UrlCfg: st.Url, + utils.URLCfg: st.Url, utils.ClientNumberCfg: st.ClientNumber, utils.ValidationKeyCfg: st.ValidationKey, utils.BusinessUnitCfg: st.BusinessUnit, diff --git a/config/suretaxcfg_test.go b/config/suretaxcfg_test.go index 03adcd640..f941ed56b 100644 --- a/config/suretaxcfg_test.go +++ b/config/suretaxcfg_test.go @@ -328,7 +328,7 @@ func TestSureTaxCfgAsMapInterface(t *testing.T) { }, }` eMap := map[string]interface{}{ - utils.UrlCfg: utils.EmptyString, + utils.URLCfg: utils.EmptyString, utils.ClientNumberCfg: utils.EmptyString, utils.ValidationKeyCfg: utils.EmptyString, utils.BusinessUnitCfg: utils.EmptyString, diff --git a/engine/core.go b/engine/core.go index a4cbde8db..df15ebcd7 100644 --- a/engine/core.go +++ b/engine/core.go @@ -30,8 +30,7 @@ func NewCoreService() *CoreService { return &CoreService{} } -type CoreService struct { -} +type CoreService struct{} // ListenAndServe will initialize the service func (cS *CoreService) ListenAndServe(exitChan chan bool) (err error) { diff --git a/services/analyzers.go b/services/analyzers.go index a2e473554..dfca01888 100644 --- a/services/analyzers.go +++ b/services/analyzers.go @@ -62,7 +62,6 @@ func (anz *AnalyzerService) Start() (err error) { } if anz.anz, err = analyzers.NewAnalyzerService(anz.cfg); err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.AnalyzerS, err.Error())) - anz.exitChan <- true return } go func() { diff --git a/utils/consts.go b/utils/consts.go index 8d840ab20..62845e728 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -2170,6 +2170,11 @@ const ( RateStringIndexedFieldsCfg = "rate_string_indexed_fields" RatePrefixIndexedFieldsCfg = "rate_prefix_indexed_fields" RateSuffixIndexedFieldsCfg = "rate_suffix_indexed_fields" + + // AnalyzerSCfg + CleanupIntervalCfg = "cleanup_interval" + IndexTypeCfg = "index_type" + DBPathCfg = "db_path" ) // FC Template @@ -2196,7 +2201,7 @@ const ( // SureTax const ( RootDirCfg = "root_dir" - UrlCfg = "url" + URLCfg = "url" ClientNumberCfg = "client_number" ValidationKeyCfg = "validation_key" BusinessUnitCfg = "business_unit" @@ -2481,6 +2486,14 @@ const ( ProcessedOpt = "Processed" ) +// Analyzers cpmstants +const ( + MetaScorch = "*scorch" + MetaBoltdb = "*boltdb" + MetaLeveldb = "*leveldb" + MetaMoss = "*mossdb" +) + func buildCacheInstRevPrefixes() { CachePrefixToInstance = make(map[string]string) for k, v := range CacheInstanceToPrefix {