Added tpes infrastructure and ping method

This commit is contained in:
porosnicuadrian
2022-03-02 10:55:29 +02:00
committed by Dan Christian Bogos
parent c7092cc0df
commit 4de3b13c12
13 changed files with 393 additions and 18 deletions

29
apis/tpes.go Normal file
View File

@@ -0,0 +1,29 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"github.com/cgrates/cgrates/tpes"
)
func NewTpeSv1(tpes *tpes.TpeS) *TpeSv1 {
return &TpeSv1{tpes: tpes}
}
type TpeSv1 struct {
tpes *tpes.TpeS
ping
}

120
apis/tpes_it_test.go Normal file
View File

@@ -0,0 +1,120 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apis
import (
"path"
"testing"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
tpesCfgPath string
tpesCfg *config.CGRConfig
tpeSRPC *birpc.Client
tpeSConfigDIR string //run tests for specific configuration
sTestTpes = []func(t *testing.T){
testTpeSInitCfg,
testTpeSInitDataDb,
testTpeSResetStorDb,
testTpeSStartEngine,
testTpeSRPCConn,
testTpeSPing,
testTpeSKillEngine,
}
)
func TestTpeSIT(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
tpeSConfigDIR = "tutinternal"
case utils.MetaMongo:
tpeSConfigDIR = "tutmongo"
case utils.MetaMySQL:
tpeSConfigDIR = "tutmysql"
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range sTestTpes {
t.Run(tpeSConfigDIR, stest)
}
}
func testTpeSInitCfg(t *testing.T) {
var err error
tpesCfgPath = path.Join(*dataDir, "conf", "samples", tpeSConfigDIR)
tpesCfg, err = config.NewCGRConfigFromPath(context.Background(), tpesCfgPath)
if err != nil {
t.Error(err)
}
}
func testTpeSInitDataDb(t *testing.T) {
if err := engine.InitDataDB(tpesCfg); err != nil {
t.Fatal(err)
}
}
func testTpeSResetStorDb(t *testing.T) {
if err := engine.InitStorDB(tpesCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func testTpeSStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(tpesCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testTpeSPing(t *testing.T) {
var reply string
if err := tpeSRPC.Call(context.Background(), utils.TpeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Unexpected reply returned: %s", reply)
}
}
func testTpeSRPCConn(t *testing.T) {
var err error
tpeSRPC, err = newRPCClient(tpesCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
}
//Kill the engine when it is about to be finished
func testTpeSKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -203,6 +203,7 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
MaxItems: []*utils.DynamicIntPointerOpt{},
Usage: []*utils.DynamicDecimalBigOpt{},
}},
tpeSCfg: new(TpeSCfg),
sureTaxCfg: new(SureTaxCfg),
dispatcherSCfg: new(DispatcherSCfg),
registrarCCfg: &RegistrarCCfgs{
@@ -352,6 +353,7 @@ type CGRConfig struct {
apiBanCfg *APIBanCfg // APIBan config
coreSCfg *CoreSCfg // CoreS config
accountSCfg *AccountSCfg // AccountS config
tpeSCfg *TpeSCfg // TpeS config
configDBCfg *ConfigDBCfg // ConfigDB conifg
cacheDP utils.MapStorage
@@ -687,6 +689,13 @@ func (cfg *CGRConfig) AccountSCfg() *AccountSCfg {
return cfg.accountSCfg
}
// TpeSCfg reads the TpeS configuration
func (cfg *CGRConfig) TpeSCfg() *TpeSCfg {
cfg.Lock(TpeSJSON)
defer cfg.Unlock(TpeSJSON)
return cfg.tpeSCfg
}
// SIPAgentCfg reads the Apier configuration
func (cfg *CGRConfig) SIPAgentCfg() *SIPAgentCfg {
cfg.lks[SIPAgentJSON].Lock()
@@ -1037,6 +1046,7 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) {
coreSCfg: cfg.coreSCfg.Clone(),
actionSCfg: cfg.actionSCfg.Clone(),
accountSCfg: cfg.accountSCfg.Clone(),
tpeSCfg: cfg.tpeSCfg.Clone(),
configDBCfg: cfg.configDBCfg.Clone(),
rldCh: make(chan string),
cacheDP: make(utils.MapStorage),

View File

@@ -1822,5 +1822,9 @@ const CGRATES_CFG_JSON = `
}
},
"tpes": {
"enabled": false,
},
}`

View File

@@ -45,6 +45,7 @@ const (
ResourceSJSON = "resources"
StatSJSON = "stats"
ThresholdSJSON = "thresholds"
TpeSJSON = "tpes"
RouteSJSON = "routes"
LoaderSJSON = "loaders"
SureTaxJSON = "suretax"
@@ -104,6 +105,7 @@ var (
AccountSJSON: utils.AccountS,
ActionSJSON: utils.ActionS,
CoreSJSON: utils.CoreS,
TpeSJSON: utils.TpeS,
RPCConnsJSON: RPCConnsJSON,
}
)
@@ -192,6 +194,7 @@ func newSections(cfg *CGRConfig) Sections {
cfg.apiBanCfg,
cfg.configDBCfg,
cfg.sureTaxCfg,
cfg.tpeSCfg,
}
}

69
config/tpes.go Normal file
View File

@@ -0,0 +1,69 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
type TpeSCfg struct {
Enabled bool
}
type TpeSCfgJson struct {
Enabled *bool
}
func (TpeSCfg) SName() string { return TpeSJSON }
func (tp *TpeSCfg) Load(ctx *context.Context, db ConfigDB, _ *CGRConfig) (err error) {
jsn := new(TpeSCfgJson)
if err = db.GetSection(ctx, tp.SName(), jsn); err != nil {
return
}
if jsn.Enabled != nil {
tp.Enabled = *jsn.Enabled
}
return
}
func (tp TpeSCfg) AsMapInterface(string) interface{} {
return map[string]interface{}{
utils.EnabledCfg: tp.Enabled,
}
}
func (tp TpeSCfg) CloneSection() Section {
return tp.Clone()
}
func (tp TpeSCfg) Clone() (tpCln *TpeSCfg) {
return &TpeSCfg{
Enabled: tp.Enabled,
}
}
func diffTpeSCfgJson(d *TpeSCfgJson, v1, v2 *TpeSCfg) *TpeSCfgJson {
if d == nil {
d = new(TpeSCfgJson)
}
if v1.Enabled != v2.Enabled {
d.Enabled = utils.BoolPointer(v2.Enabled)
}
return d
}

View File

@@ -127,4 +127,8 @@
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -140,6 +140,9 @@
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -118,5 +118,8 @@
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -79,6 +79,7 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
utils.ThresholdS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
utils.AccountS: new(sync.WaitGroup),
utils.TpeS: new(sync.WaitGroup),
},
iFilterSCh: make(chan *engine.FilterS, 1),
}
@@ -159,6 +160,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
iRateSCh := make(chan birpc.ClientConnector, 1)
iActionSCh := make(chan birpc.ClientConnector, 1)
iAccountSCh := make(chan birpc.ClientConnector, 1)
iTpeSCh := make(chan birpc.ClientConnector, 1)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
@@ -184,6 +186,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), utils.DispatcherSv1, cgr.iDispatcherSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), utils.AccountSv1, iAccountSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TpeSv1, iTpeSCh)
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep)
@@ -247,6 +250,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.server, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTpeService(cgr.cfg, cgr.cM, cgr.server, cgr.srvDep),
)
return

