From b3c24c1d78130e0d022aa2e5825e786339b97336 Mon Sep 17 00:00:00 2001 From: TeoV Date: Wed, 20 Feb 2019 10:14:18 +0200 Subject: [PATCH] Start adding resource conn to filterS --- cmd/cgr-engine/cgr-engine.go | 6 +- config/config_defaults.go | 1 + config/config_json_test.go | 1 + config/config_test.go | 1 + config/filterscfg.go | 8 ++ config/libconfig_json.go | 1 + data/conf/samples/filters/cgrates.json | 59 +++++++++++++ data/tariffplans/testit/Filters.csv | 4 +- engine/filters.go | 29 +++++-- engine/suppliers.go | 6 +- general_tests/filters_it_test.go | 111 +++++++++++++++++++++++++ 11 files changed, 214 insertions(+), 13 deletions(-) create mode 100644 data/conf/samples/filters/cgrates.json create mode 100644 general_tests/filters_it_test.go diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 758adaabd..b31a94e66 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -880,10 +880,10 @@ func startSupplierService(internalSupplierSChan chan rpcclient.RpcClientConnecti // startFilterService fires up the FilterS func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, - internalStatSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, + internalStatSChan, internalResourceSChan chan rpcclient.RpcClientConnection, cfg *config.CGRConfig, dm *engine.DataManager, exitChan chan bool) { <-cacheS.GetPrecacheChannel(utils.CacheFilters) - filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm) + filterSChan <- engine.NewFilterS(cfg, internalStatSChan, internalResourceSChan, dm) } // loaderService will start and register APIs for LoaderService if enabled @@ -1399,7 +1399,7 @@ func main() { } // Start FilterS - go startFilterService(filterSChan, cacheS, internalStatSChan, cfg, dm, exitChan) + go startFilterService(filterSChan, cacheS, internalStatSChan, internalRsChan, cfg, dm, exitChan) if cfg.AttributeSCfg().Enabled { go startAttributeService(internalAttributeSChan, cacheS, diff --git a/config/config_defaults.go b/config/config_defaults.go index f4e61ec1d..fda1b2ce2 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -153,6 +153,7 @@ const CGRATES_CFG_JSON = ` "filters": { // Filters configuration (*new) "stats_conns": [], // address where to reach the stat service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> + "resources_conns": [], // address where to reach the resource service, empty to disable stats functionality: <""|*internal|x.y.z.y:1234> "indexed_selects":true, // enable profile matching exclusively on indexes }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 961020489..4254cdc4d 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -754,6 +754,7 @@ func TestDfChargerServJsonCfg(t *testing.T) { func TestDfFilterSJsonCfg(t *testing.T) { eCfg := &FilterSJsonCfg{ Stats_conns: &[]*HaPoolJsonCfg{}, + Resources_conns: &[]*HaPoolJsonCfg{}, Indexed_selects: utils.BoolPointer(true), } if cfg, err := dfCgrJsonCfg.FilterSJsonCfg(); err != nil { diff --git a/config/config_test.go b/config/config_test.go index 4920dd099..47b4781e6 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -774,6 +774,7 @@ func TestCgrCfgJSONDefaultssteriskAgentCfg(t *testing.T) { func TestCgrCfgJSONDefaultFiltersCfg(t *testing.T) { eFiltersCfg := &FilterSCfg{ StatSConns: []*HaPoolConfig{}, + ResourceSConns: []*HaPoolConfig{}, IndexedSelects: true, } if !reflect.DeepEqual(cgrCfg.filterSCfg, eFiltersCfg) { diff --git a/config/filterscfg.go b/config/filterscfg.go index 50a602676..c34e614a2 100644 --- a/config/filterscfg.go +++ b/config/filterscfg.go @@ -20,6 +20,7 @@ package config type FilterSCfg struct { StatSConns []*HaPoolConfig + ResourceSConns []*HaPoolConfig IndexedSelects bool } @@ -34,6 +35,13 @@ func (fSCfg *FilterSCfg) loadFromJsonCfg(jsnCfg *FilterSJsonCfg) (err error) { fSCfg.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) } } + if jsnCfg.Resources_conns != nil { + fSCfg.ResourceSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns)) + for idx, jsnHaCfg := range *jsnCfg.Resources_conns { + fSCfg.ResourceSConns[idx] = NewDfltHaPoolConfig() + fSCfg.ResourceSConns[idx].loadFromJsonCfg(jsnHaCfg) + } + } if jsnCfg.Indexed_selects != nil { fSCfg.IndexedSelects = *jsnCfg.Indexed_selects } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 6f0575029..3364db6d5 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -94,6 +94,7 @@ type DbJsonCfg struct { type FilterSJsonCfg struct { Stats_conns *[]*HaPoolJsonCfg Indexed_selects *bool + Resources_conns *[]*HaPoolJsonCfg } // Rater config section diff --git a/data/conf/samples/filters/cgrates.json b/data/conf/samples/filters/cgrates.json new file mode 100644 index 000000000..4610844e9 --- /dev/null +++ b/data/conf/samples/filters/cgrates.json @@ -0,0 +1,59 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, + "node_id": "CGRFilterS", +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 6379, // data_db port to reach the database + "db_name": "10", // data_db database name to connect to +}, + + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"filters": { + "stats_conns": [ + {"address": "127.0.0.1:2012", "transport":"*json"}, + ], + "resources_conns": [ + {"address": "127.0.0.1:2012", "transport":"*json"}, + ], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", +}, + + + +} diff --git a/data/tariffplans/testit/Filters.csv b/data/tariffplans/testit/Filters.csv index eccdeef98..cc40ffb9d 100644 --- a/data/tariffplans/testit/Filters.csv +++ b/data/tariffplans/testit/Filters.csv @@ -22,6 +22,6 @@ cgrates.org,FLTR_QOS_SP1,*gte,*gs.*acd,10.0,2014-07-29T15:00:00Z cgrates.org,FLTR_QOS_SP2,*gte,*gs.*acd,10.0,2014-07-29T15:00:00Z cgrates.org,FLTR_QOS_SP2,*gte,*gs.*tcd,11.0, cgrates.org,FLTR_SPP_QOS_2,*string,DistinctMatch,*qos_filtred2,2014-07-29T15:00:00Z -cgrates.org,FLTR_QOS_SP1_2,*gte,*sd.Cost,0.1,2014-07-29T15:00:00Z -cgrates.org,FLTR_QOS_SP2_2,*gte,*sd.Cost,0.2,2014-07-29T15:00:00Z +cgrates.org,FLTR_QOS_SP1_2,*gte,*vars.Cost,0.1,2014-07-29T15:00:00Z +cgrates.org,FLTR_QOS_SP2_2,*gte,*vars.Cost,0.2,2014-07-29T15:00:00Z cgrates.org,FLTR_TEST,*string,Subject,TEST,2014-07-29T15:00:00Z \ No newline at end of file diff --git a/engine/filters.go b/engine/filters.go index eb7376174..1074018f5 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -65,9 +65,10 @@ const ( ) func NewFilterS(cfg *config.CGRConfig, - statSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS { + statSChan, resSChan chan rpcclient.RpcClientConnection, dm *DataManager) *FilterS { return &FilterS{ statSChan: statSChan, + resSChan: resSChan, dm: dm, cfg: cfg, } @@ -76,11 +77,11 @@ func NewFilterS(cfg *config.CGRConfig, // FilterS is a service used to take decisions in case of filters // uses lazy connections where necessary to avoid deadlocks on service startup type FilterS struct { - cfg *config.CGRConfig - statSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect - statSConns rpcclient.RpcClientConnection - sSConnMux sync.RWMutex // make sure only one goroutine attempts connecting - dm *DataManager + cfg *config.CGRConfig + statSChan, resSChan chan rpcclient.RpcClientConnection // reference towards internal statS connection, used for lazy connect + statSConns, resSConns rpcclient.RpcClientConnection + sSConnMux, rSConnMux sync.RWMutex // make sure only one goroutine attempts connecting + dm *DataManager } // connStatS returns will connect towards StatS @@ -99,6 +100,22 @@ func (fS *FilterS) connStatS() (err error) { return } +// connResourceS returns will connect towards ResourceS +func (fS *FilterS) connResourceS() (err error) { + fS.rSConnMux.Lock() + defer fS.rSConnMux.Unlock() + if fS.resSConns != nil { // connection was populated between locks + return + } + fS.resSConns, err = NewRPCPool(rpcclient.POOL_FIRST, + fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate, + fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts, + fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout, + fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns, + fS.resSChan, fS.cfg.GeneralCfg().InternalTtl) + return +} + // Pass will check all filters wihin filterIDs and require them passing for dataProvider // there should be at least one filter passing, ie: if filters are not active event will fail to pass // receives the event as DataProvider so we can accept undecoded data (ie: HttpRequest) diff --git a/engine/suppliers.go b/engine/suppliers.go index 1396d2933..59bb4011e 100644 --- a/engine/suppliers.go +++ b/engine/suppliers.go @@ -360,11 +360,13 @@ func (spS *SupplierService) populateSortingData(ev *utils.CGREvent, spl *Supplie } sortedSpl.globalStats = globalStats } + //reas + //reds //filter the supplier if len(spl.FilterIDs) != 0 { nM := config.NewNavigableMap(nil) - nM.Set([]string{"*req"}, ev.Event, false, false) - nM.Set([]string{"*sd"}, sortedSpl.SortingData, false, false) + nM.Set([]string{utils.MetaReq}, ev.Event, false, false) + nM.Set([]string{utils.MetaVars}, sortedSpl.SortingData, false, false) nM.Set([]string{"*gs"}, metricForFilter, false, false) if pass, err = spS.filterS.Pass(ev.Tenant, spl.FilterIDs, nM); err != nil { diff --git a/general_tests/filters_it_test.go b/general_tests/filters_it_test.go new file mode 100644 index 000000000..aee82f1eb --- /dev/null +++ b/general_tests/filters_it_test.go @@ -0,0 +1,111 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "net/rpc" + "net/rpc/jsonrpc" + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + fltrCfgPath string + fltrCfg *config.CGRConfig + fltrRpc *rpc.Client + fltrConfDIR string //run tests for specific configuration + fltrDelay int +) + +var sTestsFltr = []func(t *testing.T){ + testV1FltrLoadConfig, + testV1FltrInitDataDb, + testV1FltrResetStorDb, + testV1FltrStartEngine, + testV1FltrRpcConn, + + testV1FltrStopEngine, +} + +// Test start here +func TestFltrIT(t *testing.T) { + fltrConfDIR = "filters" + for _, stest := range sTestsFltr { + t.Run(fltrConfDIR, stest) + } +} + +func testV1FltrLoadConfig(t *testing.T) { + var err error + fltrCfgPath = path.Join(*dataDir, "conf", "samples", fltrConfDIR) + if fltrCfg, err = config.NewCGRConfigFromFolder(fltrCfgPath); err != nil { + t.Error(err) + } + fltrDelay = 1000 +} + +func testV1FltrInitDataDb(t *testing.T) { + if err := engine.InitDataDb(fltrCfg); err != nil { + t.Fatal(err) + } +} + +func testV1FltrResetStorDb(t *testing.T) { + if err := engine.InitStorDb(fltrCfg); err != nil { + t.Fatal(err) + } +} + +func testV1FltrStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(fltrCfgPath, fltrDelay); err != nil { + t.Fatal(err) + } +} + +func testV1FltrRpcConn(t *testing.T) { + var err error + fltrRpc, err = jsonrpc.Dial("tcp", fltrCfg.ListenCfg().RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal("Could not connect to rater: ", err.Error()) + } +} + +func testV1FltrLoadTarrifPlans(t *testing.T) { + var reply string + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "testit")} + if err := fltrRpc.Call("ApierV1.LoadTariffPlanFromFolder", attrs, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Error("Unexpected reply returned", reply) + } + time.Sleep(500 * time.Millisecond) +} + +func testV1FltrStopEngine(t *testing.T) { + if err := engine.KillEngine(accDelay); err != nil { + t.Error(err) + } +}