Add integration test for filterS (type *stats)

This commit is contained in:
TeoV
2019-02-20 15:15:45 +02:00
committed by Dan Christian Bogos
parent b3c24c1d78
commit 3252fe791b
9 changed files with 339 additions and 59 deletions

View File

@@ -40,7 +40,7 @@ func TestAgReqAsNavigableMap(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
@@ -136,7 +136,7 @@ func TestAgReqMaxCost(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CapMaxUsage}, "120s", false, false)
@@ -180,7 +180,7 @@ func TestAgReqParseFieldDiameter(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS)
@@ -230,7 +230,7 @@ func TestAgReqParseFieldRadius(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -270,7 +270,7 @@ Host: api.cgrates.org
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -341,7 +341,7 @@ func TestAgReqParseFieldHttpXml(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
//pass the data provider to agent request
agReq := newAgentRequest(dP, nil, nil, nil, "cgrates.org", "", filterS)
tplFlds := []*config.FCTemplate{
@@ -370,7 +370,7 @@ func TestAgReqEmptyFilter(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS)
// populate request, emulating the way will be done in HTTPAgent
agReq.CGRRequest.Set([]string{utils.CGRID},
@@ -413,7 +413,7 @@ func TestAgReqMetaExponent(t *testing.T) {
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
cfg, _ := config.NewDefaultCGRConfig()
filterS := engine.NewFilterS(cfg, nil, dm)
filterS := engine.NewFilterS(cfg, nil, nil, dm)
agReq := newAgentRequest(nil, nil, nil, nil, "cgrates.org", "", filterS)
agReq.CGRRequest.Set([]string{"Value"}, "2", false, false)
agReq.CGRRequest.Set([]string{"Exponent"}, "2", false, false)

View File

@@ -314,7 +314,7 @@ func TestXMLRPProcessWithNewFilters(t *testing.T) {
}
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft),
utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true,
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data)))
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data)))
if err != nil {
t.Error(err)
}
@@ -573,7 +573,7 @@ func TestXMLRPNestingSeparator(t *testing.T) {
}
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(xmlContent),
utils.HierarchyPath([]string{"File", "CDRs", "Call"}), "UTC", true,
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data)))
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, nil, engine.NewDataManager(data)))
if err != nil {
t.Error(err)
}

View File

