Updated loaders

This commit is contained in:
Trial97
2021-11-01 13:56:51 +02:00
committed by Dan Christian Bogos
parent 876955f9e2
commit 7d0106e9c2
10 changed files with 169 additions and 323 deletions

View File

@@ -38,12 +38,10 @@ import (
)
var (
ldrDirPathIn string
ldrDirPathOut string
ldrCfgPath string
ldrCfg *config.CGRConfig
ldrRPC *birpc.Client
ldrConfigDIR string //run tests for specific configuration
ldrCfgPath string
ldrCfg *config.CGRConfig
ldrRPC *birpc.Client
ldrConfigDIR string //run tests for specific configuration
sTestsLdr = []func(t *testing.T){
testLoadersRemoveFolders,
@@ -72,7 +70,6 @@ var (
testLoadersGetThresholdProfile,
testLoadersRemove,
testLoadersGetAccountAfterRemove,
testLoadersGetActionProfileAfterRemove,
testLoadersGetAttributeProfileAfterRemove,
@@ -165,253 +162,101 @@ func testLoadersPing(t *testing.T) {
}
func testLoadersCreateFolders(t *testing.T) {
ldrDirPathIn = "/tmp/TestLoadersIT/in"
err := os.MkdirAll(ldrDirPathIn, 0755)
if err != nil {
t.Error(err)
}
ldrDirPathOut = "/tmp/TestLoadersIT/out"
err = os.MkdirAll(ldrDirPathOut, 0755)
if err != nil {
if err := os.MkdirAll("/tmp/TestLoadersIT/in", 0755); err != nil {
t.Error(err)
}
}
func testLoadersRemoveFolders(t *testing.T) {
err := os.RemoveAll("/tmp/TestLoadersIT")
if err != nil {
if err := os.RemoveAll("/tmp/TestLoadersIT/in"); err != nil {
t.Error(err)
}
}
func testLoadersWriteCSVs(t *testing.T) {
writeFile := func(fileName, data string) error {
csvAccounts, err := os.Create(path.Join(ldrCfg.LoaderCfg()[0].TpInDir, fileName))
if err != nil {
return err
}
defer csvAccounts.Close()
_, err = csvAccounts.WriteString(data)
if err != nil {
return err
}
return csvAccounts.Sync()
}
// Create and populate Accounts.csv
csvAccounts, err := os.Create(ldrDirPathIn + "/Accounts.csv")
if err != nil {
t.Error(err)
}
defer csvAccounts.Close()
data := `#Tenant,ID,FilterIDs,Weights,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs
cgrates.org,ACC_PRF,,;20,,MonetaryBalance,,;10,*concrete,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none`
_, err = csvAccounts.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvAccounts.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.AccountsCsv, `#Tenant,ID,FilterIDs,Weights,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs
cgrates.org,ACC_PRF,,;20,,MonetaryBalance,,;10,*concrete,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none`); err != nil {
t.Fatal(err)
}
// Create and populate Actions.csv
csvActions, err := os.Create(ldrDirPathIn + "/Actions.csv")
if err != nil {
t.Error(err)
}
defer csvActions.Close()
data = `#Tenant,ID,FilterIDs,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue
cgrates.org,ACT_PRF,,10,*asap,*accounts,1001,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10`
_, err = csvActions.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvActions.Sync()
if err != nil {
t.Error(err)
// Create and populate ActionProfiles.csv
if err := writeFile(utils.ActionsCsv, `#Tenant,ID,FilterIDs,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue
cgrates.org,ACT_PRF,,10,*asap,*accounts,1001,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10`); err != nil {
t.Fatal(err)
}
// Create and populate Attributes.csv
csvAttributes, err := os.Create(ldrDirPathIn + "/Attributes.csv")
if err != nil {
t.Error(err)
}
defer csvAttributes.Close()
data = `#Tenant,ID,FilterIDs,Weight,AttributeFilterIDs,Path,Type,Value,Blocker
cgrates.org,ATTR_ACNT_1001,FLTR_ACCOUNT_1001,10,,*req.OfficeGroup,*constant,Marketing,false`
_, err = csvAttributes.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvAttributes.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.AttributesCsv, `#Tenant,ID,FilterIDs,Weight,AttributeFilterIDs,Path,Type,Value,Blocker
cgrates.org,ATTR_ACNT_1001,FLTR_ACCOUNT_1001,10,,*req.OfficeGroup,*constant,Marketing,false`); err != nil {
t.Fatal(err)
}
// Create and populate Chargers.csv
csvChargers, err := os.Create(ldrDirPathIn + "/Chargers.csv")
if err != nil {
t.Error(err)
}
defer csvChargers.Close()
data = `#Tenant,ID,FilterIDs,Weight,RunID,AttributeIDs
cgrates.org,Raw,FLTR_ACCOUNT_1001,20,*raw,*constant:*req.RequestType:*none`
_, err = csvChargers.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvChargers.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.ChargersCsv, `#Tenant,ID,FilterIDs,Weight,RunID,AttributeIDs
cgrates.org,Raw,FLTR_ACCOUNT_1001,20,*raw,*constant:*req.RequestType:*none`); err != nil {
t.Fatal(err)
}
// Create and populate DispatcherProfiles.csv
csvDispatcherProfiles, err := os.Create(ldrDirPathIn + "/DispatcherProfiles.csv")
if err != nil {
t.Error(err)
}
defer csvDispatcherProfiles.Close()
data = `#Tenant,ID,FilterIDs,Weight,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters
cgrates.org,DSP1,FLTR_ACCOUNT_1001,10,*weight,,ALL,,20,false,`
_, err = csvDispatcherProfiles.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvDispatcherProfiles.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.DispatcherProfilesCsv, `#Tenant,ID,FilterIDs,Weight,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters
cgrates.org,DSP1,FLTR_ACCOUNT_1001,10,*weight,,ALL,,20,false,`); err != nil {
t.Fatal(err)
}
// Create and populate DispatcherHosts.csv
csvDispatcherHosts, err := os.Create(ldrDirPathIn + "/DispatcherHosts.csv")
if err != nil {
t.Error(err)
}
defer csvDispatcherHosts.Close()
data = `#Tenant[0],ID[1],Address[2],Transport[3],TLS[4]
cgrates.org,DSPHOST1,*internal,,`
_, err = csvDispatcherHosts.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvDispatcherHosts.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.DispatcherHostsCsv, `#Tenant[0],ID[1],Address[2],Transport[3],ConnectAttempts[4],Reconnects[5],ConnectTimeout[6],ReplyTimeout[7],Tls[8],ClientKey[9],ClientCertificate[10],CaCertificate[11]
cgrates.org,DSPHOST1,*internal,,1,3,"1m","2m",false,,,`); err != nil {
t.Fatal(err)
}
// Create and populate Filters.csv
csvFilters, err := os.Create(ldrDirPathIn + "/Filters.csv")
if err != nil {
t.Error(err)
}
defer csvFilters.Close()
data = `#Tenant[0],ID[1],Type[2],Path[3],Values[4]
cgrates.org,FLTR_ACCOUNT_1001,*string,~*req.Account,1001`
_, err = csvFilters.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvFilters.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.FiltersCsv, `#Tenant[0],ID[1],Type[2],Path[3],Values[4]
cgrates.org,FLTR_ACCOUNT_1001,*string,~*req.Account,1001`); err != nil {
t.Fatal(err)
}
// Create and populate Rates.csv
csvRateProfiles, err := os.Create(ldrDirPathIn + "/Rates.csv")
if err != nil {
t.Error(err)
}
defer csvRateProfiles.Close()
data = `#Tenant,ID,FilterIDs,Weights,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeights,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement
cgrates.org,RP1,FLTR_ACCOUNT_1001,;0,0.1,0.6,*free,RT_WEEK,FLTR_ACCOUNT_1001,"* * * * 1-5",;0,false,0s,,0.12,1m,1m`
_, err = csvRateProfiles.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvRateProfiles.Sync()
if err != nil {
t.Error(err)
// Create and populate RateProfiles.csv
if err := writeFile(utils.RatesCsv, `#Tenant,ID,FilterIDs,Weights,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeights,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement
cgrates.org,RP1,FLTR_ACCOUNT_1001,;0,0.1,0.6,*free,RT_WEEK,FLTR_ACCOUNT_1001,"* * * * 1-5",;0,false,0s,,0.12,1m,1m`); err != nil {
t.Fatal(err)
}
// Create and populate Resources.csv
csvResources, err := os.Create(ldrDirPathIn + "/Resources.csv")
if err != nil {
t.Error(err)
}
defer csvResources.Close()
data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],ThresholdIDs[9]
cgrates.org,RES_ACNT_1001,FLTR_ACCOUNT_1001,10,1h,1,,false,false,`
_, err = csvResources.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvResources.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.ResourcesCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],ThresholdIDs[9]
cgrates.org,RES_ACNT_1001,FLTR_ACCOUNT_1001,10,1h,1,,false,false,`); err != nil {
t.Fatal(err)
}
// Create and populate Routes.csv
csvRoutes, err := os.Create(ldrDirPathIn + "/Routes.csv")
if err != nil {
t.Error(err)
}
defer csvRoutes.Close()
data = `#Tenant,ID,FilterIDs,Weights,Sorting,SortingParameters,RouteID,RouteFilterIDs,RouteAccountIDs,RouteRateProfileIDs,RouteResourceIDs,RouteStatIDs,RouteWeights,RouteBlocker,RouteParameters
cgrates.org,ROUTE_ACNT_1001,FLTR_ACCOUNT_1001,;10,*weight,,route1,,,,,,;20,,`
_, err = csvRoutes.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvRoutes.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.RoutesCsv, `#Tenant,ID,FilterIDs,Weights,Sorting,SortingParameters,RouteID,RouteFilterIDs,RouteAccountIDs,RouteRateProfileIDs,RouteResourceIDs,RouteStatIDs,RouteWeights,RouteBlocker,RouteParameters
cgrates.org,ROUTE_ACNT_1001,FLTR_ACCOUNT_1001,;10,*weight,,route1,,,,,,;20,,`); err != nil {
t.Fatal(err)
}
// Create and populate Stats.csv
csvStats, err := os.Create(ldrDirPathIn + "/Stats.csv")
if err != nil {
t.Error(err)
}
defer csvStats.Close()
data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],ThresholdIDs[11]
cgrates.org,Stat_1,FLTR_ACCOUNT_1001,30,100,10s,0,*acd;*tcd;*asr,,false,true,*none`
_, err = csvStats.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvStats.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.StatsCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],ThresholdIDs[11]
cgrates.org,Stat_1,FLTR_ACCOUNT_1001,30,100,10s,0,*acd;*tcd;*asr,,false,true,*none`); err != nil {
t.Fatal(err)
}
// Create and populate Thresholds.csv
csvThresholds, err := os.Create(ldrDirPathIn + "/Thresholds.csv")
if err != nil {
t.Error(err)
}
defer csvThresholds.Close()
data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],ActionProfileIDs[8],Async[9]
cgrates.org,THD_ACNT_1001,FLTR_ACCOUNT_1001,10,-1,0,0,false,ACT_PRF,false`
_, err = csvThresholds.WriteString(data)
if err != nil {
t.Error(err)
}
err = csvThresholds.Sync()
if err != nil {
t.Error(err)
if err := writeFile(utils.ThresholdsCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],ActionProfileIDs[8],Async[9]
cgrates.org,THD_ACNT_1001,FLTR_ACCOUNT_1001,10,-1,0,0,false,ACT_PRF,false`); err != nil {
t.Fatal(err)
}
}
@@ -1261,7 +1106,7 @@ func TestLoadersLoad(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), "", fltrs, nil)
ldrS := loaders.NewLoaderService(cfg, dm, "", fltrs, nil)
lSv1 := NewLoaderSv1(ldrS)
args := &loaders.ArgsProcessFolder{

View File

@@ -32,7 +32,7 @@ func TestLoadersNewLoaderSv1(t *testing.T) {
data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items)
dm := engine.NewDataManager(data, cfg.CacheCfg(), nil)
fltrs := engine.NewFilterS(cfg, nil, dm)
ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), "", fltrs, nil)
ldrS := loaders.NewLoaderService(cfg, dm, "", fltrs, nil)
exp := &LoaderSv1{
ldrS: ldrS,

View File

@@ -1168,6 +1168,8 @@ const CGRATES_CFG_JSON = `
"tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved
"data":[ // data profiles to load
{
"id": "filters1",
"flags": ["*partial"],
"type": "*filters", // data source type
"file_name": "Filters.csv", // file name in the tp_in_dir
"fields": [

View File

@@ -113,6 +113,7 @@ type LoaderSCfg struct {
// LoaderDataType the template for profile loading
type LoaderDataType struct {
ID string
Type string
Filename string
Flags utils.FlagsWithParams
@@ -123,6 +124,9 @@ func (lData *LoaderDataType) loadFromJSONCfg(jsnCfg *LoaderJsonDataType, msgTemp
if jsnCfg == nil {
return nil
}
if jsnCfg.Id != nil {
lData.ID = *jsnCfg.Id
}
if jsnCfg.Type != nil {
lData.Type = *jsnCfg.Type
}
@@ -149,6 +153,7 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
if jsnCfg == nil {
return nil
}
l.WithIndex = true
if jsnCfg.ID != nil {
l.ID = *jsnCfg.ID
}
@@ -188,12 +193,17 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str
continue
}
var ldrDataType *LoaderDataType
var lType, id string
if jsnLoCfg.Type != nil {
for _, ldrDT := range l.Data {
if ldrDT.Type == *jsnLoCfg.Type {
ldrDataType = ldrDT
break
}
lType = *jsnLoCfg.Type
}
if jsnLoCfg.Id != nil {
id = *jsnLoCfg.Id
}
for _, ldrDT := range l.Data {
if ldrDT.Type == lType && id == ldrDT.ID {
ldrDataType = ldrDT
break
}
}
if ldrDataType == nil {
@@ -302,6 +312,7 @@ func (l LoaderSCfg) AsMapInterface(separator string) (initialMP map[string]inter
}
type LoaderJsonDataType struct {
Id *string
Type *string
File_name *string
Flags *[]string
@@ -327,7 +338,8 @@ func equalsLoaderDatasType(v1, v2 []*LoaderDataType) bool {
return false
}
for i := range v2 {
if v1[i].Type != v2[i].Type ||
if v1[i].ID != v2[i].ID ||
v1[i].Type != v2[i].Type ||
v1[i].Filename != v2[i].Filename ||
!utils.SliceStringEqual(v1[i].Flags.SliceFlags(), v2[i].Flags.SliceFlags()) ||
!fcTemplatesEqual(v1[i].Fields, v2[i].Fields) {
@@ -377,6 +389,7 @@ func diffLoaderJsonCfg(v1, v2 *LoaderSCfg, separator string) (d *LoaderJsonCfg)
var req []*FcTemplateJsonCfg
req = diffFcTemplateJsonCfg(req, nil, val2.Fields, separator)
data[i] = &LoaderJsonDataType{
Id: utils.StringPointer(val2.ID),
Type: utils.StringPointer(val2.Type),
File_name: utils.StringPointer(val2.Filename),
Flags: utils.SliceStringPointer(val2.Flags.SliceFlags()),

View File

@@ -32,41 +32,41 @@ import (
func TestDataUpdateFromCSVOneFile(t *testing.T) {
attrSFlds := []*config.FCTemplate{
{Tag: "TenantID",
{
Path: "Tenant",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep),
Mandatory: true},
{Tag: "ProfileID",
{
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep),
Mandatory: true},
{Tag: "Contexts",
{
Path: "Contexts",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.2", utils.InfieldSep)},
{Tag: "FilterIDs",
{
Path: "FilterIDs",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.3", utils.InfieldSep)},
{Tag: "Weight",
{
Path: "Weight",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.4", utils.InfieldSep)},
{Tag: "Path",
{
Path: "Path",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.5", utils.InfieldSep)},
{Tag: "Initial",
{
Path: "Initial",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.6", utils.InfieldSep)},
{Tag: "Substitute",
{
Path: "Substitute",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.7", utils.InfieldSep)},
{Tag: "Append",
{
Path: "Append",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.8", utils.InfieldSep)},
@@ -116,41 +116,41 @@ func TestDataUpdateFromCSVOneFile(t *testing.T) {
func TestDataUpdateFromCSVOneFile2(t *testing.T) {
attrSFlds := []*config.FCTemplate{
{Tag: "TenantID",
{
Path: "Tenant",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep),
Mandatory: true},
{Tag: "ProfileID",
{
Path: "ID",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep),
Mandatory: true},
{Tag: "Contexts",
{
Path: "Contexts",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.2", utils.InfieldSep)},
{Tag: "FilterIDs",
{
Path: "FilterIDs",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.3", utils.InfieldSep)},
{Tag: "Weight",
{
Path: "Weight",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.4", utils.InfieldSep)},
{Tag: "Path",
{
Path: "Path",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.5", utils.InfieldSep)},
{Tag: "Initial",
{
Path: "Initial",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.6", utils.InfieldSep)},
{Tag: "Substitute",
{
Path: "Substitute",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.7", utils.InfieldSep)},
{Tag: "Append",
{
Path: "Append",
Type: utils.MetaVariable,
Value: config.NewRSRParsersMustCompile("~*req.8", utils.InfieldSep)},
@@ -200,37 +200,37 @@ func TestDataUpdateFromCSVOneFile2(t *testing.T) {
func TestDataUpdateFromCSVMultiFiles(t *testing.T) {
attrSFlds := []*config.FCTemplate{
{Tag: "TenantID",
{
Path: "Tenant",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep),
Mandatory: true},
{Tag: "ProfileID",
{
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep),
Mandatory: true},
{Tag: "Contexts",
{
Path: "Contexts",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("*any", utils.InfieldSep)},
{Tag: "Path",
{
Path: "Path",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File1.csv).5", utils.InfieldSep)},
{Tag: "Initial",
{
Path: "Initial",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File1.csv).6", utils.InfieldSep)},
{Tag: "Substitute",
{
Path: "Substitute",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File1.csv).7", utils.InfieldSep)},
{Tag: "Append",
{
Path: "Append",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("true", utils.InfieldSep)},
{Tag: "Weight",
{
Path: "Weight",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("10", utils.InfieldSep)},
@@ -286,13 +286,13 @@ func TestGetRateIDsLoaderData(t *testing.T) {
func TestUpdateFromCsvWithFiltersError(t *testing.T) {
attrSFlds := []*config.FCTemplate{
{Tag: "TenantID",
{
Path: "Tenant",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep),
Filters: []string{"*string:~*req.Account:10"},
Mandatory: true},
{Tag: "ProfileID",
{
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep),
@@ -320,13 +320,13 @@ func TestUpdateFromCsvWithFiltersError(t *testing.T) {
func TestUpdateFromCsvWithFiltersContinue(t *testing.T) {
attrSFlds := []*config.FCTemplate{
{Tag: "TenantID",
{
Path: "Tenant",
Type: utils.MetaString,
Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep),
Filters: []string{`*string:~*req.2:10`},
Mandatory: true},
{Tag: "ProfileID",
{
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep),

View File

@@ -517,8 +517,10 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
}
tntID := TenantIDFromMap(data)
if !prevTntID.Equal(tntID) {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil {
return
if prevTntID != nil {
if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil {
return
}
}
prevTntID = tntID
lData = make([]utils.MapStorage, 0, 1)
@@ -587,7 +589,7 @@ func (l *loader) processFolder(ctx *context.Context, action, caching string, dry
}
return
}
switch l.ldrCfg.Action {
switch action {
case utils.MetaStore:
for i := range l.ldrCfg.Data {
if err = proces(i); err != nil {

View File

@@ -99,6 +99,7 @@ func (ldrS *LoaderService) process(ctx *context.Context, args *ArgsProcessFolder
rply *string) (err error) {
ldrS.RLock()
defer ldrS.RUnlock()
if args.LoaderID == utils.EmptyString {
args.LoaderID = utils.MetaDefault
}

View File

@@ -88,31 +88,35 @@ cgrates.org,NewRes1
data := engine.NewInternalDB(nil, nil, config.NewDefaultCGRConfig().DataDbCfg().Items)
dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil)
cfgLdr := config.NewDefaultCGRConfig().LoaderCfg()
cfgLdr[0] = &config.LoaderSCfg{
cfgLdr := config.NewDefaultCGRConfig()
cfgLdr.LoaderCfg()[0] = &config.LoaderSCfg{
ID: "testV1LoadResource",
Enabled: true,
FieldSeparator: utils.FieldsSep,
TpInDir: flPath,
TpOutDir: "/tmp",
LockFilePath: "res.lck",
Data: nil,
Data: []*config.LoaderDataType{{
Type: utils.MetaResources,
Filename: utils.ResourcesCsv,
Fields: []*config.FCTemplate{
{Tag: "Tenant",
Path: "Tenant",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep),
Mandatory: true},
{Tag: "ID",
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep),
Mandatory: true},
},
}},
}
ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil)
ldrs.ldrs["testV1LoadResource"].dataTpls = map[string][]*config.FCTemplate{
utils.MetaResources: {
{Tag: "Tenant",
Path: "Tenant",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep),
Mandatory: true},
{Tag: "ID",
Path: "ID",
Type: utils.MetaComposed,
Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep),
Mandatory: true},
},
for _, v := range cfgLdr.LoaderCfg()[0].Data[0].Fields {
v.ComputePath()
}
ldrs := NewLoaderService(cfgLdr, dm, "UTC", nil, nil)
resCsv := `#Tenant[0],ID[1]
cgrates.org,NewRes1

View File

@@ -33,41 +33,30 @@ type Locker interface {
func newLocker(path string) Locker {
if path != utils.EmptyString {
return &folderLock{path: path}
return folderLock(path)
}
return new(nopLock)
}
type folderLock struct {
path string
file io.Closer
}
type folderLock string
// lockFolder will attempt to lock the folder by creating the lock file
func (fl *folderLock) Lock() (err error) {
// If the path is an empty string, we should not be locking
fl.file, err = os.OpenFile(fl.path,
func (fl folderLock) Lock() (err error) {
var file io.Closer
file, err = os.OpenFile(string(fl),
os.O_RDONLY|os.O_CREATE, 0644)
file.Close()
return
}
func (fl *folderLock) Unlock() (err error) {
// If the path is an empty string, we should not be locking
if fl.file != nil {
fl.file.Close()
err = os.Remove(fl.path)
}
return
func (fl folderLock) Unlock() (err error) {
return os.Remove(string(fl))
}
func (ldr folderLock) Locked() (lk bool, err error) {
// If the path is an empty string, we should not be locking
if lk = ldr.file != nil; lk {
return
}
if _, err = os.Stat(ldr.path); err != nil {
if lk = os.IsNotExist(err); lk {
err = nil
func (fl folderLock) Locked() (lk bool, err error) {
if _, err = os.Stat(string(fl)); err != nil {
if os.IsNotExist(err) {
lk, err = false, nil
}
return
}
@@ -75,18 +64,9 @@ func (ldr folderLock) Locked() (lk bool, err error) {
return
}
type nopLock struct {
}
type nopLock struct{}
// lockFolder will attempt to lock the folder by creating the lock file
func (fl *nopLock) Lock() (_ error) {
return
}
func (fl *nopLock) Unlock() (_ error) {
return
}
func (ldr nopLock) Locked() (_ bool, _ error) {
return
}
func (nopLock) Lock() (_ error) { return }
func (nopLock) Unlock() (_ error) { return }
func (nopLock) Locked() (_ bool, _ error) { return }

View File

@@ -41,6 +41,19 @@ func TestLoaderSCoverage(t *testing.T) {
internalLoaderSChan := make(chan birpc.ClientConnector, 1)
cM := engine.NewConnManager(cfg)
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
cfg.LoaderCfg()[0] = &config.LoaderSCfg{
ID: "test_id",
Enabled: true,
Tenant: "",
DryRun: false,
RunDelay: 0,
LockFilePath: "",
CacheSConns: nil,
FieldSeparator: "",
TpInDir: "",
TpOutDir: "",
Data: nil,
}
srv := NewLoaderService(cfg, db,
filterSChan, server, internalLoaderSChan,
cM, anz, srvDep)
@@ -50,21 +63,8 @@ func TestLoaderSCoverage(t *testing.T) {
if srv.IsRunning() {
t.Errorf("Expected service to be down")
}
srv.ldrs = loaders.NewLoaderService(&engine.DataManager{},
[]*config.LoaderSCfg{{
ID: "test_id",
Enabled: true,
Tenant: "",
DryRun: false,
RunDelay: 0,
LockFilePath: "",
CacheSConns: nil,
FieldSeparator: "",
TpInDir: "",
TpOutDir: "",
Data: nil,
}}, "",
&engine.FilterS{}, nil)
srv.ldrs = loaders.NewLoaderService(cfg, &engine.DataManager{},
"", &engine.FilterS{}, nil)
if !srv.IsRunning() {
t.Errorf("Expected service to be running")
}
@@ -72,8 +72,7 @@ func TestLoaderSCoverage(t *testing.T) {
if !reflect.DeepEqual(serviceName, utils.LoaderS) {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.LoaderS, serviceName)
}
shouldRun := srv.ShouldRun()
if !reflect.DeepEqual(shouldRun, false) {
if shouldRun := srv.ShouldRun(); !shouldRun {
t.Errorf("\nExpecting <false>,\n Received <%+v>", shouldRun)
}
if !reflect.DeepEqual(srv.GetLoaderS(), srv.ldrs) {