95
services/tpes.go Normal file
View File

@@ -0,0 +1,95 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/apis"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/tpes"
"github.com/cgrates/cgrates/utils"
)
// NewTpeService is the constructor for the TpeService
func NewTpeService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
server *cores.Server, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TpeService{
cfg: cfg,
srvDep: srvDep,
connMgr: connMgr,
server: server,
}
}
// TypeService implements Service interface
type TpeService struct {
sync.RWMutex
cfg *config.CGRConfig
server *cores.Server
connMgr *engine.ConnManager
tpes *tpes.TpeS
srv *birpc.Service
stopChan chan struct{}
srvDep map[string]*sync.WaitGroup
}
// Start should handle the service start
func (tpSrv *TpeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
tpSrv.tpes = tpes.NewTpeS(tpSrv.cfg, tpSrv.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TpeS))
tpSrv.stopChan = make(chan struct{})
tpSrv.srv, _ = birpc.NewService(apis.NewTpeSv1(tpSrv.tpes), utils.EmptyString, false)
tpSrv.server.RpcRegister(tpSrv.srv)
return
}
// Reload handles the change of config
func (tpSrv *TpeService) Reload(*context.Context, context.CancelFunc) (err error) {
return
}
// Shutdown stops the service
func (tpSrv *TpeService) Shutdown() (err error) {
tpSrv.srv = nil
close(tpSrv.stopChan)
return
}
// IsRunning returns if the service is running
func (tpSrv *TpeService) IsRunning() bool {
tpSrv.Lock()
defer tpSrv.Unlock()
return tpSrv != nil && tpSrv.tpes != nil
}
// ServiceName returns the service name
func (tpSrv *TpeService) ServiceName() string {
return utils.TpeS
}
// ShouldRun returns if the service should be running
func (tpSrv *TpeService) ShouldRun() bool {
return tpSrv.cfg.TpeSCfg().Enabled
}

