diff --git a/apis/loaders.go b/apis/loaders.go index cbb522367..dc547ba86 100644 --- a/apis/loaders.go +++ b/apis/loaders.go @@ -37,3 +37,8 @@ func (ldrSv1 *LoaderSv1) Run(ctx *context.Context, args *loaders.ArgsProcessFold rply *string) error { return ldrSv1.ldrS.V1Run(ctx, args, rply) } + +func (ldrSv1 *LoaderSv1) ImportZip(ctx *context.Context, args *loaders.ArgsProcessZip, + rply *string) error { + return ldrSv1.ldrS.V1ImportZip(ctx, args, rply) +} diff --git a/guardian/guardian.go b/guardian/guardian.go index 7d78d388d..8423e124b 100644 --- a/guardian/guardian.go +++ b/guardian/guardian.go @@ -170,7 +170,7 @@ func (gl *GuardianLocker) Guard(ctx *context.Context, handler func(*context.Cont // GuardIDs aquires a lock for duration // returns the reference ID for the lock group aquired -func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) (retRefID string) { +func (gl *GuardianLocker) GuardIDs(refID string, timeout time.Duration, lkIDs ...string) string { return gl.lockWithReference(refID, timeout, lkIDs...) } diff --git a/loaders/csvreader.go b/loaders/csvreader.go index 09b01d444..942bd043a 100644 --- a/loaders/csvreader.go +++ b/loaders/csvreader.go @@ -19,8 +19,8 @@ along with this program. If not, see package loaders import ( + "archive/zip" "encoding/csv" - "fmt" "io" "net/http" "net/url" @@ -33,28 +33,15 @@ import ( "github.com/cgrates/cgrates/utils" ) -type CSVReader interface { - Path() string - Read() ([]string, error) - Close() error -} - -func NewCSVReader(csvType, dPath, fn string, sep rune, nrFlds int) (CSVReader, error) { - switch csvType { - case utils.MetaFileCSV: - return NewFileCSV(path.Join(dPath, fn), sep, nrFlds) - case utils.MetaUrl: - return NewURLCSV(strings.TrimSuffix(dPath, utils.Slash)+utils.Slash+fn, sep, nrFlds) - case utils.MetaGoogleAPI: // TODO: Implement *gapi - return nil, nil - case utils.MetaString: - return NewStringCSV(fn, sep, nrFlds), nil - default: - return nil, fmt.Errorf("unsupported CSVReader type: <%q>", csvType) +func NewCSVReader(prv CSVProvider, dPath, fn string, sep rune, nrFlds int) (_ *CSVFile, err error) { + var file io.ReadCloser + if file, err = prv.Open(dPath, fn); err != nil { + return } + return NewCSVFile(file, path.Join(dPath, fn), sep, nrFlds), nil } -func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) CSVReader { +func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) *CSVFile { csvRrdr := csv.NewReader(rdr) csvRrdr.Comma = sep csvRrdr.Comment = utils.CommentChar @@ -67,19 +54,37 @@ func NewCSVFile(rdr io.ReadCloser, path string, sep rune, nrFlds int) CSVReader } } -func NewFileCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) { - var file io.ReadCloser - if file, err = os.Open(path); err != nil { - return - } - return NewCSVFile(file, path, sep, nrFlds), nil -} - -func NewStringCSV(data string, sep rune, nrFlds int) (_ CSVReader) { +func NewStringCSV(data string, sep rune, nrFlds int) *CSVFile { return NewCSVFile(io.NopCloser(strings.NewReader(data)), data, sep, nrFlds) } -func NewURLCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) { +type CSVFile struct { + path string // only for logging purposes + cls io.Closer // keep reference so we can close it when done + csvRdr *csv.Reader +} + +func (c *CSVFile) Path() string { return c.path } +func (c *CSVFile) Read() ([]string, error) { return c.csvRdr.Read() } +func (c *CSVFile) Close() error { return c.cls.Close() } + +type CSVProvider interface { + Open(dPath, fn string) (io.ReadCloser, error) + Type() string +} + +type fileProvider struct{} + +func (fileProvider) Open(dPath, fn string) (io.ReadCloser, error) { + return os.Open(path.Join(dPath, fn)) +} + +func (fileProvider) Type() string { return utils.MetaFileCSV } + +type urlProvider struct{} + +func (urlProvider) Open(dPath, fn string) (_ io.ReadCloser, err error) { + path := strings.TrimSuffix(dPath, utils.Slash) + utils.Slash + fn if _, err = url.ParseRequestURI(path); err != nil { return } @@ -95,15 +100,18 @@ func NewURLCSV(path string, sep rune, nrFlds int) (_ CSVReader, err error) { err = utils.ErrNotFound return } - return NewCSVFile(req.Body, path, sep, nrFlds), nil + return req.Body, nil } +func (urlProvider) Type() string { return utils.MetaUrl } -type CSVFile struct { - path string // only for logging purposes - cls io.Closer // keep reference so we can close it when done - csvRdr *csv.Reader +type zipProvider struct{ *zip.Reader } + +func (z zipProvider) Open(_, fn string) (io.ReadCloser, error) { return z.Reader.Open(fn) } +func (zipProvider) Type() string { return utils.MetaZip } + +type stringProvider struct{} + +func (stringProvider) Open(_, fn string) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(fn)), nil } - -func (c *CSVFile) Path() string { return c.path } -func (c *CSVFile) Read() ([]string, error) { return c.csvRdr.Read() } -func (c *CSVFile) Close() error { return c.cls.Close() } +func (stringProvider) Type() string { return utils.MetaString } diff --git a/loaders/csvreader_test.go b/loaders/csvreader_test.go index 5f32c2c21..59b2330e0 100644 --- a/loaders/csvreader_test.go +++ b/loaders/csvreader_test.go @@ -39,14 +39,7 @@ import ( func TestNewCSVStringReader(t *testing.T) { data := `cgrates.org,ATTR_VARIABLE,,20,,*req.Category,*variable,~*req.ToR,` - expErrMsg := `unsupported CSVReader type: <"nonValidType">` - if _, err := NewCSVReader("nonValidType", utils.EmptyString, data, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { - t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) - } - if np, err := NewCSVReader(utils.MetaGoogleAPI, utils.EmptyString, data, utils.CSVSep, -1); err != nil || np != nil { // for coverage (remove when implemented) - t.Error("This tipe should not be implemented yet") - } - csvR, err := NewCSVReader(utils.MetaString, utils.EmptyString, data, utils.CSVSep, -1) + csvR, err := NewCSVReader(stringProvider{}, utils.EmptyString, data, utils.CSVSep, -1) if err != nil { t.Fatal(err) } @@ -77,23 +70,29 @@ func TestNewCSVStringReader(t *testing.T) { if err := csvR.Close(); err != nil { t.Error(err) } + if tp := (stringProvider{}).Type(); tp != utils.MetaString { + t.Errorf("Expeceted: %q, received: %q", utils.MetaString, tp) + } + if tp := (zipProvider{}).Type(); tp != utils.MetaZip { + t.Errorf("Expeceted: %q, received: %q", utils.MetaZip, tp) + } } func TestNewCSVReaderErrors(t *testing.T) { path := "TestNewCSVReaderErrors" + strconv.Itoa(rand.Int()) + utils.CSVSuffix expErrMsg := fmt.Sprintf("open %s: no such file or directory", path) - if _, err := NewCSVReader(utils.MetaFileCSV, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { + if _, err := NewCSVReader(fileProvider{}, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) } expErrMsg = fmt.Sprintf("parse %q: invalid URI for request", "./"+path) - if _, err := NewCSVReader(utils.MetaUrl, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { + if _, err := NewCSVReader(urlProvider{}, ".", path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) } prePath := "http:/localhost:" + strconv.Itoa(rand.Int()) expErrMsg = fmt.Sprintf(`path:"%s/%s" is not reachable`, prePath, path) - if _, err := NewCSVReader(utils.MetaUrl, prePath, path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { + if _, err := NewCSVReader(urlProvider{}, prePath, path, utils.CSVSep, -1); err == nil || err.Error() != expErrMsg { t.Errorf("Expeceted: %q, received: %q", expErrMsg, err.Error()) } } @@ -106,17 +105,17 @@ func TestNewCSVURLReader(t *testing.T) { defer s.Close() runtime.Gosched() - if _, err := NewCSVReader(utils.MetaUrl, s.URL+"/notFound", utils.AttributesCsv, utils.CSVSep, -1); err != utils.ErrNotFound { + if _, err := NewCSVReader(urlProvider{}, s.URL+"/notFound", utils.AttributesCsv, utils.CSVSep, -1); err != utils.ErrNotFound { t.Errorf("Expeceted: %v, received: %v", utils.ErrNotFound, err) } - csvR, err := NewCSVReader(utils.MetaUrl, s.URL+"/ok", utils.AttributesCsv, utils.CSVSep, -1) + csvR, err := NewCSVReader(urlProvider{}, s.URL+"/ok", utils.AttributesCsv, utils.CSVSep, -1) if err != nil { t.Fatal(err) } - expPath := s.URL + "/ok/" + utils.AttributesCsv + expPath := path.Join(s.URL + "/ok/" + utils.AttributesCsv) if p := csvR.Path(); p != expPath { - t.Errorf("Expeceted: %+v, received: %+v", data, expPath) + t.Errorf("Expeceted: %+v, received: %+v", p, expPath) } if p, err := csvR.Read(); err != nil { @@ -128,6 +127,9 @@ func TestNewCSVURLReader(t *testing.T) { if err := csvR.Close(); err != nil { t.Error(err) } + if tp := (urlProvider{}).Type(); tp != utils.MetaUrl { + t.Errorf("Expeceted: %q, received: %q", utils.MetaUrl, tp) + } } func TestNewCSVFileReader(t *testing.T) { @@ -152,7 +154,7 @@ func TestNewCSVFileReader(t *testing.T) { t.Fatal(err) } - csvR, err := NewCSVReader(utils.MetaFileCSV, dir, utils.AttributesCsv, utils.CSVSep, -1) + csvR, err := NewCSVReader(fileProvider{}, dir, utils.AttributesCsv, utils.CSVSep, -1) if err != nil { t.Fatal(err) } diff --git a/loaders/loader.go b/loaders/loader.go index 8b86cceff..e963b562e 100644 --- a/loaders/loader.go +++ b/loaders/loader.go @@ -19,6 +19,7 @@ along with this program. If not, see package loaders import ( + "archive/zip" "fmt" "io" "os" @@ -164,7 +165,7 @@ func newLoader(cfg *config.CGRConfig, ldrCfg *config.LoaderSCfg, dm *engine.Data connMgr: connMgr, cacheConns: cacheConns, dataCache: dataCache, - Locker: newLocker(ldrCfg.GetLockFilePath()), + Locker: newLocker(ldrCfg.GetLockFilePath(), ldrCfg.ID), } } @@ -242,7 +243,7 @@ func (l *loader) process(ctx *context.Context, obj profile, lType, action, cachi return engine.CallCache(l.connMgr, ctx, l.cacheConns, caching, cacheArgs, cacheIDs, nil, false, l.ldrCfg.Tenant) } -func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*config.FCTemplate, lType, action, caching string, withIndex, partialRates bool) (err error) { +func (l *loader) processData(ctx *context.Context, csv *CSVFile, tmpls []*config.FCTemplate, lType, action, caching string, withIndex, partialRates bool) (err error) { newPrf := newProfileFunc(lType) obj := newPrf() var prevTntID string @@ -285,24 +286,16 @@ func (l *loader) processData(ctx *context.Context, csv CSVReader, tmpls []*confi return } -func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool) (err error) { - csvType := utils.MetaFileCSV - switch { - // case strings.HasPrefix(inPath, gprefix): // uncomment this after *gapi is implemented - // csvType = utils.MetaGoogleAPI - // inPath = strings.TrimPrefix(inPath, gprefix) - case utils.IsURL(inPath): - csvType = utils.MetaUrl - } - var csv CSVReader - if csv, err = NewCSVReader(csvType, inPath, cfg.Filename, rune(l.ldrCfg.FieldSeparator[0]), 0); err != nil { +func (l *loader) processFile(ctx *context.Context, cfg *config.LoaderDataType, inPath, outPath, action, caching string, withIndex bool, prv CSVProvider) (err error) { + var csv *CSVFile + if csv, err = NewCSVReader(prv, inPath, cfg.Filename, rune(l.ldrCfg.FieldSeparator[0]), 0); err != nil { return } defer csv.Close() if err = l.processData(ctx, csv, cfg.Fields, cfg.Type, action, caching, withIndex, cfg.Flags.GetBool(utils.PartialRatesOpt)); err != nil || // encounterd error outPath == utils.EmptyString || // or no moving - csvType != utils.MetaFileCSV { // or the type can not be moved(e.g. url) + prv.Type() != utils.MetaFileCSV { // or the type can not be moved(e.g. url) return } return os.Rename(path.Join(inPath, cfg.Filename), path.Join(outPath, cfg.Filename)) @@ -330,7 +323,7 @@ func (l *loader) processIFile(_, fileName string) (err error) { return } defer l.Unlock() - return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, utils.FirstNonEmpty(l.ldrCfg.Opts.Cache, l.cfg.GeneralCfg().DefaultCaching), l.ldrCfg.Opts.WithIndex) + return l.processFile(context.Background(), cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, utils.FirstNonEmpty(l.ldrCfg.Opts.Cache, l.cfg.GeneralCfg().DefaultCaching), l.ldrCfg.Opts.WithIndex, fileProvider{}) } func (l *loader) processFolder(ctx *context.Context, caching string, withIndex, stopOnError bool) (err error) { @@ -338,8 +331,16 @@ func (l *loader) processFolder(ctx *context.Context, caching string, withIndex, return } defer l.Unlock() + var csvType CSVProvider = fileProvider{} + switch { + // case strings.HasPrefix(inPath, gprefix): // uncomment this after *gapi is implemented + // csvType = utils.MetaGoogleAPI + // inPath = strings.TrimPrefix(inPath, gprefix) + case utils.IsURL(l.ldrCfg.TpInDir): + csvType = urlProvider{} + } for _, cfg := range l.ldrCfg.Data { - if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex); err != nil { + if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex, csvType); err != nil { if !stopOnError { utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", utils.LoaderS, l.ldrCfg.ID, cfg.Type, err)) @@ -399,3 +400,23 @@ func (l *loader) ListenAndServe(stopChan chan struct{}) (err error) { } return } + +func (l *loader) processZip(ctx *context.Context, caching string, withIndex, stopOnError bool, zipR *zip.Reader) (err error) { + if err = l.Lock(); err != nil { + return + } + defer l.Unlock() + ziP := zipProvider{zipR} + for _, cfg := range l.ldrCfg.Data { + if err = l.processFile(ctx, cfg, l.ldrCfg.TpInDir, l.ldrCfg.TpOutDir, l.ldrCfg.Action, caching, withIndex, ziP); err != nil { + if !stopOnError { + utils.Logger.Warning(fmt.Sprintf("<%s-%s> loaderType: <%s> cannot open files, err: %s", + utils.LoaderS, l.ldrCfg.ID, cfg.Type, err)) + err = nil + continue + } + return + } + } + return +} diff --git a/loaders/loader_test.go b/loaders/loader_test.go index aa9c55463..ed7d81b88 100644 --- a/loaders/loader_test.go +++ b/loaders/loader_test.go @@ -19,7 +19,9 @@ along with this program. If not, see package loaders import ( + "archive/zip" "bytes" + "encoding/csv" "fmt" "net/http" "net/http/httptest" @@ -355,7 +357,7 @@ func TestLoaderProcess(t *testing.T) { filterS: fS, connMgr: cM, dataCache: cache, - Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath()), + Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath(), cfg.LoaderCfg()[0].ID), }); !reflect.DeepEqual(expLd, ld) { t.Errorf("Expeceted: %+v, received: %+v", expLd, ld) } @@ -683,11 +685,9 @@ cgrates.org,ID2`, utils.CSVSep, -1), fc, utils.MetaAttributes, utils.MetaStore, } } -type mockCSV struct{} +type mockReader struct{} -func (mockCSV) Path() (_ string) { return } -func (mockCSV) Read() ([]string, error) { return nil, utils.ErrNotFound } -func (mockCSV) Close() (_ error) { return } +func (mockReader) Read([]byte) (int, error) { return 0, utils.ErrNotFound } func TestLoaderProcessDataErrors(t *testing.T) { cfg := config.NewDefaultCGRConfig() @@ -725,7 +725,7 @@ cgrates.org,ID2`, utils.CSVSep, -1), fc, utils.MetaAttributes, "notSupported", u t.Errorf("Expeceted: %q, received: %v", expErrMsg, err) } - if err := ld.processData(context.Background(), mockCSV{}, fc, utils.MetaAttributes, "notSupported", utils.MetaNone, true, false); err != utils.ErrNotFound { + if err := ld.processData(context.Background(), &CSVFile{csvRdr: csv.NewReader(mockReader{})}, fc, utils.MetaAttributes, "notSupported", utils.MetaNone, true, false); err != utils.ErrNotFound { t.Errorf("Expeceted: %q, received: %v", utils.ErrNotFound, err) } } @@ -759,7 +759,7 @@ func TestLoaderProcessFileURL(t *testing.T) { Type: utils.MetaAttributes, Filename: utils.AttributesCsv, Fields: fc, - }, s.URL+"/ok", utils.EmptyString, utils.MetaStore, utils.MetaNone, true); err != nil { + }, s.URL+"/ok", utils.EmptyString, utils.MetaStore, utils.MetaNone, true, urlProvider{}); err != nil { t.Fatal(err) } if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil { @@ -772,7 +772,7 @@ func TestLoaderProcessFileURL(t *testing.T) { Type: utils.MetaAttributes, Filename: utils.AttributesCsv, Fields: fc, - }, s.URL+"/notFound", utils.EmptyString, utils.MetaStore, utils.MetaNone, true); err != utils.ErrNotFound { + }, s.URL+"/notFound", utils.EmptyString, utils.MetaStore, utils.MetaNone, true, urlProvider{}); err != utils.ErrNotFound { t.Errorf("Expeceted: %v, received: %v", utils.ErrNotFound, err) } @@ -978,6 +978,13 @@ func TestLoaderProcessFolder(t *testing.T) { if err := ld.processFolder(context.Background(), utils.MetaNone, true, true); err != utils.ErrExists { t.Fatal(err) } + + ld.Locker = nopLock{} + ld.ldrCfg.TpInDir = "http://localhost:0" + expErrMsg := `path:"http://localhost:0/Attributes.csv" is not reachable` + if err := ld.processFolder(context.Background(), utils.MetaNone, true, true); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } } func TestLoaderProcessFolderErrors(t *testing.T) { @@ -1071,7 +1078,6 @@ func TestLoaderProcessFolderErrors(t *testing.T) { buf.String(); !strings.Contains(rplyLog, expLog) { t.Errorf("Expected %+q, received %+q", expLog, rplyLog) } - } func TestLoaderMoveUnprocessedFilesErrors(t *testing.T) { @@ -1192,3 +1198,90 @@ func TestLoaderListenAndServeI(t *testing.T) { t.Errorf("Expected %+q, received %+q", expLog, rplyLog) } } + +func TestLoaderProcessZipErrors(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + cM := engine.NewConnManager(cfg) + dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) + fS := engine.NewFilterS(cfg, cM, dm) + cache := map[string]*ltcache.Cache{} + for k, cfg := range cfg.LoaderCfg()[0].Cache { + cache[k] = ltcache.NewCache(cfg.Limit, cfg.TTL, cfg.StaticTTL, nil) + } + fc := []*config.FCTemplate{ + {Filters: []string{"*string"}}, + } + for _, f := range fc { + f.ComputePath() + } + + ld := newLoader(cfg, &config.LoaderSCfg{ + ID: "test", + Enabled: true, + TpInDir: utils.EmptyString, + TpOutDir: utils.EmptyString, + Data: []*config.LoaderDataType{ + { + Type: utils.MetaAttributes, + Filename: utils.AttributesCsv, + Fields: fc, + }, + }, + FieldSeparator: utils.FieldsSep, + Action: utils.MetaStore, + Opts: &config.LoaderSOptsCfg{ + WithIndex: true, + Cache: utils.MetaNone, + }, + }, dm, cache, fS, cM, nil) + bufz := new(bytes.Buffer) + w := zip.NewWriter(bufz) + f, err := w.Create(utils.AttributesCsv) + if err != nil { + t.Fatal(err) + } + if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil { + t.Fatal(err) + } + if err := w.Close(); err != nil { + t.Fatal(err) + } + r, err := zip.NewReader(bytes.NewReader(bufz.Bytes()), int64(bufz.Len())) + if err != nil { + t.Fatal(err) + } + + expErrMsg := "inline parse error for string: <*string>" + if err := ld.processZip(context.Background(), utils.MetaNone, true, true, r); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + if _, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != utils.ErrNotFound { + t.Fatal(err) + } + + buf := bytes.NewBuffer([]byte{}) + log.SetOutput(buf) + lgr := utils.Logger + defer func() { utils.Logger = lgr; log.SetOutput(os.Stderr) }() + utils.Logger, _ = utils.Newlogger(utils.MetaStdLog, utils.EmptyString) + utils.Logger.SetLogLevel(7) + if err := ld.processZip(context.Background(), utils.MetaNone, true, false, r); err != nil { + t.Fatal(err) + } + + if _, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != utils.ErrNotFound { + t.Fatal(err) + } + + if expLog, rplyLog := " loaderType: <*attributes> cannot open files, err: inline parse error for string: <*string>", + buf.String(); !strings.Contains(rplyLog, expLog) { + t.Errorf("Expected %+q, received %+q", expLog, rplyLog) + } + + ld.Locker = mockLock{} + if err := ld.processZip(context.Background(), utils.MetaNone, true, true, r); err != utils.ErrExists { + t.Fatal(err) + } + +} diff --git a/loaders/loaders.go b/loaders/loaders.go index 6ed6e3969..554e499c1 100644 --- a/loaders/loaders.go +++ b/loaders/loaders.go @@ -19,6 +19,8 @@ along with this program. If not, see package loaders import ( + "archive/zip" + "bytes" "errors" "fmt" "sync" @@ -119,6 +121,66 @@ func (ldrS *LoaderS) V1Run(ctx *context.Context, args *ArgsProcessFolder, return } +type ArgsProcessZip struct { + LoaderID string + Data []byte + APIOpts map[string]interface{} +} + +func (ldrS *LoaderS) V1ImportZip(ctx *context.Context, args *ArgsProcessZip, + rply *string) (err error) { + var zipR *zip.Reader + if zipR, err = zip.NewReader(bytes.NewReader(args.Data), int64(len(args.Data))); err != nil { + return + } + ldrS.RLock() + defer ldrS.RUnlock() + + if args.LoaderID == utils.EmptyString { + args.LoaderID = utils.MetaDefault + } + ldr, has := ldrS.ldrs[args.LoaderID] + if !has { + return fmt.Errorf("UNKNOWN_LOADER: %s", args.LoaderID) + } + var locked bool + if locked, err = ldr.Locked(); err != nil { + return utils.NewErrServerError(err) + } else if locked { + fl := ldr.ldrCfg.Opts.ForceLock + if val, has := args.APIOpts[utils.MetaForceLock]; has { + if fl, err = utils.IfaceAsBool(val); err != nil { + return + } + } + if !fl { + return errors.New("ANOTHER_LOADER_RUNNING") + } + if err := ldr.Unlock(); err != nil { + return utils.NewErrServerError(err) + } + } + wI := ldr.ldrCfg.Opts.WithIndex + if val, has := args.APIOpts[utils.MetaWithIndex]; has { + if wI, err = utils.IfaceAsBool(val); err != nil { + return + } + } + + soE := ldr.ldrCfg.Opts.StopOnError + if val, has := args.APIOpts[utils.MetaStopOnError]; has { + if soE, err = utils.IfaceAsBool(val); err != nil { + return + } + } + if err := ldr.processZip(context.Background(), utils.FirstNonEmpty(utils.IfaceAsString(args.APIOpts[utils.MetaCache]), ldr.ldrCfg.Opts.Cache, ldrS.cfg.GeneralCfg().DefaultCaching), + wI, soE, zipR); err != nil { + return utils.NewErrServerError(err) + } + *rply = utils.OK + return +} + // Reload recreates the loaders map thread safe func (ldrS *LoaderS) Reload(dm *engine.DataManager, filterS *engine.FilterS, connMgr *engine.ConnManager) { diff --git a/loaders/loaders_test.go b/loaders/loaders_test.go index 36bb97c2d..0b24cb860 100644 --- a/loaders/loaders_test.go +++ b/loaders/loaders_test.go @@ -19,6 +19,7 @@ along with this program. If not, see package loaders import ( + "archive/zip" "bytes" "os" "path" @@ -60,7 +61,7 @@ func TestNewLoaderService(t *testing.T) { connMgr: cM, dataCache: cache, cacheConns: cfg.LoaderCfg()[0].CacheSConns, - Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath()), + Locker: newLocker(cfg.LoaderCfg()[0].GetLockFilePath(), cfg.LoaderCfg()[0].ID), }, }, }); !reflect.DeepEqual(exp, ld) { @@ -302,3 +303,199 @@ func TestLoaderServiceV1RunErrors(t *testing.T) { t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) } } + +func TestLoaderServiceV1ImportZip(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fc := []*config.FCTemplate{ + {Path: utils.Tenant, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.0", utils.RSRConstSep)}, + {Path: utils.ID, Type: utils.MetaVariable, Value: config.NewRSRParsersMustCompile("~*req.1", utils.RSRConstSep)}, + } + for _, f := range fc { + f.ComputePath() + } + cfg.LoaderCfg()[0].Enabled = true + cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{ + Type: utils.MetaAttributes, + Filename: utils.AttributesCsv, + Fields: fc, + }} + cfg.LoaderCfg()[0].LockFilePath = utils.MetaMemory + cfg.LoaderCfg()[0].TpInDir = utils.EmptyString + cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString + + buf := new(bytes.Buffer) + wr := zip.NewWriter(buf) + f, err := wr.Create(utils.AttributesCsv) + if err != nil { + t.Fatal(err) + } + if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil { + t.Fatal(err) + } + if err := wr.Close(); err != nil { + t.Fatal(err) + } + + cM := engine.NewConnManager(cfg) + dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) + fS := engine.NewFilterS(cfg, cM, dm) + + ld := NewLoaderService(cfg, dm, fS, cM) + var rply string + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: true, + utils.MetaStopOnError: true, + utils.MetaForceLock: true, + }, + }, &rply); err != nil { + t.Fatal(err) + } else if rply != utils.OK { + t.Errorf("Expected: %q,received: %q", utils.OK, rply) + } + if prf, err := dm.GetAttributeProfile(context.Background(), "cgrates.org", "ID", false, true, utils.NonTransactional); err != nil { + t.Fatal(err) + } else if v := (&engine.AttributeProfile{Tenant: "cgrates.org", ID: "ID"}); !reflect.DeepEqual(v, prf) { + t.Errorf("Expeceted: %v, received: %v", utils.ToJSON(v), utils.ToJSON(prf)) + } +} + +func TestLoaderServiceV1ImportZipErrors(t *testing.T) { + cfg := config.NewDefaultCGRConfig() + fc := []*config.FCTemplate{ + {Filters: []string{"*string"}}, + } + for _, f := range fc { + f.ComputePath() + } + cfg.LoaderCfg()[0].Enabled = true + cfg.LoaderCfg()[0].Data = []*config.LoaderDataType{{ + Type: utils.MetaAttributes, + Filename: utils.AttributesCsv, + Fields: fc, + }} + cfg.LoaderCfg()[0].TpInDir = utils.EmptyString + cfg.LoaderCfg()[0].TpOutDir = utils.EmptyString + defer os.Remove(cfg.LoaderCfg()[0].LockFilePath) + buf := new(bytes.Buffer) + wr := zip.NewWriter(buf) + f, err := wr.Create(utils.AttributesCsv) + if err != nil { + t.Fatal(err) + } + if _, err := f.Write([]byte(`cgrates.org,ID`)); err != nil { + t.Fatal(err) + } + if err := wr.Close(); err != nil { + t.Fatal(err) + } + + cM := engine.NewConnManager(cfg) + dm := engine.NewDataManager(engine.NewInternalDB(nil, nil, cfg.DataDbCfg().Items), cfg.CacheCfg(), cM) + fS := engine.NewFilterS(cfg, cM, dm) + + ld := NewLoaderService(cfg, dm, fS, cM) + var rply string + + expErrMsg := "SERVER_ERROR: inline parse error for string: <*string>" + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: true, + utils.MetaStopOnError: true, + utils.MetaForceLock: true, + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + expErrMsg = "zip: not a valid zip file" + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: true, + utils.MetaStopOnError: true, + utils.MetaForceLock: true, + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + expErrMsg = `strconv.ParseBool: parsing "notfloat": invalid syntax` + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: true, + utils.MetaStopOnError: "notfloat", + utils.MetaForceLock: true, + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: "notfloat", + utils.MetaStopOnError: "notfloat", + utils.MetaForceLock: true, + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + ld.ldrs[utils.MetaDefault].Locker.Lock() + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: "notfloat", + utils.MetaStopOnError: "notfloat", + utils.MetaForceLock: "notfloat", + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + expErrMsg = `ANOTHER_LOADER_RUNNING` + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + APIOpts: map[string]interface{}{ + utils.MetaCache: utils.MetaNone, + utils.MetaWithIndex: "notfloat", + utils.MetaStopOnError: "notfloat", + utils.MetaForceLock: false, + }, + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + ld.ldrs[utils.MetaDefault].Locker = mockLock{} + + expErrMsg = `SERVER_ERROR: EXISTS` + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + ld.ldrs[utils.MetaDefault].Locker = mockLock2{} + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), APIOpts: map[string]interface{}{ + utils.MetaForceLock: true}}, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } + + cfg.LoaderCfg()[0].Enabled = false + ld.Reload(dm, fS, cM) + expErrMsg = `UNKNOWN_LOADER: *default` + if err := ld.V1ImportZip(context.Background(), &ArgsProcessZip{ + Data: buf.Bytes(), + }, &rply); err == nil || err.Error() != expErrMsg { + t.Errorf("Expeceted: %v, received: %v", expErrMsg, err) + } +} diff --git a/loaders/locker.go b/loaders/locker.go index c5eac978a..dc5efe3ba 100644 --- a/loaders/locker.go +++ b/loaders/locker.go @@ -21,6 +21,9 @@ package loaders import ( "io" "os" + + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" ) type Locker interface { @@ -30,11 +33,15 @@ type Locker interface { IsLockFile(string) bool } -func newLocker(path string) Locker { - if len(path) != 0 { +func newLocker(path, loaderID string) Locker { + switch path { + case utils.EmptyString: + return new(nopLock) + case utils.MetaMemory: + return &memoryLock{loaderID: loaderID} + default: return folderLock(path) } - return new(nopLock) } type folderLock string @@ -66,8 +73,24 @@ func (fl folderLock) IsLockFile(path string) bool { return path == string(fl) } type nopLock struct{} -// lockFolder will attempt to lock the folder by creating the lock file func (nopLock) Lock() (_ error) { return } func (nopLock) Unlock() (_ error) { return } func (nopLock) Locked() (_ bool, _ error) { return } func (nopLock) IsLockFile(string) (_ bool) { return } + +type memoryLock struct { + loaderID string + refID string +} + +func (m *memoryLock) Lock() (_ error) { + m.refID = guardian.Guardian.GuardIDs(m.refID, 0, utils.ConcatenatedKey(utils.LoaderS, m.loaderID)) + return +} +func (m *memoryLock) Unlock() (_ error) { + guardian.Guardian.UnguardIDs(m.refID) + m.refID = utils.EmptyString + return +} +func (m memoryLock) Locked() (bool, error) { return len(m.refID) != 0, nil } +func (m memoryLock) IsLockFile(string) (_ bool) { return } diff --git a/loaders/locker_test.go b/loaders/locker_test.go index 7a6f00078..1c0b6b609 100644 --- a/loaders/locker_test.go +++ b/loaders/locker_test.go @@ -28,7 +28,7 @@ import ( ) func TestNopLocker(t *testing.T) { - np := newLocker(utils.EmptyString) + np := newLocker(utils.EmptyString, utils.EmptyString) if err := np.Lock(); err != nil { t.Error(err) } @@ -56,7 +56,7 @@ func TestFolderLocker(t *testing.T) { } defer os.RemoveAll(dir) fp := path.Join(dir, ".lkr") - np := newLocker(fp) + np := newLocker(fp, utils.EmptyString) exp := folderLock(fp) if !reflect.DeepEqual(np, exp) { t.Errorf("Expeceted: %+v, received: %+v", exp, np) @@ -84,3 +84,30 @@ func TestFolderLocker(t *testing.T) { t.Error("Expected no lock") } } + +func TestMemoryLocker(t *testing.T) { + np := newLocker(utils.MetaMemory, "ID") + exp := &memoryLock{loaderID: "ID"} + if !reflect.DeepEqual(np, exp) { + t.Errorf("Expeceted: %+v, received: %+v", exp, np) + } + if err := np.Lock(); err != nil { + t.Error(err) + } + if lk, err := np.Locked(); err != nil { + t.Error(err) + } else if !lk { + t.Error("Expected lock") + } + if err := np.Unlock(); err != nil { + t.Error(err) + } + if np.IsLockFile(utils.EmptyString) { + t.Error("Expected to not be lock file") + } + if lk, err := np.Locked(); err != nil { + t.Error(err) + } else if lk { + t.Error("Expected no lock") + } +} diff --git a/utils/consts.go b/utils/consts.go index ad7c670cd..dcd064ce8 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -591,6 +591,7 @@ const ( MetaMultiply = "*multiply" MetaDivide = "*divide" MetaUrl = "*url" + MetaZip = "*zip" MetaXml = "*xml" MetaReq = "*req" MetaVars = "*vars" @@ -634,6 +635,7 @@ const ( IdxEnd = "]" IdxCombination = "][" + MetaMemory = "*memory" RemoteHost = "RemoteHost" Local = "local" TCP = "tcp"