@@ -128,7 +128,7 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl)
cdrcCfg.CdrsConns, internalCdrSChan, cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRC> Could not connect to CDRS via RPC: %s", err.Error()))
exitChan <- true
@@ -161,7 +161,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout,
cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().ChargerSConns,
internalChargerSChan, cfg.GeneralCfg().InternalTtl)
internalChargerSChan, cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SessionS, utils.ChargerS, err.Error()))
@@ -175,7 +175,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.TlsCfg().CaCertificate, cfg.GeneralCfg().ConnectAttempts,
cfg.GeneralCfg().Reconnects, cfg.GeneralCfg().ConnectTimeout,
cfg.GeneralCfg().ReplyTimeout, cfg.SessionSCfg().RALsConns,
internalRaterChan, cfg.GeneralCfg().InternalTtl)
internalRaterChan, cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s",
utils.SessionS, err.Error()))
@@ -190,7 +190,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().ResSConns, internalResourceSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s",
utils.SessionS, err.Error()))
@@ -205,7 +205,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().ThreshSConns, internalThresholdSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s",
utils.SessionS, err.Error()))
@@ -220,7 +220,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().StatSConns, internalStatSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SessionS, err.Error()))
@@ -235,7 +235,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().SupplSConns, internalSupplierSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s",
utils.SessionS, err.Error()))
@@ -250,7 +250,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().AttrSConns, internalAttrSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s",
utils.SessionS, err.Error()))
@@ -265,7 +265,7 @@ func startSessionS(internalSMGChan, internalRaterChan, internalResourceSChan, in
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SessionSCfg().CDRsConns, internalCDRSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s",
utils.SessionS, err.Error()))
@@ -362,7 +362,7 @@ func startDiameterAgent(internalSsChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DiameterAgentCfg().SessionSConns, internalSsChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DiameterAgent, utils.SessionS, err.Error()))
@@ -407,7 +407,7 @@ func startRadiusAgent(internalSMGChan chan rpcclient.RpcClientConnection, exitCh
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RadiusAgentCfg().SessionSConns, internalSMGChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RadiusAgent> Could not connect to SMG: %s", err.Error()))
exitChan <- true
@@ -485,7 +485,7 @@ func startHTTPAgent(internalSMGChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
agntCfg.SessionSConns, internalSMGChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> could not connect to %s, error: %s",
utils.HTTPAgent, utils.SessionS, err.Error()))
@@ -518,7 +518,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSChargerSConns, internalChargerSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.ChargerS, err.Error()))
@@ -533,7 +533,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSRaterConns, internalRaterChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to RAL: %s", err.Error()))
exitChan <- true
@@ -547,7 +547,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSAttributeSConns, internalAttributeSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to %s: %s",
utils.AttributeS, err.Error()))
@@ -562,7 +562,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSThresholdSConns, internalThresholdSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
@@ -576,7 +576,7 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.CdrsCfg().CDRSStatSConns, internalStatSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<CDRS> Could not connect to StatS: %s", err.Error()))
exitChan <- true
@@ -660,7 +660,7 @@ func startChargerService(internalChargerSChan chan rpcclient.RpcClientConnection
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ChargerSCfg().AttributeSConns, internalAttributeSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.ChargerS, utils.AttributeS, err.Error()))
@@ -705,7 +705,7 @@ func startResourceService(internalRsChan chan rpcclient.RpcClientConnection, cac
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.ResourceSCfg().ThresholdSConns, internalThresholdSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<ResourceS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
@@ -752,7 +752,7 @@ func startStatService(internalStatSChan chan rpcclient.RpcClientConnection, cach
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.StatSCfg().ThresholdSConns, internalThresholdSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<StatS> Could not connect to ThresholdS: %s", err.Error()))
exitChan <- true
@@ -830,7 +830,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SupplierSCfg().AttributeSConns, internalAttrSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.SupplierS, utils.AttributeS, err.Error()))
@@ -845,7 +845,7 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SupplierSCfg().StatSConns, internalStatSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s",
utils.SupplierS, err.Error()))
@@ -918,7 +918,7 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().AttributeSConns, intAttrSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DispatcherS, utils.AttributeS, err.Error()))
@@ -935,7 +935,7 @@ func startDispatcherService(internalDispatcherSChan chan *dispatchers.Dispatcher
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
haPoolCfg, nil, time.Duration(0)); err != nil {
haPoolCfg, nil, time.Duration(0), false); err != nil {
utils.Logger.Crit(
fmt.Sprintf("<%s> could not connect to connID: <%s>, err: <%s>",
utils.DispatcherS, connID, err.Error()))
@@ -1130,7 +1130,7 @@ func schedCDRsConns(internalCDRSChan chan rpcclient.RpcClientConnection, exitCha
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.SchedulerCfg().CDRsConns, internalCDRSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to CDRServer: %s", utils.SchedulerS, err.Error()))
exitChan <- true

View File

@@ -66,7 +66,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RalsCfg().RALsThresholdSConns, internalThdSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to ThresholdS, error: %s", err.Error()))
exitChan <- true
@@ -88,7 +88,7 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.RalsCfg().RALsStatSConns, internalStatSChan,
cfg.GeneralCfg().InternalTtl)
cfg.GeneralCfg().InternalTtl, false)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<RALs> Could not connect to StatS, error: %s", err.Error()))
exitChan <- true

View File

@@ -30,6 +30,9 @@
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
},
@@ -39,7 +42,8 @@
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
],
"indexed_selects":false,
},
@@ -55,5 +59,10 @@
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
}

View File

