Added dataManager and tenant in tpes

This commit is contained in:
porosnicuadrian
2022-03-08 13:58:53 +02:00
committed by Dan Christian Bogos
parent 0ecfc20283
commit bdc124be1f
10 changed files with 46 additions and 24 deletions

View File

@@ -1584,7 +1584,7 @@ func TestLoadersLoad(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaderService(cfg, dm, fltrs, nil)
ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil)
lSv1 := NewLoaderSv1(ldrS)
args := &loaders.ArgsProcessFolder{

View File

@@ -32,7 +32,7 @@ func TestLoadersNewLoaderSv1(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaderService(cfg, dm, fltrs, nil)
ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil)
exp := &LoaderSv1{
ldrS: ldrS,

View File

@@ -32,7 +32,7 @@ import (
"github.com/cgrates/ltcache"
)
func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager,
func NewLoaders(cfg *config.CGRConfig, dm *engine.DataManager,
filterS *engine.FilterS,
connMgr *engine.ConnManager) (ldrS *LoaderS) {
ldrS = &LoaderS{cfg: cfg, cache: make(map[string]*ltcache.Cache)}

View File

@@ -48,7 +48,7 @@ func TestNewLoaderService(t *testing.T) {
for k, cfg := range cfg.LoaderCfg()[0].Cache {
cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil)
}
ld := NewLoaderService(cfg, dm, fS, cM)
ld := NewLoaders(cfg, dm, fS, cM)
if exp := (&LoaderS{
cfg: cfg,
cache: cache,
@@ -146,7 +146,7 @@ func TestLoaderServiceV1Run(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
ld := NewLoaders(cfg, dm, fS, cM)
var rply string
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
APIOpts: map[string]interface{}{
@@ -222,7 +222,7 @@ func TestLoaderServiceV1RunErrors(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
ld := NewLoaders(cfg, dm, fS, cM)
var rply string
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"
@@ -340,7 +340,7 @@ func TestLoaderServiceV1ImportZip(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
ld := NewLoaders(cfg, dm, fS, cM)
var rply string
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
@@ -396,7 +396,7 @@ func TestLoaderServiceV1ImportZipErrors(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
ld := NewLoaders(cfg, dm, fS, cM)
var rply string
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"

View File

@@ -250,7 +250,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.server, iRateSCh, cgr.anzS, cgr.srvDep),
NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep),
NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.server, cgr.srvDep),
NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep),
)
return

View File

@@ -83,7 +83,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaderService(ldrs.cfg, datadb, filterS, ldrs.connMgr)
ldrs.ldrs = loaders.NewLoaders(ldrs.cfg, datadb, filterS, ldrs.connMgr)
if !ldrs.ldrs.Enabled() {
return

View File

@@ -62,7 +62,7 @@ func TestLoaderSCoverage(t *testing.T) {
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
srv.ldrs = loaders.NewLoaderService(cfg, &engine.DataManager{},
srv.ldrs = loaders.NewLoaders(cfg, &engine.DataManager{},
&engine.FilterS{}, nil)
if !srv.IsRunning() {
t.Errorf("Expected service to be running")

View File

@@ -33,11 +33,12 @@ import (
)
// NewTPeService is the constructor for the TpeService
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager,
func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService,
server *cores.Server, srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &TPeService{
cfg: cfg,
srvDep: srvDep,
dm: dm,
connMgr: connMgr,
server: server,
}
@@ -51,6 +52,7 @@ type TPeService struct {
server *cores.Server
connMgr *engine.ConnManager
tpes *tpes.TPeS
dm *DataDBService
srv *birpc.Service
stopChan chan struct{}
@@ -59,7 +61,12 @@ type TPeService struct {
// Start should handle the service start
func (tpSrv *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, tpSrv.connMgr)
var datadb *engine.DataManager
if datadb, err = tpSrv.dm.WaitForDM(ctx); err != nil {
return
}
tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, datadb, tpSrv.connMgr)
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS))
tpSrv.stopChan = make(chan struct{})
tpSrv.srv, _ = birpc.NewService(apis.NewTPeSv1(tpSrv.tpes), utils.EmptyString, false)

View File

@@ -20,25 +20,27 @@ package tpes
import (
"context"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewTPeS(cfg *config.CGRConfig, cm *engine.ConnManager) (tpE *TPeS) {
func NewTPeS(cfg *config.CGRConfig, dm *engine.DataManager, cm *engine.ConnManager) (tpE *TPeS) {
tpE = &TPeS{
cfg: cfg,
connMgr: cm,
dm: dm,
exps: make(map[string]tpExporter),
}
/*
for expType := range tpExporterTypes {
if tpE[expType], err = newTPExporter(expType, dm); err != nil {
return nil, err
}
var err error
for expType := range tpExporterTypes {
if tpE.exps[expType], err = newTPExporter(expType, dm); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> cannot create exporter of type <%s>", utils.TPeS, expType))
}
*/
}
return
}
@@ -46,6 +48,7 @@ func NewTPeS(cfg *config.CGRConfig, cm *engine.ConnManager) (tpE *TPeS) {
type TPeS struct {
cfg *config.CGRConfig
connMgr *engine.ConnManager
dm *engine.DataManager
fltr *engine.FilterS
exps map[string]tpExporter
}

View File

@@ -23,18 +23,30 @@ import (
"github.com/cgrates/cgrates/utils"
)
var tpExporterTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaResources, utils.MetaFilters, utils.MetaStats,
utils.MetaThresholds, utils.MetaRoutes, utils.MetaChargers, utils.MetaDispatchers, utils.MetaDispatcherHosts,
utils.MetaRateProfiles, utils.MetaActions, utils.MetaAccounts})
var tpExporterTypes = utils.NewStringSet([]string{
utils.MetaAttributes,
utils.MetaResources,
utils.MetaFilters,
utils.MetaStats,
utils.MetaThresholds,
utils.MetaRoutes,
utils.MetaChargers,
utils.MetaDispatchers,
utils.MetaDispatcherHosts,
utils.MetaRateProfiles,
utils.MetaActions,
utils.MetaAccounts})
// tpExporter is the interface implementing exports of tariff plan items
type tpExporter interface {
exportItems(itmIDs []string) (expContent []byte, err error)
exportItems(tnt string, itmIDs []string) (expContent []byte, err error)
}
// newTPExporter constructs tpExporters
func newTPExporter(expType string, dm *engine.DataManager) (tpE tpExporter, err error) {
switch expType {
case utils.MetaAttributes:
return //newTPAttributes()
default:
return nil, utils.ErrPrefix(utils.ErrUnsupportedTPExporterType, expType)
}