Active Dispatcher Service

This commit is contained in:
TeoV
2018-04-11 05:22:31 -04:00
committed by Dan Christian Bogos
parent a5f77d5a8b
commit ee90c1f0d1
6 changed files with 155 additions and 13 deletions

44
apier/v1/dispatcher.go Executable file
View File

@@ -0,0 +1,44 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v1
import (
"github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/utils"
)
func NewDispatcherSv1(dps *dispatcher.DispatcherService) *DispatcherSv1 {
return &DispatcherSv1{dpsS: dps}
}
// Exports RPC from RLs
type DispatcherSv1 struct {
dpsS *dispatcher.DispatcherService
}
// Call implements rpcclient.RpcClientConnection interface for internal RPC
func (dpsS *DispatcherSv1) Call(serviceMethod string,
args interface{}, reply interface{}) error {
return utils.APIerRPCCall(dpsS, serviceMethod, args, reply)
}
func (dpsS *DispatcherSv1) Ping(ign string, reply *string) error {
*reply = utils.Pong
return nil
}

View File

@@ -33,6 +33,7 @@ import (
"github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/cdrc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/dispatcher"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/scheduler"
@@ -696,7 +697,6 @@ func startFilterService(filterSChan chan *engine.FilterS, cacheS *engine.CacheS,
dm *engine.DataManager, exitChan chan bool) {
<-cacheS.GetPrecacheChannel(utils.CacheFilters)
filterSChan <- engine.NewFilterS(cfg, internalStatSChan, dm)
}
// loaderService will start and register APIs for LoaderService if enabled
@@ -710,6 +710,31 @@ func loaderService(cacheS *engine.CacheS, cfg *config.CGRConfig,
server.RpcRegister(v1.NewLoaderSv1(ldrS))
}
// startDispatcherService fires up the DispatcherS
func startDispatcherService(internalDispatcherSChan chan rpcclient.RpcClientConnection,
cacheS *engine.CacheS, dm *engine.DataManager,
server *utils.Server, exitChan chan bool) {
<-cacheS.GetPrecacheChannel(utils.CacheAttributeProfiles)
dspS, err := dispatcher.NewDispatcherService(dm)
if err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Could not init, error: %s", utils.DispatcherS, err.Error()))
exitChan <- true
return
}
go func() {
if err := dspS.ListenAndServe(exitChan); err != nil {
utils.Logger.Crit(fmt.Sprintf("<%s> Error: %s listening for packets", utils.DispatcherS, err.Error()))
}
dspS.Shutdown()
exitChan <- true
return
}()
dspSv1 := v1.NewDispatcherSv1(dspS)
server.RpcRegister(dspSv1)
internalDispatcherSChan <- dspSv1
}
func startRpc(server *utils.Server, internalRaterChan,
internalCdrSChan, internalCdrStatSChan, internalPubSubSChan, internalUserSChan,
internalAliaseSChan, internalRsChan, internalStatSChan, internalSMGChan chan rpcclient.RpcClientConnection) {
@@ -900,6 +925,7 @@ func main() {
internalThresholdSChan := make(chan rpcclient.RpcClientConnection, 1)
internalSupplierSChan := make(chan rpcclient.RpcClientConnection, 1)
filterSChan := make(chan *engine.FilterS, 1)
internalDispatcherSChan := make(chan rpcclient.RpcClientConnection, 1)
// Start ServiceManager
srvManager := servmanager.NewServiceManager(cfg, dm, exitChan, cacheS)
@@ -1008,11 +1034,8 @@ func main() {
}
if cfg.DispatcherSCfg().Enabled {
/*
go startDispatcherService(internalSupplierSChan, cacheS,
internalRsChan, internalStatSChan,
cfg, dm, server, exitChan, filterSChan)
*/
go startDispatcherService(internalDispatcherSChan, cacheS,
dm, server, exitChan)
}
go loaderService(cacheS, cfg, dm, server, exitChan)

View File

@@ -1439,3 +1439,7 @@ func (cfg *CGRConfig) CacheCfg() CacheConfig {
func (cfg *CGRConfig) LoaderCfg() []*LoaderConfig {
return cfg.loaderCfg
}
func (cfg *CGRConfig) DispatcherSCfg() *DispatcherSCfg {
return cfg.dispatcherSCfg
}

62
console/dispatcher_ping.go Executable file
View File

@@ -0,0 +1,62 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import "github.com/cgrates/cgrates/utils"
func init() {
c := &CmdDispatcherPing{
name: "dispatcher_ping",
rpcMethod: utils.DispatcherSv1Ping,
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type CmdDispatcherPing struct {
name string
rpcMethod string
rpcParams *EmptyWrapper
*CommandExecuter
}
func (self *CmdDispatcherPing) Name() string {
return self.name
}
func (self *CmdDispatcherPing) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdDispatcherPing) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &EmptyWrapper{}
}
return self.rpcParams
}
func (self *CmdDispatcherPing) PostprocessRpcParams() error {
return nil
}
func (self *CmdDispatcherPing) RpcResult() interface{} {
var s string
return &s
}

View File

@@ -19,17 +19,21 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package dispatcher
import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// NewDispatcherService initializes a DispatcherService
func NewDispatcherService() (dspS *DispatcherService, err error) {
dspS = &DispatcherService{}
return
func NewDispatcherService(dm *engine.DataManager) (*DispatcherService, error) {
return &DispatcherService{dm: dm}, nil
}
// DispatcherService is the service handling dispatcher
type DispatcherService struct {
dm *engine.DataManager
}
// ListenAndServe will initialize the service

View File

@@ -614,7 +614,7 @@ const (
AttributeSv1Ping = "AttributeSv1.Ping"
)
//ThresholdS APIs
// ThresholdS APIs
const (
ThresholdSv1ProcessEvent = "ThresholdSv1.ProcessEvent"
ThresholdSv1GetThreshold = "ThresholdSv1.GetThreshold"
@@ -622,7 +622,7 @@ const (
ThresholdSv1Ping = "ThresholdSv1.Ping"
)
//StatS APIs
// StatS APIs
const (
StatSv1ProcessEvent = "StatSv1.ProcessEvent"
StatSv1GetQueueIDs = "StatSv1.GetQueueIDs"
@@ -630,7 +630,7 @@ const (
StatSv1Ping = "StatSv1.Ping"
)
//ResourceS APIs
// ResourceS APIs
const (
ResourceSv1AuthorizeResources = "ResourceSv1.AuthorizeResources"
ResourceSv1GetResourcesForEvent = "ResourceSv1.GetResourcesForEvent"
@@ -639,7 +639,7 @@ const (
ResourceSv1Ping = "ResourceSv1.Ping"
)
//SessionS APIs
// SessionS APIs
const (
SessionSv1AuthorizeEvent = "SessionSv1.AuthorizeEvent"
SessionSv1AuthorizeEventWithDigest = "SessionSv1.AuthorizeEventWithDigest"
@@ -657,6 +657,11 @@ const (
SessionSv1Ping = "SessionSv1.Ping"
)
// DispatcherS APIs
const (
DispatcherSv1Ping = "DispatcherSv1.Ping"
)
// Cache
const (
CacheSv1GetCacheStats = "CacheSv1.GetCacheStats"