Implement ErSv1.RunReader api

This commit is contained in:
ionutboangiu
2024-09-23 18:58:28 +03:00
committed by Dan Christian Bogos
parent cf9cc5dcfc
commit 65e8128303
14 changed files with 179 additions and 79 deletions

View File

@@ -43,6 +43,7 @@ import (
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/efs"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/loaders"
"github.com/cgrates/cgrates/rates"
@@ -81,6 +82,7 @@ func main() {
{"cores.go", "MetaCore", new(cores.CoreS), utils.EmptyString},
{"guardian.go", "MetaGuardian", guardian.Guardian, utils.GuardianS},
{"efs.go", "MetaEFs", new(efs.EfS), utils.EmptyString},
{"ers.go", "MetaERs", new(ers.ERService), utils.ErS},
{"tpes.go", "MetaTpes", new(tpes.TPeS), utils.EmptyString},
// {"servicemanager.go", "MetaServiceManager", new(servmanager.ServiceManager), utils.EmptyString},
} {

50
dispatchers/ers.go Normal file
View File

@@ -0,0 +1,50 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
// do not modify this code because it's generated
package dispatchers
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
func (dS *DispatcherService) ErSv1Ping(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if args != nil && len(args.Tenant) != 0 {
tnt = args.Tenant
}
ev := make(map[string]any)
if args != nil {
ev = args.Event
}
opts := make(map[string]any)
if args != nil {
opts = args.APIOpts
}
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1Ping, args, reply)
}
func (dS *DispatcherService) ErSv1RunReader(ctx *context.Context, args utils.StringWithAPIOpts, reply *string) (err error) {
tnt := dS.cfg.GeneralCfg().DefaultTenant
if len(args.Tenant) != 0 {
tnt = args.Tenant
}
ev := make(map[string]any)
opts := args.APIOpts
return dS.Dispatch(ctx, &utils.CGREvent{Tenant: tnt, Event: ev, APIOpts: opts}, utils.MetaERs, utils.ErSv1RunReader, args, reply)
}

View File

@@ -149,8 +149,8 @@ func NewService(val any) (_ IntService, err error) {
return NewServiceWithName(val, utils.EmptyString, false)
}
func NewServiceWithPing(val any, name, prefix string) (*birpc.Service, error) {
srv, err := birpc.NewServiceWithMethodsRename(val, name, true, func(funcName string) string {
return strings.TrimPrefix(funcName, prefix)
srv, err := birpc.NewServiceWithMethodsRename(val, name, true, func(oldFn string) (newFn string) {
return strings.TrimPrefix(oldFn, prefix)
})
if err != nil {
return nil, err

65
ers/apis.go Normal file
View File

@@ -0,0 +1,65 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"errors"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
// V1RunReaderParams contains required parameters for an ErSv1.RunReader request.
type V1RunReaderParams struct {
Tenant string
ID string // unique identifier of the request
ReaderID string
APIOpts map[string]any
}
// V1RunReader processes files in the configured directory for the given reader. This function handles files
// based on the reader's type and configuration. Only available for readers that are not processing files
// automatically (RunDelay should equal 0).
//
// Note: This API is not safe to call concurrently for the same reader. Ensure the current files finish being
// processed before calling again.
func (erS *ERService) V1RunReader(ctx *context.Context, params V1RunReaderParams, reply *string) error {
rdrCfg := erS.cfg.ERsCfg().ReaderCfg(params.ReaderID)
er, has := erS.rdrs[params.ReaderID]
if !has || rdrCfg == nil {
return utils.ErrNotFound
}
if rdrCfg.RunDelay != 0 {
return errors.New("readers with RunDelay different from 0 are not supported")
}
switch rdr := er.(type) {
case *CSVFileER:
processReaderDir(rdr.sourceDir, utils.CSVSuffix, rdr.processFile)
case *XMLFileER:
processReaderDir(rdr.sourceDir, utils.XMLSuffix, rdr.processFile)
case *FWVFileER:
processReaderDir(rdr.sourceDir, utils.FWVSuffix, rdr.processFile)
case *JSONFileER:
processReaderDir(rdr.sourceDir, utils.JSONSuffix, rdr.processFile)
default:
return errors.New("reader type does not yet support manual processing")
}
*reply = utils.OK
return nil
}

View File

@@ -85,10 +85,7 @@ func TestERsAddReader(t *testing.T) {
func TestERsListenAndServeErr(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: "",
},
{},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
@@ -102,21 +99,12 @@ func TestERsListenAndServeErr(t *testing.T) {
func TestERsProcessEventErr(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: "",
},
{},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
rdrCfg := &config.EventReaderCfg{
ID: "",
Type: "",
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
}
rdrCfg := &config.EventReaderCfg{}
cgrEvent := &utils.CGREvent{}
err := srv.processEvent(cgrEvent, rdrCfg)
if err == nil || err.Error() != "unsupported reqType: <>" {
t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "unsupported reqType: <>", err)
@@ -126,10 +114,7 @@ func TestERsProcessEventErr(t *testing.T) {
func TestERsCloseAllRdrs(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: "",
},
{},
}
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
@@ -140,7 +125,6 @@ func TestERsListenAndServeRdrErr(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: utils.MetaNone,
},
}
@@ -161,7 +145,6 @@ func TestERsListenAndServeStopchan(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: utils.MetaNone,
},
}
@@ -181,7 +164,6 @@ func TestERsListenAndServeRdrEvents(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: utils.MetaNone,
},
}
@@ -192,15 +174,8 @@ func TestERsListenAndServeRdrEvents(t *testing.T) {
srv.rdrErr = make(chan error, 1)
srv.rdrEvents = make(chan *erEvent, 1)
srv.rdrEvents <- &erEvent{
cgrEvent: &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: nil,
},
rdrCfg: &config.EventReaderCfg{
ID: "",
},
cgrEvent: &utils.CGREvent{},
rdrCfg: &config.EventReaderCfg{},
}
go func() {
time.Sleep(10 * time.Millisecond)
@@ -216,7 +191,6 @@ func TestERsListenAndServeCfgRldChan(t *testing.T) {
cfg := config.NewDefaultCGRConfig()
cfg.ERsCfg().Readers = []*config.EventReaderCfg{
{
ID: "",
Type: utils.MetaNone,
},
}
@@ -349,14 +323,7 @@ func TestERsListenAndServeCfgRldChan5(t *testing.T) {
fltrS := &engine.FilterS{}
srv := NewERService(cfg, fltrS, nil)
exp := &CSVFileER{
cgrCfg: cfg,
cfgIdx: 0,
fltrS: nil,
sourceDir: "",
rdrEvents: nil,
rdrError: nil,
rdrExit: nil,
conReqs: nil,
cgrCfg: cfg,
}
var evRdr EventReader = exp
srv.rdrs = map[string]EventReader{
@@ -436,12 +403,7 @@ func TestERsProcessEvent(t *testing.T) {
},
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: nil,
}
cgrEvent := &utils.CGREvent{}
err := srv.processEvent(cgrEvent, rdrCfg)
if err == nil || err.Error() != "unsupported reqType: <>" {
t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", "unsupported reqType: <>", err)
@@ -464,12 +426,7 @@ func TestERsProcessEvent2(t *testing.T) {
},
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: nil,
}
cgrEvent := &utils.CGREvent{}
err := srv.processEvent(cgrEvent, rdrCfg)
if err != nil {
t.Fatalf("\nExpecting <%+v>,\n Received <%+v>", nil, err)
@@ -546,9 +503,6 @@ func TestERsProcessEvent5(t *testing.T) {
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: map[string]any{
utils.OptsRoutesLimit: true,
},
@@ -628,9 +582,6 @@ func TestERsProcessEvent8(t *testing.T) {
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: map[string]any{
utils.OptsRoutesLimit: true,
},
@@ -658,9 +609,6 @@ func TestERsProcessEvent9(t *testing.T) {
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: nil,
APIOpts: map[string]any{
utils.OptsRoutesLimit: true,
},
@@ -689,8 +637,6 @@ func TestERsProcessEvent10(t *testing.T) {
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: map[string]any{
utils.Usage: time.Second,
},
@@ -744,8 +690,6 @@ func TestERsProcessEvent11(t *testing.T) {
},
}
cgrEvent := &utils.CGREvent{
Tenant: "",
ID: "",
Event: map[string]any{
utils.Usage: 0,
},

View File

@@ -63,7 +63,7 @@ type CSVFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
sourceDir string
sourceDir string // path to the directory monitored by the reader for new events
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error

View File

@@ -65,7 +65,7 @@ type FWVFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
sourceDir string
sourceDir string // path to the directory monitored by the reader for new events
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error

View File

@@ -65,7 +65,7 @@ type JSONFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
sourceDir string
sourceDir string // path to the directory monitored by the reader for new events
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error

View File

@@ -66,7 +66,7 @@ type XMLFileER struct {
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
sourceDir string
sourceDir string // path to the directory monitored by the reader for new events
rdrEvents chan *erEvent // channel to dispatch the events created to
partialEvents chan *erEvent // channel to dispatch the partial events created to
rdrError chan error

View File

@@ -169,6 +169,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
iAccountSCh := make(chan birpc.ClientConnector, 1)
iTpeSCh := make(chan birpc.ClientConnector, 1)
iEFsCh := make(chan birpc.ClientConnector, 1)
iERsCh := make(chan birpc.ClientConnector, 1)
// initialize the connManager before creating the DMService
// because we need to pass the connection to it
@@ -196,6 +197,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaActions), utils.ActionSv1, iActionSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaTpes), utils.TPeSv1, iTpeSCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaEFs), utils.EfSv1, iEFsCh)
cgr.cM.AddInternalConn(utils.ConcatenatedKey(utils.MetaInternal, utils.MetaERs), utils.ErSv1, iERsCh)
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep)
@@ -244,7 +246,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
NewStatService(cgr.cfg, cgr.dmS, cgr.cacheS, cgr.iFilterSCh, cgr.server,
iStatSCh, cgr.cM, cgr.anzS, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewEventReaderService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.server, iERsCh, cgr.anzS, cgr.srvDep),
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),

View File

@@ -22,8 +22,10 @@ import (
"fmt"
"sync"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/servmanager"
@@ -31,14 +33,22 @@ import (
)
// NewEventReaderService returns the EventReader Service
func NewEventReaderService(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
func NewEventReaderService(
cfg *config.CGRConfig,
filterSChan chan *engine.FilterS,
connMgr *engine.ConnManager,
server *cores.Server,
intConn chan birpc.ClientConnector,
anz *AnalyzerService,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &EventReaderService{
rldChan: make(chan struct{}, 1),
cfg: cfg,
filterSChan: filterSChan,
connMgr: connMgr,
server: server,
intConn: intConn,
anz: anz,
srvDep: srvDep,
}
}
@@ -53,6 +63,9 @@ type EventReaderService struct {
rldChan chan struct{}
stopChan chan struct{}
connMgr *engine.ConnManager
server *cores.Server
intConn chan birpc.ClientConnector
anz *AnalyzerService
srvDep map[string]*sync.WaitGroup
}
@@ -77,6 +90,15 @@ func (erS *EventReaderService) Start(ctx *context.Context, shtDwn context.Cancel
// build the service
erS.ers = ers.NewERService(erS.cfg, filterS, erS.connMgr)
go erS.listenAndServe(erS.ers, erS.stopChan, erS.rldChan, shtDwn)
srv, err := engine.NewServiceWithPing(erS.ers, utils.ErSv1, utils.V1Prfx)
if err != nil {
return err
}
if !erS.cfg.DispatcherSCfg().Enabled {
erS.server.RpcRegister(srv)
}
erS.intConn <- erS.anz.GetInternalCodec(srv, utils.ERs)
return
}
@@ -99,9 +121,10 @@ func (erS *EventReaderService) Reload(*context.Context, context.CancelFunc) (err
// Shutdown stops the service
func (erS *EventReaderService) Shutdown() (err error) {
erS.Lock()
defer erS.Unlock()
close(erS.stopChan)
erS.ers = nil
erS.Unlock()
erS.server.RpcUnregisterName(utils.ErSv1)
return
}

View File

@@ -65,7 +65,8 @@ func TestEventReaderSReload(t *testing.T) {
anz := NewAnalyzerService(cfg, server, filterSChan, make(chan birpc.ClientConnector, 1), srvDep)
db := NewDataDBService(cfg, nil, srvDep)
sS := NewSessionService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep)
erS := NewEventReaderService(cfg, filterSChan, nil, srvDep)
intERsConn := make(chan birpc.ClientConnector, 1)
erS := NewEventReaderService(cfg, filterSChan, nil, server, intERsConn, anz, srvDep)
engine.NewConnManager(cfg)
srvMngr.AddServices(erS, sS,
NewLoaderService(cfg, db, filterSChan, server, make(chan birpc.ClientConnector, 1), nil, anz, srvDep), db)
@@ -127,7 +128,8 @@ func TestEventReaderSReload2(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
erS := NewEventReaderService(cfg, filterSChan, nil, srvDep)
server := cores.NewServer(nil)
erS := NewEventReaderService(cfg, filterSChan, nil, server, nil, nil, srvDep)
ers := ers.NewERService(cfg, nil, nil)
runtime.Gosched()

View File

@@ -22,6 +22,7 @@ import (
"sync"
"testing"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/ers"
"github.com/cgrates/cgrates/config"
@@ -36,7 +37,8 @@ func TestEventReaderSCoverage(t *testing.T) {
filterSChan := make(chan *engine.FilterS, 1)
filterSChan <- nil
srvDep := map[string]*sync.WaitGroup{utils.DataDB: new(sync.WaitGroup)}
srv := NewEventReaderService(cfg, filterSChan, nil, srvDep)
server := cores.NewServer(nil)
srv := NewEventReaderService(cfg, filterSChan, nil, server, nil, nil, srvDep)
if srv.IsRunning() {
t.Errorf("Expected service to be down")
@@ -50,6 +52,7 @@ func TestEventReaderSCoverage(t *testing.T) {
rldChan: make(chan struct{}, 1),
stopChan: make(chan struct{}, 1),
connMgr: nil,
server: server,
srvDep: srvDep,
}
if !srv2.IsRunning() {

View File

@@ -384,6 +384,8 @@ const (
MetaUCH = "*uch"
MetaGuardian = "*guardians"
MetaEEs = "*ees"
MetaEFs = "*efs"
MetaERs = "*ers"
MetaRates = "*rates"
MetaRateSOverwrite = "*rateSOverwrite"
MetaContinue = "*continue"
@@ -931,7 +933,6 @@ const (
MetaThresholds = "*thresholds"
MetaRoutes = "*routes"
MetaAttributes = "*attributes"
MetaEFs = "*efs"
MetaActionProfiles = "*action_profiles"
MetaLoadIDs = "*load_ids"
MetaNodeID = "*node_id"
@@ -1354,6 +1355,14 @@ const (
EfSv1ReplayEvents = "EfSv1.ReplayEvents"
)
// ERs
const (
ErS = "ErS"
ErSv1 = "ErSv1"
ErSv1Ping = "ErSv1.Ping"
ErSv1RunReader = "ErSv1.RunReader"
)
// ConfigSv1 APIs
const (
ConfigS = "ConfigS"