diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go new file mode 100755 index 000000000..838cd6cae --- /dev/null +++ b/apier/v1/dispatcher.go @@ -0,0 +1,44 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package v1 + +import ( + "github.com/cgrates/cgrates/dispatcher" + "github.com/cgrates/cgrates/utils" +) + +func NewDispatcherSv1(dps *dispatcher.DispatcherService) *DispatcherSv1 { + return &DispatcherSv1{dpsS: dps} +} + +// Exports RPC from RLs +type DispatcherSv1 struct { + dpsS *dispatcher.DispatcherService +} + +// Call implements rpcclient.RpcClientConnection interface for internal RPC +func (dpsS *DispatcherSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(dpsS, serviceMethod, args, reply) +} + +func (dpsS *DispatcherSv1) Ping(ign string, reply *string) error { + *reply = utils.Pong + return nil +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 3261a8556..a75dec9cc 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -33,6 +33,7 @@ import ( "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/cdrc" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/dispatcher" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/scheduler" @@ -696,7 +697,6 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS, dm *engine.DataManager, exitChan chan bool) { <-cacheS.GetPrecacheChannel(utils.CacheFilters) filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm) - } // loaderService will start and register APIs for LoaderService if enabled @@ -710,6 +710,31 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig, server.RpcRegister(v1.NewLoaderSv1(ldrS)) } +// startDispatcherService fires up the DispatcherS +func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection, + cacheS *engine.CacheS, dm *engine.DataManager, + server *utils.Server, exitChan chan bool) { + <-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles) + + dspS, err := dispatcher.NewDispatcherService(dm) + if err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error())) + exitChan <- true + return + } + go func() { + if err := dspS.ListenAndServe(exitChan); err != nil { + utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.DispatcherS, err.Error())) + } + dspS.Shutdown() + exitChan <- true + return + }() + dspSv1 := v1.NewDispatcherSv1(dspS) + server.RpcRegister(dspSv1) + internalDispatcherSChan <- dspSv1 +} + func startRpc(server *utils.Server, internalRaterChan, internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan, internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) { @@ -900,6 +925,7 @@ func main() { internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1) internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1) filterSChan := make(chan *engine.FilterS, 1) + internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1) // Start ServiceManager srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS) @@ -1008,11 +1034,8 @@ func main() { } if cfg.DispatcherSCfg().Enabled { - /* - go startDispatcherService(internalSupplierSChan, cacheS, - internalRsChan, internalStatSChan, - cfg, dm, server, exitChan, filterSChan) - */ + go startDispatcherService(internalDispatcherSChan, cacheS, + dm, server, exitChan) } go loaderService(cacheS, cfg, dm, server, exitChan) diff --git a/config/config.go b/config/config.go index 947933415..6fa7896b6 100755 --- a/config/config.go +++ b/config/config.go @@ -1439,3 +1439,7 @@ func (cfg *CGRConfig) CacheCfg() CacheConfig { func (cfg *CGRConfig) LoaderCfg() []*LoaderConfig { return cfg.loaderCfg } + +func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg { + return cfg.dispatcherSCfg +} diff --git a/console/dispatcher_ping.go b/console/dispatcher_ping.go new file mode 100755 index 000000000..ff4ec9140 --- /dev/null +++ b/console/dispatcher_ping.go @@ -0,0 +1,62 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package console + +import "github.com/cgrates/cgrates/utils" + +func init() { + c := &CmdDispatcherPing{ + name: "dispatcher_ping", + rpcMethod: utils.DispatcherSv1Ping, + } + commands[c.Name()] = c + c.CommandExecuter = &CommandExecuter{c} +} + +// Commander implementation +type CmdDispatcherPing struct { + name string + rpcMethod string + rpcParams *EmptyWrapper + *CommandExecuter +} + +func (self *CmdDispatcherPing) Name() string { + return self.name +} + +func (self *CmdDispatcherPing) RpcMethod() string { + return self.rpcMethod +} + +func (self *CmdDispatcherPing) RpcParams(reset bool) interface{} { + if reset || self.rpcParams == nil { + self.rpcParams = &EmptyWrapper{} + } + return self.rpcParams +} + +func (self *CmdDispatcherPing) PostprocessRpcParams() error { + return nil +} + +func (self *CmdDispatcherPing) RpcResult() interface{} { + var s string + return &s +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 79d409330..400b4675d 100755 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -19,17 +19,21 @@ along with this program. If not, see package dispatcher import ( + "fmt" + + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // NewDispatcherService initializes a DispatcherService -func NewDispatcherService() (dspS *DispatcherService, err error) { - dspS = &DispatcherService{} - return +func NewDispatcherService(dm *engine.DataManager) (*DispatcherService, error) { + return &DispatcherService{dm: dm}, nil + } // DispatcherService is the service handling dispatcher type DispatcherService struct { + dm *engine.DataManager } // ListenAndServe will initialize the service diff --git a/utils/consts.go b/utils/consts.go index 79f4c74d8..3b95c3678 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -614,7 +614,7 @@ const ( AttributeSv1Ping = "AttributeSv1.Ping" ) -//ThresholdS APIs +// ThresholdS APIs const ( ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent" ThresholdSv1GetThreshold = "ThresholdSv1.GetThreshold" @@ -622,7 +622,7 @@ const ( ThresholdSv1Ping = "ThresholdSv1.Ping" ) -//StatS APIs +// StatS APIs const ( StatSv1ProcessEvent = "StatSv1.ProcessEvent" StatSv1GetQueueIDs = "StatSv1.GetQueueIDs" @@ -630,7 +630,7 @@ const ( StatSv1Ping = "StatSv1.Ping" ) -//ResourceS APIs +// ResourceS APIs const ( ResourceSv1AuthorizeResources = "ResourceSv1.AuthorizeResources" ResourceSv1GetResourcesForEvent = "ResourceSv1.GetResourcesForEvent" @@ -639,7 +639,7 @@ const ( ResourceSv1Ping = "ResourceSv1.Ping" ) -//SessionS APIs +// SessionS APIs const ( SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent" SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest" @@ -657,6 +657,11 @@ const ( SessionSv1Ping = "SessionSv1.Ping" ) +// DispatcherS APIs +const ( + DispatcherSv1Ping = "DispatcherSv1.Ping" +) + // Cache const ( CacheSv1GetCacheStats = "CacheSv1.GetCacheStats"