mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Add infrastructure for service
This commit is contained in:
committed by
Dan Christian Bogos
parent
530ad1e541
commit
a717fc4f3d
@@ -21,6 +21,8 @@ package v1
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/actions"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -139,3 +141,25 @@ func (apierSv1 *APIerSv1) RemoveActionProfile(arg *utils.TenantIDWithCache, repl
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewActionSv1 initializes ActionSv1
|
||||
func NewActionSv1(aS *actions.ActionS) *ActionSv1 {
|
||||
return &ActionSv1{aS: aS}
|
||||
}
|
||||
|
||||
// ActionSv1 exports RPC from RLs
|
||||
type ActionSv1 struct {
|
||||
aS *actions.ActionS
|
||||
}
|
||||
|
||||
// Call implements rpcclient.ClientConnector interface for internal RPC
|
||||
func (aSv1 *ActionSv1) Call(serviceMethod string,
|
||||
args interface{}, reply interface{}) error {
|
||||
return utils.APIerRPCCall(aSv1, serviceMethod, args, reply)
|
||||
}
|
||||
|
||||
// Ping return pong if the service is active
|
||||
func (aSv1 *ActionSv1) Ping(ign *utils.CGREvent, reply *string) error {
|
||||
*reply = utils.Pong
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -48,6 +48,7 @@ var (
|
||||
testActionSRPCConn,
|
||||
testActionSLoadFromFolder,
|
||||
testActionSGetActionProfile,
|
||||
testActionSPing,
|
||||
testActionSKillEngine,
|
||||
}
|
||||
)
|
||||
@@ -179,6 +180,15 @@ func testActionSGetActionProfile(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testActionSPing(t *testing.T) {
|
||||
var resp string
|
||||
if err := actSRPC.Call(utils.ActionSv1Ping, new(utils.CGREvent), &resp); err != nil {
|
||||
t.Error(err)
|
||||
} else if resp != utils.Pong {
|
||||
t.Error("Unexpected reply returned", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func testActionSKillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(100); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
@@ -1299,3 +1299,17 @@ type DispatcherRateSv1 struct {
|
||||
func (dR *DispatcherRateSv1) Ping(args *utils.CGREventWithOpts, reply *string) error {
|
||||
return dR.dR.RateSv1Ping(args, reply)
|
||||
}
|
||||
|
||||
func NewDispatcherActionSv1(dps *dispatchers.DispatcherService) *DispatcherActionSv1 {
|
||||
return &DispatcherActionSv1{dR: dps}
|
||||
}
|
||||
|
||||
// Exports RPC from RLs
|
||||
type DispatcherActionSv1 struct {
|
||||
dR *dispatchers.DispatcherService
|
||||
}
|
||||
|
||||
// Ping implements RateSv1Ping
|
||||
func (dR *DispatcherActionSv1) Ping(args *utils.CGREventWithOpts, reply *string) error {
|
||||
return dR.dR.ActionSv1Ping(args, reply)
|
||||
}
|
||||
|
||||
@@ -144,7 +144,7 @@ func startRPC(server *cores.Server, internalRaterChan,
|
||||
internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan,
|
||||
internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan,
|
||||
internalLoaderSChan, internalRALsv1Chan, internalCacheSChan,
|
||||
internalEEsChan, internalRateSChan chan rpcclient.ClientConnector,
|
||||
internalEEsChan, internalRateSChan, internalActionSChan chan rpcclient.ClientConnector,
|
||||
shdChan *utils.SyncedChan) {
|
||||
if !cfg.DispatcherSCfg().Enabled {
|
||||
select { // Any of the rpc methods will unlock listening to rpc requests
|
||||
@@ -178,6 +178,8 @@ func startRPC(server *cores.Server, internalRaterChan,
|
||||
internalEEsChan <- eeS
|
||||
case rateS := <-internalRateSChan:
|
||||
internalRateSChan <- rateS
|
||||
case actionS := <-internalActionSChan:
|
||||
internalActionSChan <- actionS
|
||||
case <-shdChan.Done():
|
||||
return
|
||||
}
|
||||
@@ -495,6 +497,7 @@ func main() {
|
||||
internalLoaderSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalEEsChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalRateSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
internalActionSChan := make(chan rpcclient.ClientConnector, 1)
|
||||
|
||||
// initialize the connManager before creating the DMService
|
||||
// because we need to pass the connection to it
|
||||
@@ -520,6 +523,7 @@ func main() {
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): internalActionSChan,
|
||||
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan,
|
||||
})
|
||||
srvDep := map[string]*sync.WaitGroup{
|
||||
@@ -700,6 +704,7 @@ func main() {
|
||||
engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan)
|
||||
engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan)
|
||||
|
||||
@@ -715,7 +720,7 @@ func main() {
|
||||
internalAttributeSChan, internalChargerSChan, internalThresholdSChan,
|
||||
internalRouteSChan, internalSessionSChan, internalAnalyzerSChan,
|
||||
internalDispatcherSChan, internalLoaderSChan, internalRALsChan,
|
||||
internalCacheSChan, internalEEsChan, internalRateSChan, shdChan)
|
||||
internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, shdChan)
|
||||
|
||||
<-shdChan.Done()
|
||||
shtdDone := make(chan struct{})
|
||||
|
||||
@@ -5,44 +5,44 @@
|
||||
|
||||
"general": {
|
||||
"log_level": 7,
|
||||
"reply_timeout": "50s",
|
||||
"reply_timeout": "50s"
|
||||
},
|
||||
|
||||
|
||||
"listen": {
|
||||
"rpc_json": ":2012",
|
||||
"rpc_gob": ":2013",
|
||||
"http": ":2080",
|
||||
"http": ":2080"
|
||||
},
|
||||
|
||||
|
||||
"data_db": {
|
||||
"db_type": "*internal",
|
||||
"db_type": "*internal"
|
||||
},
|
||||
|
||||
|
||||
"stor_db": {
|
||||
"db_type": "*internal",
|
||||
"db_type": "*internal"
|
||||
},
|
||||
|
||||
|
||||
"rals": {
|
||||
"enabled": true,
|
||||
"thresholds_conns": ["*internal"],
|
||||
"max_increments":3000000,
|
||||
"max_increments":3000000
|
||||
},
|
||||
|
||||
|
||||
"schedulers": {
|
||||
"enabled": true,
|
||||
"cdrs_conns": ["*internal"],
|
||||
"stats_conns": ["*localhost"],
|
||||
"stats_conns": ["*localhost"]
|
||||
},
|
||||
|
||||
|
||||
"cdrs": {
|
||||
"enabled": true,
|
||||
"chargers_conns":["*internal"],
|
||||
"chargers_conns":["*internal"]
|
||||
},
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@
|
||||
|
||||
"chargers": {
|
||||
"enabled": true,
|
||||
"attributes_conns": ["*internal"],
|
||||
"attributes_conns": ["*internal"]
|
||||
},
|
||||
|
||||
|
||||
@@ -70,12 +70,12 @@
|
||||
"stats": {
|
||||
"enabled": true,
|
||||
"store_interval": "-1",
|
||||
"thresholds_conns": ["*internal"],
|
||||
"thresholds_conns": ["*internal"]
|
||||
},
|
||||
|
||||
"thresholds": {
|
||||
"enabled": true,
|
||||
"store_interval": "-1",
|
||||
"store_interval": "-1"
|
||||
},
|
||||
|
||||
|
||||
@@ -84,7 +84,7 @@
|
||||
"prefix_indexed_fields":["*req.Destination"],
|
||||
"stats_conns": ["*internal"],
|
||||
"resources_conns": ["*internal"],
|
||||
"rals_conns": ["*internal"],
|
||||
"rals_conns": ["*internal"]
|
||||
},
|
||||
|
||||
|
||||
@@ -95,13 +95,13 @@
|
||||
"attributes_conns": ["*internal"],
|
||||
"rals_conns": ["*internal"],
|
||||
"cdrs_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"],
|
||||
"chargers_conns": ["*internal"]
|
||||
},
|
||||
|
||||
|
||||
"apiers": {
|
||||
"enabled": true,
|
||||
"scheduler_conns": ["*internal"],
|
||||
"scheduler_conns": ["*internal"]
|
||||
},
|
||||
|
||||
|
||||
@@ -110,8 +110,13 @@
|
||||
},
|
||||
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
"actions": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -120,6 +120,11 @@
|
||||
},
|
||||
|
||||
|
||||
"actions": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
},
|
||||
|
||||
@@ -116,8 +116,14 @@
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
|
||||
"actions": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"filters": {
|
||||
"apiers_conns": ["*internal"],
|
||||
},
|
||||
|
||||
|
||||
}
|
||||
|
||||
35
dispatchers/actions.go
Normal file
35
dispatchers/actions.go
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatchers
|
||||
|
||||
import "github.com/cgrates/cgrates/utils"
|
||||
|
||||
func (dS *DispatcherService) ActionSv1Ping(args *utils.CGREventWithOpts, rpl *string) (err error) {
|
||||
if args == nil {
|
||||
args = new(utils.CGREventWithOpts)
|
||||
}
|
||||
args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant)
|
||||
if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 {
|
||||
if err = dS.authorize(utils.ActionSv1Ping, args.CGREvent.Tenant,
|
||||
utils.IfaceAsString(args.Opts[utils.OptsAPIKey]), args.CGREvent.Time); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return dS.Dispatch(args, utils.ActionS, utils.ActionSv1Ping, args, rpl)
|
||||
}
|
||||
21
dispatchers/actions_it_test.go
Normal file
21
dispatchers/actions_it_test.go
Normal file
@@ -0,0 +1,21 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package dispatchers
|
||||
128
services/actions.go
Normal file
128
services/actions.go
Normal file
@@ -0,0 +1,128 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/actions"
|
||||
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/cores"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
// NewActionService returns the Action Service
|
||||
func NewActionService(cfg *config.CGRConfig, dm *DataDBService,
|
||||
cacheS *engine.CacheS, filterSChan chan *engine.FilterS,
|
||||
server *cores.Server, internalChan chan rpcclient.ClientConnector,
|
||||
anz *AnalyzerService) servmanager.Service {
|
||||
return &ActionService{
|
||||
connChan: internalChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
cacheS: cacheS,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
anz: anz,
|
||||
}
|
||||
}
|
||||
|
||||
// ActionService implements Service interface
|
||||
type ActionService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cacheS *engine.CacheS
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
|
||||
acts *actions.ActionS
|
||||
rpc *v1.ActionSv1 // useful on restart
|
||||
connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available
|
||||
anz *AnalyzerService
|
||||
}
|
||||
|
||||
// Start should handle the sercive start
|
||||
func (acts *ActionService) Start() (err error) {
|
||||
if acts.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
|
||||
<-acts.cacheS.GetPrecacheChannel(utils.CacheActionProfiles)
|
||||
<-acts.cacheS.GetPrecacheChannel(utils.CacheActionProfilesFilterIndexes)
|
||||
|
||||
filterS := <-acts.filterSChan
|
||||
acts.filterSChan <- filterS
|
||||
dbchan := acts.dm.GetDMChan()
|
||||
datadb := <-dbchan
|
||||
dbchan <- datadb
|
||||
|
||||
acts.Lock()
|
||||
defer acts.Unlock()
|
||||
acts.acts = actions.NewActionS(acts.cfg, filterS, datadb)
|
||||
|
||||
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS))
|
||||
acts.rpc = v1.NewActionSv1(acts.acts)
|
||||
if !acts.cfg.DispatcherSCfg().Enabled {
|
||||
acts.server.RpcRegister(acts.rpc)
|
||||
}
|
||||
acts.connChan <- acts.anz.GetInternalCodec(acts.rpc, utils.AttributeS)
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (acts *ActionService) Reload() (err error) {
|
||||
return // for the moment nothing to reload
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (acts *ActionService) Shutdown() (err error) {
|
||||
acts.Lock()
|
||||
defer acts.Unlock()
|
||||
if err = acts.acts.Shutdown(); err != nil {
|
||||
return
|
||||
}
|
||||
acts.acts = nil
|
||||
acts.rpc = nil
|
||||
<-acts.connChan
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (acts *ActionService) IsRunning() bool {
|
||||
acts.RLock()
|
||||
defer acts.RUnlock()
|
||||
return acts != nil && acts.acts != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (acts *ActionService) ServiceName() string {
|
||||
return utils.ActionS
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (acts *ActionService) ShouldRun() bool {
|
||||
return acts.cfg.ActionSCfg().Enabled
|
||||
}
|
||||
@@ -152,6 +152,9 @@ func (dspS *DispatcherService) Start() (err error) {
|
||||
dspS.server.RpcRegisterName(utils.RateSv1,
|
||||
v1.NewDispatcherRateSv1(dspS.dspS))
|
||||
|
||||
dspS.server.RpcRegisterName(utils.ActionSv1,
|
||||
v1.NewDispatcherActionSv1(dspS.dspS))
|
||||
|
||||
dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS)
|
||||
|
||||
return
|
||||
|
||||
@@ -1487,6 +1487,11 @@ const (
|
||||
RateSv1Ping = "RateSv1.Ping"
|
||||
)
|
||||
|
||||
const (
|
||||
ActionSv1 = "ActionSv1"
|
||||
ActionSv1Ping = "ActionSv1.Ping"
|
||||
)
|
||||
|
||||
const (
|
||||
CoreS = "CoreS"
|
||||
CoreSv1 = "CoreSv1"
|
||||
|
||||
Reference in New Issue
Block a user