mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 15:18:44 +05:00
Make FilterS to use connections through connManager and update general_tests package
This commit is contained in:
@@ -19,87 +19,28 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func NewFilterS(cfg *config.CGRConfig,
|
||||
statSChan, resSChan, ralSChan chan rpcclient.ClientConnector, dm *DataManager) (fS *FilterS) {
|
||||
func NewFilterS(cfg *config.CGRConfig, connMgr *ConnManager, dm *DataManager) (fS *FilterS) {
|
||||
fS = &FilterS{
|
||||
dm: dm,
|
||||
cfg: cfg,
|
||||
}
|
||||
if len(cfg.FilterSCfg().StatSConns) != 0 {
|
||||
fS.connStatS(statSChan)
|
||||
}
|
||||
if len(cfg.FilterSCfg().ResourceSConns) != 0 {
|
||||
fS.connResourceS(resSChan)
|
||||
}
|
||||
if len(cfg.FilterSCfg().RALsConns) != 0 {
|
||||
fS.connRALs(ralSChan)
|
||||
dm: dm,
|
||||
cfg: cfg,
|
||||
connMgr: connMgr,
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// FilterS is a service used to take decisions in case of filters
|
||||
// uses lazy connections where necessary to avoid deadlocks on service startup
|
||||
type FilterS struct {
|
||||
cfg *config.CGRConfig
|
||||
statSConns, resSConns, ralSConns rpcclient.ClientConnector
|
||||
sSConnMux, rSConnMux, ralSConnMux sync.RWMutex // make sure only one goroutine attempts connecting
|
||||
dm *DataManager
|
||||
}
|
||||
|
||||
// connStatS returns will connect towards StatS
|
||||
func (fS *FilterS) connStatS(statSChan chan rpcclient.ClientConnector) (err error) {
|
||||
fS.sSConnMux.Lock()
|
||||
defer fS.sSConnMux.Unlock()
|
||||
if fS.statSConns != nil { // connection was populated between locks
|
||||
return
|
||||
}
|
||||
fS.statSConns, err = NewRPCPool(rpcclient.PoolFirst,
|
||||
fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate,
|
||||
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
|
||||
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
|
||||
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().StatSConns,
|
||||
statSChan, true)
|
||||
return
|
||||
}
|
||||
|
||||
// connResourceS returns will connect towards ResourceS
|
||||
func (fS *FilterS) connResourceS(resSChan chan rpcclient.ClientConnector) (err error) {
|
||||
fS.rSConnMux.Lock()
|
||||
defer fS.rSConnMux.Unlock()
|
||||
if fS.resSConns != nil { // connection was populated between locks
|
||||
return
|
||||
}
|
||||
fS.resSConns, err = NewRPCPool(rpcclient.PoolFirst,
|
||||
fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate,
|
||||
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
|
||||
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
|
||||
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().ResourceSConns,
|
||||
resSChan, true)
|
||||
return
|
||||
}
|
||||
|
||||
// connRALs returns will connect towards RALs
|
||||
func (fS *FilterS) connRALs(ralSChan chan rpcclient.ClientConnector) (err error) {
|
||||
fS.ralSConnMux.Lock()
|
||||
defer fS.ralSConnMux.Unlock()
|
||||
if fS.ralSConns != nil { // connection was populated between locks
|
||||
return
|
||||
}
|
||||
fS.ralSConns, err = NewRPCPool(rpcclient.PoolFirst,
|
||||
fS.cfg.TlsCfg().ClientKey, fS.cfg.TlsCfg().ClientCerificate,
|
||||
fS.cfg.TlsCfg().CaCertificate, fS.cfg.GeneralCfg().ConnectAttempts,
|
||||
fS.cfg.GeneralCfg().Reconnects, fS.cfg.GeneralCfg().ConnectTimeout,
|
||||
fS.cfg.GeneralCfg().ReplyTimeout, fS.cfg.FilterSCfg().RALsConns,
|
||||
ralSChan, true)
|
||||
return
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
connMgr *ConnManager
|
||||
}
|
||||
|
||||
// Pass will check all filters wihin filterIDs and require them passing for dataProvider
|
||||
@@ -497,7 +438,7 @@ func (fS *FilterS) getFieldNameDataProvider(initialDP config.DataProvider,
|
||||
return nil, fmt.Errorf("invalid fieldname <%s>", fieldName)
|
||||
}
|
||||
var account *Account
|
||||
if err = fS.ralSConns.Call(utils.ApierV2GetAccount,
|
||||
if err = fS.connMgr.Call(fS.cfg.FilterSCfg().RALsConns, nil, utils.ApierV2GetAccount,
|
||||
&utils.AttrGetAccount{Tenant: tenant, Account: splitFldName[1]}, &account); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -510,7 +451,7 @@ func (fS *FilterS) getFieldNameDataProvider(initialDP config.DataProvider,
|
||||
return nil, fmt.Errorf("invalid fieldname <%s>", fieldName)
|
||||
}
|
||||
var reply *Resource
|
||||
if err := fS.resSConns.Call(utils.ResourceSv1GetResource,
|
||||
if err := fS.connMgr.Call(fS.cfg.FilterSCfg().ResourceSConns, nil, utils.ResourceSv1GetResource,
|
||||
&utils.TenantID{Tenant: tenant, ID: splitFldName[1]}, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -523,7 +464,7 @@ func (fS *FilterS) getFieldNameDataProvider(initialDP config.DataProvider,
|
||||
}
|
||||
var statValues map[string]float64
|
||||
|
||||
if err := fS.statSConns.Call(utils.StatSv1GetQueueFloatMetrics,
|
||||
if err := fS.connMgr.Call(fS.cfg.FilterSCfg().StatSConns, nil, utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: splitFldName[1]}},
|
||||
&statValues); err != nil {
|
||||
return nil, err
|
||||
@@ -572,7 +513,7 @@ func (fS *FilterS) getFieldValueDataProvider(initialDP config.DataProvider,
|
||||
return nil, fmt.Errorf("invalid fieldname <%s>", fieldValue)
|
||||
}
|
||||
var account *Account
|
||||
if err = fS.ralSConns.Call(utils.ApierV2GetAccount,
|
||||
if err = fS.connMgr.Call(fS.cfg.FilterSCfg().RALsConns, nil, utils.ApierV2GetAccount,
|
||||
&utils.AttrGetAccount{Tenant: tenant, Account: splitFldName[1]}, &account); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -585,7 +526,7 @@ func (fS *FilterS) getFieldValueDataProvider(initialDP config.DataProvider,
|
||||
return nil, fmt.Errorf("invalid fieldname <%s>", fieldValue)
|
||||
}
|
||||
var reply *Resource
|
||||
if err := fS.resSConns.Call(utils.ResourceSv1GetResource,
|
||||
if err := fS.connMgr.Call(fS.cfg.FilterSCfg().ResourceSConns, nil, utils.ResourceSv1GetResource,
|
||||
&utils.TenantID{Tenant: tenant, ID: splitFldName[1]}, &reply); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -598,7 +539,7 @@ func (fS *FilterS) getFieldValueDataProvider(initialDP config.DataProvider,
|
||||
}
|
||||
var statValues map[string]float64
|
||||
|
||||
if err := fS.statSConns.Call(utils.StatSv1GetQueueFloatMetrics,
|
||||
if err := fS.connMgr.Call(fS.cfg.FilterSCfg().StatSConns, nil, utils.StatSv1GetQueueFloatMetrics,
|
||||
&utils.TenantIDWithArgDispatcher{TenantID: &utils.TenantID{Tenant: tenant, ID: splitFldName[1]}},
|
||||
&statValues); err != nil {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user