35
tpes/tpes.go Normal file
View File

@@ -0,0 +1,35 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package tpes
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
func NewTpeS(cfg *config.CGRConfig, cm *engine.ConnManager) *TpeS {
return &TpeS{
cfg: cfg,
connMgr: cm,
}
}
// TpeS is managing the TariffPlanExporter
type TpeS struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
fltr *engine.FilterS
}

View File

@@ -370,6 +370,7 @@ const (
MetaServiceManager = "*servicemanager"
MetaChargers = "*chargers"
MetaConfig = "*config"
MetaTpes = "*tpes"
MetaDispatchers = "*dispatchers"
MetaDispatcherHosts = "*dispatcher_hosts"
MetaFilters = "*filters"
@@ -921,6 +922,7 @@ const (
RegistrarC = "RegistrarC"
LoaderS = "LoaderS"
ChargerS = "ChargerS"
TpeS = "TpeS"
CacheS = "CacheS"
AnalyzerS = "AnalyzerS"
CDRServer = "CDRServer"
@@ -1250,24 +1252,12 @@ const (
// APIerSv1GetLoadIDs = "APIerSv1.GetLoadIDs"
// APIerSv1GetLoadTimes = "APIerSv1.GetLoadTimes"
AdminSv1GetAttributeProfilesCount = "AdminSv1.GetAttributeProfilesCount"
// APIerSv1GetTPActionProfile = "APIerSv1.GetTPActionProfile"
// APIerSv1SetTPActionProfile = "APIerSv1.SetTPActionProfile"
// APIerSv1GetTPActionProfileIDs = "APIerSv1.GetTPActionProfileIDs"
// APIerSv1RemoveTPActionProfile = "APIerSv1.RemoveTPActionProfile"
// APIerSv1GetTPRateProfile = "APIerSv1.GetTPRateProfile"
// APIerSv1SetTPRateProfile = "APIerSv1.SetTPRateProfile"
// APIerSv1GetTPRateProfileIds = "APIerSv1.GetTPRateProfileIds"
// APIerSv1RemoveTPRateProfile = "APIerSv1.RemoveTPRateProfile"
AdminSv1SetAccount = "AdminSv1.SetAccount"
AdminSv1GetAccount = "AdminSv1.GetAccount"
AdminSv1GetAccounts = "AdminSv1.GetAccounts"
AdminSv1GetAccountIDs = "AdminSv1.GetAccountIDs"
AdminSv1RemoveAccount = "AdminSv1.RemoveAccount"
AdminSv1GetAccountsCount = "AdminSv1.GetAccountsCount"
// APIerSv1GetTPAccountIDs = "APIerSv1.GetTPAccountIDs"
// APIerSv1GetTPAccount = "APIerSv1.GetTPAccount"
// APIerSv1SetTPAccount = "APIerSv1.SetTPAccount"
// APIerSv1RemoveTPAccount = "APIerSv1.RemoveTPAccount"
AdminSv1SetAccount = "AdminSv1.SetAccount"
AdminSv1GetAccount = "AdminSv1.GetAccount"
AdminSv1GetAccounts = "AdminSv1.GetAccounts"
AdminSv1GetAccountIDs = "AdminSv1.GetAccountIDs"
AdminSv1RemoveAccount = "AdminSv1.RemoveAccount"
AdminSv1GetAccountsCount = "AdminSv1.GetAccountsCount"
)
// APIerSv1 TP APIs
@@ -1284,6 +1274,12 @@ const (
ServiceManagerV1Ping = "ServiceManagerV1.Ping"
)
// TpeSv1 APIs
const (
TpeSv1 = "TpeSv1"
TpeSv1Ping = "TpeSv1.Ping"
)
// ConfigSv1 APIs
const (
ConfigS = "ConfigS"