diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index 838cd6cae..e703ceab6 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -20,25 +20,49 @@ package v1 import ( "github.com/cgrates/cgrates/dispatcher" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewDispatcherSv1(dps *dispatcher.DispatcherService) *DispatcherSv1 { - return &DispatcherSv1{dpsS: dps} +func NewDispatcherThresholdSv1(dps *dispatcher.DispatcherService) *DispatcherThresholdSv1 { + return &DispatcherThresholdSv1{dS: dps} } // Exports RPC from RLs -type DispatcherSv1 struct { - dpsS *dispatcher.DispatcherService +type DispatcherThresholdSv1 struct { + dS *dispatcher.DispatcherService } -// Call implements rpcclient.RpcClientConnection interface for internal RPC -func (dpsS *DispatcherSv1) Call(serviceMethod string, - args interface{}, reply interface{}) error { - return utils.APIerRPCCall(dpsS, serviceMethod, args, reply) +// Ping implements ThresholdSv1Ping +func (dT *DispatcherThresholdSv1) Ping(ign string, reply *string) error { + return dT.dS.ThresholdSv1Ping(ign, reply) } -func (dpsS *DispatcherSv1) Ping(ign string, reply *string) error { - *reply = utils.Pong - return nil +// GetThresholdIDs implements ThresholdSv1GetThresholdIDs +func (dT *DispatcherThresholdSv1) GetThresholdIDs(tenant string, tIDs *[]string) error { + return dT.dS.ThresholdSv1GetThresholdIDs(tenant, tIDs) +} + +// GetThreshold implements ThresholdSv1GetThreshold +func (dT *DispatcherThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error { + return dT.dS.ThresholdSv1GetThreshold(tntID, t) +} + +// ProcessEvent implements ThresholdSv1ProcessEvent +func (dT *DispatcherThresholdSv1) ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error { + return dT.dS.ThresholdSv1ProcessEvent(args, tIDs) +} + +func NewDispatcherStatSv1(dps *dispatcher.DispatcherService) *DispatcherStatSv1 { + return &DispatcherStatSv1{dS: dps} +} + +// Exports RPC from RLs +type DispatcherStatSv1 struct { + dS *dispatcher.DispatcherService +} + +// Ping implements StatSv1Ping +func (dSts *DispatcherStatSv1) Ping(ign string, reply *string) error { + return dSts.dS.StatSv1Ping(ign, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 32cd446b5..e87f9de91 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -711,12 +711,81 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig, } // startDispatcherService fires up the DispatcherS -func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection, +func startDispatcherService(internalDispatcherSChan, internalSMGChan, + internalRaterChan, internalResourceSChan, internalThresholdSChan, + internalStatSChan, internalSupplierSChan, internalAttrSChan chan rpcclient.RpcClientConnection, cacheS *engine.CacheS, dm *engine.DataManager, server *utils.Server, exitChan chan bool) { - <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) - - dspS, err := dispatcher.NewDispatcherService(dm) + utils.Logger.Info("Starting CGRateS Dispatcher service.") + var err error + var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns *rpcclient.RpcClientPool + if len(cfg.DispatcherSCfg().RALsConns) != 0 { + ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().RALsConns, internalRaterChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().ResSConns) != 0 { + resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, + cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().ResSConns, internalResourceSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().ThreshSConns) != 0 { + threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().ThreshSConns, internalThresholdSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().StatSConns) != 0 { + statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().StatSConns, internalStatSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().SupplSConns) != 0 { + suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().AttrSConns) != 0 { + attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().AttrSConns, internalAttrSChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + if len(cfg.DispatcherSCfg().SessionSConns) != 0 { + sessionsSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout, + cfg.DispatcherSCfg().SessionSConns, internalSMGChan, cfg.InternalTtl) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + } + dspS, err := dispatcher.NewDispatcherService(dm, ralsConns, resSConns, threshSConns, statSConns, + suplSConns, attrSConns, sessionsSConns) if err != nil { utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) exitChan <- true @@ -730,14 +799,20 @@ func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConn exitChan <- true return }() - dspSv1 := v1.NewDispatcherSv1(dspS) - server.RpcRegister(dspSv1) - internalDispatcherSChan <- dspSv1 + if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 { + server.RpcRegisterName(utils.ThresholdSv1, + v1.NewDispatcherThresholdSv1(dspS)) + } + if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 { + server.RpcRegisterName(utils.StatSv1, + v1.NewDispatcherStatSv1(dspS)) + } } func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, - internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) { + internalAliaseSChan, internalRsChan, internalStatSChan, + internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection) { select { // Any of the rpc methods will unlock listening to rpc requests case resp := <-internalRaterChan: internalRaterChan <- resp @@ -757,6 +832,8 @@ func startRpc(server *utils.Server, internalRaterChan, internalRsChan <- rls case statS := <-internalStatSChan: internalStatSChan <- statS + case dispatcherS := <-internalDispatcherSChan: + internalDispatcherSChan <- dispatcherS } go server.ServeJSON(cfg.RPCJSONListen) go server.ServeGOB(cfg.RPCGOBListen) @@ -1034,7 +1111,9 @@ func main() { } if cfg.DispatcherSCfg().Enabled { - go startDispatcherService(internalDispatcherSChan, cacheS, + go startDispatcherService(internalDispatcherSChan, internalSMGChan, + internalRaterChan, internalRsChan, internalThresholdSChan, + internalStatSChan, internalSupplierSChan, internalAttributeSChan, cacheS, dm, server, exitChan) } @@ -1042,7 +1121,8 @@ func main() { // Serve rpc connections go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, - internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan) + internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, + internalStatSChan, internalSMGChan, internalDispatcherSChan) <-exitChan if *memprofile != "" { diff --git a/config/config.go b/config/config.go index eb52aae63..04fae41ab 100755 --- a/config/config.go +++ b/config/config.go @@ -668,40 +668,41 @@ func (self *CGRConfig) checkConfigSanity() error { } // DispaterS checks if self.dispatcherSCfg != nil && self.dispatcherSCfg.Enabled { - for _, connCfg := range self.dispatcherSCfg.RALsConns { - if connCfg.Address != utils.MetaInternal { - return errors.New("Only <*internal> connectivity allowed in DispatcherS for now") - } - if connCfg.Address == utils.MetaInternal && !self.RALsEnabled { - return errors.New("RALs not enabled but requested by DispatcherS component.") - } - } - - for _, connCfg := range self.dispatcherSCfg.ResSConns { - if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { - return errors.New("ResourceS not enabled but requested by DispatcherS component.") - } - } - for _, connCfg := range self.dispatcherSCfg.StatSConns { - if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { - return errors.New("StatS not enabled but requested by DispatherS component.") - } - } - for _, connCfg := range self.dispatcherSCfg.ThreshSConns { - if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { - return errors.New("ThresholdS not enabled but requested by DispatherS component.") - } - } - for _, connCfg := range self.dispatcherSCfg.SupplSConns { - if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { - return errors.New("SupplierS not enabled but requested by DispatherS component.") - } - } - for _, connCfg := range self.dispatcherSCfg.AttrSConns { - if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { - return errors.New("AttributeS not enabled but requested by DispatherS component.") - } - } + // for _, connCfg := range self.dispatcherSCfg.RALsConns { + // if connCfg.Address == utils.MetaInternal && !self.RALsEnabled { + // return errors.New("RALs not enabled but requested by DispatcherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.ResSConns { + // if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { + // return errors.New("ResourceS not enabled but requested by DispatcherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.StatSConns { + // if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled { + // return errors.New("StatS not enabled but requested by DispatherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.ThreshSConns { + // if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + // return errors.New("ThresholdS not enabled but requested by DispatherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.SupplSConns { + // if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + // return errors.New("SupplierS not enabled but requested by DispatherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.AttrSConns { + // if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled { + // return errors.New("AttributeS not enabled but requested by DispatherS component.") + // } + // } + // for _, connCfg := range self.dispatcherSCfg.SessionSConns { + // if connCfg.Address == utils.MetaInternal && !self.sessionSCfg.Enabled { + // return errors.New("SessionS not enabled but requested by DispatherS component.") + // } + // } if !utils.IsSliceMember([]string{utils.MetaRandom, utils.MetaBalancer, utils.MetaOrdered, utils.MetaCircular}, self.dispatcherSCfg.DispatchingStrategy) { return fmt.Errorf("<%s> unsupported dispatching strategy %s", diff --git a/config/config_defaults.go b/config/config_defaults.go index 13d53333c..0e1fc8f46 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -658,18 +658,14 @@ const CGRATES_CFG_JSON = ` "dispatcher":{ "enabled": false, // starts DispatcherS service: . - "rals_conns": [ - {"address": "*internal"}, // address where to reach the RALs for dispatcherS <*internal> - ], + "rals_conns": [], // address where to reach the RALs for dispatcherS <*internal> "resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013> "thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013> "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013> "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> - "sessions_conns": [ - {"address": "*internal"} // connection towards SessionService - ], - "dispatching_strategy":"*random" // strategy for dispatching <*random|*balancer|*ordered|*circular> + "sessions_conns": [], // connection towards SessionService + "dispatching_strategy":"*random", // strategy for dispatching <*random|*balancer|*ordered|*circular> }, diff --git a/config/config_json_test.go b/config/config_json_test.go index 9b8ee53fa..27657722d 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1232,19 +1232,32 @@ func TestDfHttpJsonCfg(t *testing.T) { } } -// Will be activated after finish config struct -/* func TestDfDispatcherSJsonCfg(t *testing.T) { eCfg := &DispatcherSJsonCfg{ - Enabled: utils.BoolPointer(true), + Enabled: utils.BoolPointer(false), + Rals_conns: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Address: utils.StringPointer(utils.MetaInternal), + }, + }, + Resources_conns: &[]*HaPoolJsonCfg{}, + Thresholds_conns: &[]*HaPoolJsonCfg{}, + Stats_conns: &[]*HaPoolJsonCfg{}, + Suppliers_conns: &[]*HaPoolJsonCfg{}, + Attributes_conns: &[]*HaPoolJsonCfg{}, + Sessions_conns: &[]*HaPoolJsonCfg{ + &HaPoolJsonCfg{ + Address: utils.StringPointer(utils.MetaInternal), + }, + }, + Dispatching_strategy: utils.StringPointer(utils.MetaRandom), } if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil { t.Error(err) } else if !reflect.DeepEqual(eCfg, cfg) { - t.Errorf("expecting: %+v, received: %+v", eCfg, cfg) + t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg)) } } -*/ func TestDfLoaderCfg(t *testing.T) { eCfg := &LoaderCfgJson{ diff --git a/config/config_test.go b/config/config_test.go index c101bac85..f02ab0948 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1332,16 +1332,31 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) { } } -/* Will be activated after finish dispatcher config func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) { eDspSCfg := &DispatcherSCfg{ - Enabled: true, + Enabled: false, + RALsConns: []*HaPoolConfig{ + &HaPoolConfig{ + Address: utils.MetaInternal, + }, + }, + ResSConns: []*HaPoolConfig{}, + ThreshSConns: []*HaPoolConfig{}, + StatSConns: []*HaPoolConfig{}, + SupplSConns: []*HaPoolConfig{}, + AttrSConns: []*HaPoolConfig{}, + SessionSConns: []*HaPoolConfig{ + &HaPoolConfig{ + Address: utils.MetaInternal, + }, + }, + DispatchingStrategy: utils.MetaRandom, } if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) { t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg) } } -*/ + func TestCgrLoaderCfgDefault(t *testing.T) { eLdrCfg := &LoaderCgrCfg{ TpID: "", diff --git a/data/conf/samples/dispatcher/cgrates.json b/data/conf/samples/dispatcher/cgrates.json new file mode 100755 index 000000000..7b6d4ea5b --- /dev/null +++ b/data/conf/samples/dispatcher/cgrates.json @@ -0,0 +1,43 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "redis", // data_db type: + "db_port": 6379, // data_db port to reach the database + "db_name": "10", // data_db database name to connect to + +}, + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"rals": { + "enabled": true, +}, + + +"dispatcher":{ + "enabled": true, + "thresholds_conns": [ + {"address": "192.168.56.204:2012", "transport": "*json"} + ], + "dispatching_strategy":"*random", +}, + + +} diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index 0f6fbc331..e885ad521 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -309,7 +309,7 @@ "migrator":{ "out_stordb_password": "CGRateS.org", -} +}, } diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 400b4675d..02914fb5c 100755 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -20,33 +20,119 @@ package dispatcher import ( "fmt" + "reflect" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" ) // NewDispatcherService initializes a DispatcherService -func NewDispatcherService(dm *engine.DataManager) (*DispatcherService, error) { - return &DispatcherService{dm: dm}, nil - +func NewDispatcherService(dm *engine.DataManager, rals, resS, thdS, + statS, splS, attrS, sessionS rpcclient.RpcClientConnection) (*DispatcherService, error) { + if rals != nil && reflect.ValueOf(rals).IsNil() { + rals = nil + } + if resS != nil && reflect.ValueOf(resS).IsNil() { + resS = nil + } + if thdS != nil && reflect.ValueOf(thdS).IsNil() { + thdS = nil + } + if statS != nil && reflect.ValueOf(statS).IsNil() { + statS = nil + } + if splS != nil && reflect.ValueOf(splS).IsNil() { + splS = nil + } + if attrS != nil && reflect.ValueOf(attrS).IsNil() { + attrS = nil + } + if sessionS != nil && reflect.ValueOf(sessionS).IsNil() { + sessionS = nil + } + return &DispatcherService{dm: dm, + rals: rals, + resS: resS, + thdS: thdS, + statS: statS, + splS: splS, + attrS: attrS, + sessionS: sessionS}, nil } // DispatcherService is the service handling dispatcher type DispatcherService struct { - dm *engine.DataManager + dm *engine.DataManager + rals rpcclient.RpcClientConnection // RALs connections + resS rpcclient.RpcClientConnection // ResourceS connections + thdS rpcclient.RpcClientConnection // ThresholdS connections + statS rpcclient.RpcClientConnection // StatS connections + splS rpcclient.RpcClientConnection // SupplierS connections + attrS rpcclient.RpcClientConnection // AttributeS connections + sessionS rpcclient.RpcClientConnection // SessionS server connections } // ListenAndServe will initialize the service -func (spS *DispatcherService) ListenAndServe(exitChan chan bool) error { - utils.Logger.Info("Starting Dispatcher Service") +func (dS *DispatcherService) ListenAndServe(exitChan chan bool) error { e := <-exitChan exitChan <- e // put back for the others listening for shutdown request return nil } // Shutdown is called to shutdown the service -func (spS *DispatcherService) Shutdown() error { +func (dS *DispatcherService) Shutdown() error { utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherS)) utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherS)) return nil } + +func (dS *DispatcherService) ThresholdSv1Ping(ign string, reply *string) error { + if dS.thdS != nil { + if err := dS.thdS.Call(utils.ThresholdSv1Ping, ign, reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s ThresholdS.", err.Error())) + } + } + return nil +} + +func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(tenant string, tIDs *[]string) error { + if dS.thdS != nil { + if err := dS.thdS.Call(utils.ThresholdSv1GetThresholdIDs, tenant, tIDs); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s ThresholdS.", err.Error())) + } + } + return nil +} + +func (dS *DispatcherService) ThresholdSv1GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error { + if dS.thdS != nil { + if err := dS.thdS.Call(utils.ThresholdSv1GetThreshold, tntID, t); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s ThresholdS.", err.Error())) + } + } + return nil +} + +func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error { + if dS.thdS != nil { + if err := dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args, tIDs); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s ThresholdS.", err.Error())) + } + } + return nil +} + +func (dS *DispatcherService) StatSv1Ping(ign string, reply *string) error { + if dS.statS != nil { + if err := dS.statS.Call(utils.StatSv1Ping, ign, reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" error: %s StatS.", err.Error())) + } + } + return nil +} diff --git a/utils/consts.go b/utils/consts.go index 4e07cdc5c..3e1f95a4e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -629,6 +629,8 @@ const ( MetaBalancer = "*balancer" MetaOrdered = "*ordered" MetaCircular = "*circular" + ThresholdSv1 = "ThresholdSv1" + StatSv1 = "StatSv1" ) // MetaFilterIndexesAPIs