diff --git a/data/scripts/generate_dispatchers/generator.go b/data/scripts/generate_dispatchers/generator.go index 250438ffe..632077755 100644 --- a/data/scripts/generate_dispatchers/generator.go +++ b/data/scripts/generate_dispatchers/generator.go @@ -43,6 +43,7 @@ import ( "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/efs" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/guardian" "github.com/cgrates/cgrates/loaders" "github.com/cgrates/cgrates/rates" @@ -81,6 +82,7 @@ func main() { {"cores.go", "MetaCore", new(cores.CoreS), utils.EmptyString}, {"guardian.go", "MetaGuardian", guardian.Guardian, utils.GuardianS}, {"efs.go", "MetaEFs", new(efs.EfS), utils.EmptyString}, + {"ers.go", "MetaERs", new(ers.ERService), utils.ErS}, {"tpes.go", "MetaTpes", new(tpes.TPeS), utils.EmptyString}, // {"servicemanager.go", "MetaServiceManager", new(servmanager.ServiceManager), utils.EmptyString}, } { diff --git a/dispatchers/ers.go b/dispatchers/ers.go new file mode 100644 index 000000000..2f4973e95 --- /dev/null +++ b/dispatchers/ers.go @@ -0,0 +1,50 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +// do not modify this code because it's generated +package dispatchers + +import ( + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +func (dS *DispatcherService) ErSv1Ping(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) { + tnt := dS.cfg.GeneralCfg().DefaultTenant + if args != nil && len(args.Tenant) != 0 { + tnt = args.Tenant + } + ev := make(map[string]any) + if args != nil { + ev = args.Event + } + opts := make(map[string]any) + if args != nil { + opts = args.APIOpts + } + return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1Ping, args, reply) +} +func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args utils.StringWithAPIOpts, reply *string) (err error) { + tnt := dS.cfg.GeneralCfg().DefaultTenant + if len(args.Tenant) != 0 { + tnt = args.Tenant + } + ev := make(map[string]any) + opts := args.APIOpts + return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1RunReader, args, reply) +} diff --git a/engine/libengine.go b/engine/libengine.go index 1bf8cccdf..33a358084 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -149,8 +149,8 @@ func NewService(val any) (_ IntService, err error) { return NewServiceWithName(val, utils.EmptyString, false) } func NewServiceWithPing(val any, name, prefix string) (*birpc.Service, error) { - srv, err := birpc.NewServiceWithMethodsRename(val, name, true, func(funcName string) string { - return strings.TrimPrefix(funcName, prefix) + srv, err := birpc.NewServiceWithMethodsRename(val, name, true, func(oldFn string) (newFn string) { + return strings.TrimPrefix(oldFn, prefix) }) if err != nil { return nil, err diff --git a/ers/apis.go b/ers/apis.go new file mode 100644 index 000000000..eb92d5157 --- /dev/null +++ b/ers/apis.go @@ -0,0 +1,65 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "errors" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +// V1RunReaderParams contains required parameters for an ErSv1.RunReader request. +type V1RunReaderParams struct { + Tenant string + ID string // unique identifier of the request + ReaderID string + APIOpts map[string]any +} + +// V1RunReader processes files in the configured directory for the given reader. This function handles files +// based on the reader's type and configuration. Only available for readers that are not processing files +// automatically (RunDelay should equal 0). +// +// Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being +// processed before calling again. +func (erS *ERService) V1RunReader(ctx *context.Context, params V1RunReaderParams, reply *string) error { + rdrCfg := erS.cfg.ERsCfg().ReaderCfg(params.ReaderID) + er, has := erS.rdrs[params.ReaderID] + if !has || rdrCfg == nil { + return utils.ErrNotFound + } + if rdrCfg.RunDelay != 0 { + return errors.New("readers with RunDelay different from 0 are not supported") + } + switch rdr := er.(type) { + case *CSVFileER: + processReaderDir(rdr.sourceDir, utils.CSVSuffix, rdr.processFile) + case *XMLFileER: + processReaderDir(rdr.sourceDir, utils.XMLSuffix, rdr.processFile) + case *FWVFileER: + processReaderDir(rdr.sourceDir, utils.FWVSuffix, rdr.processFile) + case *JSONFileER: + processReaderDir(rdr.sourceDir, utils.JSONSuffix, rdr.processFile) + default: + return errors.New("reader type does not yet support manual processing") + } + *reply = utils.OK + return nil +} diff --git a/ers/ers_it_test.go b/ers/ers_it_test.go index 2475959d6..1db2d7311 100644 --- a/ers/ers_it_test.go +++ b/ers/ers_it_test.go @@ -85,10 +85,7 @@ func TestERsAddReader(t *testing.T) { func TestERsListenAndServeErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ - { - ID: "", - Type: "", - }, + {}, } fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) @@ -102,21 +99,12 @@ func TestERsListenAndServeErr(t *testing.T) { func TestERsProcessEventErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ - { - ID: "", - Type: "", - }, + {}, } fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) - rdrCfg := &config.EventReaderCfg{ - ID: "", - Type: "", - } - cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - } + rdrCfg := &config.EventReaderCfg{} + cgrEvent := &utils.CGREvent{} err := srv.processEvent(cgrEvent, rdrCfg) if err == nil || err.Error() != "unsupported reqType: <>" { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "unsupported reqType: <>", err) @@ -126,10 +114,7 @@ func TestERsProcessEventErr(t *testing.T) { func TestERsCloseAllRdrs(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ - { - ID: "", - Type: "", - }, + {}, } fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) @@ -140,7 +125,6 @@ func TestERsListenAndServeRdrErr(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -161,7 +145,6 @@ func TestERsListenAndServeStopchan(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -181,7 +164,6 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -192,15 +174,8 @@ func TestERsListenAndServeRdrEvents(t *testing.T) { srv.rdrErr = make(chan error, 1) srv.rdrEvents = make(chan *erEvent, 1) srv.rdrEvents <- &erEvent{ - cgrEvent: &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, - APIOpts: nil, - }, - rdrCfg: &config.EventReaderCfg{ - ID: "", - }, + cgrEvent: &utils.CGREvent{}, + rdrCfg: &config.EventReaderCfg{}, } go func() { time.Sleep(10 * time.Millisecond) @@ -216,7 +191,6 @@ func TestERsListenAndServeCfgRldChan(t *testing.T) { cfg := config.NewDefaultCGRConfig() cfg.ERsCfg().Readers = []*config.EventReaderCfg{ { - ID: "", Type: utils.MetaNone, }, } @@ -349,14 +323,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) { fltrS := &engine.FilterS{} srv := NewERService(cfg, fltrS, nil) exp := &CSVFileER{ - cgrCfg: cfg, - cfgIdx: 0, - fltrS: nil, - sourceDir: "", - rdrEvents: nil, - rdrError: nil, - rdrExit: nil, - conReqs: nil, + cgrCfg: cfg, } var evRdr EventReader = exp srv.rdrs = map[string]EventReader{ @@ -436,12 +403,7 @@ func TestERsProcessEvent(t *testing.T) { }, }, } - cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, - APIOpts: nil, - } + cgrEvent := &utils.CGREvent{} err := srv.processEvent(cgrEvent, rdrCfg) if err == nil || err.Error() != "unsupported reqType: <>" { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "unsupported reqType: <>", err) @@ -464,12 +426,7 @@ func TestERsProcessEvent2(t *testing.T) { }, }, } - cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, - APIOpts: nil, - } + cgrEvent := &utils.CGREvent{} err := srv.processEvent(cgrEvent, rdrCfg) if err != nil { t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err) @@ -546,9 +503,6 @@ func TestERsProcessEvent5(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -628,9 +582,6 @@ func TestERsProcessEvent8(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -658,9 +609,6 @@ func TestERsProcessEvent9(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", - Event: nil, APIOpts: map[string]any{ utils.OptsRoutesLimit: true, }, @@ -689,8 +637,6 @@ func TestERsProcessEvent10(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", Event: map[string]any{ utils.Usage: time.Second, }, @@ -744,8 +690,6 @@ func TestERsProcessEvent11(t *testing.T) { }, } cgrEvent := &utils.CGREvent{ - Tenant: "", - ID: "", Event: map[string]any{ utils.Usage: 0, }, diff --git a/ers/filecsv.go b/ers/filecsv.go index 2263747f3..4e82856c0 100644 --- a/ers/filecsv.go +++ b/ers/filecsv.go @@ -63,7 +63,7 @@ type CSVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - sourceDir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error diff --git a/ers/filefwv.go b/ers/filefwv.go index c31cfa098..c8144069b 100644 --- a/ers/filefwv.go +++ b/ers/filefwv.go @@ -65,7 +65,7 @@ type FWVFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - sourceDir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error diff --git a/ers/filejson.go b/ers/filejson.go index 14d50c47f..096b44b63 100644 --- a/ers/filejson.go +++ b/ers/filejson.go @@ -65,7 +65,7 @@ type JSONFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - sourceDir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error diff --git a/ers/filexml.go b/ers/filexml.go index 01526ab82..15511812a 100644 --- a/ers/filexml.go +++ b/ers/filexml.go @@ -66,7 +66,7 @@ type XMLFileER struct { cgrCfg *config.CGRConfig cfgIdx int // index of config instance within ERsCfg.Readers fltrS *engine.FilterS - sourceDir string + sourceDir string // path to the directory monitored by the reader for new events rdrEvents chan *erEvent // channel to dispatch the events created to partialEvents chan *erEvent // channel to dispatch the partial events created to rdrError chan error diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 8923ace40..07f6ddd9f 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -169,6 +169,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr iAccountSCh := make(chan birpc.ClientConnector, 1) iTpeSCh := make(chan birpc.ClientConnector, 1) iEFsCh := make(chan birpc.ClientConnector, 1) + iERsCh := make(chan birpc.ClientConnector, 1) // initialize the connManager before creating the DMService // because we need to pass the connection to it @@ -196,6 +197,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh) cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh) + cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh) cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep) @@ -244,7 +246,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server, iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep), - NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), + NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.server, iERsCh, cgr.anzS, cgr.srvDep), NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep), NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep), diff --git a/services/ers.go b/services/ers.go index 03216a9dd..83631de5d 100644 --- a/services/ers.go +++ b/services/ers.go @@ -22,8 +22,10 @@ import ( "fmt" "sync" + "github.com/cgrates/birpc" "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/servmanager" @@ -31,14 +33,22 @@ import ( ) // NewEventReaderService returns the EventReader Service -func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, +func NewEventReaderService( + cfg *config.CGRConfig, + filterSChan chan *engine.FilterS, connMgr *engine.ConnManager, + server *cores.Server, + intConn chan birpc.ClientConnector, + anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { return &EventReaderService{ rldChan: make(chan struct{}, 1), cfg: cfg, filterSChan: filterSChan, connMgr: connMgr, + server: server, + intConn: intConn, + anz: anz, srvDep: srvDep, } } @@ -53,6 +63,9 @@ type EventReaderService struct { rldChan chan struct{} stopChan chan struct{} connMgr *engine.ConnManager + server *cores.Server + intConn chan birpc.ClientConnector + anz *AnalyzerService srvDep map[string]*sync.WaitGroup } @@ -77,6 +90,15 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel // build the service erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr) go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shtDwn) + + srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx) + if err != nil { + return err + } + if !erS.cfg.DispatcherSCfg().Enabled { + erS.server.RpcRegister(srv) + } + erS.intConn <- erS.anz.GetInternalCodec(srv, utils.ERs) return } @@ -99,9 +121,10 @@ func (erS *EventReaderService) Reload(*context.Context, context.CancelFunc) (err // Shutdown stops the service func (erS *EventReaderService) Shutdown() (err error) { erS.Lock() + defer erS.Unlock() close(erS.stopChan) erS.ers = nil - erS.Unlock() + erS.server.RpcUnregisterName(utils.ErSv1) return } diff --git a/services/ers_it_test.go b/services/ers_it_test.go index 233b2d7f6..e3b980a56 100644 --- a/services/ers_it_test.go +++ b/services/ers_it_test.go @@ -65,7 +65,8 @@ func TestEventReaderSReload(t *testing.T) { anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep) db := NewDataDBService(cfg, nil, srvDep) sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep) - erS := NewEventReaderService(cfg, filterSChan, nil, srvDep) + intERsConn := make(chan birpc.ClientConnector, 1) + erS := NewEventReaderService(cfg, filterSChan, nil, server, intERsConn, anz, srvDep) engine.NewConnManager(cfg) srvMngr.AddServices(erS, sS, NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db) @@ -127,7 +128,8 @@ func TestEventReaderSReload2(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - erS := NewEventReaderService(cfg, filterSChan, nil, srvDep) + server := cores.NewServer(nil) + erS := NewEventReaderService(cfg, filterSChan, nil, server, nil, nil, srvDep) ers := ers.NewERService(cfg, nil, nil) runtime.Gosched() diff --git a/services/ers_test.go b/services/ers_test.go index 5544d3ae8..050143c04 100644 --- a/services/ers_test.go +++ b/services/ers_test.go @@ -22,6 +22,7 @@ import ( "sync" "testing" + "github.com/cgrates/cgrates/cores" "github.com/cgrates/cgrates/ers" "github.com/cgrates/cgrates/config" @@ -36,7 +37,8 @@ func TestEventReaderSCoverage(t *testing.T) { filterSChan := make(chan *engine.FilterS, 1) filterSChan <- nil srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)} - srv := NewEventReaderService(cfg, filterSChan, nil, srvDep) + server := cores.NewServer(nil) + srv := NewEventReaderService(cfg, filterSChan, nil, server, nil, nil, srvDep) if srv.IsRunning() { t.Errorf("Expected service to be down") @@ -50,6 +52,7 @@ func TestEventReaderSCoverage(t *testing.T) { rldChan: make(chan struct{}, 1), stopChan: make(chan struct{}, 1), connMgr: nil, + server: server, srvDep: srvDep, } if !srv2.IsRunning() { diff --git a/utils/consts.go b/utils/consts.go index de786f1eb..404aabfce 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -384,6 +384,8 @@ const ( MetaUCH = "*uch" MetaGuardian = "*guardians" MetaEEs = "*ees" + MetaEFs = "*efs" + MetaERs = "*ers" MetaRates = "*rates" MetaRateSOverwrite = "*rateSOverwrite" MetaContinue = "*continue" @@ -931,7 +933,6 @@ const ( MetaThresholds = "*thresholds" MetaRoutes = "*routes" MetaAttributes = "*attributes" - MetaEFs = "*efs" MetaActionProfiles = "*action_profiles" MetaLoadIDs = "*load_ids" MetaNodeID = "*node_id" @@ -1354,6 +1355,14 @@ const ( EfSv1ReplayEvents = "EfSv1.ReplayEvents" ) +// ERs +const ( + ErS = "ErS" + ErSv1 = "ErSv1" + ErSv1Ping = "ErSv1.Ping" + ErSv1RunReader = "ErSv1.RunReader" +) + // ConfigSv1 APIs const ( ConfigS = "ConfigS"