DispatcherS with dedicated connection towards AttributeS for API authorization, routes caching via DispatcherRoute API key

This commit is contained in:
DanB
2019-02-02 13:57:21 +01:00
parent 1da4a8d462
commit d38d1d17fd
7 changed files with 49 additions and 7 deletions

View File

@@ -974,6 +974,22 @@ func startDispatcherService(internalDispatcherSChan,
fltrS := <-filterSChan
filterSChan <- fltrS
var err error
var attrSConn *rpcclient.RpcClientPool
if len(cfg.DispatcherSCfg().AttributeSConns) != 0 { // AttributeS connection init
attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.TlsCfg().ClientKey,
cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate,
cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects,
cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout,
cfg.DispatcherSCfg().AttributeSConns, intAttrSChan,
cfg.GeneralCfg().InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s",
utils.DispatcherS, utils.AttributeS, err.Error()))
exitChan <- true
return
}
}
conns := make(map[string]*rpcclient.RpcClientPool)
for connID, haPoolCfg := range cfg.DispatcherSCfg().Conns {
var connPool *rpcclient.RpcClientPool
@@ -993,7 +1009,7 @@ func startDispatcherService(internalDispatcherSChan,
conns[connID] = connPool
}
dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, conns)
dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, attrSConn, conns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true

View File

@@ -149,6 +149,7 @@ const CGRATES_CFG_JSON = `
"attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching
"charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching
"dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching
"dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher routes caching
"diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching
},

View File

@@ -159,6 +159,8 @@ func TestCacheJsonCfg(t *testing.T) {
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDispatcherFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDispatcherRoutes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)},
utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1),
Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false)},
}

View File

@@ -727,6 +727,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) {
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherFilterIndexes: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDispatcherRoutes: &CacheParamCfg{Limit: -1,
TTL: time.Duration(0), StaticTTL: false, Precache: false},
utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1,
TTL: time.Duration(3 * time.Hour), StaticTTL: false},
}

View File

@@ -30,9 +30,10 @@ import (
// NewDispatcherService constructs a DispatcherService
func NewDispatcherService(dm *engine.DataManager,
cfg *config.CGRConfig, fltrS *engine.FilterS,
attrS *rpcclient.RpcClientPool,
conns map[string]*rpcclient.RpcClientPool) (*DispatcherService, error) {
return &DispatcherService{dm: dm, cfg: cfg,
fltrS: fltrS, conns: conns}, nil
fltrS: fltrS, attrS: attrS, conns: conns}, nil
}
// DispatcherService is the service handling dispatching towards internal components
@@ -41,6 +42,7 @@ type DispatcherService struct {
dm *engine.DataManager
cfg *config.CGRConfig
fltrS *engine.FilterS
attrS *rpcclient.RpcClientPool // used for API auth
conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID
}
@@ -143,6 +145,18 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
if errDsp != nil {
return utils.NewErrDispatcherS(errDsp)
}
var connID string
if ev.DispatcherRoute != nil &&
*ev.DispatcherRoute != "" {
// use previously discovered route
if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes,
*ev.DispatcherRoute); ok && x != nil {
connID = x.(string)
if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
return
}
}
}
for i := 0; i < d.MaxConns(); i++ {
connID := d.NextConnID()
conn, has := dS.conns[connID]
@@ -153,6 +167,11 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string,
if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) {
break
}
if ev.DispatcherRoute != nil &&
*ev.DispatcherRoute != "" { // cache the discovered route
engine.Cache.Set(utils.CacheDispatcherRoutes, *ev.DispatcherRoute, connID,
nil, true, utils.EmptyString)
}
}
return
}

View File

@@ -27,11 +27,12 @@ import (
// CGREvent is a generic event processed by CGR services
type CGREvent struct {
Tenant string
ID string
Context *string // attach the event to a context
Time *time.Time // event time
Event map[string]interface{}
Tenant string
ID string
Context *string // attach the event to a context
Time *time.Time // event time
DispatcherRoute *string // route over previous computed path
Event map[string]interface{}
}
func (ev *CGREvent) HasField(fldName string) (has bool) {

View File

@@ -934,6 +934,7 @@ const (
CacheChargerProfiles = "charger_profiles"
CacheDispatcherProfiles = "dispatcher_profiles"
CacheDispatchers = "dispatchers"
CacheDispatcherRoutes = "dispatcher_routes"
CacheResourceFilterIndexes = "resource_filter_indexes"
CacheStatFilterIndexes = "stat_filter_indexes"
CacheThresholdFilterIndexes = "threshold_filter_indexes"