From 7d0106e9c2aaccd2bbea51e3bffc3470c9b2e31d Mon Sep 17 00:00:00 2001 From: Trial97 Date: Mon, 1 Nov 2021 13:56:51 +0200 Subject: [PATCH] Updated loaders --- apis/loaders_it_test.go | 271 ++++++++----------------------------- apis/loaders_test.go | 2 +- config/config_defaults.go | 2 + config/loaderscfg.go | 25 +++- loaders/libloader_test.go | 60 ++++---- loaders/loader.go | 8 +- loaders/loaders.go | 1 + loaders/loaders_it_test.go | 38 +++--- loaders/locker.go | 52 +++---- services/loaders_test.go | 33 +++-- 10 files changed, 169 insertions(+), 323 deletions(-) diff --git a/apis/loaders_it_test.go b/apis/loaders_it_test.go index 93f40034b..34363f6fc 100644 --- a/apis/loaders_it_test.go +++ b/apis/loaders_it_test.go @@ -38,12 +38,10 @@ import ( ) var ( - ldrDirPathIn string - ldrDirPathOut string - ldrCfgPath string - ldrCfg *config.CGRConfig - ldrRPC *birpc.Client - ldrConfigDIR string //run tests for specific configuration + ldrCfgPath string + ldrCfg *config.CGRConfig + ldrRPC *birpc.Client + ldrConfigDIR string //run tests for specific configuration sTestsLdr = []func(t *testing.T){ testLoadersRemoveFolders, @@ -72,7 +70,6 @@ var ( testLoadersGetThresholdProfile, testLoadersRemove, - testLoadersGetAccountAfterRemove, testLoadersGetActionProfileAfterRemove, testLoadersGetAttributeProfileAfterRemove, @@ -165,253 +162,101 @@ func testLoadersPing(t *testing.T) { } func testLoadersCreateFolders(t *testing.T) { - ldrDirPathIn = "/tmp/TestLoadersIT/in" - err := os.MkdirAll(ldrDirPathIn, 0755) - if err != nil { - t.Error(err) - } - - ldrDirPathOut = "/tmp/TestLoadersIT/out" - err = os.MkdirAll(ldrDirPathOut, 0755) - if err != nil { + if err := os.MkdirAll("/tmp/TestLoadersIT/in", 0755); err != nil { t.Error(err) } } func testLoadersRemoveFolders(t *testing.T) { - err := os.RemoveAll("/tmp/TestLoadersIT") - if err != nil { + if err := os.RemoveAll("/tmp/TestLoadersIT/in"); err != nil { t.Error(err) } } func testLoadersWriteCSVs(t *testing.T) { + writeFile := func(fileName, data string) error { + csvAccounts, err := os.Create(path.Join(ldrCfg.LoaderCfg()[0].TpInDir, fileName)) + if err != nil { + return err + } + defer csvAccounts.Close() + _, err = csvAccounts.WriteString(data) + if err != nil { + return err + + } + return csvAccounts.Sync() + } // Create and populate Accounts.csv - csvAccounts, err := os.Create(ldrDirPathIn + "/Accounts.csv") - if err != nil { - t.Error(err) - } - defer csvAccounts.Close() - - data := `#Tenant,ID,FilterIDs,Weights,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs -cgrates.org,ACC_PRF,,;20,,MonetaryBalance,,;10,*concrete,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none` - - _, err = csvAccounts.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvAccounts.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.AccountsCsv, `#Tenant,ID,FilterIDs,Weights,Opts,BalanceID,BalanceFilterIDs,BalanceWeights,BalanceType,BalanceUnits,BalanceUnitFactors,BalanceOpts,BalanceCostIncrements,BalanceAttributeIDs,BalanceRateProfileIDs,ThresholdIDs +cgrates.org,ACC_PRF,,;20,,MonetaryBalance,,;10,*concrete,14,fltr1&fltr2;100;fltr3;200,,fltr1&fltr2;1.3;2.3;3.3,attr1;attr2,,*none`); err != nil { + t.Fatal(err) } - // Create and populate Actions.csv - csvActions, err := os.Create(ldrDirPathIn + "/Actions.csv") - if err != nil { - t.Error(err) - } - defer csvActions.Close() - - data = `#Tenant,ID,FilterIDs,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue -cgrates.org,ACT_PRF,,10,*asap,*accounts,1001,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10` - - _, err = csvActions.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvActions.Sync() - if err != nil { - t.Error(err) + // Create and populate ActionProfiles.csv + if err := writeFile(utils.ActionsCsv, `#Tenant,ID,FilterIDs,Weight,Schedule,TargetType,TargetIDs,ActionID,ActionFilterIDs,ActionBlocker,ActionTTL,ActionType,ActionOpts,ActionPath,ActionValue +cgrates.org,ACT_PRF,,10,*asap,*accounts,1001,TOPUP,,false,0s,*add_balance,,*balance.TestBalance.Units,10`); err != nil { + t.Fatal(err) } // Create and populate Attributes.csv - csvAttributes, err := os.Create(ldrDirPathIn + "/Attributes.csv") - if err != nil { - t.Error(err) - } - defer csvAttributes.Close() - - data = `#Tenant,ID,FilterIDs,Weight,AttributeFilterIDs,Path,Type,Value,Blocker -cgrates.org,ATTR_ACNT_1001,FLTR_ACCOUNT_1001,10,,*req.OfficeGroup,*constant,Marketing,false` - - _, err = csvAttributes.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvAttributes.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.AttributesCsv, `#Tenant,ID,FilterIDs,Weight,AttributeFilterIDs,Path,Type,Value,Blocker +cgrates.org,ATTR_ACNT_1001,FLTR_ACCOUNT_1001,10,,*req.OfficeGroup,*constant,Marketing,false`); err != nil { + t.Fatal(err) } // Create and populate Chargers.csv - csvChargers, err := os.Create(ldrDirPathIn + "/Chargers.csv") - if err != nil { - t.Error(err) - } - defer csvChargers.Close() - - data = `#Tenant,ID,FilterIDs,Weight,RunID,AttributeIDs -cgrates.org,Raw,FLTR_ACCOUNT_1001,20,*raw,*constant:*req.RequestType:*none` - - _, err = csvChargers.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvChargers.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.ChargersCsv, `#Tenant,ID,FilterIDs,Weight,RunID,AttributeIDs +cgrates.org,Raw,FLTR_ACCOUNT_1001,20,*raw,*constant:*req.RequestType:*none`); err != nil { + t.Fatal(err) } // Create and populate DispatcherProfiles.csv - csvDispatcherProfiles, err := os.Create(ldrDirPathIn + "/DispatcherProfiles.csv") - if err != nil { - t.Error(err) - } - defer csvDispatcherProfiles.Close() - - data = `#Tenant,ID,FilterIDs,Weight,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters -cgrates.org,DSP1,FLTR_ACCOUNT_1001,10,*weight,,ALL,,20,false,` - - _, err = csvDispatcherProfiles.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvDispatcherProfiles.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.DispatcherProfilesCsv, `#Tenant,ID,FilterIDs,Weight,Strategy,StrategyParameters,ConnID,ConnFilterIDs,ConnWeight,ConnBlocker,ConnParameters +cgrates.org,DSP1,FLTR_ACCOUNT_1001,10,*weight,,ALL,,20,false,`); err != nil { + t.Fatal(err) } // Create and populate DispatcherHosts.csv - csvDispatcherHosts, err := os.Create(ldrDirPathIn + "/DispatcherHosts.csv") - if err != nil { - t.Error(err) - } - defer csvDispatcherHosts.Close() - - data = `#Tenant[0],ID[1],Address[2],Transport[3],TLS[4] -cgrates.org,DSPHOST1,*internal,,` - - _, err = csvDispatcherHosts.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvDispatcherHosts.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.DispatcherHostsCsv, `#Tenant[0],ID[1],Address[2],Transport[3],ConnectAttempts[4],Reconnects[5],ConnectTimeout[6],ReplyTimeout[7],Tls[8],ClientKey[9],ClientCertificate[10],CaCertificate[11] +cgrates.org,DSPHOST1,*internal,,1,3,"1m","2m",false,,,`); err != nil { + t.Fatal(err) } // Create and populate Filters.csv - csvFilters, err := os.Create(ldrDirPathIn + "/Filters.csv") - if err != nil { - t.Error(err) - } - defer csvFilters.Close() - - data = `#Tenant[0],ID[1],Type[2],Path[3],Values[4] -cgrates.org,FLTR_ACCOUNT_1001,*string,~*req.Account,1001` - - _, err = csvFilters.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvFilters.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.FiltersCsv, `#Tenant[0],ID[1],Type[2],Path[3],Values[4] +cgrates.org,FLTR_ACCOUNT_1001,*string,~*req.Account,1001`); err != nil { + t.Fatal(err) } - // Create and populate Rates.csv - csvRateProfiles, err := os.Create(ldrDirPathIn + "/Rates.csv") - if err != nil { - t.Error(err) - } - defer csvRateProfiles.Close() - - data = `#Tenant,ID,FilterIDs,Weights,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeights,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement -cgrates.org,RP1,FLTR_ACCOUNT_1001,;0,0.1,0.6,*free,RT_WEEK,FLTR_ACCOUNT_1001,"* * * * 1-5",;0,false,0s,,0.12,1m,1m` - - _, err = csvRateProfiles.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvRateProfiles.Sync() - if err != nil { - t.Error(err) + // Create and populate RateProfiles.csv + if err := writeFile(utils.RatesCsv, `#Tenant,ID,FilterIDs,Weights,MinCost,MaxCost,MaxCostStrategy,RateID,RateFilterIDs,RateActivationStart,RateWeights,RateBlocker,RateIntervalStart,RateFixedFee,RateRecurrentFee,RateUnit,RateIncrement +cgrates.org,RP1,FLTR_ACCOUNT_1001,;0,0.1,0.6,*free,RT_WEEK,FLTR_ACCOUNT_1001,"* * * * 1-5",;0,false,0s,,0.12,1m,1m`); err != nil { + t.Fatal(err) } // Create and populate Resources.csv - csvResources, err := os.Create(ldrDirPathIn + "/Resources.csv") - if err != nil { - t.Error(err) - } - defer csvResources.Close() - - data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],ThresholdIDs[9] -cgrates.org,RES_ACNT_1001,FLTR_ACCOUNT_1001,10,1h,1,,false,false,` - - _, err = csvResources.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvResources.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.ResourcesCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],ThresholdIDs[9] +cgrates.org,RES_ACNT_1001,FLTR_ACCOUNT_1001,10,1h,1,,false,false,`); err != nil { + t.Fatal(err) } // Create and populate Routes.csv - csvRoutes, err := os.Create(ldrDirPathIn + "/Routes.csv") - if err != nil { - t.Error(err) - } - defer csvRoutes.Close() - - data = `#Tenant,ID,FilterIDs,Weights,Sorting,SortingParameters,RouteID,RouteFilterIDs,RouteAccountIDs,RouteRateProfileIDs,RouteResourceIDs,RouteStatIDs,RouteWeights,RouteBlocker,RouteParameters -cgrates.org,ROUTE_ACNT_1001,FLTR_ACCOUNT_1001,;10,*weight,,route1,,,,,,;20,,` - - _, err = csvRoutes.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvRoutes.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.RoutesCsv, `#Tenant,ID,FilterIDs,Weights,Sorting,SortingParameters,RouteID,RouteFilterIDs,RouteAccountIDs,RouteRateProfileIDs,RouteResourceIDs,RouteStatIDs,RouteWeights,RouteBlocker,RouteParameters +cgrates.org,ROUTE_ACNT_1001,FLTR_ACCOUNT_1001,;10,*weight,,route1,,,,,,;20,,`); err != nil { + t.Fatal(err) } // Create and populate Stats.csv - csvStats, err := os.Create(ldrDirPathIn + "/Stats.csv") - if err != nil { - t.Error(err) - } - defer csvStats.Close() - - data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],ThresholdIDs[11] -cgrates.org,Stat_1,FLTR_ACCOUNT_1001,30,100,10s,0,*acd;*tcd;*asr,,false,true,*none` - - _, err = csvStats.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvStats.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.StatsCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],ThresholdIDs[11] +cgrates.org,Stat_1,FLTR_ACCOUNT_1001,30,100,10s,0,*acd;*tcd;*asr,,false,true,*none`); err != nil { + t.Fatal(err) } // Create and populate Thresholds.csv - csvThresholds, err := os.Create(ldrDirPathIn + "/Thresholds.csv") - if err != nil { - t.Error(err) - } - defer csvThresholds.Close() - - data = `#Tenant[0],Id[1],FilterIDs[2],Weight[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],ActionProfileIDs[8],Async[9] -cgrates.org,THD_ACNT_1001,FLTR_ACCOUNT_1001,10,-1,0,0,false,ACT_PRF,false` - - _, err = csvThresholds.WriteString(data) - if err != nil { - t.Error(err) - } - err = csvThresholds.Sync() - if err != nil { - t.Error(err) + if err := writeFile(utils.ThresholdsCsv, `#Tenant[0],Id[1],FilterIDs[2],Weight[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],ActionProfileIDs[8],Async[9] +cgrates.org,THD_ACNT_1001,FLTR_ACCOUNT_1001,10,-1,0,0,false,ACT_PRF,false`); err != nil { + t.Fatal(err) } } @@ -1261,7 +1106,7 @@ func TestLoadersLoad(t *testing.T) { data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) fltrs := engine.NewFilterS(cfg, nil, dm) - ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), "", fltrs, nil) + ldrS := loaders.NewLoaderService(cfg, dm, "", fltrs, nil) lSv1 := NewLoaderSv1(ldrS) args := &loaders.ArgsProcessFolder{ diff --git a/apis/loaders_test.go b/apis/loaders_test.go index abd3d77ac..a912a72ad 100644 --- a/apis/loaders_test.go +++ b/apis/loaders_test.go @@ -32,7 +32,7 @@ func TestLoadersNewLoaderSv1(t *testing.T) { data := engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items) dm := engine.NewDataManager(data, cfg.CacheCfg(), nil) fltrs := engine.NewFilterS(cfg, nil, dm) - ldrS := loaders.NewLoaderService(dm, cfg.LoaderCfg(), "", fltrs, nil) + ldrS := loaders.NewLoaderService(cfg, dm, "", fltrs, nil) exp := &LoaderSv1{ ldrS: ldrS, diff --git a/config/config_defaults.go b/config/config_defaults.go index 3453a8bec..390a1fc13 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -1168,6 +1168,8 @@ const CGRATES_CFG_JSON = ` "tp_out_dir": "/var/spool/cgrates/loader/out", // absolute path towards the directory where processed TPs will be moved "data":[ // data profiles to load { + "id": "filters1", + "flags": ["*partial"], "type": "*filters", // data source type "file_name": "Filters.csv", // file name in the tp_in_dir "fields": [ diff --git a/config/loaderscfg.go b/config/loaderscfg.go index 47d3ad99f..a7142c05a 100644 --- a/config/loaderscfg.go +++ b/config/loaderscfg.go @@ -113,6 +113,7 @@ type LoaderSCfg struct { // LoaderDataType the template for profile loading type LoaderDataType struct { + ID string Type string Filename string Flags utils.FlagsWithParams @@ -123,6 +124,9 @@ func (lData *LoaderDataType) loadFromJSONCfg(jsnCfg *LoaderJsonDataType, msgTemp if jsnCfg == nil { return nil } + if jsnCfg.Id != nil { + lData.ID = *jsnCfg.Id + } if jsnCfg.Type != nil { lData.Type = *jsnCfg.Type } @@ -149,6 +153,7 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str if jsnCfg == nil { return nil } + l.WithIndex = true if jsnCfg.ID != nil { l.ID = *jsnCfg.ID } @@ -188,12 +193,17 @@ func (l *LoaderSCfg) loadFromJSONCfg(jsnCfg *LoaderJsonCfg, msgTemplates map[str continue } var ldrDataType *LoaderDataType + var lType, id string if jsnLoCfg.Type != nil { - for _, ldrDT := range l.Data { - if ldrDT.Type == *jsnLoCfg.Type { - ldrDataType = ldrDT - break - } + lType = *jsnLoCfg.Type + } + if jsnLoCfg.Id != nil { + id = *jsnLoCfg.Id + } + for _, ldrDT := range l.Data { + if ldrDT.Type == lType && id == ldrDT.ID { + ldrDataType = ldrDT + break } } if ldrDataType == nil { @@ -302,6 +312,7 @@ func (l LoaderSCfg) AsMapInterface(separator string) (initialMP map[string]inter } type LoaderJsonDataType struct { + Id *string Type *string File_name *string Flags *[]string @@ -327,7 +338,8 @@ func equalsLoaderDatasType(v1, v2 []*LoaderDataType) bool { return false } for i := range v2 { - if v1[i].Type != v2[i].Type || + if v1[i].ID != v2[i].ID || + v1[i].Type != v2[i].Type || v1[i].Filename != v2[i].Filename || !utils.SliceStringEqual(v1[i].Flags.SliceFlags(), v2[i].Flags.SliceFlags()) || !fcTemplatesEqual(v1[i].Fields, v2[i].Fields) { @@ -377,6 +389,7 @@ func diffLoaderJsonCfg(v1, v2 *LoaderSCfg, separator string) (d *LoaderJsonCfg) var req []*FcTemplateJsonCfg req = diffFcTemplateJsonCfg(req, nil, val2.Fields, separator) data[i] = &LoaderJsonDataType{ + Id: utils.StringPointer(val2.ID), Type: utils.StringPointer(val2.Type), File_name: utils.StringPointer(val2.Filename), Flags: utils.SliceStringPointer(val2.Flags.SliceFlags()), diff --git a/loaders/libloader_test.go b/loaders/libloader_test.go index 4f996ed38..08899ebe1 100644 --- a/loaders/libloader_test.go +++ b/loaders/libloader_test.go @@ -32,41 +32,41 @@ import ( func TestDataUpdateFromCSVOneFile(t *testing.T) { attrSFlds := []*config.FCTemplate{ - {Tag: "TenantID", + { Path: "Tenant", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), Mandatory: true}, - {Tag: "ProfileID", + { Path: "ID", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), Mandatory: true}, - {Tag: "Contexts", + { Path: "Contexts", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.2", utils.InfieldSep)}, - {Tag: "FilterIDs", + { Path: "FilterIDs", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.3", utils.InfieldSep)}, - {Tag: "Weight", + { Path: "Weight", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.4", utils.InfieldSep)}, - {Tag: "Path", + { Path: "Path", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.5", utils.InfieldSep)}, - {Tag: "Initial", + { Path: "Initial", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.6", utils.InfieldSep)}, - {Tag: "Substitute", + { Path: "Substitute", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.7", utils.InfieldSep)}, - {Tag: "Append", + { Path: "Append", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*req.8", utils.InfieldSep)}, @@ -116,41 +116,41 @@ func TestDataUpdateFromCSVOneFile(t *testing.T) { func TestDataUpdateFromCSVOneFile2(t *testing.T) { attrSFlds := []*config.FCTemplate{ - {Tag: "TenantID", + { Path: "Tenant", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), Mandatory: true}, - {Tag: "ProfileID", + { Path: "ID", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), Mandatory: true}, - {Tag: "Contexts", + { Path: "Contexts", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.2", utils.InfieldSep)}, - {Tag: "FilterIDs", + { Path: "FilterIDs", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.3", utils.InfieldSep)}, - {Tag: "Weight", + { Path: "Weight", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.4", utils.InfieldSep)}, - {Tag: "Path", + { Path: "Path", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.5", utils.InfieldSep)}, - {Tag: "Initial", + { Path: "Initial", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.6", utils.InfieldSep)}, - {Tag: "Substitute", + { Path: "Substitute", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.7", utils.InfieldSep)}, - {Tag: "Append", + { Path: "Append", Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.8", utils.InfieldSep)}, @@ -200,37 +200,37 @@ func TestDataUpdateFromCSVOneFile2(t *testing.T) { func TestDataUpdateFromCSVMultiFiles(t *testing.T) { attrSFlds := []*config.FCTemplate{ - {Tag: "TenantID", + { Path: "Tenant", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep), Mandatory: true}, - {Tag: "ProfileID", + { Path: "ID", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep), Mandatory: true}, - {Tag: "Contexts", + { Path: "Contexts", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("*any", utils.InfieldSep)}, - {Tag: "Path", + { Path: "Path", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File1.csv).5", utils.InfieldSep)}, - {Tag: "Initial", + { Path: "Initial", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File1.csv).6", utils.InfieldSep)}, - {Tag: "Substitute", + { Path: "Substitute", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File1.csv).7", utils.InfieldSep)}, - {Tag: "Append", + { Path: "Append", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("true", utils.InfieldSep)}, - {Tag: "Weight", + { Path: "Weight", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("10", utils.InfieldSep)}, @@ -286,13 +286,13 @@ func TestGetRateIDsLoaderData(t *testing.T) { func TestUpdateFromCsvWithFiltersError(t *testing.T) { attrSFlds := []*config.FCTemplate{ - {Tag: "TenantID", + { Path: "Tenant", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep), Filters: []string{"*string:~*req.Account:10"}, Mandatory: true}, - {Tag: "ProfileID", + { Path: "ID", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep), @@ -320,13 +320,13 @@ func TestUpdateFromCsvWithFiltersError(t *testing.T) { func TestUpdateFromCsvWithFiltersContinue(t *testing.T) { attrSFlds := []*config.FCTemplate{ - {Tag: "TenantID", + { Path: "Tenant", Type: utils.MetaString, Value: config.NewRSRParsersMustCompile("cgrates.org", utils.InfieldSep), Filters: []string{`*string:~*req.2:10`}, Mandatory: true}, - {Tag: "ProfileID", + { Path: "ID", Type: utils.MetaComposed, Value: config.NewRSRParsersMustCompile("~*file(File2.csv).1", utils.InfieldSep), diff --git a/loaders/loader.go b/loaders/loader.go index 0f24dfaf9..21279c0d1 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -517,8 +517,10 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi } tntID := TenantIDFromMap(data) if !prevTntID.Equal(tntID) { - if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil { - return + if prevTntID != nil { + if err = l.process(ctx, prevTntID, lData, lType, action, caching, dryRun, withIndex, partialRates); err != nil { + return + } } prevTntID = tntID lData = make([]utils.MapStorage, 0, 1) @@ -587,7 +589,7 @@ func (l *loader) processFolder(ctx *context.Context, action, caching string, dry } return } - switch l.ldrCfg.Action { + switch action { case utils.MetaStore: for i := range l.ldrCfg.Data { if err = proces(i); err != nil { diff --git a/loaders/loaders.go b/loaders/loaders.go index 70429198b..5aa9d388a 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -99,6 +99,7 @@ func (ldrS *LoaderService) process(ctx *context.Context, args *ArgsProcessFolder rply *string) (err error) { ldrS.RLock() defer ldrS.RUnlock() + if args.LoaderID == utils.EmptyString { args.LoaderID = utils.MetaDefault } diff --git a/loaders/loaders_it_test.go b/loaders/loaders_it_test.go index 06129bfba..d847103e9 100644 --- a/loaders/loaders_it_test.go +++ b/loaders/loaders_it_test.go @@ -88,31 +88,35 @@ cgrates.org,NewRes1 data := engine.NewInternalDB(nil, nil, config.NewDefaultCGRConfig().DataDbCfg().Items) dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) - cfgLdr := config.NewDefaultCGRConfig().LoaderCfg() - cfgLdr[0] = &config.LoaderSCfg{ + cfgLdr := config.NewDefaultCGRConfig() + cfgLdr.LoaderCfg()[0] = &config.LoaderSCfg{ ID: "testV1LoadResource", Enabled: true, FieldSeparator: utils.FieldsSep, TpInDir: flPath, TpOutDir: "/tmp", LockFilePath: "res.lck", - Data: nil, + Data: []*config.LoaderDataType{{ + Type: utils.MetaResources, + Filename: utils.ResourcesCsv, + Fields: []*config.FCTemplate{ + {Tag: "Tenant", + Path: "Tenant", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), + Mandatory: true}, + {Tag: "ID", + Path: "ID", + Type: utils.MetaComposed, + Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), + Mandatory: true}, + }, + }}, } - ldrs := NewLoaderService(dm, cfgLdr, "UTC", nil, nil) - ldrs.ldrs["testV1LoadResource"].dataTpls = map[string][]*config.FCTemplate{ - utils.MetaResources: { - {Tag: "Tenant", - Path: "Tenant", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.0", utils.InfieldSep), - Mandatory: true}, - {Tag: "ID", - Path: "ID", - Type: utils.MetaComposed, - Value: config.NewRSRParsersMustCompile("~*req.1", utils.InfieldSep), - Mandatory: true}, - }, + for _, v := range cfgLdr.LoaderCfg()[0].Data[0].Fields { + v.ComputePath() } + ldrs := NewLoaderService(cfgLdr, dm, "UTC", nil, nil) resCsv := `#Tenant[0],ID[1] cgrates.org,NewRes1 diff --git a/loaders/locker.go b/loaders/locker.go index c6c9510cb..056b0f5b0 100644 --- a/loaders/locker.go +++ b/loaders/locker.go @@ -33,41 +33,30 @@ type Locker interface { func newLocker(path string) Locker { if path != utils.EmptyString { - return &folderLock{path: path} + return folderLock(path) } return new(nopLock) } -type folderLock struct { - path string - file io.Closer -} +type folderLock string // lockFolder will attempt to lock the folder by creating the lock file -func (fl *folderLock) Lock() (err error) { - // If the path is an empty string, we should not be locking - fl.file, err = os.OpenFile(fl.path, +func (fl folderLock) Lock() (err error) { + var file io.Closer + file, err = os.OpenFile(string(fl), os.O_RDONLY|os.O_CREATE, 0644) + file.Close() return } -func (fl *folderLock) Unlock() (err error) { - // If the path is an empty string, we should not be locking - if fl.file != nil { - fl.file.Close() - err = os.Remove(fl.path) - } - return +func (fl folderLock) Unlock() (err error) { + return os.Remove(string(fl)) } -func (ldr folderLock) Locked() (lk bool, err error) { - // If the path is an empty string, we should not be locking - if lk = ldr.file != nil; lk { - return - } - if _, err = os.Stat(ldr.path); err != nil { - if lk = os.IsNotExist(err); lk { - err = nil +func (fl folderLock) Locked() (lk bool, err error) { + if _, err = os.Stat(string(fl)); err != nil { + if os.IsNotExist(err) { + lk, err = false, nil } return } @@ -75,18 +64,9 @@ func (ldr folderLock) Locked() (lk bool, err error) { return } -type nopLock struct { -} +type nopLock struct{} // lockFolder will attempt to lock the folder by creating the lock file -func (fl *nopLock) Lock() (_ error) { - return -} - -func (fl *nopLock) Unlock() (_ error) { - return -} - -func (ldr nopLock) Locked() (_ bool, _ error) { - return -} +func (nopLock) Lock() (_ error) { return } +func (nopLock) Unlock() (_ error) { return } +func (nopLock) Locked() (_ bool, _ error) { return } diff --git a/services/loaders_test.go b/services/loaders_test.go index c2dc668d6..ae0a63d94 100644 --- a/services/loaders_test.go +++ b/services/loaders_test.go @@ -41,6 +41,19 @@ func TestLoaderSCoverage(t *testing.T) { internalLoaderSChan := make(chan birpc.ClientConnector, 1) cM := engine.NewConnManager(cfg) anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) + cfg.LoaderCfg()[0] = &config.LoaderSCfg{ + ID: "test_id", + Enabled: true, + Tenant: "", + DryRun: false, + RunDelay: 0, + LockFilePath: "", + CacheSConns: nil, + FieldSeparator: "", + TpInDir: "", + TpOutDir: "", + Data: nil, + } srv := NewLoaderService(cfg, db, filterSChan, server, internalLoaderSChan, cM, anz, srvDep) @@ -50,21 +63,8 @@ func TestLoaderSCoverage(t *testing.T) { if srv.IsRunning() { t.Errorf("Expected service to be down") } - srv.ldrs = loaders.NewLoaderService(&engine.DataManager{}, - []*config.LoaderSCfg{{ - ID: "test_id", - Enabled: true, - Tenant: "", - DryRun: false, - RunDelay: 0, - LockFilePath: "", - CacheSConns: nil, - FieldSeparator: "", - TpInDir: "", - TpOutDir: "", - Data: nil, - }}, "", - &engine.FilterS{}, nil) + srv.ldrs = loaders.NewLoaderService(cfg, &engine.DataManager{}, + "", &engine.FilterS{}, nil) if !srv.IsRunning() { t.Errorf("Expected service to be running") } @@ -72,8 +72,7 @@ func TestLoaderSCoverage(t *testing.T) { if !reflect.DeepEqual(serviceName, utils.LoaderS) { t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.LoaderS, serviceName) } - shouldRun := srv.ShouldRun() - if !reflect.DeepEqual(shouldRun, false) { + if shouldRun := srv.ShouldRun(); !shouldRun { t.Errorf("\nExpecting ,\n Received <%+v>", shouldRun) } if !reflect.DeepEqual(srv.GetLoaderS(), srv.ldrs) {