@@ -65,13 +65,20 @@ const (
)
func NewFilterS(cfg *config.CGRConfig,
statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS {
return &FilterS{
statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) (fS *FilterS) {
fS = &FilterS{
statSChan: statSChan,
resSChan: resSChan,
dm: dm,
cfg: cfg,
}
if len(cfg.FilterSCfg().StatSConns) != 0 {
fS.connStatS()
}
if len(cfg.FilterSCfg().ResourceSConns) != 0 {
fS.connResourceS()
}
return
}
// FilterS is a service used to take decisions in case of filters
@@ -96,7 +103,7 @@ func (fS *FilterS) connStatS() (err error) {
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().StatSConns,
fS.statSChan, fS.cfg.GeneralCfg().InternalTtl)
fS.statSChan, fS.cfg.GeneralCfg().InternalTtl, true)
return
}
@@ -112,7 +119,7 @@ func (fS *FilterS) connResourceS() (err error) {
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns,
fS.resSChan, fS.cfg.GeneralCfg().InternalTtl)
fS.resSChan, fS.cfg.GeneralCfg().InternalTtl, true)
return
}
@@ -138,7 +145,7 @@ func (fS *FilterS) Pass(tenant string, filterIDs []string,
continue
}
for _, fltr := range f.Rules {
if pass, err = fltr.Pass(ev, fS.statSConns); err != nil || !pass {
if pass, err = fltr.Pass(ev, fS.statSConns, tenant); err != nil || !pass {
return pass, err
}
}
@@ -272,7 +279,8 @@ func (rf *FilterRule) CompileValues() (err error) {
}
// Pass is the method which should be used from outside.
func (fltr *FilterRule) Pass(dP config.DataProvider, rpcClnt rpcclient.RpcClientConnection) (result bool, err error) {
func (fltr *FilterRule) Pass(dP config.DataProvider,
rpcClnt rpcclient.RpcClientConnection, tenant string) (result bool, err error) {
if fltr.negative == nil {
fltr.negative = utils.BoolPointer(strings.HasPrefix(fltr.Type, MetaNot))
}
@@ -295,7 +303,7 @@ func (fltr *FilterRule) Pass(dP config.DataProvider, rpcClnt rpcclient.RpcClient
case MetaRSR, MetaNotRSR:
result, err = fltr.passRSR(dP)
case MetaStatS, MetaNotStatS:
result, err = fltr.passStatS(dP, rpcClnt)
result, err = fltr.passStatS(dP, rpcClnt, tenant)
case MetaLessThan, MetaLessOrEqual, MetaGreaterThan, MetaGreaterOrEqual,
MetaNotLessThan, MetaNotLessOrEqual, MetaNotGreaterThan, MetaNotGreaterOrEqual:
result, err = fltr.passGreaterThan(dP)
@@ -436,13 +444,13 @@ func (fltr *FilterRule) passRSR(dP config.DataProvider) (bool, error) {
}
func (fltr *FilterRule) passStatS(dP config.DataProvider,
stats rpcclient.RpcClientConnection) (bool, error) {
stats rpcclient.RpcClientConnection, tenant string) (bool, error) {
if stats == nil || reflect.ValueOf(stats).IsNil() {
return false, errors.New("Missing StatS information")
}
for _, threshold := range fltr.statSThresholds {
statValues := make(map[string]float64)
if err := stats.Call("StatSV1.GetFloatMetrics", threshold.QueueID, &statValues); err != nil {
if err := stats.Call(utils.StatSv1GetQueueFloatMetrics, &utils.TenantID{Tenant: tenant, ID: threshold.QueueID}, &statValues); err != nil {
return false, err
}
val, hasIt := statValues[utils.Meta+threshold.ThresholdType[len(MetaMinCapPrefix):]]

View File

@@ -51,14 +51,14 @@ func TestFilterPassString(t *testing.T) {
//not
rf = &FilterRule{Type: "*notstring",
FieldName: "Category", Values: []string{"call"}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if passes {
t.Error("Filter passes")
}
rf = &FilterRule{Type: "*notstring",
FieldName: "Category", Values: []string{"cal"}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if !passes {
t.Error("Not passes filter")
@@ -97,7 +97,7 @@ func TestFilterPassEmpty(t *testing.T) {
}
//not
rf = &FilterRule{Type: "*notempty", FieldName: "Category", Values: []string{}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if passes {
t.Error("Filter passes")
@@ -136,7 +136,7 @@ func TestFilterPassExists(t *testing.T) {
}
//not
rf = &FilterRule{Type: "*notexists", FieldName: "Category1", Values: []string{}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if !passes {
t.Error("Not passes filter")
@@ -192,7 +192,7 @@ func TestFilterPassStringPrefix(t *testing.T) {
}
//not
rf = &FilterRule{Type: "*notprefix", FieldName: "Category", Values: []string{"premium"}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if !passes {
t.Error("Not passes filter")
@@ -248,7 +248,7 @@ func TestFilterPassStringSuffix(t *testing.T) {
}
//not
rf = &FilterRule{Type: "*notsuffix", FieldName: "Destination", Values: []string{"963"}}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if passes {
t.Error("Passes filter")
@@ -298,7 +298,7 @@ func TestFilterPassRSRFields(t *testing.T) {
if err != nil {
t.Error(err)
}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if passes {
t.Error("Passing")
@@ -341,7 +341,7 @@ func TestFilterPassDestinations(t *testing.T) {
if err != nil {
t.Error(err)
}
if passes, err := rf.Pass(cd, nil); err != nil {
if passes, err := rf.Pass(cd, nil, "cgrates.org"); err != nil {
t.Error(err)
} else if passes {
t.Error("Passing")

View File

@@ -30,7 +30,7 @@ import (
func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, connAttempts, reconnects int,
connectTimeout, replyTimeout time.Duration, rpcConnCfgs []*config.HaPoolConfig,
internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration) (*rpcclient.RpcClientPool, error) {
internalConnChan chan rpcclient.RpcClientConnection, ttl time.Duration, lazyConnect bool) (*rpcclient.RpcClientPool, error) {
var rpcClient *rpcclient.RpcClient
var err error
rpcPool := rpcclient.NewRpcClientPool(dispatchStrategy, replyTimeout)
@@ -45,14 +45,14 @@ func NewRPCPool(dispatchStrategy string, key_path, cert_path, ca_path string, co
return nil, errors.New("TTL triggered")
}
rpcClient, err = rpcclient.NewRpcClient("", "", rpcConnCfg.Tls, key_path, cert_path, ca_path, connAttempts,
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, false)
reconnects, connectTimeout, replyTimeout, rpcclient.INTERNAL_RPC, internalConn, lazyConnect)
} else if utils.IsSliceMember([]string{utils.MetaJSONrpc, utils.MetaGOBrpc, ""}, rpcConnCfg.Transport) {
codec := utils.GOB
if rpcConnCfg.Transport != "" {
codec = rpcConnCfg.Transport[1:] // Transport contains always * before codec understood by rpcclient
}
rpcClient, err = rpcclient.NewRpcClient("tcp", rpcConnCfg.Address, rpcConnCfg.Tls, key_path, cert_path, ca_path,
connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, false)
connAttempts, reconnects, connectTimeout, replyTimeout, codec, nil, lazyConnect)
} else {
return nil, fmt.Errorf("Unsupported transport: <%s>", rpcConnCfg.Transport)
}

View File

@@ -24,6 +24,7 @@ import (
"net/rpc"
"net/rpc/jsonrpc"
"path"
"reflect"
"testing"
"time"
@@ -46,7 +47,10 @@ var sTestsFltr = []func(t *testing.T){
testV1FltrResetStorDb,
testV1FltrStartEngine,
testV1FltrRpcConn,
testV1FltrLoadTarrifPlans,
testV1FltrAddStats,
testV1FltrPupulateThreshold,
testV1FltrGetThresholdForEvent,
testV1FltrStopEngine,
}
@@ -104,6 +108,265 @@ func testV1FltrLoadTarrifPlans(t *testing.T) {
time.Sleep(500 * time.Millisecond)
}
func testV1FltrAddStats(t *testing.T) {
var reply []string
expected := []string{"Stat_1"}
ev1 := utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
utils.Account: "1001",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(11 * time.Second),
utils.COST: 10.0,
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_1"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Event: map[string]interface{}{
utils.Account: "1001",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(11 * time.Second),
utils.COST: 10.5,
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_2"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Event: map[string]interface{}{
utils.Account: "1002",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(5 * time.Second),
utils.COST: 12.5,
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_2"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event2",
Event: map[string]interface{}{
utils.Account: "1002",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(6 * time.Second),
utils.COST: 17.5,
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_3"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event3",
Event: map[string]interface{}{
utils.Account: "1003",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(11 * time.Second),
utils.COST: 12.5,
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_1_1"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event3",
Event: map[string]interface{}{
"Stat": "Stat1_1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(11 * time.Second),
utils.COST: 12.5,
utils.PDD: time.Duration(12 * time.Second),
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
expected = []string{"Stat_1_1"}
ev1 = utils.CGREvent{
Tenant: "cgrates.org",
ID: "event3",
Event: map[string]interface{}{
"Stat": "Stat1_1",
utils.AnswerTime: time.Date(2014, 7, 14, 14, 25, 0, 0, time.UTC),
utils.Usage: time.Duration(15 * time.Second),
utils.COST: 15.5,
utils.PDD: time.Duration(15 * time.Second),
},
}
if err := fltrRpc.Call(utils.StatSv1ProcessEvent, &ev1, &reply); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(reply, expected) {
t.Errorf("Expecting: %+v, received: %+v", expected, reply)
}
}
func testV1FltrPupulateThreshold(t *testing.T) {
//Add a filter of type *stats and check if acd metric is minim 10 ( greater than 10)
//we expect that acd from Stat_1 to be 11 so the filter should pass (11 > 10)
filter := &engine.Filter{
Tenant: "cgrates.org",
ID: "FLTR_TH_Stats1",
Rules: []*engine.FilterRule{
{
Type: "*stats",
Values: []string{"Stat_1:*min_acd:10"},
},
},
}
var result string
if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
// Add a disable and log action
attrsAA := &utils.AttrSetActions{ActionsId: "LOG", Actions: []*utils.TPAction{
{Identifier: engine.LOG},
}}
if err := fltrRpc.Call("ApierV2.SetActions", attrsAA, &result); err != nil && err.Error() != utils.ErrExists.Error() {
t.Error("Got error on ApierV2.SetActions: ", err.Error())
} else if result != utils.OK {
t.Errorf("Calling ApierV2.SetActions received: %s", result)
}
time.Sleep(10 * time.Millisecond)
//Add a threshold with filter from above and an inline filter for Account 1010
tPrfl := &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH_Stats1",
FilterIDs: []string{"FLTR_TH_Stats1", "*string:Account:1010"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
},
MaxHits: -1,
MinSleep: time.Duration(1 * time.Second),
Blocker: false,
Weight: 10.0,
ActionIDs: []string{"LOG"},
Async: true,
}
if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
var rcvTh *engine.ThresholdProfile
if err := fltrRpc.Call("ApierV1.GetThresholdProfile",
&utils.TenantID{Tenant: tPrfl.Tenant, ID: tPrfl.ID}, &rcvTh); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(tPrfl, rcvTh) {
t.Errorf("Expecting: %+v, received: %+v", tPrfl, rcvTh)
}
}
func testV1FltrGetThresholdForEvent(t *testing.T) {
// check the event
tEv := utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
utils.Account: "1010"},
}
var ids []string
eIDs := []string{"TH_Stats1"}
if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(ids, eIDs) {
t.Errorf("Expecting ids: %s, received: %s", eIDs, ids)
}
}
func testV1FltrGetThresholdForEvent2(t *testing.T) {
//Add a filter of type *stats and check if acd metric is maximum 10 ( lower than 10)
//we expect that acd from Stat_1 to be 11 so the filter should not pass (11 > 10)
filter := &engine.Filter{
Tenant: "cgrates.org",
ID: "FLTR_TH_Stats1",
Rules: []*engine.FilterRule{
{
Type: "*stats",
Values: []string{"Stat_1:*max_acd:10"},
},
},
}
var result string
if err := fltrRpc.Call("ApierV1.SetFilter", filter, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
//update the threshold with new filter
tPrfl := &engine.ThresholdProfile{
Tenant: "cgrates.org",
ID: "TH_Stats1",
FilterIDs: []string{"FLTR_TH_Stats1", "*string:Account:1010"},
ActivationInterval: &utils.ActivationInterval{
ActivationTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
ExpiryTime: time.Date(2014, 7, 14, 14, 35, 0, 0, time.UTC),
},
MaxHits: -1,
MinSleep: time.Duration(1 * time.Second),
Blocker: false,
Weight: 10.0,
ActionIDs: []string{"LOG"},
Async: true,
}
if err := fltrRpc.Call("ApierV1.SetThresholdProfile", tPrfl, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
tEv := utils.CGREvent{
Tenant: "cgrates.org",
ID: "event1",
Event: map[string]interface{}{
utils.Account: "1010"},
}
var ids []string
if err := fltrRpc.Call(utils.ThresholdSv1ProcessEvent, tEv, &ids); err == nil ||
err.Error() != utils.ErrNotFound.Error() {
t.Error(err)
}
}
func testV1FltrStopEngine(t *testing.T) {
if err := engine.KillEngine(accDelay); err != nil {
t.Error(err)