Start adding resource conn to filterS

This commit is contained in:
TeoV
2019-02-20 10:14:18 +02:00
committed by Dan Christian Bogos
parent dcd645bbe5
commit b3c24c1d78
11 changed files with 214 additions and 13 deletions

View File

@@ -880,10 +880,10 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti
// startFilterService fires up the FilterS
func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
internalStatSChan, internalResourceSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig,
dm *engine.DataManager, exitChan chan bool) {
<-cacheS.GetPrecacheChannel(utils.CacheFilters)
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm)
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, dm)
}
// loaderService will start and register APIs for LoaderService if enabled
@@ -1399,7 +1399,7 @@ func main() {
}
// Start FilterS
go startFilterService(filterSChan, cacheS, internalStatSChan, cfg, dm, exitChan)
go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, cfg, dm, exitChan)
if cfg.AttributeSCfg().Enabled {
go startAttributeService(internalAttributeSChan, cacheS,

View File

@@ -153,6 +153,7 @@ const CGRATES_CFG_JSON = `
"filters": { // Filters configuration (*new)
"stats_conns": [], // address where to reach the stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"resources_conns": [], // address where to reach the resource service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234>
"indexed_selects":true, // enable profile matching exclusively on indexes
},

View File

@@ -754,6 +754,7 @@ func TestDfChargerServJsonCfg(t *testing.T) {
func TestDfFilterSJsonCfg(t *testing.T) {
eCfg := &FilterSJsonCfg{
Stats_conns: &[]*HaPoolJsonCfg{},
Resources_conns: &[]*HaPoolJsonCfg{},
Indexed_selects: utils.BoolPointer(true),
}
if cfg, err := dfCgrJsonCfg.FilterSJsonCfg(); err != nil {

View File

@@ -774,6 +774,7 @@ func TestCgrCfgJSONDefaultssteriskAgentCfg(t *testing.T) {
func TestCgrCfgJSONDefaultFiltersCfg(t *testing.T) {
eFiltersCfg := &FilterSCfg{
StatSConns: []*HaPoolConfig{},
ResourceSConns: []*HaPoolConfig{},
IndexedSelects: true,
}
if !reflect.DeepEqual(cgrCfg.filterSCfg, eFiltersCfg) {

View File

@@ -20,6 +20,7 @@ package config
type FilterSCfg struct {
StatSConns []*HaPoolConfig
ResourceSConns []*HaPoolConfig
IndexedSelects bool
}
@@ -34,6 +35,13 @@ func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) {
fSCfg.StatSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Resources_conns != nil {
fSCfg.ResourceSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns))
for idx, jsnHaCfg := range *jsnCfg.Resources_conns {
fSCfg.ResourceSConns[idx] = NewDfltHaPoolConfig()
fSCfg.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Indexed_selects != nil {
fSCfg.IndexedSelects = *jsnCfg.Indexed_selects
}

View File

@@ -94,6 +94,7 @@ type DbJsonCfg struct {
type FilterSJsonCfg struct {
Stats_conns *[]*HaPoolJsonCfg
Indexed_selects *bool
Resources_conns *[]*HaPoolJsonCfg
}
// Rater config section

View File

@@ -0,0 +1,59 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
"node_id": "CGRFilterS",
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"filters": {
"stats_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
"resources_conns": [
{"address": "127.0.0.1:2012", "transport":"*json"},
],
},
"resources": {
"enabled": true,
"store_interval": "1s",
},
"stats": {
"enabled": true,
"store_interval": "1s",
},
}

View File

@@ -22,6 +22,6 @@ cgrates.org,FLTR_QOS_SP1,*gte,*gs.*acd,10.0,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP2,*gte,*gs.*acd,10.0,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP2,*gte,*gs.*tcd,11.0,
cgrates.org,FLTR_SPP_QOS_2,*string,DistinctMatch,*qos_filtred2,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP1_2,*gte,*sd.Cost,0.1,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP2_2,*gte,*sd.Cost,0.2,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP1_2,*gte,*vars.Cost,0.1,2014-07-29T15:00:00Z
cgrates.org,FLTR_QOS_SP2_2,*gte,*vars.Cost,0.2,2014-07-29T15:00:00Z
cgrates.org,FLTR_TEST,*string,Subject,TEST,2014-07-29T15:00:00Z
1 #Tenant[0] ID[1] FilterType[2] FilterFieldName[3] FilterFieldValues[4] ActivationInterval[5]
22 cgrates.org FLTR_QOS_SP2 *gte *gs.*acd 10.0 2014-07-29T15:00:00Z
23 cgrates.org FLTR_QOS_SP2 *gte *gs.*tcd 11.0
24 cgrates.org FLTR_SPP_QOS_2 *string DistinctMatch *qos_filtred2 2014-07-29T15:00:00Z
25 cgrates.org FLTR_QOS_SP1_2 *gte *sd.Cost *vars.Cost 0.1 2014-07-29T15:00:00Z
26 cgrates.org FLTR_QOS_SP2_2 *gte *sd.Cost *vars.Cost 0.2 2014-07-29T15:00:00Z
27 cgrates.org FLTR_TEST *string Subject TEST 2014-07-29T15:00:00Z

View File

@@ -65,9 +65,10 @@ const (
)
func NewFilterS(cfg *config.CGRConfig,
statSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS {
statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS {
return &FilterS{
statSChan: statSChan,
resSChan: resSChan,
dm: dm,
cfg: cfg,
}
@@ -76,11 +77,11 @@ func NewFilterS(cfg *config.CGRConfig,
// FilterS is a service used to take decisions in case of filters
// uses lazy connections where necessary to avoid deadlocks on service startup
type FilterS struct {
cfg *config.CGRConfig
statSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect
statSConns rpcclient.RpcClientConnection
sSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
dm *DataManager
cfg *config.CGRConfig
statSChan, resSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect
statSConns, resSConns rpcclient.RpcClientConnection
sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
dm *DataManager
}
// connStatS returns will connect towards StatS
@@ -99,6 +100,22 @@ func (fS *FilterS) connStatS() (err error) {
return
}
// connResourceS returns will connect towards ResourceS
func (fS *FilterS) connResourceS() (err error) {
fS.rSConnMux.Lock()
defer fS.rSConnMux.Unlock()
if fS.resSConns != nil { // connection was populated between locks
return
}
fS.resSConns, err = NewRPCPool(rpcclient.POOL_FIRST,
fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate,
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns,
fS.resSChan, fS.cfg.GeneralCfg().InternalTtl)
return
}
// Pass will check all filters wihin filterIDs and require them passing for dataProvider
// there should be at least one filter passing, ie: if filters are not active event will fail to pass
// receives the event as DataProvider so we can accept undecoded data (ie: HttpRequest)

View File

@@ -360,11 +360,13 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie
}
sortedSpl.globalStats = globalStats
}
//reas
//reds
//filter the supplier
if len(spl.FilterIDs) != 0 {
nM := config.NewNavigableMap(nil)
nM.Set([]string{"*req"}, ev.Event, false, false)
nM.Set([]string{"*sd"}, sortedSpl.SortingData, false, false)
nM.Set([]string{utils.MetaReq}, ev.Event, false, false)
nM.Set([]string{utils.MetaVars}, sortedSpl.SortingData, false, false)
nM.Set([]string{"*gs"}, metricForFilter, false, false)
if pass, err = spS.filterS.Pass(ev.Tenant, spl.FilterIDs,
nM); err != nil {

View File

@@ -0,0 +1,111 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package general_tests
import (
"net/rpc"
"net/rpc/jsonrpc"
"path"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
fltrCfgPath string
fltrCfg *config.CGRConfig
fltrRpc *rpc.Client
fltrConfDIR string //run tests for specific configuration
fltrDelay int
)
var sTestsFltr = []func(t *testing.T){
testV1FltrLoadConfig,
testV1FltrInitDataDb,
testV1FltrResetStorDb,
testV1FltrStartEngine,
testV1FltrRpcConn,
testV1FltrStopEngine,
}
// Test start here
func TestFltrIT(t *testing.T) {
fltrConfDIR = "filters"
for _, stest := range sTestsFltr {
t.Run(fltrConfDIR, stest)
}
}
func testV1FltrLoadConfig(t *testing.T) {
var err error
fltrCfgPath = path.Join(*dataDir, "conf", "samples", fltrConfDIR)
if fltrCfg, err = config.NewCGRConfigFromFolder(fltrCfgPath); err != nil {
t.Error(err)
}
fltrDelay = 1000
}
func testV1FltrInitDataDb(t *testing.T) {
if err := engine.InitDataDb(fltrCfg); err != nil {
t.Fatal(err)
}
}
func testV1FltrResetStorDb(t *testing.T) {
if err := engine.InitStorDb(fltrCfg); err != nil {
t.Fatal(err)
}
}
func testV1FltrStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(fltrCfgPath, fltrDelay); err != nil {
t.Fatal(err)
}
}
func testV1FltrRpcConn(t *testing.T) {
var err error
fltrRpc, err = jsonrpc.Dial("tcp", fltrCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal("Could not connect to rater: ", err.Error())
}
}
func testV1FltrLoadTarrifPlans(t *testing.T) {
var reply string
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")}
if err := fltrRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error("Unexpected reply returned", reply)
}
time.Sleep(500 * time.Millisecond)
}
func testV1FltrStopEngine(t *testing.T) {
if err := engine.KillEngine(accDelay); err != nil {
t.Error(err)
}
}