Implement ErSv1.ProcessDir api

cores.Server, analyzer object and internal conn channel specific to ERs
are now part of the ERService struct and are passed to its constructor.
This commit is contained in:
ionutboangiu
2024-04-02 12:18:01 +03:00
committed by Dan Christian Bogos
parent b924fc9e0c
commit b9a39e233f
6 changed files with 84 additions and 8 deletions

View File

@@ -21,6 +21,7 @@ package ers
import (
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"os"
"path"
@@ -80,6 +81,37 @@ type ERService struct {
partialCache *ltcache.Cache
}
// V1RunReader processes files in the configured directory for the given reader. This function handles files
// based on the reader's type and configuration. Only available for readers that are not processing files
// automatically (RunDelay should equal 0).
//
// Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being
// processed before calling again.
func (erS *ERService) V1RunReader(ctx *context.Context, rdrID utils.StringWithAPIOpts, reply *string) error {
rdrCfg := erS.cfg.ERsCfg().ReaderCfg(rdrID.Arg)
er, has := erS.rdrs[rdrID.Arg]
if !has || rdrCfg == nil {
return utils.ErrNotFound
}
if rdrCfg.RunDelay != 0 {
return errors.New("readers with RunDelay different from 0 are not supported")
}
switch rdr := er.(type) {
case *CSVFileER:
processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile)
case *XMLFileER:
processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile)
case *FWVFileER:
processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile)
case *JSONFileER:
processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile)
default:
return errors.New("reader type does not yet support manual processing")
}
*reply = utils.OK
return nil
}
// ListenAndServe keeps the service alive
func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error {
for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers {