From 08d7daf7fa7f05adb7bfb781ff071f416fef2bf9 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 19 Aug 2019 21:14:03 +0200 Subject: [PATCH] Basic ERS CSV reader --- ers/csv.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++++ ers/ers.go | 24 ++++++++++++------------ ers/reader.go | 6 +++--- 3 files changed, 67 insertions(+), 15 deletions(-) create mode 100644 ers/csv.go diff --git a/ers/csv.go b/ers/csv.go new file mode 100644 index 000000000..a53c8f1cb --- /dev/null +++ b/ers/csv.go @@ -0,0 +1,52 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ers + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +func NewCSVFileER(cfg *config.CGRConfig, cfgIdx int) (er EventReader, err error) { + return +} + +// CSVer implements EventReader interface for .csv files +type CSVFileER struct { +} + +func (csv *CSVFileER) ID() (id string) { + return +} + +func (csv *CSVFileER) Init(args interface{}) (err error) { + return +} + +func (csv *CSVFileER) Read() (ev *utils.CGREvent, err error) { + return +} + +func (csv *CSVFileER) Processed() (nrItms int64) { + return +} + +func (csv *CSVFileER) Close() (err error) { + return +} diff --git a/ers/ers.go b/ers/ers.go index 4b094a2f4..44a85c450 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package cdrc +package ers import ( "fmt" @@ -34,13 +34,12 @@ func NewERService(cfg *config.CGRConfig, cdrS rpcclient.RpcClientConnection, rdrs: make(map[string][]EventReader), cfgRld: cfgRld, } - //for _, cfg := range cfg.CDRcProfiles() { - // if !cfg.Enabled { - // continue - // } - //} - if len(erS.rdrs) == 0 { - return nil, nil // no CDRC profiles enabled + for _, rdrCfg := range cfg.ERsCfg().Readers { + if rdr, err := NewEventReader(rdrCfg); err != nil { + return nil, err + } else { + erS.rdrs[rdrCfg.SourcePath] = append(erS.rdrs[rdrCfg.SourcePath], rdr) + } } return } @@ -76,9 +75,9 @@ func (erS *ERService) handleReloads() { addIDs := make(map[string]struct{}) // IDs which need to be added to ERService remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService // index config IDs - //for i, cgrCfg := range erS.cfg.CDRcProfiles() { - // cfgIDs[cgrCfg.ID] = &erCfgRef{path: cgrCfg.CDRInPath, idx: i} - //} + for i, rdrCfg := range erS.cfg.ERsCfg().Readers { + cfgIDs[rdrCfg.ID] = &erCfgRef{path: rdrCfg.SourcePath, idx: i} + } erS.Lock() // index in use IDs for path, rdrs := range erS.rdrs { @@ -109,7 +108,7 @@ func (erS *ERService) handleReloads() { // add new ids: for id := range addIDs { cfgRef := cfgIDs[id] - if newRdr, err := NewEventReader(erS.cfg, cfgRef.idx); err != nil { + if newRdr, err := NewEventReader(erS.cfg.ERsCfg().Readers[cfgRef.idx]); err != nil { utils.Logger.Warning( fmt.Sprintf( "<%s> error reloading config with ID: <%s>, err: <%s>", @@ -119,5 +118,6 @@ func (erS *ERService) handleReloads() { } } + erS.Unlock() } } diff --git a/ers/reader.go b/ers/reader.go index affcda33c..331059cab 100644 --- a/ers/reader.go +++ b/ers/reader.go @@ -16,7 +16,7 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package cdrc +package ers import ( "github.com/cgrates/cgrates/config" @@ -28,10 +28,10 @@ type EventReader interface { Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection Read() (*utils.CGREvent, error) // Process a single record in the events file Processed() int64 // number of records processed - Close() error // called when the reader should stop processing + Close() error // called when the reader should release resources } // NewEventReader instantiates the event reader based on configuration at index -func NewEventReader(cfg *config.CGRConfig, cfgIdx int) (er EventReader, err error) { +func NewEventReader(rdrCfg *config.EventReaderCfg) (er EventReader, err error) { return }