Added loaders importZip API

This commit is contained in:
Trial97
2021-12-16 16:05:03 +02:00
committed by Dan Christian Bogos
parent 2601ff6163
commit b699b4a43e
11 changed files with 528 additions and 88 deletions

View File

@@ -37,3 +37,8 @@ func (ldrSv1 *LoaderSv1) Run(ctx *context.Context, args *loaders.ArgsProcessFold
rply *string) error {
return ldrSv1.ldrS.V1Run(ctx, args, rply)
}
func (ldrSv1 *LoaderSv1) ImportZip(ctx *context.Context, args *loaders.ArgsProcessZip,
rply *string) error {
return ldrSv1.ldrS.V1ImportZip(ctx, args, rply)
}

View File

@@ -170,7 +170,7 @@ func (gl *GuardianLocker) Guard(ctx *context.Context, handler func(*context.Cont
// GuardIDs aquires a lock for duration
// returns the reference ID for the lock group aquired
func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) {
func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) string {
return gl.lockWithReference(refID, timeout, lkIDs...)
}

View File

@@ -19,8 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package loaders
import (
"archive/zip"
"encoding/csv"
"fmt"
"io"
"net/http"
"net/url"
@@ -33,28 +33,15 @@ import (
"github.com/cgrates/cgrates/utils"
)
type CSVReader interface {
Path() string
Read() ([]string, error)
Close() error
}
func NewCSVReader(csvType, dPath, fn string, sep rune, nrFlds int) (CSVReader, error) {
switch csvType {
case utils.MetaFileCSV:
return NewFileCSV(path.Join(dPath, fn), sep, nrFlds)
case utils.MetaUrl:
return NewURLCSV(strings.TrimSuffix(dPath, utils.Slash)+utils.Slash+fn, sep, nrFlds)
case utils.MetaGoogleAPI: // TODO: Implement *gapi
return nil, nil
case utils.MetaString:
return NewStringCSV(fn, sep, nrFlds), nil
default:
return nil, fmt.Errorf("unsupported CSVReader type: <%q>", csvType)
func NewCSVReader(prv CSVProvider, dPath, fn string, sep rune, nrFlds int) (_ *CSVFile, err error) {
var file io.ReadCloser
if file, err = prv.Open(dPath, fn); err != nil {
return
}
return NewCSVFile(file, path.Join(dPath, fn), sep, nrFlds), nil
}
func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) CSVReader {
func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) *CSVFile {
csvRrdr := csv.NewReader(rdr)
csvRrdr.Comma = sep
csvRrdr.Comment = utils.CommentChar
@@ -67,19 +54,37 @@ func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) CSVReader
}
}
func NewFileCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) {
var file io.ReadCloser
if file, err = os.Open(path); err != nil {
return
}
return NewCSVFile(file, path, sep, nrFlds), nil
}
func NewStringCSV(data string, sep rune, nrFlds int) (_ CSVReader) {
func NewStringCSV(data string, sep rune, nrFlds int) *CSVFile {
return NewCSVFile(io.NopCloser(strings.NewReader(data)), data, sep, nrFlds)
}
func NewURLCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) {
type CSVFile struct {
path string // only for logging purposes
cls io.Closer // keep reference so we can close it when done
csvRdr *csv.Reader
}
func (c *CSVFile) Path() string { return c.path }
func (c *CSVFile) Read() ([]string, error) { return c.csvRdr.Read() }
func (c *CSVFile) Close() error { return c.cls.Close() }
type CSVProvider interface {
Open(dPath, fn string) (io.ReadCloser, error)
Type() string
}
type fileProvider struct{}
func (fileProvider) Open(dPath, fn string) (io.ReadCloser, error) {
return os.Open(path.Join(dPath, fn))
}
func (fileProvider) Type() string { return utils.MetaFileCSV }
type urlProvider struct{}
func (urlProvider) Open(dPath, fn string) (_ io.ReadCloser, err error) {
path := strings.TrimSuffix(dPath, utils.Slash) + utils.Slash + fn
if _, err = url.ParseRequestURI(path); err != nil {
return
}
@@ -95,15 +100,18 @@ func NewURLCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) {
err = utils.ErrNotFound
return
}
return NewCSVFile(req.Body, path, sep, nrFlds), nil
return req.Body, nil
}
func (urlProvider) Type() string { return utils.MetaUrl }
type CSVFile struct {
path string // only for logging purposes
cls io.Closer // keep reference so we can close it when done
csvRdr *csv.Reader
type zipProvider struct{ *zip.Reader }
func (z zipProvider) Open(_, fn string) (io.ReadCloser, error) { return z.Reader.Open(fn) }
func (zipProvider) Type() string { return utils.MetaZip }
type stringProvider struct{}
func (stringProvider) Open(_, fn string) (io.ReadCloser, error) {
return io.NopCloser(strings.NewReader(fn)), nil
}
func (c *CSVFile) Path() string { return c.path }
func (c *CSVFile) Read() ([]string, error) { return c.csvRdr.Read() }
func (c *CSVFile) Close() error { return c.cls.Close() }
func (stringProvider) Type() string { return utils.MetaString }

View File

@@ -39,14 +39,7 @@ import (
func TestNewCSVStringReader(t *testing.T) {
data := `cgrates.org,ATTR_VARIABLE,,20,,*req.Category,*variable,~*req.ToR,`
expErrMsg := `unsupported CSVReader type: <"nonValidType">`
if _, err := NewCSVReader("nonValidType", utils.EmptyString, data, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
if np, err := NewCSVReader(utils.MetaGoogleAPI, utils.EmptyString, data, utils.CSVSep, -1); err != nil || np != nil { // for coverage (remove when implemented)
t.Error("This tipe should not be implemented yet")
}
csvR, err := NewCSVReader(utils.MetaString, utils.EmptyString, data, utils.CSVSep, -1)
csvR, err := NewCSVReader(stringProvider{}, utils.EmptyString, data, utils.CSVSep, -1)
if err != nil {
t.Fatal(err)
}
@@ -77,23 +70,29 @@ func TestNewCSVStringReader(t *testing.T) {
if err := csvR.Close(); err != nil {
t.Error(err)
}
if tp := (stringProvider{}).Type(); tp != utils.MetaString {
t.Errorf("Expeceted: %q, received: %q", utils.MetaString, tp)
}
if tp := (zipProvider{}).Type(); tp != utils.MetaZip {
t.Errorf("Expeceted: %q, received: %q", utils.MetaZip, tp)
}
}
func TestNewCSVReaderErrors(t *testing.T) {
path := "TestNewCSVReaderErrors" + strconv.Itoa(rand.Int()) + utils.CSVSuffix
expErrMsg := fmt.Sprintf("open %s: no such file or directory", path)
if _, err := NewCSVReader(utils.MetaFileCSV, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
if _, err := NewCSVReader(fileProvider{}, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
expErrMsg = fmt.Sprintf("parse %q: invalid URI for request", "./"+path)
if _, err := NewCSVReader(utils.MetaUrl, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
if _, err := NewCSVReader(urlProvider{}, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
prePath := "http:/localhost:" + strconv.Itoa(rand.Int())
expErrMsg = fmt.Sprintf(`path:"%s/%s" is not reachable`, prePath, path)
if _, err := NewCSVReader(utils.MetaUrl, prePath, path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
if _, err := NewCSVReader(urlProvider{}, prePath, path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %q, received: %q", expErrMsg, err.Error())
}
}
@@ -106,17 +105,17 @@ func TestNewCSVURLReader(t *testing.T) {
defer s.Close()
runtime.Gosched()
if _, err := NewCSVReader(utils.MetaUrl, s.URL+"/notFound", utils.AttributesCsv, utils.CSVSep, -1); err != utils.ErrNotFound {
if _, err := NewCSVReader(urlProvider{}, s.URL+"/notFound", utils.AttributesCsv, utils.CSVSep, -1); err != utils.ErrNotFound {
t.Errorf("Expeceted: %v, received: %v", utils.ErrNotFound, err)
}
csvR, err := NewCSVReader(utils.MetaUrl, s.URL+"/ok", utils.AttributesCsv, utils.CSVSep, -1)
csvR, err := NewCSVReader(urlProvider{}, s.URL+"/ok", utils.AttributesCsv, utils.CSVSep, -1)
if err != nil {
t.Fatal(err)
}
expPath := s.URL + "/ok/" + utils.AttributesCsv
expPath := path.Join(s.URL + "/ok/" + utils.AttributesCsv)
if p := csvR.Path(); p != expPath {
t.Errorf("Expeceted: %+v, received: %+v", data, expPath)
t.Errorf("Expeceted: %+v, received: %+v", p, expPath)
}
if p, err := csvR.Read(); err != nil {
@@ -128,6 +127,9 @@ func TestNewCSVURLReader(t *testing.T) {
if err := csvR.Close(); err != nil {
t.Error(err)
}
if tp := (urlProvider{}).Type(); tp != utils.MetaUrl {
t.Errorf("Expeceted: %q, received: %q", utils.MetaUrl, tp)
}
}
func TestNewCSVFileReader(t *testing.T) {
@@ -152,7 +154,7 @@ func TestNewCSVFileReader(t *testing.T) {
t.Fatal(err)
}
csvR, err := NewCSVReader(utils.MetaFileCSV, dir, utils.AttributesCsv, utils.CSVSep, -1)
csvR, err := NewCSVReader(fileProvider{}, dir, utils.AttributesCsv, utils.CSVSep, -1)
if err != nil {
t.Fatal(err)
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package loaders
import (
"archive/zip"
"fmt"
"io"
"os"
@@ -164,7 +165,7 @@ func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.Data
connMgr: connMgr,
cacheConns: cacheConns,
dataCache: dataCache,
Locker: newLocker(ldrCfg.GetLockFilePath()),
Locker: newLocker(ldrCfg.GetLockFilePath(), ldrCfg.ID),
}
}
@@ -242,7 +243,7 @@ func (l *loader) process(ctx *context.Context, obj profile, lType, action, cachi
return engine.CallCache(l.connMgr, ctx, l.cacheConns, caching, cacheArgs, cacheIDs, nil, false, l.ldrCfg.Tenant)
}
func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, withIndex, partialRates bool) (err error) {
func (l *loader) processData(ctx *context.Context, csv *CSVFile, tmpls []*config.FCTemplate, lType, action, caching string, withIndex, partialRates bool) (err error) {
newPrf := newProfileFunc(lType)
obj := newPrf()
var prevTntID string
@@ -285,24 +286,16 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi
return
}
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool) (err error) {
csvType := utils.MetaFileCSV
switch {
// case strings.HasPrefix(inPath, gprefix): // uncomment this after *gapi is implemented
// csvType = utils.MetaGoogleAPI
// inPath = strings.TrimPrefix(inPath, gprefix)
case utils.IsURL(inPath):
csvType = utils.MetaUrl
}
var csv CSVReader
if csv, err = NewCSVReader(csvType, inPath, cfg.Filename, rune(l.ldrCfg.FieldSeparator[0]), 0); err != nil {
func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool, prv CSVProvider) (err error) {
var csv *CSVFile
if csv, err = NewCSVReader(prv, inPath, cfg.Filename, rune(l.ldrCfg.FieldSeparator[0]), 0); err != nil {
return
}
defer csv.Close()
if err = l.processData(ctx, csv, cfg.Fields, cfg.Type, action, caching,
withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt)); err != nil || // encounterd error
outPath == utils.EmptyString || // or no moving
csvType != utils.MetaFileCSV { // or the type can not be moved(e.g. url)
prv.Type() != utils.MetaFileCSV { // or the type can not be moved(e.g. url)
return
}
return os.Rename(path.Join(inPath, cfg.Filename), path.Join(outPath, cfg.Filename))
@@ -330,7 +323,7 @@ func (l *loader) processIFile(_, fileName string) (err error) {
return
}
defer l.Unlock()
return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, utils.FirstNonEmpty(l.ldrCfg.Opts.Cache, l.cfg.GeneralCfg().DefaultCaching), l.ldrCfg.Opts.WithIndex)
return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, utils.FirstNonEmpty(l.ldrCfg.Opts.Cache, l.cfg.GeneralCfg().DefaultCaching), l.ldrCfg.Opts.WithIndex, fileProvider{})
}
func (l *loader) processFolder(ctx *context.Context, caching string, withIndex, stopOnError bool) (err error) {
@@ -338,8 +331,16 @@ func (l *loader) processFolder(ctx *context.Context, caching string, withIndex,
return
}
defer l.Unlock()
var csvType CSVProvider = fileProvider{}
switch {
// case strings.HasPrefix(inPath, gprefix): // uncomment this after *gapi is implemented
// csvType = utils.MetaGoogleAPI
// inPath = strings.TrimPrefix(inPath, gprefix)
case utils.IsURL(l.ldrCfg.TpInDir):
csvType = urlProvider{}
}
for _, cfg := range l.ldrCfg.Data {
if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex); err != nil {
if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex, csvType); err != nil {
if !stopOnError {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, l.ldrCfg.ID, cfg.Type, err))
@@ -399,3 +400,23 @@ func (l *loader) ListenAndServe(stopChan chan struct{}) (err error) {
}
return
}
func (l *loader) processZip(ctx *context.Context, caching string, withIndex, stopOnError bool, zipR *zip.Reader) (err error) {
if err = l.Lock(); err != nil {
return
}
defer l.Unlock()
ziP := zipProvider{zipR}
for _, cfg := range l.ldrCfg.Data {
if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex, ziP); err != nil {
if !stopOnError {
utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s",
utils.LoaderS, l.ldrCfg.ID, cfg.Type, err))
err = nil
continue
}
return
}
}
return
}

View File

@@ -19,7 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package loaders
import (
"archive/zip"
"bytes"
"encoding/csv"
"fmt"
"net/http"
"net/http/httptest"
@@ -355,7 +357,7 @@ func TestLoaderProcess(t *testing.T) {
filterS: fS,
connMgr: cM,
dataCache: cache,
Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath()),
Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath(), cfg.LoaderCfg()[0].ID),
}); !reflect.DeepEqual(expLd, ld) {
t.Errorf("Expeceted: %+v, received: %+v", expLd, ld)
}
@@ -683,11 +685,9 @@ cgrates.org,ID2`, utils.CSVSep, -1), fc, utils.MetaAttributes, utils.MetaStore,
}
}
type mockCSV struct{}
type mockReader struct{}
func (mockCSV) Path() (_ string) { return }
func (mockCSV) Read() ([]string, error) { return nil, utils.ErrNotFound }
func (mockCSV) Close() (_ error) { return }
func (mockReader) Read([]byte) (int, error) { return 0, utils.ErrNotFound }
func TestLoaderProcessDataErrors(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
@@ -725,7 +725,7 @@ cgrates.org,ID2`, utils.CSVSep, -1), fc, utils.MetaAttributes, "notSupported", u
t.Errorf("Expeceted: %q, received: %v", expErrMsg, err)
}
if err := ld.processData(context.Background(), mockCSV{}, fc, utils.MetaAttributes, "notSupported", utils.MetaNone, true, false); err != utils.ErrNotFound {
if err := ld.processData(context.Background(), &CSVFile{csvRdr: csv.NewReader(mockReader{})}, fc, utils.MetaAttributes, "notSupported", utils.MetaNone, true, false); err != utils.ErrNotFound {
t.Errorf("Expeceted: %q, received: %v", utils.ErrNotFound, err)
}
}
@@ -759,7 +759,7 @@ func TestLoaderProcessFileURL(t *testing.T) {
Type: utils.MetaAttributes,
Filename: utils.AttributesCsv,
Fields: fc,
}, s.URL+"/ok", utils.EmptyString, utils.MetaStore, utils.MetaNone, true); err != nil {
}, s.URL+"/ok", utils.EmptyString, utils.MetaStore, utils.MetaNone, true, urlProvider{}); err != nil {
t.Fatal(err)
}
if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil {
@@ -772,7 +772,7 @@ func TestLoaderProcessFileURL(t *testing.T) {
Type: utils.MetaAttributes,
Filename: utils.AttributesCsv,
Fields: fc,
}, s.URL+"/notFound", utils.EmptyString, utils.MetaStore, utils.MetaNone, true); err != utils.ErrNotFound {
}, s.URL+"/notFound", utils.EmptyString, utils.MetaStore, utils.MetaNone, true, urlProvider{}); err != utils.ErrNotFound {
t.Errorf("Expeceted: %v, received: %v", utils.ErrNotFound, err)
}
@@ -978,6 +978,13 @@ func TestLoaderProcessFolder(t *testing.T) {
if err := ld.processFolder(context.Background(), utils.MetaNone, true, true); err != utils.ErrExists {
t.Fatal(err)
}
ld.Locker = nopLock{}
ld.ldrCfg.TpInDir = "http://localhost:0"
expErrMsg := `path:"http://localhost:0/Attributes.csv" is not reachable`
if err := ld.processFolder(context.Background(), utils.MetaNone, true, true); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
}
func TestLoaderProcessFolderErrors(t *testing.T) {
@@ -1071,7 +1078,6 @@ func TestLoaderProcessFolderErrors(t *testing.T) {
buf.String(); !strings.Contains(rplyLog, expLog) {
t.Errorf("Expected %+q, received %+q", expLog, rplyLog)
}
}
func TestLoaderMoveUnprocessedFilesErrors(t *testing.T) {
@@ -1192,3 +1198,90 @@ func TestLoaderListenAndServeI(t *testing.T) {
t.Errorf("Expected %+q, received %+q", expLog, rplyLog)
}
}
func TestLoaderProcessZipErrors(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cM := engine.NewConnManager(cfg)
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
cache := map[string]*ltcache.Cache{}
for k, cfg := range cfg.LoaderCfg()[0].Cache {
cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil)
}
fc := []*config.FCTemplate{
{Filters: []string{"*string"}},
}
for _, f := range fc {
f.ComputePath()
}
ld := newLoader(cfg, &config.LoaderSCfg{
ID: "test",
Enabled: true,
TpInDir: utils.EmptyString,
TpOutDir: utils.EmptyString,
Data: []*config.LoaderDataType{
{
Type: utils.MetaAttributes,
Filename: utils.AttributesCsv,
Fields: fc,
},
},
FieldSeparator: utils.FieldsSep,
Action: utils.MetaStore,
Opts: &config.LoaderSOptsCfg{
WithIndex: true,
Cache: utils.MetaNone,
},
}, dm, cache, fS, cM, nil)
bufz := new(bytes.Buffer)
w := zip.NewWriter(bufz)
f, err := w.Create(utils.AttributesCsv)
if err != nil {
t.Fatal(err)
}
if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
r, err := zip.NewReader(bytes.NewReader(bufz.Bytes()), int64(bufz.Len()))
if err != nil {
t.Fatal(err)
}
expErrMsg := "inline parse error for string: <*string>"
if err := ld.processZip(context.Background(), utils.MetaNone, true, true, r); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
if _, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != utils.ErrNotFound {
t.Fatal(err)
}
buf := bytes.NewBuffer([]byte{})
log.SetOutput(buf)
lgr := utils.Logger
defer func() { utils.Logger = lgr; log.SetOutput(os.Stderr) }()
utils.Logger, _ = utils.Newlogger(utils.MetaStdLog, utils.EmptyString)
utils.Logger.SetLogLevel(7)
if err := ld.processZip(context.Background(), utils.MetaNone, true, false, r); err != nil {
t.Fatal(err)
}
if _, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != utils.ErrNotFound {
t.Fatal(err)
}
if expLog, rplyLog := "<LoaderS-test> loaderType: <*attributes> cannot open files, err: inline parse error for string: <*string>",
buf.String(); !strings.Contains(rplyLog, expLog) {
t.Errorf("Expected %+q, received %+q", expLog, rplyLog)
}
ld.Locker = mockLock{}
if err := ld.processZip(context.Background(), utils.MetaNone, true, true, r); err != utils.ErrExists {
t.Fatal(err)
}
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package loaders
import (
"archive/zip"
"bytes"
"errors"
"fmt"
"sync"
@@ -119,6 +121,66 @@ func (ldrS *LoaderS) V1Run(ctx *context.Context, args *ArgsProcessFolder,
return
}
type ArgsProcessZip struct {
LoaderID string
Data []byte
APIOpts map[string]interface{}
}
func (ldrS *LoaderS) V1ImportZip(ctx *context.Context, args *ArgsProcessZip,
rply *string) (err error) {
var zipR *zip.Reader
if zipR, err = zip.NewReader(bytes.NewReader(args.Data), int64(len(args.Data))); err != nil {
return
}
ldrS.RLock()
defer ldrS.RUnlock()
if args.LoaderID == utils.EmptyString {
args.LoaderID = utils.MetaDefault
}
ldr, has := ldrS.ldrs[args.LoaderID]
if !has {
return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID)
}
var locked bool
if locked, err = ldr.Locked(); err != nil {
return utils.NewErrServerError(err)
} else if locked {
fl := ldr.ldrCfg.Opts.ForceLock
if val, has := args.APIOpts[utils.MetaForceLock]; has {
if fl, err = utils.IfaceAsBool(val); err != nil {
return
}
}
if !fl {
return errors.New("ANOTHER_LOADER_RUNNING")
}
if err := ldr.Unlock(); err != nil {
return utils.NewErrServerError(err)
}
}
wI := ldr.ldrCfg.Opts.WithIndex
if val, has := args.APIOpts[utils.MetaWithIndex]; has {
if wI, err = utils.IfaceAsBool(val); err != nil {
return
}
}
soE := ldr.ldrCfg.Opts.StopOnError
if val, has := args.APIOpts[utils.MetaStopOnError]; has {
if soE, err = utils.IfaceAsBool(val); err != nil {
return
}
}
if err := ldr.processZip(context.Background(), utils.FirstNonEmpty(utils.IfaceAsString(args.APIOpts[utils.MetaCache]), ldr.ldrCfg.Opts.Cache, ldrS.cfg.GeneralCfg().DefaultCaching),
wI, soE, zipR); err != nil {
return utils.NewErrServerError(err)
}
*rply = utils.OK
return
}
// Reload recreates the loaders map thread safe
func (ldrS *LoaderS) Reload(dm *engine.DataManager,
filterS *engine.FilterS, connMgr *engine.ConnManager) {

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package loaders
import (
"archive/zip"
"bytes"
"os"
"path"
@@ -60,7 +61,7 @@ func TestNewLoaderService(t *testing.T) {
connMgr: cM,
dataCache: cache,
cacheConns: cfg.LoaderCfg()[0].CacheSConns,
Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath()),
Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath(), cfg.LoaderCfg()[0].ID),
},
},
}); !reflect.DeepEqual(exp, ld) {
@@ -302,3 +303,199 @@ func TestLoaderServiceV1RunErrors(t *testing.T) {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
}
func TestLoaderServiceV1ImportZip(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fc := []*config.FCTemplate{
{Path: utils.Tenant, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.0", utils.RSRConstSep)},
{Path: utils.ID, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.1", utils.RSRConstSep)},
}
for _, f := range fc {
f.ComputePath()
}
cfg.LoaderCfg()[0].Enabled = true
cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{
Type: utils.MetaAttributes,
Filename: utils.AttributesCsv,
Fields: fc,
}}
cfg.LoaderCfg()[0].LockFilePath = utils.MetaMemory
cfg.LoaderCfg()[0].TpInDir = utils.EmptyString
cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString
buf := new(bytes.Buffer)
wr := zip.NewWriter(buf)
f, err := wr.Create(utils.AttributesCsv)
if err != nil {
t.Fatal(err)
}
if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil {
t.Fatal(err)
}
if err := wr.Close(); err != nil {
t.Fatal(err)
}
cM := engine.NewConnManager(cfg)
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
var rply string
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: true,
utils.MetaStopOnError: true,
utils.MetaForceLock: true,
},
}, &rply); err != nil {
t.Fatal(err)
} else if rply != utils.OK {
t.Errorf("Expected: %q,received: %q", utils.OK, rply)
}
if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil {
t.Fatal(err)
} else if v := (&engine.AttributeProfile{Tenant: "cgrates.org", ID: "ID"}); !reflect.DeepEqual(v, prf) {
t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(v), utils.ToJSON(prf))
}
}
func TestLoaderServiceV1ImportZipErrors(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
fc := []*config.FCTemplate{
{Filters: []string{"*string"}},
}
for _, f := range fc {
f.ComputePath()
}
cfg.LoaderCfg()[0].Enabled = true
cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{
Type: utils.MetaAttributes,
Filename: utils.AttributesCsv,
Fields: fc,
}}
cfg.LoaderCfg()[0].TpInDir = utils.EmptyString
cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString
defer os.Remove(cfg.LoaderCfg()[0].LockFilePath)
buf := new(bytes.Buffer)
wr := zip.NewWriter(buf)
f, err := wr.Create(utils.AttributesCsv)
if err != nil {
t.Fatal(err)
}
if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil {
t.Fatal(err)
}
if err := wr.Close(); err != nil {
t.Fatal(err)
}
cM := engine.NewConnManager(cfg)
dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM)
fS := engine.NewFilterS(cfg, cM, dm)
ld := NewLoaderService(cfg, dm, fS, cM)
var rply string
expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>"
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: true,
utils.MetaStopOnError: true,
utils.MetaForceLock: true,
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
expErrMsg = "zip: not a valid zip file"
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: true,
utils.MetaStopOnError: true,
utils.MetaForceLock: true,
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
expErrMsg = `strconv.ParseBool: parsing "notfloat": invalid syntax`
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: true,
utils.MetaStopOnError: "notfloat",
utils.MetaForceLock: true,
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: "notfloat",
utils.MetaStopOnError: "notfloat",
utils.MetaForceLock: true,
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
ld.ldrs[utils.MetaDefault].Locker.Lock()
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: "notfloat",
utils.MetaStopOnError: "notfloat",
utils.MetaForceLock: "notfloat",
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
expErrMsg = `ANOTHER_LOADER_RUNNING`
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
APIOpts: map[string]interface{}{
utils.MetaCache: utils.MetaNone,
utils.MetaWithIndex: "notfloat",
utils.MetaStopOnError: "notfloat",
utils.MetaForceLock: false,
},
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
ld.ldrs[utils.MetaDefault].Locker = mockLock{}
expErrMsg = `SERVER_ERROR: EXISTS`
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
ld.ldrs[utils.MetaDefault].Locker = mockLock2{}
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(), APIOpts: map[string]interface{}{
utils.MetaForceLock: true}}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
cfg.LoaderCfg()[0].Enabled = false
ld.Reload(dm, fS, cM)
expErrMsg = `UNKNOWN_LOADER: *default`
if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{
Data: buf.Bytes(),
}, &rply); err == nil || err.Error() != expErrMsg {
t.Errorf("Expeceted: %v, received: %v", expErrMsg, err)
}
}

