diff --git a/apier/v1/actionprofiles.go b/apier/v1/actionprofiles.go index 0bfbf539c..bcbcec66a 100644 --- a/apier/v1/actionprofiles.go +++ b/apier/v1/actionprofiles.go @@ -21,6 +21,8 @@ package v1 import ( "time" + "github.com/cgrates/cgrates/actions" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -139,3 +141,25 @@ func (apierSv1 *APIerSv1) RemoveActionProfile(arg *utils.TenantIDWithCache, repl *reply = utils.OK return nil } + +// NewActionSv1 initializes ActionSv1 +func NewActionSv1(aS *actions.ActionS) *ActionSv1 { + return &ActionSv1{aS: aS} +} + +// ActionSv1 exports RPC from RLs +type ActionSv1 struct { + aS *actions.ActionS +} + +// Call implements rpcclient.ClientConnector interface for internal RPC +func (aSv1 *ActionSv1) Call(serviceMethod string, + args interface{}, reply interface{}) error { + return utils.APIerRPCCall(aSv1, serviceMethod, args, reply) +} + +// Ping return pong if the service is active +func (aSv1 *ActionSv1) Ping(ign *utils.CGREvent, reply *string) error { + *reply = utils.Pong + return nil +} diff --git a/apier/v1/actionprofiles_it_test.go b/apier/v1/actionprofiles_it_test.go index d2025e254..e697e1dfe 100644 --- a/apier/v1/actionprofiles_it_test.go +++ b/apier/v1/actionprofiles_it_test.go @@ -48,6 +48,7 @@ var ( testActionSRPCConn, testActionSLoadFromFolder, testActionSGetActionProfile, + testActionSPing, testActionSKillEngine, } ) @@ -179,6 +180,15 @@ func testActionSGetActionProfile(t *testing.T) { } } +func testActionSPing(t *testing.T) { + var resp string + if err := actSRPC.Call(utils.ActionSv1Ping, new(utils.CGREvent), &resp); err != nil { + t.Error(err) + } else if resp != utils.Pong { + t.Error("Unexpected reply returned", resp) + } +} + func testActionSKillEngine(t *testing.T) { if err := engine.KillEngine(100); err != nil { t.Error(err) diff --git a/apier/v1/dispatcher.go b/apier/v1/dispatcher.go index ee4e902ae..8d9ee0af3 100755 --- a/apier/v1/dispatcher.go +++ b/apier/v1/dispatcher.go @@ -1299,3 +1299,17 @@ type DispatcherRateSv1 struct { func (dR *DispatcherRateSv1) Ping(args *utils.CGREventWithOpts, reply *string) error { return dR.dR.RateSv1Ping(args, reply) } + +func NewDispatcherActionSv1(dps *dispatchers.DispatcherService) *DispatcherActionSv1 { + return &DispatcherActionSv1{dR: dps} +} + +// Exports RPC from RLs +type DispatcherActionSv1 struct { + dR *dispatchers.DispatcherService +} + +// Ping implements RateSv1Ping +func (dR *DispatcherActionSv1) Ping(args *utils.CGREventWithOpts, reply *string) error { + return dR.dR.ActionSv1Ping(args, reply) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d26975d0b..286642807 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -144,7 +144,7 @@ func startRPC(server *cores.Server, internalRaterChan, internalAttrSChan, internalChargerSChan, internalThdSChan, internalSuplSChan, internalSMGChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsv1Chan, internalCacheSChan, - internalEEsChan, internalRateSChan chan rpcclient.ClientConnector, + internalEEsChan, internalRateSChan, internalActionSChan chan rpcclient.ClientConnector, shdChan *utils.SyncedChan) { if !cfg.DispatcherSCfg().Enabled { select { // Any of the rpc methods will unlock listening to rpc requests @@ -178,6 +178,8 @@ func startRPC(server *cores.Server, internalRaterChan, internalEEsChan <- eeS case rateS := <-internalRateSChan: internalRateSChan <- rateS + case actionS := <-internalActionSChan: + internalActionSChan <- actionS case <-shdChan.Done(): return } @@ -495,6 +497,7 @@ func main() { internalLoaderSChan := make(chan rpcclient.ClientConnector, 1) internalEEsChan := make(chan rpcclient.ClientConnector, 1) internalRateSChan := make(chan rpcclient.ClientConnector, 1) + internalActionSChan := make(chan rpcclient.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -520,6 +523,7 @@ func main() { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRALs): internalRALsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEEs): internalEEsChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaRateS): internalRateSChan, + utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions): internalActionSChan, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers): internalDispatcherSChan, }) srvDep := map[string]*sync.WaitGroup{ @@ -700,6 +704,7 @@ func main() { engine.IntRPC.AddInternalRPCClient(utils.CoreSv1, internalCoreSv1Chan) engine.IntRPC.AddInternalRPCClient(utils.RALsV1, internalRALsChan) engine.IntRPC.AddInternalRPCClient(utils.RateSv1, internalRateSChan) + engine.IntRPC.AddInternalRPCClient(utils.ActionSv1, internalActionSChan) engine.IntRPC.AddInternalRPCClient(utils.EeSv1, internalEEsChan) engine.IntRPC.AddInternalRPCClient(utils.DispatcherSv1, internalDispatcherSChan) @@ -715,7 +720,7 @@ func main() { internalAttributeSChan, internalChargerSChan, internalThresholdSChan, internalRouteSChan, internalSessionSChan, internalAnalyzerSChan, internalDispatcherSChan, internalLoaderSChan, internalRALsChan, - internalCacheSChan, internalEEsChan, internalRateSChan, shdChan) + internalCacheSChan, internalEEsChan, internalRateSChan, internalActionSChan, shdChan) <-shdChan.Done() shtdDone := make(chan struct{}) diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json index 57d8c07ce..2928b145b 100644 --- a/data/conf/samples/tutinternal/cgrates.json +++ b/data/conf/samples/tutinternal/cgrates.json @@ -5,44 +5,44 @@ "general": { "log_level": 7, - "reply_timeout": "50s", + "reply_timeout": "50s" }, "listen": { "rpc_json": ":2012", "rpc_gob": ":2013", - "http": ":2080", + "http": ":2080" }, "data_db": { - "db_type": "*internal", + "db_type": "*internal" }, "stor_db": { - "db_type": "*internal", + "db_type": "*internal" }, "rals": { "enabled": true, "thresholds_conns": ["*internal"], - "max_increments":3000000, + "max_increments":3000000 }, "schedulers": { "enabled": true, "cdrs_conns": ["*internal"], - "stats_conns": ["*localhost"], + "stats_conns": ["*localhost"] }, "cdrs": { "enabled": true, - "chargers_conns":["*internal"], + "chargers_conns":["*internal"] }, @@ -56,7 +56,7 @@ "chargers": { "enabled": true, - "attributes_conns": ["*internal"], + "attributes_conns": ["*internal"] }, @@ -70,12 +70,12 @@ "stats": { "enabled": true, "store_interval": "-1", - "thresholds_conns": ["*internal"], + "thresholds_conns": ["*internal"] }, "thresholds": { "enabled": true, - "store_interval": "-1", + "store_interval": "-1" }, @@ -84,7 +84,7 @@ "prefix_indexed_fields":["*req.Destination"], "stats_conns": ["*internal"], "resources_conns": ["*internal"], - "rals_conns": ["*internal"], + "rals_conns": ["*internal"] }, @@ -95,13 +95,13 @@ "attributes_conns": ["*internal"], "rals_conns": ["*internal"], "cdrs_conns": ["*internal"], - "chargers_conns": ["*internal"], + "chargers_conns": ["*internal"] }, "apiers": { "enabled": true, - "scheduler_conns": ["*internal"], + "scheduler_conns": ["*internal"] }, @@ -110,8 +110,13 @@ }, -"filters": { - "apiers_conns": ["*internal"], +"actions": { + "enabled": true }, + +"filters": { + "apiers_conns": ["*internal"] +} + } diff --git a/data/conf/samples/tutmongo/cgrates.json b/data/conf/samples/tutmongo/cgrates.json index c581f037e..6542a1189 100644 --- a/data/conf/samples/tutmongo/cgrates.json +++ b/data/conf/samples/tutmongo/cgrates.json @@ -120,6 +120,11 @@ }, +"actions": { + "enabled": true +}, + + "filters": { "apiers_conns": ["*internal"], }, diff --git a/data/conf/samples/tutmysql/cgrates.json b/data/conf/samples/tutmysql/cgrates.json index df473622f..267827a14 100644 --- a/data/conf/samples/tutmysql/cgrates.json +++ b/data/conf/samples/tutmysql/cgrates.json @@ -116,8 +116,14 @@ "enabled": true }, + +"actions": { + "enabled": true +}, + "filters": { "apiers_conns": ["*internal"], }, + } diff --git a/dispatchers/actions.go b/dispatchers/actions.go new file mode 100644 index 000000000..5f79d9776 --- /dev/null +++ b/dispatchers/actions.go @@ -0,0 +1,35 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers + +import "github.com/cgrates/cgrates/utils" + +func (dS *DispatcherService) ActionSv1Ping(args *utils.CGREventWithOpts, rpl *string) (err error) { + if args == nil { + args = new(utils.CGREventWithOpts) + } + args.CGREvent.Tenant = utils.FirstNonEmpty(args.CGREvent.Tenant, dS.cfg.GeneralCfg().DefaultTenant) + if len(dS.cfg.DispatcherSCfg().AttributeSConns) != 0 { + if err = dS.authorize(utils.ActionSv1Ping, args.CGREvent.Tenant, + utils.IfaceAsString(args.Opts[utils.OptsAPIKey]), args.CGREvent.Time); err != nil { + return + } + } + return dS.Dispatch(args, utils.ActionS, utils.ActionSv1Ping, args, rpl) +} diff --git a/dispatchers/actions_it_test.go b/dispatchers/actions_it_test.go new file mode 100644 index 000000000..20b0034f7 --- /dev/null +++ b/dispatchers/actions_it_test.go @@ -0,0 +1,21 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package dispatchers diff --git a/services/actions.go b/services/actions.go new file mode 100644 index 000000000..ba8045e2b --- /dev/null +++ b/services/actions.go @@ -0,0 +1,128 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/actions" + + v1 "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewActionService returns the Action Service +func NewActionService(cfg *config.CGRConfig, dm *DataDBService, + cacheS *engine.CacheS, filterSChan chan *engine.FilterS, + server *cores.Server, internalChan chan rpcclient.ClientConnector, + anz *AnalyzerService) servmanager.Service { + return &ActionService{ + connChan: internalChan, + cfg: cfg, + dm: dm, + cacheS: cacheS, + filterSChan: filterSChan, + server: server, + anz: anz, + } +} + +// ActionService implements Service interface +type ActionService struct { + sync.RWMutex + cfg *config.CGRConfig + dm *DataDBService + cacheS *engine.CacheS + filterSChan chan *engine.FilterS + server *cores.Server + + acts *actions.ActionS + rpc *v1.ActionSv1 // useful on restart + connChan chan rpcclient.ClientConnector // publish the internal Subsystem when available + anz *AnalyzerService +} + +// Start should handle the sercive start +func (acts *ActionService) Start() (err error) { + if acts.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + + <-acts.cacheS.GetPrecacheChannel(utils.CacheActionProfiles) + <-acts.cacheS.GetPrecacheChannel(utils.CacheActionProfilesFilterIndexes) + + filterS := <-acts.filterSChan + acts.filterSChan <- filterS + dbchan := acts.dm.GetDMChan() + datadb := <-dbchan + dbchan <- datadb + + acts.Lock() + defer acts.Unlock() + acts.acts = actions.NewActionS(acts.cfg, filterS, datadb) + + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.AttributeS)) + acts.rpc = v1.NewActionSv1(acts.acts) + if !acts.cfg.DispatcherSCfg().Enabled { + acts.server.RpcRegister(acts.rpc) + } + acts.connChan <- acts.anz.GetInternalCodec(acts.rpc, utils.AttributeS) + return +} + +// Reload handles the change of config +func (acts *ActionService) Reload() (err error) { + return // for the moment nothing to reload +} + +// Shutdown stops the service +func (acts *ActionService) Shutdown() (err error) { + acts.Lock() + defer acts.Unlock() + if err = acts.acts.Shutdown(); err != nil { + return + } + acts.acts = nil + acts.rpc = nil + <-acts.connChan + return +} + +// IsRunning returns if the service is running +func (acts *ActionService) IsRunning() bool { + acts.RLock() + defer acts.RUnlock() + return acts != nil && acts.acts != nil +} + +// ServiceName returns the service name +func (acts *ActionService) ServiceName() string { + return utils.ActionS +} + +// ShouldRun returns if the service should be running +func (acts *ActionService) ShouldRun() bool { + return acts.cfg.ActionSCfg().Enabled +} diff --git a/services/dispatchers.go b/services/dispatchers.go index 66ca78365..c90516191 100644 --- a/services/dispatchers.go +++ b/services/dispatchers.go @@ -152,6 +152,9 @@ func (dspS *DispatcherService) Start() (err error) { dspS.server.RpcRegisterName(utils.RateSv1, v1.NewDispatcherRateSv1(dspS.dspS)) + dspS.server.RpcRegisterName(utils.ActionSv1, + v1.NewDispatcherActionSv1(dspS.dspS)) + dspS.connChan <- dspS.anz.GetInternalCodec(dspS.dspS, utils.DispatcherS) return diff --git a/utils/consts.go b/utils/consts.go index 8313b40c0..c57e75942 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -1487,6 +1487,11 @@ const ( RateSv1Ping = "RateSv1.Ping" ) +const ( + ActionSv1 = "ActionSv1" + ActionSv1Ping = "ActionSv1.Ping" +) + const ( CoreS = "CoreS" CoreSv1 = "CoreSv1"