From b9a39e233f7e9d7acef431d7086406f7c3d1487e Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Tue, 2 Apr 2024 12:18:01 +0300 Subject: [PATCH] Implement ErSv1.ProcessDir api cores.Server, analyzer object and internal conn channel specific to ERs are now part of the ERService struct and are passed to its constructor. --- cmd/cgr-engine/cgr-engine.go | 5 +++-- ers/ers.go | 32 +++++++++++++++++++++++++++++++ services/ers.go | 37 +++++++++++++++++++++++++++++++++--- services/ers_it_test.go | 6 ++++-- services/ers_test.go | 4 +++- utils/consts.go | 8 ++++++++ 6 files changed, 84 insertions(+), 8 deletions(-) diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index accd58096..7abf0615f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -462,6 +462,7 @@ func main() { internalAPIerSv2Chan := make(chan birpc.ClientConnector, 1) internalLoaderSChan := make(chan birpc.ClientConnector, 1) internalEEsChan := make(chan birpc.ClientConnector, 1) + internalERsChan := make(chan birpc.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -625,10 +626,10 @@ func main() { ldrs := services.NewLoaderService(cfg, dmService, filterSChan, server, internalLoaderSChan, connManager, anz, srvDep) + erS := services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep) srvManager.AddServices(gvService, attrS, chrS, tS, stS, reS, routeS, schS, rals, - apiSv1, apiSv2, cdrS, smg, coreS, - services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, srvDep), + apiSv1, apiSv2, cdrS, smg, coreS, erS, services.NewDNSAgent(cfg, filterSChan, shdChan, connManager, srvDep), services.NewFreeswitchAgent(cfg, shdChan, connManager, srvDep), services.NewKamailioAgent(cfg, shdChan, connManager, srvDep), diff --git a/ers/ers.go b/ers/ers.go index d24e4bf60..28266dd99 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -21,6 +21,7 @@ package ers import ( "encoding/csv" "encoding/json" + "errors" "fmt" "os" "path" @@ -80,6 +81,37 @@ type ERService struct { partialCache *ltcache.Cache } +// V1RunReader processes files in the configured directory for the given reader. This function handles files +// based on the reader's type and configuration. Only available for readers that are not processing files +// automatically (RunDelay should equal 0). +// +// Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being +// processed before calling again. +func (erS *ERService) V1RunReader(ctx *context.Context, rdrID utils.StringWithAPIOpts, reply *string) error { + rdrCfg := erS.cfg.ERsCfg().ReaderCfg(rdrID.Arg) + er, has := erS.rdrs[rdrID.Arg] + if !has || rdrCfg == nil { + return utils.ErrNotFound + } + if rdrCfg.RunDelay != 0 { + return errors.New("readers with RunDelay different from 0 are not supported") + } + switch rdr := er.(type) { + case *CSVFileER: + processReaderDir(rdr.dir, utils.CSVSuffix, rdr.processFile) + case *XMLFileER: + processReaderDir(rdr.dir, utils.XMLSuffix, rdr.processFile) + case *FWVFileER: + processReaderDir(rdr.dir, utils.FWVSuffix, rdr.processFile) + case *JSONFileER: + processReaderDir(rdr.dir, utils.JSNSuffix, rdr.processFile) + default: + return errors.New("reader type does not yet support manual processing") + } + *reply = utils.OK + return nil +} + // ListenAndServe keeps the service alive func (erS *ERService) ListenAndServe(stopChan, cfgRldChan chan struct{}) error { for cfgIdx, rdrCfg := range erS.cfg.ERsCfg().Readers { diff --git a/services/ers.go b/services/ers.go index 99ca28a51..acd9856d6 100644 --- a/services/ers.go +++ b/services/ers.go @@ -20,9 +20,12 @@ package services import ( "fmt" + "strings" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/servmanager" @@ -30,15 +33,25 @@ import ( ) // NewEventReaderService returns the EventReader Service -func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, - shdChan *utils.SyncedChan, connMgr *engine.ConnManager, - srvDep map[string]*sync.WaitGroup) servmanager.Service { +func NewEventReaderService( + cfg *config.CGRConfig, + filterSChan chan *engine.FilterS, + shdChan *utils.SyncedChan, + connMgr *engine.ConnManager, + server *cores.Server, + intConn chan birpc.ClientConnector, + anz *AnalyzerService, + srvDep map[string]*sync.WaitGroup, +) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, filterSChan: filterSChan, shdChan: shdChan, connMgr: connMgr, + server: server, + intConn: intConn, + anz: anz, srvDep: srvDep, } } @@ -54,6 +67,9 @@ type EventReaderService struct { rldChan chan struct{} stopChan chan struct{} connMgr *engine.ConnManager + server *cores.Server + intConn chan birpc.ClientConnector + anz *AnalyzerService srvDep map[string]*sync.WaitGroup } @@ -77,6 +93,21 @@ func (erS *EventReaderService) Start() (err error) { // build the service erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan) + + // Register ERs methods whose names start with "V1" under the "ErSv1" object name. + srv, err := birpc.NewServiceWithMethodsRename(erS.ers, utils.ErS, true, func(oldFn string) (newFn string) { + return strings.TrimPrefix(oldFn, "V1") + }) + if err != nil { + return err + } + engine.RegisterPingMethod(srv.Methods) + + if !erS.cfg.DispatcherSCfg().Enabled { + erS.server.RpcRegister(srv) + } + + erS.intConn <- erS.anz.GetInternalCodec(srv, utils.ERs) return } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 1f905ba71..64a14bd01 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -67,7 +67,8 @@ func TestEventReaderSReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, shdChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) sS := NewSessionService(cfg, db, server, make(chan birpc.ClientConnector, 1), shdChan, nil, anz, srvDep) - erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) + intERsConn := make(chan birpc.ClientConnector, 1) + erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, intERsConn, anz, srvDep) engine.NewConnManager(cfg, nil) srvMngr.AddServices(erS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -135,7 +136,8 @@ func TestEventReaderSReload2(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) + server := cores.NewServer(nil) + erS := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep) ers := ers.NewERService(cfg, nil, nil) runtime.Gosched() diff --git a/services/ers_test.go b/services/ers_test.go index 41cc385b9..2f6e85f6e 100644 --- a/services/ers_test.go +++ b/services/ers_test.go @@ -22,6 +22,7 @@ import ( "sync" "testing" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/config" @@ -37,7 +38,8 @@ func TestEventReaderSCoverage(t *testing.T) { filterSChan <- nil shdChan := utils.NewSyncedChan() srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewEventReaderService(cfg, filterSChan, shdChan, nil, srvDep) + server := cores.NewServer(nil) + srv := NewEventReaderService(cfg, filterSChan, shdChan, nil, server, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") diff --git a/utils/consts.go b/utils/consts.go index eb84f7e43..8aa592306 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -999,6 +999,7 @@ const ( ConfigS = "ConfigS" DispatcherS = "DispatcherS" EeS = "EeS" + ErS = "ErS" FilterS = "FilterS" GuardianS = "GuardianS" LoaderS = "LoaderS" @@ -1800,6 +1801,13 @@ const ( EeSv1ProcessEvent = "EeSv1.ProcessEvent" ) +// ERs +const ( + ErSv1 = "ErSv1" + ErSv1Ping = "ErSv1.Ping" + ErSv1RunReader = "ErSv1.RunReader" +) + // cgr_ variables const ( CGRAccount = "cgr_account"