View File

@@ -21,6 +21,9 @@ package loaders
import (
"io"
"os"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
type Locker interface {
@@ -30,11 +33,15 @@ type Locker interface {
IsLockFile(string) bool
}
func newLocker(path string) Locker {
if len(path) != 0 {
func newLocker(path, loaderID string) Locker {
switch path {
case utils.EmptyString:
return new(nopLock)
case utils.MetaMemory:
return &memoryLock{loaderID: loaderID}
default:
return folderLock(path)
}
return new(nopLock)
}
type folderLock string
@@ -66,8 +73,24 @@ func (fl folderLock) IsLockFile(path string) bool { return path == string(fl) }
type nopLock struct{}
// lockFolder will attempt to lock the folder by creating the lock file
func (nopLock) Lock() (_ error) { return }
func (nopLock) Unlock() (_ error) { return }
func (nopLock) Locked() (_ bool, _ error) { return }
func (nopLock) IsLockFile(string) (_ bool) { return }
type memoryLock struct {
loaderID string
refID string
}
func (m *memoryLock) Lock() (_ error) {
m.refID = guardian.Guardian.GuardIDs(m.refID, 0, utils.ConcatenatedKey(utils.LoaderS, m.loaderID))
return
}
func (m *memoryLock) Unlock() (_ error) {
guardian.Guardian.UnguardIDs(m.refID)
m.refID = utils.EmptyString
return
}
func (m memoryLock) Locked() (bool, error) { return len(m.refID) != 0, nil }
func (m memoryLock) IsLockFile(string) (_ bool) { return }

View File

@@ -28,7 +28,7 @@ import (
)
func TestNopLocker(t *testing.T) {
np := newLocker(utils.EmptyString)
np := newLocker(utils.EmptyString, utils.EmptyString)
if err := np.Lock(); err != nil {
t.Error(err)
}
@@ -56,7 +56,7 @@ func TestFolderLocker(t *testing.T) {
}
defer os.RemoveAll(dir)
fp := path.Join(dir, ".lkr")
np := newLocker(fp)
np := newLocker(fp, utils.EmptyString)
exp := folderLock(fp)
if !reflect.DeepEqual(np, exp) {
t.Errorf("Expeceted: %+v, received: %+v", exp, np)
@@ -84,3 +84,30 @@ func TestFolderLocker(t *testing.T) {
t.Error("Expected no lock")
}
}
func TestMemoryLocker(t *testing.T) {
np := newLocker(utils.MetaMemory, "ID")
exp := &memoryLock{loaderID: "ID"}
if !reflect.DeepEqual(np, exp) {
t.Errorf("Expeceted: %+v, received: %+v", exp, np)
}
if err := np.Lock(); err != nil {
t.Error(err)
}
if lk, err := np.Locked(); err != nil {
t.Error(err)
} else if !lk {
t.Error("Expected lock")
}
if err := np.Unlock(); err != nil {
t.Error(err)
}
if np.IsLockFile(utils.EmptyString) {
t.Error("Expected to not be lock file")
}
if lk, err := np.Locked(); err != nil {
t.Error(err)
} else if lk {
t.Error("Expected no lock")
}
}

View File

@@ -591,6 +591,7 @@ const (
MetaMultiply = "*multiply"
MetaDivide = "*divide"
MetaUrl = "*url"
MetaZip = "*zip"
MetaXml = "*xml"
MetaReq = "*req"
MetaVars = "*vars"
@@ -634,6 +635,7 @@ const (
IdxEnd = "]"
IdxCombination = "]["
MetaMemory = "*memory"
RemoteHost = "RemoteHost"
Local = "local"
TCP = "tcp"