mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-23 16:18:44 +05:00
Finished cover tests for loaders
This commit is contained in:
committed by
Dan Christian Bogos
parent
bcdf7910d1
commit
f1bb562b34
@@ -152,10 +152,9 @@ func testAnalyzerSLoadTarrifPlans(t *testing.T) {
|
||||
// t.Error("Unexpected reply returned", reply)
|
||||
// }
|
||||
// time.Sleep(100 * time.Millisecond)
|
||||
args := &loaders.ArgsProcessFolder{
|
||||
Caching: utils.StringPointer(utils.MetaReload),
|
||||
}
|
||||
if err := anzRPC.Call(utils.LoaderSv1Load, args, &reply); err != nil {
|
||||
if err := anzRPC.Call(utils.LoaderSv1Run, &loaders.ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{utils.MetaCache: utils.MetaReload},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply returned", reply)
|
||||
|
||||
@@ -112,10 +112,9 @@ func testAnzBiSRPCConn(t *testing.T) {
|
||||
func testAnalyzerSLoad(t *testing.T) {
|
||||
var reply string
|
||||
|
||||
args := &loaders.ArgsProcessFolder{
|
||||
Caching: utils.StringPointer(utils.MetaReload),
|
||||
}
|
||||
if err := anzBiRPC.Call(context.Background(), utils.LoaderSv1Load, args, &reply); err != nil {
|
||||
if err := anzBiRPC.Call(context.Background(), utils.LoaderSv1Run, &loaders.ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{utils.MetaCache: utils.MetaReload},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Error("Unexpected reply returned", reply)
|
||||
|
||||
@@ -109,10 +109,10 @@ func testIdxLoadRPCConn(t *testing.T) {
|
||||
|
||||
func testIdxLoadTariffPlan(t *testing.T) {
|
||||
var reply string
|
||||
if err := idxLoadBiRPC.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := idxLoadBiRPC.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
// StopOnError: true,
|
||||
Caching: utils.StringPointer(utils.MetaReload), // after laod, we got CacheIDs and it will be called Cachesv1.Clear, so indexes will be removed
|
||||
APIOpts: map[string]interface{}{utils.MetaCache: utils.MetaReload}, // after laod, we got CacheIDs and it will be called Cachesv1.Clear, so indexes will be removed
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
|
||||
@@ -69,20 +69,21 @@ var (
|
||||
testLoadersGetStatQueueProfile,
|
||||
testLoadersGetThresholdProfile,
|
||||
|
||||
testLoadersRemove,
|
||||
testLoadersGetAccountAfterRemove,
|
||||
testLoadersGetActionProfileAfterRemove,
|
||||
testLoadersGetAttributeProfileAfterRemove,
|
||||
testLoadersGetChargerProfileAfterRemove,
|
||||
// testLoadersGetDispatcherProfileAfterRemove,
|
||||
// testLoadersGetDispatcherHostAfterRemove,
|
||||
testLoadersGetFilterAfterRemove,
|
||||
testLoadersGetRateProfileAfterRemove,
|
||||
testLoadersGetResourceProfileAfterRemove,
|
||||
testLoadersGetRouteProfileAfterRemove,
|
||||
testLoadersGetStatQueueProfileAfterRemove,
|
||||
testLoadersGetThresholdProfileAfterRemove,
|
||||
|
||||
/*
|
||||
testLoadersRemove,
|
||||
testLoadersGetAccountAfterRemove,
|
||||
testLoadersGetActionProfileAfterRemove,
|
||||
testLoadersGetAttributeProfileAfterRemove,
|
||||
testLoadersGetChargerProfileAfterRemove,
|
||||
// testLoadersGetDispatcherProfileAfterRemove,
|
||||
// testLoadersGetDispatcherHostAfterRemove,
|
||||
testLoadersGetFilterAfterRemove,
|
||||
testLoadersGetRateProfileAfterRemove,
|
||||
testLoadersGetResourceProfileAfterRemove,
|
||||
testLoadersGetRouteProfileAfterRemove,
|
||||
testLoadersGetStatQueueProfileAfterRemove,
|
||||
testLoadersGetThresholdProfileAfterRemove,
|
||||
*/
|
||||
testLoadersRemoveFolders,
|
||||
|
||||
testLoadersPing,
|
||||
@@ -262,10 +263,12 @@ cgrates.org,THD_ACNT_1001,FLTR_ACCOUNT_1001,10,-1,0,0,false,ACT_PRF,false`); err
|
||||
|
||||
func testLoadersLoad(t *testing.T) {
|
||||
var reply string
|
||||
if err := ldrRPC.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := ldrRPC.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
StopOnError: true,
|
||||
Caching: utils.StringPointer(utils.MetaReload),
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaReload,
|
||||
utils.MetaStopOnError: true,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
@@ -805,10 +808,12 @@ func testLoadersGetThresholdProfile(t *testing.T) {
|
||||
|
||||
func testLoadersRemove(t *testing.T) {
|
||||
var reply string
|
||||
if err := ldrRPC.Call(context.Background(), utils.LoaderSv1Remove,
|
||||
if err := ldrRPC.Call(context.Background(), utils.LoaderSv1Run, //Remove,
|
||||
&loaders.ArgsProcessFolder{
|
||||
StopOnError: true,
|
||||
Caching: utils.StringPointer(utils.MetaReload),
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaReload,
|
||||
utils.MetaStopOnError: true,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
@@ -1100,6 +1105,7 @@ func TestLoadersLoad(t *testing.T) {
|
||||
FieldSeparator: ";",
|
||||
TpInDir: dirPathIn,
|
||||
TpOutDir: dirPathOut,
|
||||
Opts: &config.LoaderSOptsCfg{},
|
||||
}
|
||||
|
||||
cfg.LoaderCfg()[0] = loaderCfg
|
||||
@@ -1113,11 +1119,11 @@ func TestLoadersLoad(t *testing.T) {
|
||||
LoaderID: "LoaderID",
|
||||
}
|
||||
var reply string
|
||||
if err := lSv1.Load(context.Background(), args, &reply); err != nil {
|
||||
if err := lSv1.Run(context.Background(), args, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
if err := lSv1.Remove(context.Background(), args, &reply); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// if err := lSv1.Remove(context.Background(), args, &reply); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
}
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -139,10 +139,12 @@ func testLdPrMatchAcLoadTP(t *testing.T) {
|
||||
caching = utils.MetaNone
|
||||
}
|
||||
var reply string
|
||||
if err := testLdPrMatchAcRPC.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := testLdPrMatchAcRPC.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
StopOnError: true,
|
||||
Caching: utils.StringPointer(caching),
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: caching,
|
||||
utils.MetaStopOnError: true,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
|
||||
@@ -135,10 +135,12 @@ func testLdPrMatchRtRPCConn(t *testing.T) {
|
||||
|
||||
func testLdPrMatchRtLoadTP(t *testing.T) {
|
||||
var reply string
|
||||
if err := testLdPrMatchRtRPC.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := testLdPrMatchRtRPC.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
StopOnError: true,
|
||||
Caching: utils.StringPointer(utils.MetaReload),
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaStopOnError: true,
|
||||
utils.MetaCache: utils.MetaReload,
|
||||
},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
@@ -170,8 +172,12 @@ func testLdPrMatchRtCDRSProcessEvent(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := "OK"
|
||||
if !reflect.DeepEqual(utils.ToJSON(&expected), utils.ToJSON(&rply)) {
|
||||
t.Errorf("Expecting : %+v, received: %+v", utils.ToJSON(&expected), utils.ToJSON(&rply))
|
||||
if expected != rply {
|
||||
t.Errorf("Expecting : %q, received: %q", expected, rply)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
if testRPC1.Event == nil {
|
||||
t.Fatal("The rpc was not called")
|
||||
}
|
||||
costIntervalRatesID := testRPCrt1.Event.Event["*rateSCost"].(map[string]interface{})["CostIntervals"].([]interface{})[0].(map[string]interface{})["Increments"].([]interface{})[0].(map[string]interface{})["RateID"]
|
||||
expected2 := &utils.CGREventWithEeIDs{
|
||||
|
||||
@@ -138,10 +138,10 @@ func testV1RtsCaseFromFolder(t *testing.T) {
|
||||
caching = utils.MetaNone
|
||||
}
|
||||
var reply string
|
||||
if err := rtsCaseSv1BiRpc.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := rtsCaseSv1BiRpc.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
// StopOnError: true,
|
||||
Caching: utils.StringPointer(caching),
|
||||
APIOpts: map[string]interface{}{utils.MetaCache: caching},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
|
||||
@@ -137,10 +137,9 @@ func testSessVolDiscLoadersLoad(t *testing.T) {
|
||||
caching = utils.MetaNone
|
||||
}
|
||||
var reply string
|
||||
if err := tSessVolDiscBiRPC.Call(context.Background(), utils.LoaderSv1Load,
|
||||
if err := tSessVolDiscBiRPC.Call(context.Background(), utils.LoaderSv1Run,
|
||||
&loaders.ArgsProcessFolder{
|
||||
// StopOnError: true,
|
||||
Caching: utils.StringPointer(caching),
|
||||
APIOpts: map[string]interface{}{utils.MetaCache: caching},
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
|
||||
@@ -48,7 +48,7 @@ func NewCSVReader(csvType, dPath, fn string, sep rune, nrFlds int) (CSVReader, e
|
||||
case utils.MetaGoogleAPI: // TODO: Implement *gapi
|
||||
return nil, nil
|
||||
case utils.MetaString:
|
||||
return NewStringCSV(fn, sep, nrFlds)
|
||||
return NewStringCSV(fn, sep, nrFlds), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported CSVReader type: <%q>", csvType)
|
||||
}
|
||||
@@ -75,8 +75,8 @@ func NewFileCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) {
|
||||
return NewCSVFile(file, path, sep, nrFlds), nil
|
||||
}
|
||||
|
||||
func NewStringCSV(data string, sep rune, nrFlds int) (_ CSVReader, err error) {
|
||||
return NewCSVFile(io.NopCloser(strings.NewReader(data)), data, sep, nrFlds), nil
|
||||
func NewStringCSV(data string, sep rune, nrFlds int) (_ CSVReader) {
|
||||
return NewCSVFile(io.NopCloser(strings.NewReader(data)), data, sep, nrFlds)
|
||||
}
|
||||
|
||||
func NewURLCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) {
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
@@ -511,7 +510,7 @@ func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.Data
|
||||
connMgr: connMgr,
|
||||
cacheConns: cacheConns,
|
||||
dataCache: dataCache,
|
||||
Locker: newLocker(ldrCfg.LockFilePath),
|
||||
Locker: newLocker(ldrCfg.GetLockFilePath()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -576,13 +575,12 @@ func (l *loader) process(ctx *context.Context, tntID *utils.TenantID, lDataSet [
|
||||
cacheArgs[utils.CacheDispatcherProfiles] = []string{tntId}
|
||||
case utils.MetaDispatcherHosts:
|
||||
cacheArgs[utils.CacheDispatcherHosts] = []string{tntId}
|
||||
|
||||
case utils.MetaRateProfiles:
|
||||
cacheIDs = []string{utils.CacheRateProfilesFilterIndexes, utils.CacheRateFilterIndexes}
|
||||
cacheArgs[utils.CacheRateProfiles] = []string{tntId}
|
||||
case utils.MetaActionProfiles:
|
||||
cacheIDs = []string{utils.CacheActionProfiles, utils.CacheActionProfilesFilterIndexes}
|
||||
cacheArgs[utils.CacheRateProfiles] = []string{tntId}
|
||||
cacheArgs[utils.CacheActionProfiles] = []string{tntId}
|
||||
case utils.MetaAccounts:
|
||||
cacheIDs = []string{utils.CacheAccounts, utils.CacheAccountsFilterIndexes}
|
||||
|
||||
@@ -598,6 +596,7 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
|
||||
var record []string
|
||||
if record, err = csv.Read(); err != nil {
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
utils.Logger.Warning(
|
||||
@@ -624,15 +623,18 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
|
||||
}
|
||||
lData = append(lData, data)
|
||||
}
|
||||
return l.process(ctx, prevTntID, lData, lType, action, caching, withIndex, partialRates)
|
||||
if prevTntID != nil {
|
||||
err = l.process(ctx, prevTntID, lData, lType, action, caching, withIndex, partialRates)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool) (err error) {
|
||||
csvType := utils.MetaFileCSV
|
||||
switch {
|
||||
case strings.HasPrefix(inPath, gprefix):
|
||||
csvType = utils.MetaGoogleAPI
|
||||
inPath = strings.TrimPrefix(inPath, gprefix)
|
||||
// case strings.HasPrefix(inPath, gprefix): // uncomment this after *gapi is implemented
|
||||
// csvType = utils.MetaGoogleAPI
|
||||
// inPath = strings.TrimPrefix(inPath, gprefix)
|
||||
case utils.IsURL(inPath):
|
||||
csvType = utils.MetaUrl
|
||||
}
|
||||
@@ -656,13 +658,13 @@ func (l *loader) getCfg(fileName string) (cfg *config.LoaderDataType) {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *loader) processIFile(_, fileName string) (err error) {
|
||||
cfg := l.getCfg(fileName)
|
||||
if cfg == nil {
|
||||
if pathIn := path.Join(l.ldrCfg.TpInDir, fileName); l.IsLockFile(pathIn) && len(l.ldrCfg.TpOutDir) != 0 {
|
||||
if pathIn := path.Join(l.ldrCfg.TpInDir, fileName); !l.IsLockFile(pathIn) && len(l.ldrCfg.TpOutDir) != 0 {
|
||||
err = os.Rename(pathIn, path.Join(l.ldrCfg.TpOutDir, fileName))
|
||||
}
|
||||
return
|
||||
@@ -686,23 +688,29 @@ func (l *loader) processFolder(ctx *context.Context, caching string, withIndex,
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
|
||||
utils.LoaderS, l.ldrCfg.ID, cfg.Type, err))
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(l.ldrCfg.TpOutDir) != 0 {
|
||||
var fs []os.DirEntry
|
||||
if fs, err = os.ReadDir(l.ldrCfg.TpInDir); err != nil {
|
||||
return
|
||||
}
|
||||
for _, f := range fs {
|
||||
if pathIn := path.Join(l.ldrCfg.TpInDir, f.Name()); !l.IsLockFile(pathIn) {
|
||||
if err = os.Rename(pathIn, path.Join(l.ldrCfg.TpOutDir, f.Name())); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
err = l.moveUnprocessedFiles()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (l *loader) moveUnprocessedFiles() (err error) {
|
||||
var fs []os.DirEntry
|
||||
if fs, err = os.ReadDir(l.ldrCfg.TpInDir); err != nil {
|
||||
return
|
||||
}
|
||||
for _, f := range fs {
|
||||
if pathIn := path.Join(l.ldrCfg.TpInDir, f.Name()); !l.IsLockFile(pathIn) {
|
||||
if err = os.Rename(pathIn, path.Join(l.ldrCfg.TpOutDir, f.Name())); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
305
loaders/loaders_test.go
Normal file
305
loaders/loaders_test.go
Normal file
@@ -0,0 +1,305 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package loaders
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/ltcache"
|
||||
)
|
||||
|
||||
func TestNewLoaderService(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
cfg.LoaderCfg()[0].Enabled = true
|
||||
cfg.LoaderCfg()[0].RunDelay = -1
|
||||
cfg.LoaderCfg()[0].TpInDir = "notAFolder"
|
||||
cM := engine.NewConnManager(cfg)
|
||||
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
|
||||
fS := engine.NewFilterS(cfg, cM, dm)
|
||||
cache := map[string]*ltcache.Cache{}
|
||||
for k, cfg := range cfg.LoaderCfg()[0].Cache {
|
||||
cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil)
|
||||
}
|
||||
ld := NewLoaderService(cfg, dm, "", fS, cM)
|
||||
if exp := (&LoaderService{
|
||||
cfg: cfg,
|
||||
cache: cache,
|
||||
ldrs: map[string]*loader{
|
||||
utils.MetaDefault: {
|
||||
cfg: cfg,
|
||||
ldrCfg: cfg.LoaderCfg()[0],
|
||||
dm: dm,
|
||||
filterS: fS,
|
||||
connMgr: cM,
|
||||
dataCache: cache,
|
||||
cacheConns: cfg.LoaderCfg()[0].CacheSConns,
|
||||
Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath()),
|
||||
},
|
||||
},
|
||||
}); !reflect.DeepEqual(exp, ld) {
|
||||
t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(exp), utils.ToJSON(ld))
|
||||
}
|
||||
if !ld.Enabled() {
|
||||
t.Error("Expected loader to be enabled")
|
||||
}
|
||||
|
||||
ld.ldrs[utils.MetaDefault].Locker = mockLock{}
|
||||
stop := make(chan struct{})
|
||||
close(stop)
|
||||
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
log.SetOutput(buf)
|
||||
lgr := utils.Logger
|
||||
defer func() { utils.Logger = lgr; log.SetOutput(os.Stderr) }()
|
||||
utils.Logger, _ = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
|
||||
ld.ListenAndServe(stop)
|
||||
runtime.Gosched()
|
||||
time.Sleep(time.Nanosecond)
|
||||
if expLog, rplyLog := "[ERROR] <LoaderS-*default> error: <no such file or directory>",
|
||||
buf.String(); !strings.Contains(rplyLog, expLog) {
|
||||
t.Errorf("Expected %+q, received %+q", expLog, rplyLog)
|
||||
}
|
||||
|
||||
cfg.LoaderCfg()[0].Enabled = false
|
||||
ld.Reload(dm, "", fS, cM)
|
||||
if ld.Enabled() {
|
||||
t.Error("Expected loader to not be enabled")
|
||||
}
|
||||
ld.ListenAndServe(stop)
|
||||
}
|
||||
|
||||
func TestLoaderServiceV1Run(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
fc := []*config.FCTemplate{
|
||||
{Path: utils.Tenant, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.0", utils.RSRConstSep)},
|
||||
{Path: utils.ID, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.1", utils.RSRConstSep)},
|
||||
}
|
||||
tmpIn, err := os.MkdirTemp(utils.EmptyString, "TestLoaderServiceV1RunIn")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpIn)
|
||||
for _, f := range fc {
|
||||
f.ComputePath()
|
||||
}
|
||||
cfg.LoaderCfg()[0].Enabled = true
|
||||
cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{
|
||||
Type: utils.MetaAttributes,
|
||||
Filename: utils.AttributesCsv,
|
||||
Fields: fc,
|
||||
}}
|
||||
cfg.LoaderCfg()[0].TpInDir = tmpIn
|
||||
cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString
|
||||
|
||||
f, err := os.Create(path.Join(tmpIn, utils.AttributesCsv))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := f.WriteString(`cgrates.org,ID`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f, err = os.Create(cfg.LoaderCfg()[0].GetLockFilePath())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cM := engine.NewConnManager(cfg)
|
||||
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
|
||||
fS := engine.NewFilterS(cfg, cM, dm)
|
||||
|
||||
ld := NewLoaderService(cfg, dm, "", fS, cM)
|
||||
var rply string
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: true,
|
||||
utils.MetaStopOnError: true,
|
||||
utils.MetaForceLock: true,
|
||||
},
|
||||
}, &rply); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if rply != utils.OK {
|
||||
t.Errorf("Expected: %q,received: %q", utils.OK, rply)
|
||||
}
|
||||
if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if v := (&engine.AttributeProfile{Tenant: "cgrates.org", ID: "ID", FilterIDs: []string{}, Attributes: []*engine.Attribute{}}); !reflect.DeepEqual(v, prf) {
|
||||
t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(v), utils.ToJSON(prf))
|
||||
}
|
||||
}
|
||||
|
||||
type mockLock2 struct{}
|
||||
|
||||
// lockFolder will attempt to lock the folder by creating the lock file
|
||||
func (mockLock2) Lock() (_ error) { return }
|
||||
func (mockLock2) Unlock() (_ error) { return utils.ErrExists }
|
||||
func (mockLock2) Locked() (_ bool, _ error) { return true, nil }
|
||||
func (mockLock2) IsLockFile(string) (_ bool) { return }
|
||||
|
||||
func TestLoaderServiceV1RunErrors(t *testing.T) {
|
||||
cfg := config.NewDefaultCGRConfig()
|
||||
fc := []*config.FCTemplate{
|
||||
{Filters: []string{"*string"}},
|
||||
}
|
||||
tmpIn, err := os.MkdirTemp(utils.EmptyString, "TestLoaderProcessFolderErrorsIn")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(tmpIn)
|
||||
for _, f := range fc {
|
||||
f.ComputePath()
|
||||
}
|
||||
cfg.LoaderCfg()[0].Enabled = true
|
||||
cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{
|
||||
Type: utils.MetaAttributes,
|
||||
Filename: utils.AttributesCsv,
|
||||
Fields: fc,
|
||||
}}
|
||||
cfg.LoaderCfg()[0].TpInDir = tmpIn
|
||||
cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString
|
||||
|
||||
f, err := os.Create(path.Join(tmpIn, utils.AttributesCsv))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := f.WriteString(`cgrates.org,ID`); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f, err = os.Create(cfg.LoaderCfg()[0].GetLockFilePath())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cM := engine.NewConnManager(cfg)
|
||||
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
|
||||
fS := engine.NewFilterS(cfg, cM, dm)
|
||||
|
||||
ld := NewLoaderService(cfg, dm, "", fS, cM)
|
||||
var rply string
|
||||
|
||||
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: true,
|
||||
utils.MetaStopOnError: true,
|
||||
utils.MetaForceLock: true,
|
||||
},
|
||||
}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
expErrMsg = `strconv.ParseBool: parsing "notfloat": invalid syntax`
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: true,
|
||||
utils.MetaStopOnError: "notfloat",
|
||||
utils.MetaForceLock: true,
|
||||
},
|
||||
}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: "notfloat",
|
||||
utils.MetaStopOnError: "notfloat",
|
||||
utils.MetaForceLock: true,
|
||||
},
|
||||
}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
ld.ldrs[utils.MetaDefault].Locker.Lock()
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: "notfloat",
|
||||
utils.MetaStopOnError: "notfloat",
|
||||
utils.MetaForceLock: "notfloat",
|
||||
},
|
||||
}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
expErrMsg = `ANOTHER_LOADER_RUNNING`
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{
|
||||
APIOpts: map[string]interface{}{
|
||||
utils.MetaCache: utils.MetaNone,
|
||||
utils.MetaWithIndex: "notfloat",
|
||||
utils.MetaStopOnError: "notfloat",
|
||||
utils.MetaForceLock: false,
|
||||
},
|
||||
}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
ld.ldrs[utils.MetaDefault].Locker = mockLock{}
|
||||
|
||||
expErrMsg = `SERVER_ERROR: EXISTS`
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
ld.ldrs[utils.MetaDefault].Locker = mockLock2{}
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{APIOpts: map[string]interface{}{
|
||||
utils.MetaForceLock: true}}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
|
||||
cfg.LoaderCfg()[0].Enabled = false
|
||||
ld.Reload(dm, "", fS, cM)
|
||||
expErrMsg = `UNKNOWN_LOADER: *default`
|
||||
if err := ld.V1Run(context.Background(), &ArgsProcessFolder{}, &rply); err == nil || err.Error() != expErrMsg {
|
||||
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user