TPExporter

This commit is contained in:
porosnicuadrian
2022-03-10 17:04:35 +02:00
committed by Dan Christian Bogos
parent 2ef3689f44
commit 1e202feb5b
18 changed files with 347 additions and 74 deletions

View File

@@ -44,7 +44,6 @@ var (
sTestsAlsPrf = []func(t *testing.T){
testAttributeSInitCfg,
testAttributeSInitDataDb,
testAttributeSStartEngine,
testAttributeSRPCConn,
testGetAttributeProfileBeforeSet,

View File

@@ -1577,7 +1577,7 @@ func TestLoadersLoad(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil)
ldrS := loaders.NewLoaderS(cfg, dm, fltrs, nil)
lSv1 := NewLoaderSv1(ldrS)
args := &loaders.ArgsProcessFolder{

View File

@@ -32,7 +32,7 @@ func TestLoadersNewLoaderSv1(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaders(cfg, dm, fltrs, nil)
ldrS := loaders.NewLoaderS(cfg, dm, fltrs, nil)
exp := &LoaderSv1{
ldrS: ldrS,

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package apis
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/tpes"
)
@@ -30,3 +31,7 @@ type TPeSv1 struct {
tpes *tpes.TPeS
ping
}
func (tpE *TPeSv1) ExportTariffPlan(ctx *context.Context, args *tpes.ArgsExportTP, reply *[]byte) error {
return tpE.tpes.V1ExportTariffPlan(ctx, args, reply)
}

View File

@@ -29,6 +29,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/tpes"
"github.com/cgrates/cgrates/utils"
)
@@ -39,17 +40,17 @@ var (
tpeSConfigDIR string //run tests for specific configuration
sTestTpes = []func(t *testing.T){
testTpeSInitCfg,
testTpeSInitDataDb,
testTpeSStartEngine,
testTpeSRPCConn,
testTpeSPing,
testTpeSKillEngine,
testTPeSInitCfg,
testTPeSInitDataDb,
testTPeSStartEngine,
testTPeSRPCConn,
testTPeSPing,
testTPeSExportTariffPlan,
testTPeSKillEngine,
}
)
func TestTpeSIT(t *testing.T) {
func TestTPeSIT(t *testing.T) {
switch *dbType {
case utils.MetaInternal:
tpeSConfigDIR = "tutinternal"
@@ -67,7 +68,7 @@ func TestTpeSIT(t *testing.T) {
}
}
func testTpeSInitCfg(t *testing.T) {
func testTPeSInitCfg(t *testing.T) {
var err error
tpesCfgPath = path.Join(*dataDir, "conf", "samples", tpeSConfigDIR)
tpesCfg, err = config.NewCGRConfigFromPath(context.Background(), tpesCfgPath)
@@ -76,29 +77,161 @@ func testTpeSInitCfg(t *testing.T) {
}
}
func testTpeSInitDataDb(t *testing.T) {
func testTPeSInitDataDb(t *testing.T) {
if err := engine.InitDataDB(tpesCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func testTpeSStartEngine(t *testing.T) {
func testTPeSStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(tpesCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testTpeSPing(t *testing.T) {
func testTPeSPing(t *testing.T) {
var reply string
if err := tpeSRPC.Call(context.Background(), utils.TpeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
if err := tpeSRPC.Call(context.Background(), utils.TPeSv1Ping, &utils.CGREvent{}, &reply); err != nil {
t.Error(err)
} else if reply != utils.Pong {
t.Errorf("Unexpected reply returned: %s", reply)
}
}
func testTpeSRPCConn(t *testing.T) {
func testTPeSExportTariffPlan(t *testing.T) {
attrPrf := &engine.APIAttributeProfileWithAPIOpts{
APIAttributeProfile: &engine.APIAttributeProfile{
Tenant: utils.CGRateSorg,
ID: "TEST_ATTRIBUTES_IT_TEST",
FilterIDs: []string{"*string:~*req.Account:1002", "*exists:~*opts.*usage:"},
Attributes: []*engine.ExternalAttribute{
{
Path: utils.AccountField,
Type: utils.MetaConstant,
Value: "1002",
},
{
Path: "*tenant",
Type: utils.MetaConstant,
Value: "cgrates.itsyscom",
},
},
Weights: utils.DynamicWeights{
{
Weight: 20,
},
},
},
}
var reply string
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetAttributeProfile,
attrPrf, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Error(err)
}
attrPrf1 := &engine.APIAttributeProfileWithAPIOpts{
APIAttributeProfile: &engine.APIAttributeProfile{
Tenant: utils.CGRateSorg,
ID: "TEST_ATTRIBUTES_IT_TEST_SECOND",
FilterIDs: []string{"*string:~*opts.*context:*sessions", "*exists:~*opts.*usage:"},
Attributes: []*engine.ExternalAttribute{
{
Path: "*tenant",
Type: utils.MetaConstant,
Value: "cgrates.itsyscom",
},
},
},
}
var reply1 string
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetAttributeProfile,
attrPrf1, &reply1); err != nil {
t.Error(err)
} else if reply1 != utils.OK {
t.Error(err)
}
rsPrf1 := &engine.ResourceProfileWithAPIOpts{
ResourceProfile: &engine.ResourceProfile{
Tenant: "cgrates.org",
ID: "ResGroup1",
FilterIDs: []string{"*string:~*req.Account:1001"},
Limit: 10,
AllocationMessage: "Approved",
Weights: utils.DynamicWeights{
{
Weight: 20,
}},
ThresholdIDs: []string{utils.MetaNone},
},
}
var replystr string
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetResourceProfile,
rsPrf1, &replystr); err != nil {
t.Error(err)
} else if replystr != utils.OK {
t.Error("Unexpected reply returned", replystr)
}
rsPrf2 := &engine.ResourceProfileWithAPIOpts{
ResourceProfile: &engine.ResourceProfile{
Tenant: "cgrates.org",
ID: "ResGroup2",
FilterIDs: []string{"*string:~*req.Account:1001"},
Limit: 10,
AllocationMessage: "Approved",
Weights: utils.DynamicWeights{
{
Weight: 10,
}},
ThresholdIDs: []string{utils.MetaNone},
},
}
if err := tpeSRPC.Call(context.Background(), utils.AdminSv1SetResourceProfile,
rsPrf2, &replystr); err != nil {
t.Error(err)
} else if replystr != utils.OK {
t.Error("Unexpected reply returned", replystr)
}
var replyBts []byte
if err := tpeSRPC.Call(context.Background(), utils.TPeSv1ExportTariffPlan, &tpes.ArgsExportTP{
Tenant: "cgrates.org",
ExportItems: map[string][]string{
utils.MetaAttributes: {"TEST_ATTRIBUTES_IT_TEST", "TEST_ATTRIBUTES_IT_TEST_SECOND"},
utils.MetaResources: {"ResGroup1", "ResGroup2"},
},
}, &replyBts); err != nil {
t.Error(err)
} /*
t.Errorf("received: %v", string(replyBts))
rdr, err := zip.NewReader(bytes.NewReader(replyBts), int64(len(reply)))
if err != nil {
t.Error(err)
}
csvRply := make([][]string, 6)
for _, f := range rdr.File {
rc, err := f.Open()
if err != nil {
t.Fatal(err)
}
info := csv.NewReader(rc)
//info.FieldsPerRecord = -1
csvRply, err = info.ReadAll()
if err != nil {
t.Error(err)
}
rc.Close()
}
t.Errorf("received: %v", utils.ToJSON(csvRply))
*/
}
func testTPeSRPCConn(t *testing.T) {
var err error
tpeSRPC, err = newRPCClient(tpesCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
@@ -107,7 +240,7 @@ func testTpeSRPCConn(t *testing.T) {
}
//Kill the engine when it is about to be finished
func testTpeSKillEngine(t *testing.T) {
func testTPeSKillEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}

View File

@@ -17,5 +17,9 @@
"admins": {
"enabled": true,
},
"tpes": {
"enabled": true,
}
}

View File

@@ -21,10 +21,6 @@
"db_port": 27017,
},
"rals": {
"enabled": true,
"thresholds_conns": ["*internal"],

View File

@@ -41,7 +41,6 @@ var (
reloadTests = []func(t *testing.T){
testReloadITCreateCdrDirs,
testReloadITInitConfig,
testReloadITInitCdrDb,
testReloadITResetDataDb,
testReloadITStartEngine,
testReloadITRpcConn,

View File

@@ -32,7 +32,7 @@ import (
"github.com/cgrates/ltcache"
)
func NewLoaders(cfg *config.CGRConfig, dm *engine.DataManager,
func NewLoaderS(cfg *config.CGRConfig, dm *engine.DataManager,
filterS *engine.FilterS,
connMgr *engine.ConnManager) (ldrS *LoaderS) {
ldrS = &LoaderS{cfg: cfg, cache: make(map[string]*ltcache.Cache)}

View File

@@ -48,7 +48,7 @@ func TestNewLoaderService(t *testing.T) {
for k, cfg := range cfg.LoaderCfg()[0].Cache {
cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil)
}
ld := NewLoaders(cfg, dm, fS, cM)
ld := NewLoaderS(cfg, dm, fS, cM)
if exp := (&LoaderS{
cfg: cfg,
cache: cache,
@@ -146,7 +146,7 @@ func TestLoaderServiceV1Run(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaders(cfg, dm, fS, cM)
ld := NewLoaderS(cfg, dm, fS, cM)
var rply string
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
APIOpts: map[string]interface{}{
@@ -222,7 +222,7 @@ func TestLoaderServiceV1RunErrors(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaders(cfg, dm, fS, cM)
ld := NewLoaderS(cfg, dm, fS, cM)
var rply string
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"
@@ -340,7 +340,7 @@ func TestLoaderServiceV1ImportZip(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaders(cfg, dm, fS, cM)
ld := NewLoaderS(cfg, dm, fS, cM)
var rply string
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
@@ -396,7 +396,7 @@ func TestLoaderServiceV1ImportZipErrors(t *testing.T) {
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaders(cfg, dm, fS, cM)
ld := NewLoaderS(cfg, dm, fS, cM)
var rply string
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"

View File

@@ -184,7 +184,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaDispatchers), utils.DispatcherSv1, cgr.iDispatcherSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaAccounts), utils.AccountSv1, iAccountSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TpeSv1, iTpeSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh)
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep)

View File

@@ -83,7 +83,7 @@ func (ldrs *LoaderService) Start(ctx *context.Context, _ context.CancelFunc) (er
ldrs.Lock()
defer ldrs.Unlock()
ldrs.ldrs = loaders.NewLoaders(ldrs.cfg, datadb, filterS, ldrs.connMgr)
ldrs.ldrs = loaders.NewLoaderS(ldrs.cfg, datadb, filterS, ldrs.connMgr)
if !ldrs.ldrs.Enabled() {
return

View File

@@ -62,7 +62,7 @@ func TestLoaderSCoverage(t *testing.T) {
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
srv.ldrs = loaders.NewLoaders(cfg, &engine.DataManager{},
srv.ldrs = loaders.NewLoaderS(cfg, &engine.DataManager{},
&engine.FilterS{}, nil)
if !srv.IsRunning() {
t.Errorf("Expected service to be running")

View File

@@ -42,9 +42,11 @@ func newTPAttributes(dm *engine.DataManager) *TPAttributes {
// exportItems for TPAttributes will implement the imethod for tpExporter interface
func (tpAttr TPAttributes) exportItems(ctx *context.Context, tnt string, itmIDs []string) (expContent []byte, err error) {
//attrBts := make(map[string][]byte)
expContent = make([]byte, len(itmIDs))
for _, attrID := range itmIDs {
attrPrf, err := tpAttr.dm.GetAttributeProfile(ctx, tnt, attrID, true, true, utils.NonTransactional)
var attrPrf *engine.AttributeProfile
attrPrf, err = tpAttr.dm.GetAttributeProfile(ctx, tnt, attrID, true, true, utils.NonTransactional)
if err != nil {
if err.Error() == utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> cannot find AttributeProfile with id: <%v>", utils.TPeS, attrID))
@@ -52,29 +54,28 @@ func (tpAttr TPAttributes) exportItems(ctx *context.Context, tnt string, itmIDs
}
return nil, err
}
attrMdl := engine.APItoModelTPAttribute(engine.AttributeProfileToAPI(attrPrf))
if err := writeOut(utils.AttributesCsv, attrMdl); err != nil {
return nil, err
if len(attrMdl) == 0 {
return
}
// for every profile, convert it into model to be writable in csv format
buff := new(bytes.Buffer) // the info will be stored into a buffer
csvWriter := csv.NewWriter(buff)
csvWriter.Comma = utils.CSVSep
for _, tpItem := range attrMdl {
// transform every record into a []string
record, err := engine.CsvDump(tpItem)
if err != nil {
return nil, err
}
// record is a line of a csv file
if err := csvWriter.Write(record); err != nil {
return nil, err
}
}
csvWriter.Flush()
// append our bytes stored in buffer for every profile
expContent = append(expContent, buff.Bytes()...)
}
return
}
func writeOut(fileName string, tpData engine.AttributeMdls) error {
buff := new(bytes.Buffer)
csvWriter := csv.NewWriter(buff)
for _, tpItem := range tpData {
record, err := engine.CsvDump(tpItem)
if err != nil {
return err
}
if err := csvWriter.Write(record); err != nil {
return err
}
}
return nil
}

76
tpes/tpe_resources.go Normal file
View File

@@ -0,0 +1,76 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package tpes
import (
"bytes"
"encoding/csv"
"fmt"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
type TPResources struct {
dm *engine.DataManager
}
// newTPResources is the constructor for TPResources
func newTPResources(dm *engine.DataManager) *TPResources {
return &TPResources{
dm: dm,
}
}
// exportItems for TPResources will implement the imethod for tpExporter interface
func (tpAttr TPResources) exportItems(ctx *context.Context, tnt string, itmIDs []string) (expContent []byte, err error) {
expContent = make([]byte, len(itmIDs))
for _, attrID := range itmIDs {
resPrf, err := tpAttr.dm.GetResourceProfile(ctx, tnt, attrID, true, true, utils.NonTransactional)
if err != nil {
if err.Error() == utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> cannot find ResourceProfile with id: <%v>", utils.TPeS, attrID))
continue
}
return nil, err
}
resMdl := engine.APItoModelResource(engine.ResourceProfileToAPI(resPrf))
// for every profile, convert it into model to be writable in csv format
buff := new(bytes.Buffer) // the info will be stored into a buffer
csvWriter := csv.NewWriter(buff)
csvWriter.Comma = utils.CSVSep
for _, tpItem := range resMdl {
record, err := engine.CsvDump(tpItem)
if err != nil {
return nil, err
}
// record is a line of a csv file
if err := csvWriter.Write(record); err != nil {
return nil, err
}
}
csvWriter.Flush()
// append our bytes stored in buffer for every profile
expContent = append(expContent, buff.Bytes()...)
}
return
}

View File

@@ -19,7 +19,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package tpes
import (
"archive/zip"
"bytes"
"fmt"
"io"
"time"
"github.com/cgrates/birpc/context"
@@ -61,7 +65,7 @@ type ArgsExportTP struct {
}
// V1ExportTariffPlan is the API executed to export tariff plan items
func (tpE *TPeS) V1ExportTariffPlan(ctx *context.Context, args *ArgsExportTP, reply map[string][]byte) (err error) {
func (tpE *TPeS) V1ExportTariffPlan(ctx *context.Context, args *ArgsExportTP, reply *[]byte) (err error) {
if args.Tenant == utils.EmptyString {
args.Tenant = tpE.cfg.GeneralCfg().DefaultTenant
}
@@ -71,13 +75,31 @@ func (tpE *TPeS) V1ExportTariffPlan(ctx *context.Context, args *ArgsExportTP, re
}
}
expotedItems := make(map[string][]byte, len(tpE.exps))
// code to export to zip comes here
buff := new(bytes.Buffer)
zBuff := zip.NewWriter(buff)
for expType, expItms := range args.ExportItems {
if expotedItems[expType], err = tpE.exps[expType].exportItems(ctx, args.Tenant, expItms); err != nil {
var wrtr io.Writer
//here we will create all the header for each subsystem type for the csv
if wrtr, err = zBuff.CreateHeader(&zip.FileHeader{
Method: zip.Deflate, // to be compressed
Name: getFileName(expType),
Modified: time.Now(),
}); err != nil {
return
}
var expBts []byte
// expBts will containt the bytes with all profiles in CSV format
if expBts, err = tpE.exps[expType].exportItems(ctx, args.Tenant, expItms); err != nil {
return utils.NewErrServerError(err)
}
// write all the bytes into our zip
if _, err = wrtr.Write(expBts); err != nil {
return
}
}
reply = expotedItems
if err = zBuff.Close(); err != nil {
return err
}
*reply = buff.Bytes()
return
}

View File

@@ -26,17 +26,18 @@ import (
var tpExporterTypes = utils.NewStringSet([]string{
utils.MetaAttributes,
/* utils.MetaResources,
utils.MetaFilters,
utils.MetaStats,
utils.MetaThresholds,
utils.MetaRoutes,
utils.MetaChargers,
utils.MetaDispatchers,
utils.MetaDispatcherHosts,
utils.MetaRateProfiles,
utils.MetaActions,
utils.MetaAccounts */})
utils.MetaResources,
/*
utils.MetaFilters,
utils.MetaStats,
utils.MetaThresholds,
utils.MetaRoutes,
utils.MetaChargers,
utils.MetaDispatchers,
utils.MetaDispatcherHosts,
utils.MetaRateProfiles,
utils.MetaActions,
utils.MetaAccounts */})
// tpExporter is the interface implementing exports of tariff plan items
type tpExporter interface {
@@ -48,7 +49,43 @@ func newTPExporter(expType string, dm *engine.DataManager) (tpE tpExporter, err
switch expType {
case utils.MetaAttributes:
return newTPAttributes(dm), nil
case utils.MetaResources:
return newTPResources(dm), nil
default:
return nil, utils.ErrPrefix(utils.ErrUnsupportedTPExporterType, expType)
}
}
func getFileName(subsystem string) string {
switch subsystem {
case utils.MetaAttributes:
return utils.AttributesCsv
case utils.MetaResources:
return utils.ResourcesCsv
default:
return utils.EmptyString
}
}
/*
func writeOut(tpData []interface{}) (csvBts []byte, err error) {
if len(tpData) == 0 {
return
}
buff := new(bytes.Buffer)
csvWriter := csv.NewWriter(buff)
csvWriter.Comma = utils.CSVSep
for _, tpItem := range tpData {
record, err := engine.CsvDump(tpItem)
if err != nil {
return nil, err
}
if err := csvWriter.Write(record); err != nil {
return nil, err
}
}
csvWriter.Flush()
return buff.Bytes(), nil
}
*/

View File

@@ -1251,10 +1251,11 @@ const (
ServiceManagerV1Ping = "ServiceManagerV1.Ping"
)
// TpeSv1 APIs
// TPeSv1 APIs
const (
TpeSv1 = "TpeSv1"
TpeSv1Ping = "TpeSv1.Ping"
TPeSv1 = "TPeSv1"
TPeSv1Ping = "TPeSv1.Ping"
TPeSv1ExportTariffPlan = "TPeSv1.ExportTariffPlan"
)
// ConfigSv1 APIs