mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
ers.processReader implementation
This commit is contained in:
18
ers/ers.go
18
ers/ers.go
@@ -46,7 +46,9 @@ type ERService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
rdrs map[string]EventReader // map[rdrID]EventReader
|
||||
rdrPaths map[string]string // used for reloads in case of path changes
|
||||
stopLsn map[string]chan struct{} // map[rdrID] chan struct{}
|
||||
|
||||
filterS *engine.FilterS
|
||||
sS rpcclient.RpcClientConnection // connection towards SessionS
|
||||
exitChan chan bool
|
||||
@@ -70,12 +72,12 @@ func (erS *ERService) ListenAndServe(cfgRldChan chan struct{}) (err error) {
|
||||
|
||||
// addReader will add a new reader to the service
|
||||
func (erS *ERService) addReader(rdrCfg *config.EventReaderCfg) (err error) {
|
||||
erS.stopLsn[rdrCfg.ID] = make(chan struct{})
|
||||
var rdr EventReader
|
||||
if rdr, err = NewEventReader(rdrCfg); err != nil {
|
||||
if rdr, err = NewEventReader(rdrCfg, erS.stopLsn[rdrCfg.ID], erS.exitChan); err != nil {
|
||||
return
|
||||
}
|
||||
erS.rdrs[rdrCfg.ID] = rdr
|
||||
erS.stopLsn[rdrCfg.ID] = make(chan struct{})
|
||||
return rdr.Subscribe()
|
||||
}
|
||||
|
||||
@@ -102,6 +104,7 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
return
|
||||
case <-cfgRldChan:
|
||||
cfgIDs := make(map[string]*config.EventReaderCfg)
|
||||
pathReloaded := make(map[string]struct{})
|
||||
// index config IDs
|
||||
for _, rdrCfg := range erS.cfg.ERsCfg().Readers {
|
||||
cfgIDs[rdrCfg.ID] = rdrCfg
|
||||
@@ -109,8 +112,11 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
erS.Lock()
|
||||
// remove the necessary ids
|
||||
for id := range erS.rdrs {
|
||||
if _, has := cfgIDs[id]; has { // still present
|
||||
continue
|
||||
if newCfg, has := cfgIDs[id]; has { // still present
|
||||
if newCfg.SourcePath == erS.rdrPaths[id] {
|
||||
continue
|
||||
}
|
||||
pathReloaded[id] = struct{}{}
|
||||
}
|
||||
delete(erS.rdrs, id)
|
||||
close(erS.stopLsn[id])
|
||||
@@ -119,7 +125,9 @@ func (erS *ERService) handleReloads(cfgRldChan chan struct{}) {
|
||||
// add new ids
|
||||
for id, rdrCfg := range cfgIDs {
|
||||
if _, has := erS.rdrs[id]; has {
|
||||
continue
|
||||
if _, has := pathReloaded[id]; !has {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if err := erS.addReader(rdrCfg); err != nil {
|
||||
utils.Logger.Crit(
|
||||
|
||||
@@ -19,28 +19,48 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package ers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
func NewCSVFileER(cfg *config.EventReaderCfg) (er EventReader, err error) {
|
||||
return new(CSVFileER), nil
|
||||
func NewCSVFileER(cfg *config.EventReaderCfg,
|
||||
rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
srcPath := cfg.SourcePath
|
||||
if strings.HasSuffix(srcPath, utils.Slash) {
|
||||
srcPath = srcPath[:len(srcPath)-1]
|
||||
}
|
||||
return &CSVFileER{erCfg: cfg, rdrDir: srcPath,
|
||||
rdrExit: rdrExit, appExit: appExit}, nil
|
||||
}
|
||||
|
||||
// CSVFileER implements EventReader interface for .csv files
|
||||
type CSVFileER struct {
|
||||
sync.RWMutex
|
||||
erCfg *config.EventReaderCfg
|
||||
rdrDir string
|
||||
rdrExit chan struct{}
|
||||
appExit chan bool
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Config() (rdrCfg *config.EventReaderCfg) {
|
||||
return
|
||||
func (csv *CSVFileER) Config() *config.EventReaderCfg {
|
||||
return csv.erCfg
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Init(itmPath, itmID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Subscribe() (err error) {
|
||||
return
|
||||
func (csv *CSVFileER) Subscribe() error {
|
||||
go func() {
|
||||
if err := watchDir(csv.rdrDir, csv.processDir,
|
||||
utils.ERs, csv.rdrExit); err != nil {
|
||||
utils.Logger.Crit(
|
||||
fmt.Sprintf("<%s> watching directory <%s> got error: <%s>",
|
||||
utils.ERs, csv.rdrDir, err.Error()))
|
||||
csv.appExit <- true
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) Read() (ev *utils.CGREvent, err error) {
|
||||
@@ -54,3 +74,7 @@ func (csv *CSVFileER) Processed() (nrItms int64) {
|
||||
func (csv *CSVFileER) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (csv *CSVFileER) processDir(itmPath, itmID string) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
163
ers/libers.go
163
ers/libers.go
@@ -22,7 +22,9 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/cgrates/cgrates/sessions"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
@@ -58,3 +60,164 @@ func watchDir(dirPath string, f func(itmPath, itmID string) error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processReader will process events from reader and publish them to SessionS
|
||||
func processReader(rdr EventReader, sS rpcclient.RpcClient, rdrExit chan struct{}) (err error) {
|
||||
for { // reads until no more events are produced or exit is signaled
|
||||
select {
|
||||
case <-rdrExit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
rdrCfg := rdr.Config()
|
||||
cgrEv, err := rdr.Read()
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, error: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
} else if cgrEv == nil {
|
||||
break // no more events
|
||||
}
|
||||
// log the event created if requested by flags
|
||||
if rdrCfg.Flags.HasKey(utils.MetaLog) {
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> LOG, reader: <%s>, message: %s",
|
||||
utils.ERs, rdrCfg.ID, utils.ToIJSON(cgrEv)))
|
||||
}
|
||||
// find out reqType
|
||||
var reqType string
|
||||
for _, typ := range []string{
|
||||
utils.MetaDryRun, utils.MetaAuth,
|
||||
utils.MetaInitiate, utils.MetaUpdate,
|
||||
utils.MetaTerminate, utils.MetaMessage,
|
||||
utils.MetaCDRs, utils.MetaEvent, utils.META_NONE} {
|
||||
if rdrCfg.Flags.HasKey(typ) { // request type is identified through flags
|
||||
reqType = typ
|
||||
break
|
||||
}
|
||||
}
|
||||
// execute the action based on reqType
|
||||
cgrArgs := cgrEv.ConsumeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaDispatchers),
|
||||
reqType == utils.MetaAuth ||
|
||||
reqType == utils.MetaMessage ||
|
||||
reqType == utils.MetaEvent)
|
||||
switch reqType {
|
||||
default:
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing reader <%s>, unsupported reqType: <%s>",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
continue
|
||||
case utils.META_NONE: // do nothing on CGRateS side
|
||||
case utils.MetaDryRun:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> DRYRUN, reader: <%s>, CGREvent: <%s>",
|
||||
utils.DNSAgent, rdrCfg.ID, utils.ToJSON(cgrEv)))
|
||||
case utils.MetaAuth:
|
||||
authArgs := sessions.NewV1AuthorizeArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator,
|
||||
)
|
||||
rply := new(sessions.V1AuthorizeReply)
|
||||
err = sS.Call(utils.SessionSv1AuthorizeEvent,
|
||||
authArgs, rply)
|
||||
case utils.MetaInitiate:
|
||||
initArgs := sessions.NewV1InitSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1InitSessionReply)
|
||||
err = sS.Call(utils.SessionSv1InitiateSession,
|
||||
initArgs, rply)
|
||||
case utils.MetaUpdate:
|
||||
updateArgs := sessions.NewV1UpdateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := new(sessions.V1UpdateSessionReply)
|
||||
err = sS.Call(utils.SessionSv1UpdateSession,
|
||||
updateArgs, rply)
|
||||
case utils.MetaTerminate:
|
||||
terminateArgs := sessions.NewV1TerminateSessionArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
cgrEv, cgrArgs.ArgDispatcher)
|
||||
rply := utils.StringPointer("")
|
||||
err = sS.Call(utils.SessionSv1TerminateSession,
|
||||
terminateArgs, rply)
|
||||
case utils.MetaMessage:
|
||||
evArgs := sessions.NewV1ProcessMessageArgs(
|
||||
rdrCfg.Flags.HasKey(utils.MetaAttributes),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaAttributes),
|
||||
rdrCfg.Flags.HasKey(utils.MetaThresholds),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaThresholds),
|
||||
rdrCfg.Flags.HasKey(utils.MetaStats),
|
||||
rdrCfg.Flags.ParamsSlice(utils.MetaStats),
|
||||
rdrCfg.Flags.HasKey(utils.MetaResources),
|
||||
rdrCfg.Flags.HasKey(utils.MetaAccounts),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliers),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
|
||||
rdrCfg.Flags.HasKey(utils.MetaSuppliersEventCost),
|
||||
cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.SupplierPaginator)
|
||||
rply := new(sessions.V1ProcessMessageReply) // need it so rpcclient can clone
|
||||
err = sS.Call(utils.SessionSv1ProcessMessage,
|
||||
evArgs, rply)
|
||||
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
|
||||
cgrEv.Event[utils.Usage] = 0 // avoid further debits
|
||||
} else if rply.MaxUsage != nil {
|
||||
cgrEv.Event[utils.Usage] = *rply.MaxUsage // make sure the CDR reflects the debit
|
||||
}
|
||||
case utils.MetaEvent:
|
||||
evArgs := &sessions.V1ProcessEventArgs{
|
||||
Flags: rdrCfg.Flags.SliceFlags(),
|
||||
CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher,
|
||||
Paginator: *cgrArgs.SupplierPaginator,
|
||||
}
|
||||
rply := new(sessions.V1ProcessEventReply)
|
||||
err = sS.Call(utils.SessionSv1ProcessEvent,
|
||||
evArgs, rply)
|
||||
case utils.MetaCDRs: // allow CDR processing
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
// separate request so we can capture the Terminate/Event also here
|
||||
if rdrCfg.Flags.HasKey(utils.MetaCDRs) &&
|
||||
!rdrCfg.Flags.HasKey(utils.MetaDryRun) {
|
||||
rplyCDRs := utils.StringPointer("")
|
||||
if err = sS.Call(utils.SessionSv1ProcessCDR,
|
||||
&utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
|
||||
ArgDispatcher: cgrArgs.ArgDispatcher}, &rplyCDRs); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reader <%s>, error: <%s> posting event",
|
||||
utils.ERs, rdrCfg.ID, err.Error()))
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -26,21 +26,20 @@ import (
|
||||
)
|
||||
|
||||
type EventReader interface {
|
||||
Config() *config.EventReaderCfg // reader configuration
|
||||
Init(itmPath, itmName string) error // init will initialize the Reader, ie: open the file to read or http connection
|
||||
Subscribe() error // subscribe the reader on the path
|
||||
Read() (*utils.CGREvent, error) // process a single record in the events file
|
||||
Processed() int64 // number of records processed
|
||||
Close() error // called when the reader should release resources
|
||||
Config() *config.EventReaderCfg
|
||||
Subscribe() error // subscribe the reader on the path
|
||||
Read() (*utils.CGREvent, error) // process a single record in the events file
|
||||
Processed() int64 // number of records processed
|
||||
}
|
||||
|
||||
// NewEventReader instantiates the event reader based on configuration at index
|
||||
func NewEventReader(rdrCfg *config.EventReaderCfg) (er EventReader, err error) {
|
||||
func NewEventReader(rdrCfg *config.EventReaderCfg,
|
||||
rdrExit chan struct{}, appExit chan bool) (er EventReader, err error) {
|
||||
switch rdrCfg.Type {
|
||||
default:
|
||||
err = fmt.Errorf("unsupported reader type: <%s>", rdrCfg.Type)
|
||||
case utils.MetaFileCSV:
|
||||
return NewCSVFileER(rdrCfg)
|
||||
return NewCSVFileER(rdrCfg, rdrExit, appExit)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user