diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 131f81750..a65858419 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -974,6 +974,22 @@ func startDispatcherService(internalDispatcherSChan, fltrS := <-filterSChan filterSChan <- fltrS var err error + var attrSConn *rpcclient.RpcClientPool + if len(cfg.DispatcherSCfg().AttributeSConns) != 0 { // AttributeS connection init + attrSConn, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + cfg.DispatcherSCfg().AttributeSConns, intAttrSChan, + cfg.GeneralCfg().InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to %s: %s", + utils.DispatcherS, utils.AttributeS, err.Error())) + exitChan <- true + return + } + } conns := make(map[string]*rpcclient.RpcClientPool) for connID, haPoolCfg := range cfg.DispatcherSCfg().Conns { var connPool *rpcclient.RpcClientPool @@ -993,7 +1009,7 @@ func startDispatcherService(internalDispatcherSChan, conns[connID] = connPool } - dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, conns) + dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, attrSConn, conns) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) exitChan <- true diff --git a/config/config_defaults.go b/config/config_defaults.go index 8c33c4ecb..eb21e9b9d 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -149,6 +149,7 @@ const CGRATES_CFG_JSON = ` "attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control attribute filter indexes caching "charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control charger filter indexes caching "dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher filter indexes caching + "dispatcher_routes": {"limit": -1, "ttl": "", "static_ttl": false}, // control dispatcher routes caching "diameter_messages": {"limit": -1, "ttl": "3h", "static_ttl": false}, // diameter messages caching }, diff --git a/config/config_json_test.go b/config/config_json_test.go index a85275cdf..e8219cc17 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -159,6 +159,8 @@ func TestCacheJsonCfg(t *testing.T) { Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheDispatcherFilterIndexes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, + utils.CacheDispatcherRoutes: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), + Ttl: utils.StringPointer(""), Static_ttl: utils.BoolPointer(false)}, utils.CacheDiameterMessages: &CacheParamJsonCfg{Limit: utils.IntPointer(-1), Ttl: utils.StringPointer("3h"), Static_ttl: utils.BoolPointer(false)}, } diff --git a/config/config_test.go b/config/config_test.go index 708387daa..5d2423bf5 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -727,6 +727,8 @@ func TestCgrCfgJSONDefaultsCacheCFG(t *testing.T) { TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheDispatcherFilterIndexes: &CacheParamCfg{Limit: -1, TTL: time.Duration(0), StaticTTL: false, Precache: false}, + utils.CacheDispatcherRoutes: &CacheParamCfg{Limit: -1, + TTL: time.Duration(0), StaticTTL: false, Precache: false}, utils.CacheDiameterMessages: &CacheParamCfg{Limit: -1, TTL: time.Duration(3 * time.Hour), StaticTTL: false}, } diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index b3a71797d..c8107b42e 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -30,9 +30,10 @@ import ( // NewDispatcherService constructs a DispatcherService func NewDispatcherService(dm *engine.DataManager, cfg *config.CGRConfig, fltrS *engine.FilterS, + attrS *rpcclient.RpcClientPool, conns map[string]*rpcclient.RpcClientPool) (*DispatcherService, error) { return &DispatcherService{dm: dm, cfg: cfg, - fltrS: fltrS, conns: conns}, nil + fltrS: fltrS, attrS: attrS, conns: conns}, nil } // DispatcherService is the service handling dispatching towards internal components @@ -41,6 +42,7 @@ type DispatcherService struct { dm *engine.DataManager cfg *config.CGRConfig fltrS *engine.FilterS + attrS *rpcclient.RpcClientPool // used for API auth conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID } @@ -143,6 +145,18 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, if errDsp != nil { return utils.NewErrDispatcherS(errDsp) } + var connID string + if ev.DispatcherRoute != nil && + *ev.DispatcherRoute != "" { + // use previously discovered route + if x, ok := engine.Cache.Get(utils.CacheDispatcherRoutes, + *ev.DispatcherRoute); ok && x != nil { + connID = x.(string) + if err = dS.conns[connID].Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { + return + } + } + } for i := 0; i < d.MaxConns(); i++ { connID := d.NextConnID() conn, has := dS.conns[connID] @@ -153,6 +167,11 @@ func (dS *DispatcherService) Dispatch(ev *utils.CGREvent, subsys string, if err = conn.Call(serviceMethod, args, reply); !utils.IsNetworkError(err) { break } + if ev.DispatcherRoute != nil && + *ev.DispatcherRoute != "" { // cache the discovered route + engine.Cache.Set(utils.CacheDispatcherRoutes, *ev.DispatcherRoute, connID, + nil, true, utils.EmptyString) + } } return } diff --git a/utils/cgrevent.go b/utils/cgrevent.go index 1c2aecb12..c1349b5ee 100644 --- a/utils/cgrevent.go +++ b/utils/cgrevent.go @@ -27,11 +27,12 @@ import ( // CGREvent is a generic event processed by CGR services type CGREvent struct { - Tenant string - ID string - Context *string // attach the event to a context - Time *time.Time // event time - Event map[string]interface{} + Tenant string + ID string + Context *string // attach the event to a context + Time *time.Time // event time + DispatcherRoute *string // route over previous computed path + Event map[string]interface{} } func (ev *CGREvent) HasField(fldName string) (has bool) { diff --git a/utils/consts.go b/utils/consts.go index 7b9268781..85951b485 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -934,6 +934,7 @@ const ( CacheChargerProfiles = "charger_profiles" CacheDispatcherProfiles = "dispatcher_profiles" CacheDispatchers = "dispatchers" + CacheDispatcherRoutes = "dispatcher_routes" CacheResourceFilterIndexes = "resource_filter_indexes" CacheStatFilterIndexes = "stat_filter_indexes" CacheThresholdFilterIndexes = "threshold_filter_indexes"