From 0650dd4fec17fad912a2fc8658601b3e0cc3ea66 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 11 Aug 2019 19:31:02 +0200 Subject: [PATCH] Adding ers package --- ers/ers.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++++ ers/reader.go | 37 +++++++++++++++ utils/consts.go | 1 + 3 files changed, 161 insertions(+) create mode 100644 ers/ers.go create mode 100644 ers/reader.go diff --git a/ers/ers.go b/ers/ers.go new file mode 100644 index 000000000..4b094a2f4 --- /dev/null +++ b/ers/ers.go @@ -0,0 +1,123 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewERService instantiates the ERService +func NewERService(cfg *config.CGRConfig, cdrS rpcclient.RpcClientConnection, + cfgRld chan struct{}) (erS *ERService, err error) { + erS = &ERService{ + rdrs: make(map[string][]EventReader), + cfgRld: cfgRld, + } + //for _, cfg := range cfg.CDRcProfiles() { + // if !cfg.Enabled { + // continue + // } + //} + if len(erS.rdrs) == 0 { + return nil, nil // no CDRC profiles enabled + } + return +} + +// ERService is managing the EventReaders +type ERService struct { + sync.RWMutex + cfg *config.CGRConfig + rdrs map[string][]EventReader // list of readers on specific paths map[path]reader + cfgRld chan struct{} // signal the need of config reloading - chan path / *any + sS rpcclient.RpcClientConnection +} + +// ListenAndServe loops keeps the service alive +func (erS *ERService) ListenAndServe(exitChan chan bool) error { + go erS.handleReloads() // start backup loop + e := <-exitChan + exitChan <- e // put back for the others listening for shutdown request + return nil +} + +// erCfgRef will be used to reference a specific reader +type erCfgRef struct { + path string + idx int +} + +func (erS *ERService) handleReloads() { + for { + <-erS.cfgRld + cfgIDs := make(map[string]*erCfgRef) // IDs which are configured in EventReader profiles + inUseIDs := make(map[string]*erCfgRef) // IDs which are running in ERService indexed on path + addIDs := make(map[string]struct{}) // IDs which need to be added to ERService + remIDs := make(map[string]struct{}) // IDs which need to be removed from ERService + // index config IDs + //for i, cgrCfg := range erS.cfg.CDRcProfiles() { + // cfgIDs[cgrCfg.ID] = &erCfgRef{path: cgrCfg.CDRInPath, idx: i} + //} + erS.Lock() + // index in use IDs + for path, rdrs := range erS.rdrs { + for i, rdr := range rdrs { + inUseIDs[rdr.ID()] = &erCfgRef{path: path, idx: i} + } + } + // find out removed ids + for id := range inUseIDs { + if _, has := cfgIDs[id]; !has { + remIDs[id] = struct{}{} + } + } + // find out added ids + for id := range cfgIDs { + if _, has := inUseIDs[id]; !has { + addIDs[id] = struct{}{} + } + } + for id := range remIDs { + ref := inUseIDs[id] + rdrSlc := erS.rdrs[ref.path] + // remove the ids + copy(rdrSlc[ref.idx:], rdrSlc[ref.idx+1:]) + rdrSlc[len(rdrSlc)-1] = nil // so it can be garbage collected + rdrSlc = rdrSlc[:len(rdrSlc)-1] + } + // add new ids: + for id := range addIDs { + cfgRef := cfgIDs[id] + if newRdr, err := NewEventReader(erS.cfg, cfgRef.idx); err != nil { + utils.Logger.Warning( + fmt.Sprintf( + "<%s> error reloading config with ID: <%s>, err: <%s>", + utils.ERs, id, err.Error())) + } else { + erS.rdrs[cfgRef.path] = append(erS.rdrs[cfgRef.path], newRdr) + } + + } + } +} diff --git a/ers/reader.go b/ers/reader.go new file mode 100644 index 000000000..affcda33c --- /dev/null +++ b/ers/reader.go @@ -0,0 +1,37 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" +) + +type EventReader interface { + ID() string // configuration identifier + Init(args interface{}) error // init will initialize the Reader, ie: open the file to read or http connection + Read() (*utils.CGREvent, error) // Process a single record in the events file + Processed() int64 // number of records processed + Close() error // called when the reader should stop processing +} + +// NewEventReader instantiates the event reader based on configuration at index +func NewEventReader(cfg *config.CGRConfig, cfgIdx int) (er EventReader, err error) { + return +} diff --git a/utils/consts.go b/utils/consts.go index 0733d7692..a746f586c 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -563,6 +563,7 @@ const ( MetaRatingPlanCost = "*rating_plan_cost" RatingPlanIDs = "RatingPlanIDs" MetaAccount = "*account" + ERs = "ERs" ) // Migrator Action