Update dispatcher

This commit is contained in:
TeoV
2018-05-25 09:36:12 -04:00
committed by Dan Christian Bogos
parent 32d9917fa9
commit f4ba447780
10 changed files with 338 additions and 78 deletions

View File

@@ -20,25 +20,49 @@ package v1
import (
"github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewDispatcherSv1(dps *dispatcher.DispatcherService) *DispatcherSv1 {
return &DispatcherSv1{dpsS: dps}
func NewDispatcherThresholdSv1(dps *dispatcher.DispatcherService) *DispatcherThresholdSv1 {
return &DispatcherThresholdSv1{dS: dps}
}
// Exports RPC from RLs
type DispatcherSv1 struct {
dpsS *dispatcher.DispatcherService
type DispatcherThresholdSv1 struct {
dS *dispatcher.DispatcherService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (dpsS *DispatcherSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(dpsS, serviceMethod, args, reply)
// Ping implements ThresholdSv1Ping
func (dT *DispatcherThresholdSv1) Ping(ign string, reply *string) error {
return dT.dS.ThresholdSv1Ping(ign, reply)
}
func (dpsS *DispatcherSv1) Ping(ign string, reply *string) error {
*reply = utils.Pong
return nil
// GetThresholdIDs implements ThresholdSv1GetThresholdIDs
func (dT *DispatcherThresholdSv1) GetThresholdIDs(tenant string, tIDs *[]string) error {
return dT.dS.ThresholdSv1GetThresholdIDs(tenant, tIDs)
}
// GetThreshold implements ThresholdSv1GetThreshold
func (dT *DispatcherThresholdSv1) GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error {
return dT.dS.ThresholdSv1GetThreshold(tntID, t)
}
// ProcessEvent implements ThresholdSv1ProcessEvent
func (dT *DispatcherThresholdSv1) ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error {
return dT.dS.ThresholdSv1ProcessEvent(args, tIDs)
}
func NewDispatcherStatSv1(dps *dispatcher.DispatcherService) *DispatcherStatSv1 {
return &DispatcherStatSv1{dS: dps}
}
// Exports RPC from RLs
type DispatcherStatSv1 struct {
dS *dispatcher.DispatcherService
}
// Ping implements StatSv1Ping
func (dSts *DispatcherStatSv1) Ping(ign string, reply *string) error {
return dSts.dS.StatSv1Ping(ign, reply)
}

View File

@@ -711,12 +711,81 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig,
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection,
func startDispatcherService(internalDispatcherSChan, internalSMGChan,
internalRaterChan, internalResourceSChan, internalThresholdSChan,
internalStatSChan, internalSupplierSChan, internalAttrSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, dm *engine.DataManager,
server *utils.Server, exitChan chan bool) {
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
dspS, err := dispatcher.NewDispatcherService(dm)
utils.Logger.Info("Starting CGRateS Dispatcher service.")
var err error
var ralsConns, resSConns, threshSConns, statSConns, suplSConns, attrSConns, sessionsSConns *rpcclient.RpcClientPool
if len(cfg.DispatcherSCfg().RALsConns) != 0 {
ralsConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().RALsConns, internalRaterChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to RALs: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().ResSConns) != 0 {
resSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST,
cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().ResSConns, internalResourceSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ResourceS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
threshSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().ThreshSConns, internalThresholdSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to ThresholdS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().StatSConns) != 0 {
statSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().StatSConns, internalStatSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to StatS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().SupplSConns) != 0 {
suplSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().SupplSConns, internalSupplierSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SupplierS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().AttrSConns) != 0 {
attrSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().AttrSConns, internalAttrSChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to AttributeS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
if len(cfg.DispatcherSCfg().SessionSConns) != 0 {
sessionsSConns, err = engine.NewRPCPool(rpcclient.POOL_FIRST, cfg.ConnectAttempts, cfg.Reconnects, cfg.ConnectTimeout, cfg.ReplyTimeout,
cfg.DispatcherSCfg().SessionSConns, internalSMGChan, cfg.InternalTtl)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not connect to SessionS: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
}
dspS, err := dispatcher.NewDispatcherService(dm, ralsConns, resSConns, threshSConns, statSConns,
suplSConns, attrSConns, sessionsSConns)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true
@@ -730,14 +799,20 @@ func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConn
exitChan <- true
return
}()
dspSv1 := v1.NewDispatcherSv1(dspS)
server.RpcRegister(dspSv1)
internalDispatcherSChan <- dspSv1
if !cfg.ThresholdSCfg().Enabled && len(cfg.DispatcherSCfg().ThreshSConns) != 0 {
server.RpcRegisterName(utils.ThresholdSv1,
v1.NewDispatcherThresholdSv1(dspS))
}
if !cfg.StatSCfg().Enabled && len(cfg.DispatcherSCfg().StatSConns) != 0 {
server.RpcRegisterName(utils.StatSv1,
v1.NewDispatcherStatSv1(dspS))
}
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) {
internalAliaseSChan, internalRsChan, internalStatSChan,
internalSMGChan, internalDispatcherSChan chan rpcclient.RpcClientConnection) {
select { // Any of the rpc methods will unlock listening to rpc requests
case resp := <-internalRaterChan:
internalRaterChan <- resp
@@ -757,6 +832,8 @@ func startRpc(server *utils.Server, internalRaterChan,
internalRsChan <- rls
case statS := <-internalStatSChan:
internalStatSChan <- statS
case dispatcherS := <-internalDispatcherSChan:
internalDispatcherSChan <- dispatcherS
}
go server.ServeJSON(cfg.RPCJSONListen)
go server.ServeGOB(cfg.RPCGOBListen)
@@ -1034,7 +1111,9 @@ func main() {
}
if cfg.DispatcherSCfg().Enabled {
go startDispatcherService(internalDispatcherSChan, cacheS,
go startDispatcherService(internalDispatcherSChan, internalSMGChan,
internalRaterChan, internalRsChan, internalThresholdSChan,
internalStatSChan, internalSupplierSChan, internalAttributeSChan, cacheS,
dm, server, exitChan)
}
@@ -1042,7 +1121,8 @@ func main() {
// Serve rpc connections
go startRpc(server, internalRaterChan, internalCdrSChan, internalCdrStatSChan,
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan)
internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan,
internalStatSChan, internalSMGChan, internalDispatcherSChan)
<-exitChan
if *memprofile != "" {

View File

@@ -668,40 +668,41 @@ func (self *CGRConfig) checkConfigSanity() error {
}
// DispaterS checks
if self.dispatcherSCfg != nil && self.dispatcherSCfg.Enabled {
for _, connCfg := range self.dispatcherSCfg.RALsConns {
if connCfg.Address != utils.MetaInternal {
return errors.New("Only <*internal> connectivity allowed in DispatcherS for now")
}
if connCfg.Address == utils.MetaInternal && !self.RALsEnabled {
return errors.New("RALs not enabled but requested by DispatcherS component.")
}
}
for _, connCfg := range self.dispatcherSCfg.ResSConns {
if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("ResourceS not enabled but requested by DispatcherS component.")
}
}
for _, connCfg := range self.dispatcherSCfg.StatSConns {
if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
return errors.New("StatS not enabled but requested by DispatherS component.")
}
}
for _, connCfg := range self.dispatcherSCfg.ThreshSConns {
if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
return errors.New("ThresholdS not enabled but requested by DispatherS component.")
}
}
for _, connCfg := range self.dispatcherSCfg.SupplSConns {
if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
return errors.New("SupplierS not enabled but requested by DispatherS component.")
}
}
for _, connCfg := range self.dispatcherSCfg.AttrSConns {
if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
return errors.New("AttributeS not enabled but requested by DispatherS component.")
}
}
// for _, connCfg := range self.dispatcherSCfg.RALsConns {
// if connCfg.Address == utils.MetaInternal && !self.RALsEnabled {
// return errors.New("RALs not enabled but requested by DispatcherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.ResSConns {
// if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
// return errors.New("ResourceS not enabled but requested by DispatcherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.StatSConns {
// if connCfg.Address == utils.MetaInternal && !self.resourceSCfg.Enabled {
// return errors.New("StatS not enabled but requested by DispatherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.ThreshSConns {
// if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
// return errors.New("ThresholdS not enabled but requested by DispatherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.SupplSConns {
// if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
// return errors.New("SupplierS not enabled but requested by DispatherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.AttrSConns {
// if connCfg.Address == utils.MetaInternal && !self.thresholdSCfg.Enabled {
// return errors.New("AttributeS not enabled but requested by DispatherS component.")
// }
// }
// for _, connCfg := range self.dispatcherSCfg.SessionSConns {
// if connCfg.Address == utils.MetaInternal && !self.sessionSCfg.Enabled {
// return errors.New("SessionS not enabled but requested by DispatherS component.")
// }
// }
if !utils.IsSliceMember([]string{utils.MetaRandom, utils.MetaBalancer, utils.MetaOrdered,
utils.MetaCircular}, self.dispatcherSCfg.DispatchingStrategy) {
return fmt.Errorf("<%s> unsupported dispatching strategy %s",

View File

@@ -658,18 +658,14 @@ const CGRATES_CFG_JSON = `
"dispatcher":{
"enabled": false, // starts DispatcherS service: <true|false>.
"rals_conns": [
{"address": "*internal"}, // address where to reach the RALs for dispatcherS <*internal>
],
"rals_conns": [], // address where to reach the RALs for dispatcherS <*internal>
"resources_conns": [], // address where to reach the ResourceS <""|*internal|127.0.0.1:2013>
"thresholds_conns": [], // address where to reach the ThresholdS <""|*internal|127.0.0.1:2013>
"stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013>
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
"attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
"sessions_conns": [
{"address": "*internal"} // connection towards SessionService
],
"dispatching_strategy":"*random" // strategy for dispatching <*random|*balancer|*ordered|*circular>
"sessions_conns": [], // connection towards SessionService
"dispatching_strategy":"*random", // strategy for dispatching <*random|*balancer|*ordered|*circular>
},

View File

@@ -1232,19 +1232,32 @@ func TestDfHttpJsonCfg(t *testing.T) {
}
}
// Will be activated after finish config struct
/*
func TestDfDispatcherSJsonCfg(t *testing.T) {
eCfg := &DispatcherSJsonCfg{
Enabled: utils.BoolPointer(true),
Enabled: utils.BoolPointer(false),
Rals_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
},
},
Resources_conns: &[]*HaPoolJsonCfg{},
Thresholds_conns: &[]*HaPoolJsonCfg{},
Stats_conns: &[]*HaPoolJsonCfg{},
Suppliers_conns: &[]*HaPoolJsonCfg{},
Attributes_conns: &[]*HaPoolJsonCfg{},
Sessions_conns: &[]*HaPoolJsonCfg{
&HaPoolJsonCfg{
Address: utils.StringPointer(utils.MetaInternal),
},
},
Dispatching_strategy: utils.StringPointer(utils.MetaRandom),
}
if cfg, err := dfCgrJsonCfg.DispatcherSJsonCfg(); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(eCfg, cfg) {
t.Errorf("expecting: %+v, received: %+v", eCfg, cfg)
t.Errorf("expecting: %+v, received: %+v", utils.ToJSON(eCfg), utils.ToJSON(cfg))
}
}
*/
func TestDfLoaderCfg(t *testing.T) {
eCfg := &LoaderCfgJson{

View File

@@ -1332,16 +1332,31 @@ func TestCgrLoaderCfgITDefaults(t *testing.T) {
}
}
/* Will be activated after finish dispatcher config
func TestCgrCfgJSONDefaultDispatcherSCfg(t *testing.T) {
eDspSCfg := &DispatcherSCfg{
Enabled: true,
Enabled: false,
RALsConns: []*HaPoolConfig{
&HaPoolConfig{
Address: utils.MetaInternal,
},
},
ResSConns: []*HaPoolConfig{},
ThreshSConns: []*HaPoolConfig{},
StatSConns: []*HaPoolConfig{},
SupplSConns: []*HaPoolConfig{},
AttrSConns: []*HaPoolConfig{},
SessionSConns: []*HaPoolConfig{
&HaPoolConfig{
Address: utils.MetaInternal,
},
},
DispatchingStrategy: utils.MetaRandom,
}
if !reflect.DeepEqual(cgrCfg.dispatcherSCfg, eDspSCfg) {
t.Errorf("received: %+v, expecting: %+v", cgrCfg.dispatcherSCfg, eDspSCfg)
}
}
*/
func TestCgrLoaderCfgDefault(t *testing.T) {
eLdrCfg := &LoaderCgrCfg{
TpID: "",

View File

@@ -0,0 +1,43 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <redis|mongo>
"db_port": 6379, // data_db port to reach the database
"db_name": "10", // data_db database name to connect to
},
"stor_db": {
"db_password": "CGRateS.org",
},
"rals": {
"enabled": true,
},
"dispatcher":{
"enabled": true,
"thresholds_conns": [
{"address": "192.168.56.204:2012", "transport": "*json"}
],
"dispatching_strategy":"*random",
},
}

View File

@@ -309,7 +309,7 @@
"migrator":{
"out_stordb_password": "CGRateS.org",
}
},
}

View File

@@ -20,33 +20,119 @@ package dispatcher
import (
"fmt"
"reflect"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewDispatcherService initializes a DispatcherService
func NewDispatcherService(dm *engine.DataManager) (*DispatcherService, error) {
return &DispatcherService{dm: dm}, nil
func NewDispatcherService(dm *engine.DataManager, rals, resS, thdS,
statS, splS, attrS, sessionS rpcclient.RpcClientConnection) (*DispatcherService, error) {
if rals != nil && reflect.ValueOf(rals).IsNil() {
rals = nil
}
if resS != nil && reflect.ValueOf(resS).IsNil() {
resS = nil
}
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
if statS != nil && reflect.ValueOf(statS).IsNil() {
statS = nil
}
if splS != nil && reflect.ValueOf(splS).IsNil() {
splS = nil
}
if attrS != nil && reflect.ValueOf(attrS).IsNil() {
attrS = nil
}
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
sessionS = nil
}
return &DispatcherService{dm: dm,
rals: rals,
resS: resS,
thdS: thdS,
statS: statS,
splS: splS,
attrS: attrS,
sessionS: sessionS}, nil
}
// DispatcherService is the service handling dispatcher
type DispatcherService struct {
dm *engine.DataManager
dm *engine.DataManager
rals rpcclient.RpcClientConnection // RALs connections
resS rpcclient.RpcClientConnection // ResourceS connections
thdS rpcclient.RpcClientConnection // ThresholdS connections
statS rpcclient.RpcClientConnection // StatS connections
splS rpcclient.RpcClientConnection // SupplierS connections
attrS rpcclient.RpcClientConnection // AttributeS connections
sessionS rpcclient.RpcClientConnection // SessionS server connections
}
// ListenAndServe will initialize the service
func (spS *DispatcherService) ListenAndServe(exitChan chan bool) error {
utils.Logger.Info("Starting Dispatcher Service")
func (dS *DispatcherService) ListenAndServe(exitChan chan bool) error {
e := <-exitChan
exitChan <- e // put back for the others listening for shutdown request
return nil
}
// Shutdown is called to shutdown the service
func (spS *DispatcherService) Shutdown() error {
func (dS *DispatcherService) Shutdown() error {
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown initialized", utils.DispatcherS))
utils.Logger.Info(fmt.Sprintf("<%s> service shutdown complete", utils.DispatcherS))
return nil
}
func (dS *DispatcherService) ThresholdSv1Ping(ign string, reply *string) error {
if dS.thdS != nil {
if err := dS.thdS.Call(utils.ThresholdSv1Ping, ign, reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<DispatcherS> error: %s ThresholdS.", err.Error()))
}
}
return nil
}
func (dS *DispatcherService) ThresholdSv1GetThresholdIDs(tenant string, tIDs *[]string) error {
if dS.thdS != nil {
if err := dS.thdS.Call(utils.ThresholdSv1GetThresholdIDs, tenant, tIDs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<DispatcherS> error: %s ThresholdS.", err.Error()))
}
}
return nil
}
func (dS *DispatcherService) ThresholdSv1GetThreshold(tntID *utils.TenantID, t *engine.Threshold) error {
if dS.thdS != nil {
if err := dS.thdS.Call(utils.ThresholdSv1GetThreshold, tntID, t); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<DispatcherS> error: %s ThresholdS.", err.Error()))
}
}
return nil
}
func (dS *DispatcherService) ThresholdSv1ProcessEvent(args *engine.ArgsProcessEvent, tIDs *[]string) error {
if dS.thdS != nil {
if err := dS.thdS.Call(utils.ThresholdSv1ProcessEvent, args, tIDs); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<DispatcherS> error: %s ThresholdS.", err.Error()))
}
}
return nil
}
func (dS *DispatcherService) StatSv1Ping(ign string, reply *string) error {
if dS.statS != nil {
if err := dS.statS.Call(utils.StatSv1Ping, ign, reply); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<DispatcherS> error: %s StatS.", err.Error()))
}
}
return nil
}

View File

@@ -629,6 +629,8 @@ const (
MetaBalancer = "*balancer"
MetaOrdered = "*ordered"
MetaCircular = "*circular"
ThresholdSv1 = "ThresholdSv1"
StatSv1 = "StatSv1"
)
// MetaFilterIndexesAPIs