From 347050a5b2770840e9466bcdbb838430e7cb0eb1 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 1 Feb 2019 13:17:41 +0100 Subject: [PATCH] DispatcherS connections initialization, string and prefix indexes configuration, cleanup old dispatcherCfg --- cmd/cgr-engine/cgr-engine.go | 51 +++++++++-------- config/config.go | 32 ++++------- config/config_defaults.go | 16 +----- config/config_json.go | 12 ---- config/config_json_test.go | 24 +------- config/config_test.go | 22 +------- config/dispatchercfg.go | 103 ++++++----------------------------- config/libconfig_json.go | 20 ++----- dispatchers/dispatchers.go | 27 +++++---- 9 files changed, 85 insertions(+), 222 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 1042d67d1..e27a3e0c7 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -965,31 +965,35 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig, } // startDispatcherService fires up the DispatcherS -func startDispatcherService(internalDispatcherSChan, internalRaterChan chan rpcclient.RpcClientConnection, - cacheS *engine.CacheS, dm *engine.DataManager, - server *utils.Server, exitChan chan bool) { +func startDispatcherService(internalDispatcherSChan, + intAttrSChan chan rpcclient.RpcClientConnection, + cfg *config.CGRConfig, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + dm *engine.DataManager, server *utils.Server, exitChan chan bool) { utils.Logger.Info("Starting CGRateS Dispatcher service.") + fltrS := <-filterSChan + filterSChan <- fltrS var err error - //var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns, chargerSConns *rpcclient.RpcClientPool - - /* - if len(cfg.DispatcherSCfg().RALsConns) != 0 { - ralsConns, err = engine.NewRPCPool(cfg.DispatcherSCfg().DispatchingStrategy, - cfg.TlsCfg().ClientKey, - cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, - cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, - cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, - cfg.DispatcherSCfg().RALsConns, internalRaterChan, - cfg.GeneralCfg().InternalTtl) - if err != nil { - utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error())) - exitChan <- true - return - } + conns := make(map[string]*rpcclient.RpcClientPool) + for connID, haPoolCfg := range cfg.DispatcherSCfg().Conns { + var connPool *rpcclient.RpcClientPool + if connPool, err = engine.NewRPCPool( + rpcclient.POOL_FIRST, + cfg.TlsCfg().ClientKey, + cfg.TlsCfg().ClientCerificate, cfg.TlsCfg().CaCertificate, + cfg.GeneralCfg().ConnectAttempts, cfg.GeneralCfg().Reconnects, + cfg.GeneralCfg().ConnectTimeout, cfg.GeneralCfg().ReplyTimeout, + haPoolCfg, nil, time.Duration(0)); err != nil { + utils.Logger.Crit( + fmt.Sprintf("<%s> could not connect to connID: <%s>, err: <%s>", + utils.DispatcherS, err.Error())) + exitChan <- true + return } - */ + conns[connID] = connPool + } - dspS, err := dispatchers.NewDispatcherService(dm, cfg) + dspS, err := dispatchers.NewDispatcherService(dm, cfg, fltrS, conns) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) exitChan <- true @@ -1508,9 +1512,10 @@ func main() { internalRsChan, internalStatSChan, cfg, dm, server, exitChan, filterSChan, internalAttributeSChan) } - if cfg.DispatcherCfg().Enabled { + if cfg.DispatcherSCfg().Enabled { go startDispatcherService(internalDispatcherSChan, - internalRaterChan, cacheS, dm, server, exitChan) + internalAttributeSChan, cfg, cacheS, filterSChan, + dm, server, exitChan) } if cfg.AnalyzerSCfg().Enabled { diff --git a/config/config.go b/config/config.go index f4f10c64c..4aaf58847 100755 --- a/config/config.go +++ b/config/config.go @@ -159,7 +159,6 @@ func NewDefaultCGRConfig() (*CGRConfig, error) { cfg.supplierSCfg = new(SupplierSCfg) cfg.sureTaxCfg = new(SureTaxCfg) cfg.dispatcherSCfg = new(DispatcherSCfg) - cfg.dispatcherCfg = new(DispatcherCfg) cfg.loaderCgrCfg = new(LoaderCgrCfg) cfg.migratorCgrCfg = new(MigratorCgrCfg) cfg.mailerCfg = new(MailerCfg) @@ -306,7 +305,6 @@ type CGRConfig struct { supplierSCfg *SupplierSCfg // SupplierS config sureTaxCfg *SureTaxCfg // SureTax config dispatcherSCfg *DispatcherSCfg // DispatcherS config - dispatcherCfg *DispatcherCfg // Dispatcher config loaderCgrCfg *LoaderCgrCfg // LoaderCgr config migratorCgrCfg *MigratorCgrCfg // MigratorCgr config mailerCfg *MailerCfg // Mailer config @@ -702,12 +700,18 @@ func (self *CGRConfig) checkConfigSanity() error { } } } - // DispaterS checks - if self.dispatcherCfg.Enabled { - if !utils.IsSliceMember([]string{utils.MetaFirst, utils.MetaRandom, utils.MetaNext, - utils.MetaBroadcast}, self.dispatcherCfg.DispatchingStrategy) { - return fmt.Errorf("<%s> unsupported dispatching strategy %s", - utils.DispatcherS, self.dispatcherCfg.DispatchingStrategy) + if self.dispatcherSCfg.Enabled { + if len(self.dispatcherSCfg.Conns) == 0 { + return fmt.Errorf("<%s> no connections defined", utils.DispatcherS) + } + for connID, haPool := range self.dispatcherSCfg.Conns { + for _, connCfg := range haPool { + if connCfg.Address == utils.MetaInternal { + return fmt.Errorf( + "<%s> connID: <%s> %s connections are not supported", + utils.DispatcherS, connID, utils.MetaInternal) + } + } } } // Scheduler check connection with CDR Server @@ -939,14 +943,6 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) { return err } - jsnDispatcherCfg, err := jsnCfg.DispatcherJsonCfg() - if err != nil { - return err - } - if self.dispatcherCfg.loadFromJsonCfg(jsnDispatcherCfg); err != nil { - return err - } - jsnDispatcherSCfg, err := jsnCfg.DispatcherSJsonCfg() if err != nil { return err @@ -1167,10 +1163,6 @@ func (cfg *CGRConfig) LoaderCfg() []*LoaderSCfg { return cfg.loaderCfg } -func (cfg *CGRConfig) DispatcherCfg() *DispatcherCfg { - return cfg.dispatcherCfg -} - func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg { return cfg.dispatcherSCfg } diff --git a/config/config_defaults.go b/config/config_defaults.go index ce8ab9758..e73196f5c 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -723,22 +723,10 @@ const CGRATES_CFG_JSON = ` }, -"dispatcher":{ - "enabled": false, // starts DispatcherS service: . - "rals_conns": [], // address where to reach the RALs for dispatcherS <*internal> - "resources_conns": [], // address where to reach the ResourceS <""|127.0.0.1:2013> - "thresholds_conns": [], // address where to reach the ThresholdS <""|127.0.0.1:2013> - "stats_conns": [], // address where to reach the StatS <""|127.0.0.1:2013> - "suppliers_conns": [], // address where to reach the SupplierS <""|127.0.0.1:2013> - "attributes_conns": [], // address where to reach the AttributeS <""|127.0.0.1:2013> - "sessions_conns": [], // connection towards SessionService - "chargers_conns": [], // address where to reach the ChargerS <""|127.0.0.1:2013> - "dispatching_strategy":"*first", // strategy for dispatching <*first|*random|*next|*broadcast> -}, - - "dispatchers":{ "enabled": false, // starts DispatcherS service: . + //"string_indexed_fields": [], // query indexes based on these fields for faster processing + "prefix_indexed_fields": [], // query indexes based on these fields for faster processing "conns": { "sessions_eu": [ {"address": "127.0.0.1:2012", "transport": "*json"}, diff --git a/config/config_json.go b/config/config_json.go index 6bdaa79bd..122a72ce1 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -453,18 +453,6 @@ func (self CgrJsonCfg) SureTaxJsonCfg() (*SureTaxJsonCfg, error) { return cfg, nil } -func (self CgrJsonCfg) DispatcherJsonCfg() (*DispatcherJsonCfg, error) { - rawCfg, hasKey := self[DispatcherJson] - if !hasKey { - return nil, nil - } - cfg := new(DispatcherJsonCfg) - if err := json.Unmarshal(*rawCfg, cfg); err != nil { - return nil, err - } - return cfg, nil -} - func (self CgrJsonCfg) DispatcherSJsonCfg() (*DispatcherSJsonCfg, error) { rawCfg, hasKey := self[DispatcherSJson] if !hasKey { diff --git a/config/config_json_test.go b/config/config_json_test.go index 2076fe594..edcb767f2 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1379,29 +1379,11 @@ func TestDfHttpJsonCfg(t *testing.T) { } } -func TestDfDispatcherJsonCfg(t *testing.T) { - eCfg := &DispatcherJsonCfg{ - Enabled: utils.BoolPointer(false), - Rals_conns: &[]*HaPoolJsonCfg{}, - Resources_conns: &[]*HaPoolJsonCfg{}, - Thresholds_conns: &[]*HaPoolJsonCfg{}, - Stats_conns: &[]*HaPoolJsonCfg{}, - Suppliers_conns: &[]*HaPoolJsonCfg{}, - Attributes_conns: &[]*HaPoolJsonCfg{}, - Sessions_conns: &[]*HaPoolJsonCfg{}, - Chargers_conns: &[]*HaPoolJsonCfg{}, - Dispatching_strategy: utils.StringPointer(utils.MetaFirst), - } - if cfg, err := dfCgrJsonCfg.DispatcherJsonCfg(); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eCfg, cfg) { - t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg)) - } -} - func TestDfDispatcherSJsonCfg(t *testing.T) { eCfg := &DispatcherSJsonCfg{ - Enabled: utils.BoolPointer(false), + Enabled: utils.BoolPointer(false), + String_indexed_fields: nil, + Prefix_indexed_fields: &[]string{}, Conns: &map[string]*[]*HaPoolJsonCfg{ "sessions_eu": &[]*HaPoolJsonCfg{ {Address: utils.StringPointer("127.0.0.1:2012"), Transport: utils.StringPointer(utils.MetaJSONrpc)}, diff --git a/config/config_test.go b/config/config_test.go index a56112d59..61e742413 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1453,27 +1453,11 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) { } } -func TestCgrCfgJSONDefaultDispatcherCfg(t *testing.T) { - eDspSCfg := &DispatcherCfg{ - Enabled: false, - RALsConns: []*HaPoolConfig{}, - ResSConns: []*HaPoolConfig{}, - ThreshSConns: []*HaPoolConfig{}, - StatSConns: []*HaPoolConfig{}, - SupplSConns: []*HaPoolConfig{}, - AttrSConns: []*HaPoolConfig{}, - SessionSConns: []*HaPoolConfig{}, - ChargerSConns: []*HaPoolConfig{}, - DispatchingStrategy: utils.MetaFirst, - } - if !reflect.DeepEqual(cgrCfg.dispatcherCfg, eDspSCfg) { - t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) - } -} - func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { eDspSCfg := &DispatcherSCfg{ - Enabled: false, + Enabled: false, + StringIndexedFields: nil, + PrefixIndexedFields: &[]string{}, Conns: map[string][]*HaPoolConfig{ "sessions_eu": []*HaPoolConfig{ {Address: "127.0.0.1:2012", Transport: utils.MetaJSONrpc}, diff --git a/config/dispatchercfg.go b/config/dispatchercfg.go index 3ae63e115..60781408f 100755 --- a/config/dispatchercfg.go +++ b/config/dispatchercfg.go @@ -18,93 +18,12 @@ along with this program. If not, see package config -// DispatcherSCfg is the configuration of dispatcher service -type DispatcherCfg struct { - Enabled bool - RALsConns []*HaPoolConfig - ResSConns []*HaPoolConfig - ThreshSConns []*HaPoolConfig - StatSConns []*HaPoolConfig - SupplSConns []*HaPoolConfig - AttrSConns []*HaPoolConfig - SessionSConns []*HaPoolConfig - ChargerSConns []*HaPoolConfig - DispatchingStrategy string -} - -func (dps *DispatcherCfg) loadFromJsonCfg(jsnCfg *DispatcherJsonCfg) (err error) { - if jsnCfg == nil { - return nil - } - if jsnCfg.Enabled != nil { - dps.Enabled = *jsnCfg.Enabled - } - if jsnCfg.Rals_conns != nil { - dps.RALsConns = make([]*HaPoolConfig, len(*jsnCfg.Rals_conns)) - for idx, jsnHaCfg := range *jsnCfg.Rals_conns { - dps.RALsConns[idx] = NewDfltHaPoolConfig() - dps.RALsConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Resources_conns != nil { - dps.ResSConns = make([]*HaPoolConfig, len(*jsnCfg.Resources_conns)) - for idx, jsnHaCfg := range *jsnCfg.Resources_conns { - dps.ResSConns[idx] = NewDfltHaPoolConfig() - dps.ResSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Thresholds_conns != nil { - dps.ThreshSConns = make([]*HaPoolConfig, len(*jsnCfg.Thresholds_conns)) - for idx, jsnHaCfg := range *jsnCfg.Thresholds_conns { - dps.ThreshSConns[idx] = NewDfltHaPoolConfig() - dps.ThreshSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Stats_conns != nil { - dps.StatSConns = make([]*HaPoolConfig, len(*jsnCfg.Stats_conns)) - for idx, jsnHaCfg := range *jsnCfg.Stats_conns { - dps.StatSConns[idx] = NewDfltHaPoolConfig() - dps.StatSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Suppliers_conns != nil { - dps.SupplSConns = make([]*HaPoolConfig, len(*jsnCfg.Suppliers_conns)) - for idx, jsnHaCfg := range *jsnCfg.Suppliers_conns { - dps.SupplSConns[idx] = NewDfltHaPoolConfig() - dps.SupplSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Attributes_conns != nil { - dps.AttrSConns = make([]*HaPoolConfig, len(*jsnCfg.Attributes_conns)) - for idx, jsnHaCfg := range *jsnCfg.Attributes_conns { - dps.AttrSConns[idx] = NewDfltHaPoolConfig() - dps.AttrSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Sessions_conns != nil { - dps.SessionSConns = make([]*HaPoolConfig, len(*jsnCfg.Sessions_conns)) - for idx, jsnHaCfg := range *jsnCfg.Sessions_conns { - dps.SessionSConns[idx] = NewDfltHaPoolConfig() - dps.SessionSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Chargers_conns != nil { - dps.ChargerSConns = make([]*HaPoolConfig, len(*jsnCfg.Chargers_conns)) - for idx, jsnHaCfg := range *jsnCfg.Chargers_conns { - dps.ChargerSConns[idx] = NewDfltHaPoolConfig() - dps.ChargerSConns[idx].loadFromJsonCfg(jsnHaCfg) - } - } - if jsnCfg.Dispatching_strategy != nil { - dps.DispatchingStrategy = *jsnCfg.Dispatching_strategy - } - return nil -} - // DispatcherSCfg is the configuration of dispatcher service type DispatcherSCfg struct { - Enabled bool - Conns map[string][]*HaPoolConfig + Enabled bool + StringIndexedFields *[]string + PrefixIndexedFields *[]string + Conns map[string][]*HaPoolConfig } func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err error) { @@ -114,6 +33,20 @@ func (dps *DispatcherSCfg) loadFromJsonCfg(jsnCfg *DispatcherSJsonCfg) (err erro if jsnCfg.Enabled != nil { dps.Enabled = *jsnCfg.Enabled } + if jsnCfg.String_indexed_fields != nil { + sif := make([]string, len(*jsnCfg.String_indexed_fields)) + for i, fID := range *jsnCfg.String_indexed_fields { + sif[i] = fID + } + dps.StringIndexedFields = &sif + } + if jsnCfg.Prefix_indexed_fields != nil { + pif := make([]string, len(*jsnCfg.Prefix_indexed_fields)) + for i, fID := range *jsnCfg.Prefix_indexed_fields { + pif[i] = fID + } + dps.PrefixIndexedFields = &pif + } if jsnCfg.Conns != nil { dps.Conns = make(map[string][]*HaPoolConfig, len(*jsnCfg.Conns)) for id, conns := range *jsnCfg.Conns { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 495b53e39..b31cfca45 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -534,23 +534,11 @@ type SureTaxJsonCfg struct { Tax_exemption_code_list *string } -// Dispatcher service config section -type DispatcherJsonCfg struct { - Enabled *bool - Rals_conns *[]*HaPoolJsonCfg - Resources_conns *[]*HaPoolJsonCfg - Thresholds_conns *[]*HaPoolJsonCfg - Stats_conns *[]*HaPoolJsonCfg - Suppliers_conns *[]*HaPoolJsonCfg - Attributes_conns *[]*HaPoolJsonCfg - Sessions_conns *[]*HaPoolJsonCfg - Chargers_conns *[]*HaPoolJsonCfg - Dispatching_strategy *string -} - type DispatcherSJsonCfg struct { - Enabled *bool - Conns *map[string]*[]*HaPoolJsonCfg + Enabled *bool + String_indexed_fields *[]string + Prefix_indexed_fields *[]string + Conns *map[string]*[]*HaPoolJsonCfg } type LoaderCfgJson struct { diff --git a/dispatchers/dispatchers.go b/dispatchers/dispatchers.go index 29f6d5ec9..b3a71797d 100755 --- a/dispatchers/dispatchers.go +++ b/dispatchers/dispatchers.go @@ -27,21 +27,21 @@ import ( "github.com/cgrates/rpcclient" ) -// NewDispatcherService initializes a DispatcherService +// NewDispatcherService constructs a DispatcherService func NewDispatcherService(dm *engine.DataManager, - cfg *config.CGRConfig) (*DispatcherService, error) { - return &DispatcherService{dm: dm, cfg: cfg}, nil + cfg *config.CGRConfig, fltrS *engine.FilterS, + conns map[string]*rpcclient.RpcClientPool) (*DispatcherService, error) { + return &DispatcherService{dm: dm, cfg: cfg, + fltrS: fltrS, conns: conns}, nil } // DispatcherService is the service handling dispatching towards internal components // designed to handle automatic partitioning and failover type DispatcherService struct { - dm *engine.DataManager - cfg *config.CGRConfig - filterS *engine.FilterS - stringIndexedFields *[]string - prefixIndexedFields *[]string - conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID + dm *engine.DataManager + cfg *config.CGRConfig + fltrS *engine.FilterS + conns map[string]*rpcclient.RpcClientPool // available connections, accessed based on connID } // ListenAndServe will initialize the service @@ -69,8 +69,11 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, idxKeyPrfx = utils.ConcatenatedKey(ev.Tenant, subsys) } matchingPrfls := make(map[string]*engine.DispatcherProfile) - prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, dS.stringIndexedFields, dS.prefixIndexedFields, - dS.dm, utils.CacheDispatcherFilterIndexes, idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects) + prflIDs, err := engine.MatchingItemIDsForEvent(ev.Event, + dS.cfg.DispatcherSCfg().StringIndexedFields, + dS.cfg.DispatcherSCfg().PrefixIndexedFields, + dS.dm, utils.CacheDispatcherFilterIndexes, + idxKeyPrfx, dS.cfg.FilterSCfg().IndexedSelects) if err != nil { return nil, err } @@ -97,7 +100,7 @@ func (dS *DispatcherService) dispatcherForEvent(ev *utils.CGREvent, !prfl.ActivationInterval.IsActiveAtTime(*ev.Time) { // not active continue } - if pass, err := dS.filterS.Pass(ev.Tenant, prfl.FilterIDs, + if pass, err := dS.fltrS.Pass(ev.Tenant, prfl.FilterIDs, config.NewNavigableMap(ev.Event)); err != nil { return nil, err } else if !pass {