diff --git a/apis/loaders_it_test.go b/apis/loaders_it_test.go index 96631170a..faa8b51c9 100644 --- a/apis/loaders_it_test.go +++ b/apis/loaders_it_test.go @@ -1584,7 +1584,7 @@ func TestLoadersLoad(t *testing.T) { data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) fltrs := engine.NewFilterS(cfg, nil, dm) - ldrS := loaders.NewLoaderService(cfg, dm, fltrs, nil) + ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil) lSv1 := NewLoaderSv1(ldrS) args := &loaders.ArgsProcessFolder{ diff --git a/apis/loaders_test.go b/apis/loaders_test.go index 92dc01603..995b5bc03 100644 --- a/apis/loaders_test.go +++ b/apis/loaders_test.go @@ -32,7 +32,7 @@ func TestLoadersNewLoaderSv1(t *testing.T) { data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) fltrs := engine.NewFilterS(cfg, nil, dm) - ldrS := loaders.NewLoaderService(cfg, dm, fltrs, nil) + ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil) exp := &LoaderSv1{ ldrS: ldrS, diff --git a/loaders/loaders.go b/loaders/loaders.go index 554e499c1..ea8759802 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -32,7 +32,7 @@ import ( "github.com/cgrates/ltcache" ) -func NewLoaderService(cfg *config.CGRConfig, dm *engine.DataManager, +func NewLoaders(cfg *config.CGRConfig, dm *engine.DataManager, filterS *engine.FilterS, connMgr *engine.ConnManager) (ldrS *LoaderS) { ldrS = &LoaderS{cfg: cfg, cache: make(map[string]*ltcache.Cache)} diff --git a/loaders/loaders_test.go b/loaders/loaders_test.go index 0b24cb860..6471b836a 100644 --- a/loaders/loaders_test.go +++ b/loaders/loaders_test.go @@ -48,7 +48,7 @@ func TestNewLoaderService(t *testing.T) { for k, cfg := range cfg.LoaderCfg()[0].Cache { cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil) } - ld := NewLoaderService(cfg, dm, fS, cM) + ld := NewLoaders(cfg, dm, fS, cM) if exp := (&LoaderS{ cfg: cfg, cache: cache, @@ -146,7 +146,7 @@ func TestLoaderServiceV1Run(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) fS := engine.NewFilterS(cfg, cM, dm) - ld := NewLoaderService(cfg, dm, fS, cM) + ld := NewLoaders(cfg, dm, fS, cM) var rply string if err := ld.V1Run(context.Background(), &ArgsProcessFolder{ APIOpts: map[string]interface{}{ @@ -222,7 +222,7 @@ func TestLoaderServiceV1RunErrors(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) fS := engine.NewFilterS(cfg, cM, dm) - ld := NewLoaderService(cfg, dm, fS, cM) + ld := NewLoaders(cfg, dm, fS, cM) var rply string expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>" @@ -340,7 +340,7 @@ func TestLoaderServiceV1ImportZip(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) fS := engine.NewFilterS(cfg, cM, dm) - ld := NewLoaderService(cfg, dm, fS, cM) + ld := NewLoaders(cfg, dm, fS, cM) var rply string if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ Data: buf.Bytes(), @@ -396,7 +396,7 @@ func TestLoaderServiceV1ImportZipErrors(t *testing.T) { dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) fS := engine.NewFilterS(cfg, cM, dm) - ld := NewLoaderService(cfg, dm, fS, cM) + ld := NewLoaders(cfg, dm, fS, cM) var rply string expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>" diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 35769c9b9..8ac2324ed 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -250,7 +250,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr cgr.server, iRateSCh, cgr.anzS, cgr.srvDep), NewActionService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iActionSCh, cgr.anzS, cgr.srvDep), NewAccountService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.cM, cgr.server, iAccountSCh, cgr.anzS, cgr.srvDep), - NewTPeService(cgr.cfg, cgr.cM, cgr.server, cgr.srvDep), + NewTPeService(cgr.cfg, cgr.cM, cgr.dmS, cgr.server, cgr.srvDep), ) return diff --git a/services/loaders.go b/services/loaders.go index a94b0a020..8d30a145e 100644 --- a/services/loaders.go +++ b/services/loaders.go @@ -83,7 +83,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er ldrs.Lock() defer ldrs.Unlock() - ldrs.ldrs = loaders.NewLoaderService(ldrs.cfg, datadb, filterS, ldrs.connMgr) + ldrs.ldrs = loaders.NewLoaders(ldrs.cfg, datadb, filterS, ldrs.connMgr) if !ldrs.ldrs.Enabled() { return diff --git a/services/loaders_test.go b/services/loaders_test.go index 622515505..2b1df24c4 100644 --- a/services/loaders_test.go +++ b/services/loaders_test.go @@ -62,7 +62,7 @@ func TestLoaderSCoverage(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - srv.ldrs = loaders.NewLoaderService(cfg, &engine.DataManager{}, + srv.ldrs = loaders.NewLoaders(cfg, &engine.DataManager{}, &engine.FilterS{}, nil) if !srv.IsRunning() { t.Errorf("Expected service to be running") diff --git a/services/tpes.go b/services/tpes.go index 5a33b47be..bd795c21d 100644 --- a/services/tpes.go +++ b/services/tpes.go @@ -33,11 +33,12 @@ import ( ) // NewTPeService is the constructor for the TpeService -func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, +func NewTPeService(cfg *config.CGRConfig, connMgr *engine.ConnManager, dm *DataDBService, server *cores.Server, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &TPeService{ cfg: cfg, srvDep: srvDep, + dm: dm, connMgr: connMgr, server: server, } @@ -51,6 +52,7 @@ type TPeService struct { server *cores.Server connMgr *engine.ConnManager tpes *tpes.TPeS + dm *DataDBService srv *birpc.Service stopChan chan struct{} @@ -59,7 +61,12 @@ type TPeService struct { // Start should handle the service start func (tpSrv *TPeService) Start(ctx *context.Context, _ context.CancelFunc) (err error) { - tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, tpSrv.connMgr) + var datadb *engine.DataManager + if datadb, err = tpSrv.dm.WaitForDM(ctx); err != nil { + return + } + + tpSrv.tpes = tpes.NewTPeS(tpSrv.cfg, datadb, tpSrv.connMgr) utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.TPeS)) tpSrv.stopChan = make(chan struct{}) tpSrv.srv, _ = birpc.NewService(apis.NewTPeSv1(tpSrv.tpes), utils.EmptyString, false) diff --git a/tpes/tpes.go b/tpes/tpes.go index 3515dac5b..affa9ef0d 100644 --- a/tpes/tpes.go +++ b/tpes/tpes.go @@ -20,25 +20,27 @@ package tpes import ( "context" + "fmt" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewTPeS(cfg *config.CGRConfig, cm *engine.ConnManager) (tpE *TPeS) { +func NewTPeS(cfg *config.CGRConfig, dm *engine.DataManager, cm *engine.ConnManager) (tpE *TPeS) { tpE = &TPeS{ cfg: cfg, connMgr: cm, + dm: dm, exps: make(map[string]tpExporter), } - /* - for expType := range tpExporterTypes { - if tpE[expType], err = newTPExporter(expType, dm); err != nil { - return nil, err - } + + var err error + for expType := range tpExporterTypes { + if tpE.exps[expType], err = newTPExporter(expType, dm); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> cannot create exporter of type <%s>", utils.TPeS, expType)) } - */ + } return } @@ -46,6 +48,7 @@ func NewTPeS(cfg *config.CGRConfig, cm *engine.ConnManager) (tpE *TPeS) { type TPeS struct { cfg *config.CGRConfig connMgr *engine.ConnManager + dm *engine.DataManager fltr *engine.FilterS exps map[string]tpExporter } diff --git a/tpes/tpexporter.go b/tpes/tpexporter.go index 6ce11e96d..8d26adb58 100644 --- a/tpes/tpexporter.go +++ b/tpes/tpexporter.go @@ -23,18 +23,30 @@ import ( "github.com/cgrates/cgrates/utils" ) -var tpExporterTypes = utils.NewStringSet([]string{utils.MetaAttributes, utils.MetaResources, utils.MetaFilters, utils.MetaStats, - utils.MetaThresholds, utils.MetaRoutes, utils.MetaChargers, utils.MetaDispatchers, utils.MetaDispatcherHosts, - utils.MetaRateProfiles, utils.MetaActions, utils.MetaAccounts}) +var tpExporterTypes = utils.NewStringSet([]string{ + utils.MetaAttributes, + utils.MetaResources, + utils.MetaFilters, + utils.MetaStats, + utils.MetaThresholds, + utils.MetaRoutes, + utils.MetaChargers, + utils.MetaDispatchers, + utils.MetaDispatcherHosts, + utils.MetaRateProfiles, + utils.MetaActions, + utils.MetaAccounts}) // tpExporter is the interface implementing exports of tariff plan items type tpExporter interface { - exportItems(itmIDs []string) (expContent []byte, err error) + exportItems(tnt string, itmIDs []string) (expContent []byte, err error) } // newTPExporter constructs tpExporters func newTPExporter(expType string, dm *engine.DataManager) (tpE tpExporter, err error) { switch expType { + case utils.MetaAttributes: + return //newTPAttributes() default: return nil, utils.ErrPrefix(utils.ErrUnsupportedTPExporterType, expType) }