From 3252fe791bfd89d7eea0c160286ffb1bd675b1ae Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 20 Feb 2019 15:15:45 +0200 Subject: [PATCH] Add integration test for filterS (type *stats) --- agents/agentreq_test.go | 16 +- cdrc/xml_test.go | 4 +- cmd/cgr-engine/cgr-engine.go | 50 ++--- cmd/cgr-engine/rater.go | 4 +- data/conf/samples/filters/cgrates.json | 11 +- engine/filters.go | 26 ++- engine/filters_test.go | 16 +- engine/libengine.go | 6 +- general_tests/filters_it_test.go | 265 ++++++++++++++++++++++++- 9 files changed, 339 insertions(+), 59 deletions(-) diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index 41a85a5ba..d16bcb9ae 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -40,7 +40,7 @@ func TestAgReqAsNavigableMap(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, @@ -136,7 +136,7 @@ func TestAgReqMaxCost(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false) @@ -180,7 +180,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS) @@ -230,7 +230,7 @@ func TestAgReqParseFieldRadius(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -270,7 +270,7 @@ Host: api.cgrates.org data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -341,7 +341,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) //pass the data provider to agent request agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS) tplFlds := []*config.FCTemplate{ @@ -370,7 +370,7 @@ func TestAgReqEmptyFilter(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS) // populate request, emulating the way will be done in HTTPAgent agReq.CGRRequest.Set([]string{utils.CGRID}, @@ -413,7 +413,7 @@ func TestAgReqMetaExponent(t *testing.T) { data, _ := engine.NewMapStorage() dm := engine.NewDataManager(data) cfg, _ := config.NewDefaultCGRConfig() - filterS := engine.NewFilterS(cfg, nil, dm) + filterS := engine.NewFilterS(cfg, nil, nil, dm) agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS) agReq.CGRRequest.Set([]string{"Value"}, "2", false, false) agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false) diff --git a/cdrc/xml_test.go b/cdrc/xml_test.go index 563124bfe..160bc8138 100644 --- a/cdrc/xml_test.go +++ b/cdrc/xml_test.go @@ -314,7 +314,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data))) if err != nil { t.Error(err) } @@ -573,7 +573,7 @@ func TestXMLRPNestingSeparator(t *testing.T) { } xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(xmlContent), utils.HierarchyPath([]string{"File", "CDRs", "Call"}), "UTC", true, - cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data))) + cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data))) if err != nil { t.Error(err) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index b31a94e66..16053cd38 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -128,7 +128,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl) + cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %s", err.Error())) exitChan <- true @@ -161,7 +161,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ChargerSConns, - internalChargerSChan, cfg.GeneralCfg().InternalTtl) + internalChargerSChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SessionS, utils.ChargerS, err.Error())) @@ -175,7 +175,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().RALsConns, - internalRaterChan, cfg.GeneralCfg().InternalTtl) + internalRaterChan, cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error())) @@ -190,7 +190,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ResSConns, internalResourceSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.SessionS, err.Error())) @@ -205,7 +205,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ThreshSConns, internalThresholdSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.SessionS, err.Error())) @@ -220,7 +220,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().StatSConns, internalStatSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SessionS, err.Error())) @@ -235,7 +235,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().SupplSConns, internalSupplierSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.SessionS, err.Error())) @@ -250,7 +250,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().AttrSConns, internalAttrSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.SessionS, err.Error())) @@ -265,7 +265,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().CDRsConns, internalCDRSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.SessionS, err.Error())) @@ -362,7 +362,7 @@ func startDiameterAgent(internalSsChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.DiameterAgentCfg().SessionSConns, internalSsChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.DiameterAgent, utils.SessionS, err.Error())) @@ -407,7 +407,7 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.RadiusAgentCfg().SessionSConns, internalSMGChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to SMG: %s", err.Error())) exitChan <- true @@ -485,7 +485,7 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, agntCfg.SessionSConns, internalSMGChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s", utils.HTTPAgent, utils.SessionS, err.Error())) @@ -518,7 +518,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.CdrsCfg().CDRSChargerSConns, internalChargerSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.ChargerS, err.Error())) @@ -533,7 +533,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.CdrsCfg().CDRSRaterConns, internalRaterChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to RAL: %s", err.Error())) exitChan <- true @@ -547,7 +547,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.CdrsCfg().CDRSAttributeSConns, internalAttributeSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to %s: %s", utils.AttributeS, err.Error())) @@ -562,7 +562,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.CdrsCfg().CDRSThresholdSConns, internalThresholdSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true @@ -576,7 +576,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.CdrsCfg().CDRSStatSConns, internalStatSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS: %s", err.Error())) exitChan <- true @@ -660,7 +660,7 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.ChargerSCfg().AttributeSConns, internalAttributeSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.ChargerS, utils.AttributeS, err.Error())) @@ -705,7 +705,7 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.ResourceSCfg().ThresholdSConns, internalThresholdSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true @@ -752,7 +752,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.StatSCfg().ThresholdSConns, internalThresholdSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS: %s", err.Error())) exitChan <- true @@ -830,7 +830,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SupplierSCfg().AttributeSConns, internalAttrSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.SupplierS, utils.AttributeS, err.Error())) @@ -845,7 +845,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SupplierSCfg().StatSConns, internalStatSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.SupplierS, err.Error())) @@ -918,7 +918,7 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.DispatcherSCfg().AttributeSConns, intAttrSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", utils.DispatcherS, utils.AttributeS, err.Error())) @@ -935,7 +935,7 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - haPoolCfg, nil, time.Duration(0)); err != nil { + haPoolCfg, nil, time.Duration(0), false); err != nil { utils.Logger.Crit( fmt.Sprintf("<%s> could not connect to connID: <%s>, err: <%s>", utils.DispatcherS, connID, err.Error())) @@ -1130,7 +1130,7 @@ func schedCDRsConns(internalCDRSChan chan rpcclient.RpcClientConnection, exitCha cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.SchedulerCfg().CDRsConns, internalCDRSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error())) exitChan <- true diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index a281aa334..edb772b97 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -66,7 +66,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.RalsCfg().RALsThresholdSConns, internalThdSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to ThresholdS, error: %s", err.Error())) exitChan <- true @@ -88,7 +88,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, cfg.RalsCfg().RALsStatSConns, internalStatSChan, - cfg.GeneralCfg().InternalTtl) + cfg.GeneralCfg().InternalTtl, false) if err != nil { utils.Logger.Crit(fmt.Sprintf(" Could not connect to StatS, error: %s", err.Error())) exitChan <- true diff --git a/data/conf/samples/filters/cgrates.json b/data/conf/samples/filters/cgrates.json index 4610844e9..f40a7859e 100644 --- a/data/conf/samples/filters/cgrates.json +++ b/data/conf/samples/filters/cgrates.json @@ -30,6 +30,9 @@ "rals": { "enabled": true, + "thresholds_conns": [ + {"address": "127.0.0.1:2012", "transport":"*json"}, + ], }, @@ -39,7 +42,8 @@ ], "resources_conns": [ {"address": "127.0.0.1:2012", "transport":"*json"}, - ], + ], + "indexed_selects":false, }, @@ -55,5 +59,10 @@ }, +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + } diff --git a/engine/filters.go b/engine/filters.go index 1074018f5..ea87fc081 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -65,13 +65,20 @@ const ( ) func NewFilterS(cfg *config.CGRConfig, - statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS { - return &FilterS{ + statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) { + fS = &FilterS{ statSChan: statSChan, resSChan: resSChan, dm: dm, cfg: cfg, } + if len(cfg.FilterSCfg().StatSConns) != 0 { + fS.connStatS() + } + if len(cfg.FilterSCfg().ResourceSConns) != 0 { + fS.connResourceS() + } + return } // FilterS is a service used to take decisions in case of filters @@ -96,7 +103,7 @@ func (fS *FilterS) connStatS() (err error) { fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().StatSConns, - fS.statSChan, fS.cfg.GeneralCfg().InternalTtl) + fS.statSChan, fS.cfg.GeneralCfg().InternalTtl, true) return } @@ -112,7 +119,7 @@ func (fS *FilterS) connResourceS() (err error) { fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns, - fS.resSChan, fS.cfg.GeneralCfg().InternalTtl) + fS.resSChan, fS.cfg.GeneralCfg().InternalTtl, true) return } @@ -138,7 +145,7 @@ func (fS *FilterS) Pass(tenant string, filterIDs []string, continue } for _, fltr := range f.Rules { - if pass, err = fltr.Pass(ev, fS.statSConns); err != nil || !pass { + if pass, err = fltr.Pass(ev, fS.statSConns, tenant); err != nil || !pass { return pass, err } } @@ -272,7 +279,8 @@ func (rf *FilterRule) CompileValues() (err error) { } // Pass is the method which should be used from outside. -func (fltr *FilterRule) Pass(dP config.DataProvider, rpcClnt rpcclient.RpcClientConnection) (result bool, err error) { +func (fltr *FilterRule) Pass(dP config.DataProvider, + rpcClnt rpcclient.RpcClientConnection, tenant string) (result bool, err error) { if fltr.negative == nil { fltr.negative = utils.BoolPointer(strings.HasPrefix(fltr.Type, MetaNot)) } @@ -295,7 +303,7 @@ func (fltr *FilterRule) Pass(dP config.DataProvider, rpcClnt rpcclient.RpcClient case MetaRSR, MetaNotRSR: result, err = fltr.passRSR(dP) case MetaStatS, MetaNotStatS: - result, err = fltr.passStatS(dP, rpcClnt) + result, err = fltr.passStatS(dP, rpcClnt, tenant) case MetaLessThan, MetaLessOrEqual, MetaGreaterThan, MetaGreaterOrEqual, MetaNotLessThan, MetaNotLessOrEqual, MetaNotGreaterThan, MetaNotGreaterOrEqual: result, err = fltr.passGreaterThan(dP) @@ -436,13 +444,13 @@ func (fltr *FilterRule) passRSR(dP config.DataProvider) (bool, error) { } func (fltr *FilterRule) passStatS(dP config.DataProvider, - stats rpcclient.RpcClientConnection) (bool, error) { + stats rpcclient.RpcClientConnection, tenant string) (bool, error) { if stats == nil || reflect.ValueOf(stats).IsNil() { return false, errors.New("Missing StatS information") } for _, threshold := range fltr.statSThresholds { statValues := make(map[string]float64) - if err := stats.Call("StatSV1.GetFloatMetrics", threshold.QueueID, &statValues); err != nil { + if err := stats.Call(utils.StatSv1GetQueueFloatMetrics, &utils.TenantID{Tenant: tenant, ID: threshold.QueueID}, &statValues); err != nil { return false, err } val, hasIt := statValues[utils.Meta+threshold.ThresholdType[len(MetaMinCapPrefix):]] diff --git a/engine/filters_test.go b/engine/filters_test.go index 5511e3a2a..4af98d21d 100644 --- a/engine/filters_test.go +++ b/engine/filters_test.go @@ -51,14 +51,14 @@ func TestFilterPassString(t *testing.T) { //not rf = &FilterRule{Type: "*notstring", FieldName: "Category", Values: []string{"call"}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if passes { t.Error("Filter passes") } rf = &FilterRule{Type: "*notstring", FieldName: "Category", Values: []string{"cal"}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if !passes { t.Error("Not passes filter") @@ -97,7 +97,7 @@ func TestFilterPassEmpty(t *testing.T) { } //not rf = &FilterRule{Type: "*notempty", FieldName: "Category", Values: []string{}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if passes { t.Error("Filter passes") @@ -136,7 +136,7 @@ func TestFilterPassExists(t *testing.T) { } //not rf = &FilterRule{Type: "*notexists", FieldName: "Category1", Values: []string{}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if !passes { t.Error("Not passes filter") @@ -192,7 +192,7 @@ func TestFilterPassStringPrefix(t *testing.T) { } //not rf = &FilterRule{Type: "*notprefix", FieldName: "Category", Values: []string{"premium"}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if !passes { t.Error("Not passes filter") @@ -248,7 +248,7 @@ func TestFilterPassStringSuffix(t *testing.T) { } //not rf = &FilterRule{Type: "*notsuffix", FieldName: "Destination", Values: []string{"963"}} - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if passes { t.Error("Passes filter") @@ -298,7 +298,7 @@ func TestFilterPassRSRFields(t *testing.T) { if err != nil { t.Error(err) } - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if passes { t.Error("Passing") @@ -341,7 +341,7 @@ func TestFilterPassDestinations(t *testing.T) { if err != nil { t.Error(err) } - if passes, err := rf.Pass(cd, nil); err != nil { + if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil { t.Error(err) } else if passes { t.Error("Passing") diff --git a/engine/libengine.go b/engine/libengine.go index 1c07c97e2..ca36b6e0b 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -30,7 +30,7 @@ import ( func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int, connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig, - internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) { + internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration, lazyConnect bool) (*rpcclient.RpcClientPool, error) { var rpcClient *rpcclient.RpcClient var err error rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy, replyTimeout) @@ -45,14 +45,14 @@ func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, co return nil, errors.New("TTL triggered") } rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts, - reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false) + reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect) } else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) { codec := utils.GOB if rpcConnCfg.Transport != "" { codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient } rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path, - connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false) + connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, lazyConnect) } else { return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport) } diff --git a/general_tests/filters_it_test.go b/general_tests/filters_it_test.go index aee82f1eb..6339bf2fa 100644 --- a/general_tests/filters_it_test.go +++ b/general_tests/filters_it_test.go @@ -24,6 +24,7 @@ import ( "net/rpc" "net/rpc/jsonrpc" "path" + "reflect" "testing" "time" @@ -46,7 +47,10 @@ var sTestsFltr = []func(t *testing.T){ testV1FltrResetStorDb, testV1FltrStartEngine, testV1FltrRpcConn, - + testV1FltrLoadTarrifPlans, + testV1FltrAddStats, + testV1FltrPupulateThreshold, + testV1FltrGetThresholdForEvent, testV1FltrStopEngine, } @@ -104,6 +108,265 @@ func testV1FltrLoadTarrifPlans(t *testing.T) { time.Sleep(500 * time.Millisecond) } +func testV1FltrAddStats(t *testing.T) { + var reply []string + expected := []string{"Stat_1"} + ev1 := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Account: "1001", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(11 * time.Second), + utils.COST: 10.0, + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_1"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event2", + Event: map[string]interface{}{ + utils.Account: "1001", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(11 * time.Second), + utils.COST: 10.5, + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_2"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event2", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(5 * time.Second), + utils.COST: 12.5, + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_2"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event2", + Event: map[string]interface{}{ + utils.Account: "1002", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(6 * time.Second), + utils.COST: 17.5, + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_3"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event3", + Event: map[string]interface{}{ + utils.Account: "1003", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(11 * time.Second), + utils.COST: 12.5, + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_1_1"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event3", + Event: map[string]interface{}{ + "Stat": "Stat1_1", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(11 * time.Second), + utils.COST: 12.5, + utils.PDD: time.Duration(12 * time.Second), + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } + + expected = []string{"Stat_1_1"} + ev1 = utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event3", + Event: map[string]interface{}{ + "Stat": "Stat1_1", + utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC), + utils.Usage: time.Duration(15 * time.Second), + utils.COST: 15.5, + utils.PDD: time.Duration(15 * time.Second), + }, + } + if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(reply, expected) { + t.Errorf("Expecting: %+v, received: %+v", expected, reply) + } +} + +func testV1FltrPupulateThreshold(t *testing.T) { + //Add a filter of type *stats and check if acd metric is minim 10 ( greater than 10) + //we expect that acd from Stat_1 to be 11 so the filter should pass (11 > 10) + filter := &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_TH_Stats1", + Rules: []*engine.FilterRule{ + { + Type: "*stats", + Values: []string{"Stat_1:*min_acd:10"}, + }, + }, + } + + var result string + if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + // Add a disable and log action + attrsAA := &utils.AttrSetActions{ActionsId: "LOG", Actions: []*utils.TPAction{ + {Identifier: engine.LOG}, + }} + if err := fltrRpc.Call("ApierV2.SetActions", attrsAA, &result); err != nil && err.Error() != utils.ErrExists.Error() { + t.Error("Got error on ApierV2.SetActions: ", err.Error()) + } else if result != utils.OK { + t.Errorf("Calling ApierV2.SetActions received: %s", result) + } + time.Sleep(10 * time.Millisecond) + + //Add a threshold with filter from above and an inline filter for Account 1010 + tPrfl := &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH_Stats1", + FilterIDs: []string{"FLTR_TH_Stats1", "*string:Account:1010"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(1 * time.Second), + Blocker: false, + Weight: 10.0, + ActionIDs: []string{"LOG"}, + Async: true, + } + if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + var rcvTh *engine.ThresholdProfile + if err := fltrRpc.Call("ApierV1.GetThresholdProfile", + &utils.TenantID{Tenant: tPrfl.Tenant, ID: tPrfl.ID}, &rcvTh); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(tPrfl, rcvTh) { + t.Errorf("Expecting: %+v, received: %+v", tPrfl, rcvTh) + } +} + +func testV1FltrGetThresholdForEvent(t *testing.T) { + // check the event + tEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Account: "1010"}, + } + var ids []string + eIDs := []string{"TH_Stats1"} + if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(ids, eIDs) { + t.Errorf("Expecting ids: %s, received: %s", eIDs, ids) + } +} + +func testV1FltrGetThresholdForEvent2(t *testing.T) { + //Add a filter of type *stats and check if acd metric is maximum 10 ( lower than 10) + //we expect that acd from Stat_1 to be 11 so the filter should not pass (11 > 10) + filter := &engine.Filter{ + Tenant: "cgrates.org", + ID: "FLTR_TH_Stats1", + Rules: []*engine.FilterRule{ + { + Type: "*stats", + Values: []string{"Stat_1:*max_acd:10"}, + }, + }, + } + + var result string + if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + //update the threshold with new filter + tPrfl := &engine.ThresholdProfile{ + Tenant: "cgrates.org", + ID: "TH_Stats1", + FilterIDs: []string{"FLTR_TH_Stats1", "*string:Account:1010"}, + ActivationInterval: &utils.ActivationInterval{ + ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC), + }, + MaxHits: -1, + MinSleep: time.Duration(1 * time.Second), + Blocker: false, + Weight: 10.0, + ActionIDs: []string{"LOG"}, + Async: true, + } + if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } + + tEv := utils.CGREvent{ + Tenant: "cgrates.org", + ID: "event1", + Event: map[string]interface{}{ + utils.Account: "1010"}, + } + var ids []string + if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err == nil || + err.Error() != utils.ErrNotFound.Error() { + t.Error(err) + } +} + func testV1FltrStopEngine(t *testing.T) { if err := engine.KillEngine(accDelay); err != nil { t.Error(err)