Updated the EEs with the new interface

This commit is contained in:
Trial97
2021-08-16 17:11:02 +03:00
committed by Dan Christian Bogos
parent 02973a711d
commit d2915a5737
30 changed files with 729 additions and 1038 deletions

View File

@@ -517,7 +517,7 @@ func main() {
utils.RegistrarC: new(sync.WaitGroup),
utils.DispatcherS: new(sync.WaitGroup),
utils.DNSAgent: new(sync.WaitGroup),
utils.EventExporterS: new(sync.WaitGroup),
utils.EEs: new(sync.WaitGroup),
utils.ERs: new(sync.WaitGroup),
utils.FreeSWITCHAgent: new(sync.WaitGroup),
utils.GlobalVarS: new(sync.WaitGroup),

View File

@@ -147,6 +147,7 @@ type EventExporterCfg struct {
AttributeSCtx string // context to use when querying AttributeS
Synchronous bool
Attempts int
FailedPostsDir string
ConcurrentRequests int
Fields []*FCTemplate
headerFields []*FCTemplate
@@ -208,6 +209,9 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe
eeC.Opts[k] = v
}
}
if jsnEec.Failed_posts_dir != nil {
eeC.FailedPostsDir = *jsnEec.Failed_posts_dir
}
return
}
@@ -261,6 +265,7 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) {
contentFields: make([]*FCTemplate, len(eeC.contentFields)),
trailerFields: make([]*FCTemplate, len(eeC.trailerFields)),
Opts: make(map[string]interface{}),
FailedPostsDir: eeC.FailedPostsDir,
}
if eeC.Filters != nil {
@@ -306,6 +311,7 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
utils.SynchronousCfg: eeC.Synchronous,
utils.AttemptsCfg: eeC.Attempts,
utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests,
utils.FailedPostsDirCfg: eeC.FailedPostsDir,
}
opts := make(map[string]interface{})
for k, v := range eeC.Opts {
@@ -337,6 +343,7 @@ type EventExporterJsonCfg struct {
Synchronous *bool
Attempts *int
Concurrent_requests *int
Failed_posts_dir *string
Fields *[]*FcTemplateJsonCfg
}
@@ -388,6 +395,9 @@ func diffEventExporterJsonCfg(d *EventExporterJsonCfg, v1, v2 *EventExporterCfg,
if flds != nil {
d.Fields = &flds
}
if v1.FailedPostsDir != v2.FailedPostsDir {
d.Failed_posts_dir = utils.StringPointer(v2.FailedPostsDir)
}
return d
}

View File

@@ -16,41 +16,46 @@ You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
package ees
import (
"fmt"
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
)
// NewAMQPPoster creates a new amqp poster
// NewAMQPee creates a new amqp poster
// "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs"
func NewAMQPPoster(dialURL string, attempts int, opts map[string]interface{}) *AMQPPoster {
amqp := &AMQPPoster{
attempts: attempts,
dialURL: dialURL,
func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee {
amqp := &AMQPee{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
amqp.parseOpts(opts)
amqp.parseOpts(cfg.Opts)
return amqp
}
// AMQPPoster used to post cdrs to amqp
type AMQPPoster struct {
dialURL string
// AMQPee used to post cdrs to amqp
type AMQPee struct {
queueID string // identifier of the CDR queue where we publish
exchange string
exchangeType string
routingKey string
attempts int
sync.Mutex // protect connection
conn *amqp.Connection
postChan *amqp.Channel
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
sync.RWMutex // protect connection
bytePreparing
}
func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) {
func (pstr *AMQPee) parseOpts(dialURL map[string]interface{}) {
pstr.queueID = utils.DefaultQueueID
pstr.routingKey = utils.DefaultQueueID
if vals, has := dialURL[utils.AMQPQueueID]; has {
@@ -68,85 +73,32 @@ func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) {
}
}
// Post is the method being called when we need to post anything in the queue
// the optional chn will permits channel caching
func (pstr *AMQPPoster) Post(content []byte, _ string) (err error) {
var chn *amqp.Channel
fib := utils.Fib()
func (pstr *AMQPee) Cfg() *config.EventExporterCfg { return pstr.cfg }
for i := 0; i < pstr.attempts; i++ {
if chn, err = pstr.newPostChannel(); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<AMQPPoster> creating new post channel, err: %s", err.Error()))
return
}
for i := 0; i < pstr.attempts; i++ {
if err = chn.Publish(
pstr.exchange, // exchange
pstr.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: utils.ContentJSON,
Body: content,
}); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
return
}
if chn != nil {
chn.Close()
}
return
}
// Close closes the connections
func (pstr *AMQPPoster) Close() {
pstr.Lock()
if pstr.conn != nil {
pstr.conn.Close()
}
pstr.conn = nil
pstr.Unlock()
}
func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) {
func (pstr *AMQPee) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.conn == nil {
var conn *amqp.Connection
conn, err = amqp.Dial(pstr.dialURL)
if err == nil {
pstr.conn = conn
go func() { // monitor connection errors so we can restart
if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil {
utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
pstr.Close()
}
}()
if pstr.conn, err = amqp.Dial(pstr.Cfg().ExportPath); err != nil {
return
}
go func() { // monitor connection errors so we can restart
if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil {
utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error()))
pstr.Close()
}
}()
}
pstr.Unlock()
if err != nil {
return nil, err
if pstr.postChan != nil {
return
}
if postChan, err = pstr.conn.Channel(); err != nil {
if pstr.postChan, err = pstr.conn.Channel(); err != nil {
return
}
if pstr.exchange != "" {
if err = postChan.ExchangeDeclare(
if err = pstr.postChan.ExchangeDeclare(
pstr.exchange, // name
pstr.exchangeType, // type
true, // durable
@@ -159,7 +111,7 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) {
}
}
if _, err = postChan.QueueDeclare(
if _, err = pstr.postChan.QueueDeclare(
pstr.queueID, // name
true, // durable
false, // auto-delete
@@ -171,7 +123,7 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) {
}
if pstr.exchange != "" {
if err = postChan.QueueBind(
if err = pstr.postChan.QueueBind(
pstr.queueID, // queue
pstr.routingKey, // key
pstr.exchange, // exchange
@@ -183,3 +135,37 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) {
}
return
}
func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
err = pstr.postChan.Publish(
pstr.exchange, // exchange
pstr.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: utils.ContentJSON,
Body: content.([]byte),
})
pstr.RUnlock()
pstr.reqs.done()
return
}
func (pstr *AMQPee) Close() (err error) {
pstr.Lock()
if pstr.postChan != nil {
pstr.postChan.Close()
pstr.postChan = nil
}
if pstr.conn != nil {
err = pstr.conn.Close()
pstr.conn = nil
}
pstr.Unlock()
return
}
func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc }

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -28,54 +29,49 @@ import (
"github.com/cgrates/cgrates/utils"
)
type EventExporter interface {
ID() string // return the exporter identificator
ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported
OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
}
type exportedEvent interface {
Parse(func(path []string, val interface{}))
AsStringSlice() []string
AsMapStringSlice() map[string]interface{}
}
type EventExporter2 interface {
Cfg() *config.EventExporterCfg // return the config
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(exportedEvent) (interface{}, error) // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
Cfg() *config.EventExporterCfg // return the config
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(interface{}, string) error // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
PrepareMap(map[string]interface{}) (interface{}, error)
PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error)
}
// NewEventExporter produces exporters
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter2, err error) {
var dc *utils.SafeMapStorage
if dc, err = newEEMetrics(utils.FirstNonEmpty(
cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone,
cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
return
}
cfg := cgrCfg.EEsCfg().Exporters[cfgIdx]
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaFileCSV:
return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc)
return NewFileCSVee(cfg, cgrCfg, filterS, dc)
case utils.MetaFileFWV:
return NewFileFWVee(cgrCfg, cfgIdx, filterS, dc)
return NewFileFWVee(cfg, cgrCfg, filterS, dc)
case utils.MetaHTTPPost:
return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc)
return NewHTTPPostEE(cfg, cgrCfg, filterS, dc)
case utils.MetaHTTPjsonMap:
return NewHTTPjsonMapEE(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap,
return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, dc)
case utils.MetaNatsjsonMap:
return NewNatsEE(cfg, cgrCfg.GeneralCfg().NodeID,
cgrCfg.GeneralCfg().ConnectTimeout, dc)
case utils.MetaAMQPjsonMap:
return NewAMQPee(cfg, dc), nil
case utils.MetaAMQPV1jsonMap,
utils.MetaSQSjsonMap, utils.MetaKafkajsonMap,
utils.MetaS3jsonMap, utils.MetaNatsjsonMap:
utils.MetaS3jsonMap:
return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc)
case utils.MetaVirt:
return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc)
return NewVirtualEE(cfg, dc)
case utils.MetaElastic:
return NewElasticExporter(cgrCfg, cfgIdx, filterS, dc)
return NewElasticEE(cfg, dc)
case utils.MetaSQL:
return NewSQLEe(cgrCfg, cfgIdx, filterS, dc)
return NewSQLEe(cfg, dc)
default:
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
}
@@ -224,44 +220,31 @@ func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent,
}
}
type expOrderedNavigableMap utils.OrderedNavigableMap
type bytePreparing struct{}
func (v *expOrderedNavigableMap) Parse(f func(path []string, val interface{})) {
nm := (*utils.OrderedNavigableMap)(v)
for el := nm.GetFirstElement(); el != nil; el = el.Next() {
nmIt, _ := nm.Field(el.Value)
f(el.Value, nmIt.Data)
}
func (eEe *bytePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) {
return json.Marshal(mp)
}
func (v *expOrderedNavigableMap) AsStringSlice() []string {
return (*utils.OrderedNavigableMap)(v).OrderedFieldsAsStrings()
}
func (v *expOrderedNavigableMap) AsMapStringSlice() (m map[string]interface{}) {
m = map[string]interface{}{}
nm := (*utils.OrderedNavigableMap)(v)
for el := nm.GetFirstElement(); el != nil; el = el.Next() {
func (eEe *bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
valMp := make(map[string]interface{})
for el := mp.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := nm.Field(path)
nmIt, _ := mp.Field(path)
path = path[:len(path)-1] // remove the last index
m[strings.Join(path, utils.NestingSep)] = nmIt.String()
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
return
return json.Marshal(valMp)
}
type expMapStorage utils.MapStorage
type slicePreparing struct{}
func (v expMapStorage) Parse(f func(path []string, val interface{})) {
for k, val := range utils.MapStorage(v) {
f([]string{k}, val)
func (eEe *slicePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) {
csvRecord := make([]string, 0, len(mp))
for _, val := range mp {
csvRecord = append(csvRecord, utils.IfaceAsString(val))
}
return csvRecord, nil
}
func (v expMapStorage) AsStringSlice() (s []string) {
s = make([]string, 0, len(v))
for _, val := range utils.MapStorage(v) {
s = append(s, utils.IfaceAsString(val))
}
return
func (eEe *slicePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
return mp.OrderedFieldsAsStrings(), nil
}
func (v expMapStorage) AsMapStringSlice() map[string]interface{} { return v }

View File

@@ -116,11 +116,11 @@ func TestNewEventExporterCase3(t *testing.T) {
if err != nil {
t.Error(err)
}
eeExpect, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
eeExpect, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
newEE := ee.(*HTTPPost)
newEE := ee.(*HTTPPostEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
@@ -200,11 +200,11 @@ func TestNewEventExporterCase6(t *testing.T) {
if err != nil {
t.Error(err)
}
eeExpect, err := NewVirtualExporter(cgrCfg, 0, filterS, dc)
eeExpect, err := NewVirtualEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
newEE := ee.(*VirtualEe)
newEE := ee.(*VirtualEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
if !reflect.DeepEqual(eeExpect, newEE) {
@@ -242,11 +242,11 @@ func TestNewEventExporterCase7(t *testing.T) {
if err != nil {
t.Error(err)
}
eeExpect, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eeExpect, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
newEE := ee.(*ElasticEe)
newEE := ee.(*ElasticEE)
newEE.dc.MapStorage[utils.TimeNow] = nil
eeExpect.dc.MapStorage[utils.TimeNow] = nil
eeExpect.eClnt = newEE.eClnt

View File

@@ -61,7 +61,7 @@ type EventExporterS struct {
// ListenAndServe keeps the service alive
func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) {
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>",
utils.CoreS, utils.EventExporterS))
utils.CoreS, utils.EEs))
for {
select {
case <-stopChan: // global exit
@@ -69,7 +69,7 @@ func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) {
case rld := <-cfgRld: // configuration was reloaded, destroy the cache
cfgRld <- rld
utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.",
utils.EventExporterS))
utils.EEs))
eeS.setupCache(eeS.cfg.EEsCfg().Cache)
}
}
@@ -77,7 +77,7 @@ func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) {
// Shutdown is called to shutdown the service
func (eeS *EventExporterS) Shutdown() {
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS))
utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs))
eeS.setupCache(nil) // cleanup exporters
}
@@ -176,11 +176,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
eeCache, hasCache := eeS.eesChs[eeCfg.Type]
eeS.eesMux.RUnlock()
var isCached bool
var ee EventExporter
var ee EventExporter2
if hasCache {
var x interface{}
if x, isCached = eeCache.Get(eeCfg.ID); isCached {
ee = x.(EventExporter)
ee = x.(EventExporter2)
}
}
if !isCached {
@@ -191,6 +191,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
eeCache.Set(eeCfg.ID, ee, nil)
}
}
metricMapLock.Lock()
metricsMap[ee.Cfg().ID] = utils.MapStorage{} // will return the ID for all processed exporters
metricMapLock.Unlock()
if eeCfg.Synchronous {
wg.Add(1) // wait for synchronous or file ones since these need to be done before continuing
}
@@ -201,22 +206,17 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
if hasVerbose && !eeCfg.Synchronous {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false",
utils.EventExporterS, ee.ID()))
utils.EEs, ee.Cfg().ID))
}
go func(evict, sync bool, ee EventExporter) {
if err := ee.ExportEvent(cgrEv.CGREvent); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> with id <%s>, error: <%s>",
utils.EventExporterS, ee.ID(), err.Error()))
go func(evict, sync bool, ee EventExporter2) {
if err := eeS.exportEventWithExporter(ee, cgrEv.CGREvent, evict); err != nil {
withErr = true
}
if evict {
ee.OnEvicted("", nil) // so we can close ie the file
}
if sync {
if hasVerbose {
metricMapLock.Lock()
metricsMap[ee.ID()] = ee.GetMetrics().MapStorage
metricsMap[ee.Cfg().ID] = ee.GetMetrics().ClonedMapStorage()
metricMapLock.Unlock()
}
wg.Done()
@@ -255,13 +255,18 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
}
func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils.CGREvent, oneTime bool) (err error) {
var eEv exportedEvent
if oneTime {
defer exp.Close()
}
var eEv interface{}
exp.GetMetrics().Lock()
exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1
exp.GetMetrics().Unlock()
if len(exp.Cfg().ContentFields()) == 0 {
eEv = expMapStorage(ev.Event)
if eEv, err = exp.PrepareMap(ev.Event); err != nil {
return
}
} else {
expNM := utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
@@ -272,21 +277,25 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils
}, utils.FirstNonEmpty(ev.Tenant, eeS.cfg.GeneralCfg().DefaultTenant),
eeS.filterS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields())
eEv = (*expOrderedNavigableMap)(expNM)
if eEv, err = exp.PrepareOrderMap(expNM); err != nil {
return
}
}
key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()),
utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault))
exp = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant),
fltS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields)
return
if oneTime {
defer exp.Close()
return ExportWithAttempts(exp, eEv, key)
}
func ExportWithAttempts(exp EventExporter2, eEv interface{}, key string) (err error) {
if exp.Cfg().FailedPostsDir != utils.MetaNone {
defer func() {
if err != nil {
engine.AddFailedPost(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath,
exp.Cfg().Type, utils.EEs,
eEv, exp.Cfg().Opts)
}
}()
}
fib := utils.Fib()
@@ -299,11 +308,13 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter <%s> could not connect because err: %s", utils.EEs, exp.Cfg().ID, err.Error()))
utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter <%s> could not connect because err: <%s>",
utils.EEs, exp.Cfg().ID, err.Error()))
return
}
for i := 0; i < exp.Cfg().Attempts; i++ {
if err = exp.ExportEvent(ev); err == nil {
if err = exp.ExportEvent(eEv, key); err == nil {
break
}
if i+1 < exp.Cfg().Attempts {
@@ -311,7 +322,9 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils
}
}
if err != nil {
return
utils.Logger.Warning(
fmt.Sprintf("<%s> Exporter <%s> could not export because err: <%s>",
utils.EEs, exp.Cfg().ID, err.Error()))
}
return
}

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -27,149 +28,101 @@ import (
"github.com/elastic/go-elasticsearch/esapi"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
elasticsearch "github.com/elastic/go-elasticsearch"
)
func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (eEe *ElasticEe, err error) {
eEe = &ElasticEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) {
eEe = &ElasticEE{
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = eEe.init()
err = eEe.prepareOpts()
return
}
// ElasticEe implements EventExporter interface for ElasticSearch export
type ElasticEe struct {
id string
eClnt *elasticsearch.Client
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
// ElasticEE implements EventExporter interface for ElasticSearch export
type ElasticEE struct {
cfg *config.EventExporterCfg
eClnt *elasticsearch.Client
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
bytePreparing
}
// init will create all the necessary dependencies, including opening the file
func (eEe *ElasticEe) init() (err error) {
// create the client
if eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{
Addresses: strings.Split(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ExportPath, utils.InfieldSep),
}); err != nil {
return
}
func (eEe *ElasticEE) prepareOpts() (err error) {
//parse opts
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIndex]; !has {
eEe.opts.Index = utils.CDRsTBL
} else {
eEe.opts.Index = utils.CDRsTBL
if val, has := eEe.Cfg().Opts[utils.ElsIndex]; has {
eEe.opts.Index = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIfPrimaryTerm]; has {
if val, has := eEe.Cfg().Opts[utils.ElsIfPrimaryTerm]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIfSeqNo]; has {
if val, has := eEe.Cfg().Opts[utils.ElsIfSeqNo]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.IfSeqNo = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsOpType]; has {
if val, has := eEe.Cfg().Opts[utils.ElsOpType]; has {
eEe.opts.OpType = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsPipeline]; has {
if val, has := eEe.Cfg().Opts[utils.ElsPipeline]; has {
eEe.opts.Pipeline = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsRouting]; has {
if val, has := eEe.Cfg().Opts[utils.ElsRouting]; has {
eEe.opts.Routing = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsTimeout]; has {
if val, has := eEe.Cfg().Opts[utils.ElsTimeout]; has {
if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil {
return
}
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsVersionLow]; has {
if val, has := eEe.Cfg().Opts[utils.ElsVersionLow]; has {
var intVal int64
if intVal, err = utils.IfaceAsTInt64(val); err != nil {
return
}
eEe.opts.Version = utils.IntPointer(int(intVal))
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsVersionType]; has {
if val, has := eEe.Cfg().Opts[utils.ElsVersionType]; has {
eEe.opts.VersionType = utils.IfaceAsString(val)
}
if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsWaitForActiveShards]; has {
if val, has := eEe.Cfg().Opts[utils.ElsWaitForActiveShards]; has {
eEe.opts.WaitForActiveShards = utils.IfaceAsString(val)
}
return
}
// ID returns the identificator of this exporter
func (eEe *ElasticEe) ID() string {
return eEe.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {
func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg }
func (eEe *ElasticEE) Connect() (err error) {
// create the client
if eEe.eClnt == nil {
eEe.eClnt, err = elasticsearch.NewClient(
elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)},
)
}
return
}
// ExportEvent implements EventExporter
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) {
eEe.reqs.get()
defer func() {
updateEEMetrics(eEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
eEe.reqs.done()
}()
eEe.dc.Lock()
eEe.dc.MapStorage[utils.NumberOfEvents] = eEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
eEe.dc.Unlock()
valMp := make(map[string]interface{})
if len(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()) == 0 {
valMp = cgrEv.Event
} else {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: eEe.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: eEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant),
eEe.filterS, oNm)
if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path)
path = path[:len(path)-1] // remove the last index
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
}
// Set up the request object
cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID())
runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)
defer eEe.reqs.done()
eReq := esapi.IndexRequest{
Index: eEe.opts.Index,
DocumentID: utils.ConcatenatedKey(cgrID, runID),
Body: strings.NewReader(utils.ToJSON(valMp)),
DocumentID: key,
Body: bytes.NewReader(ev.([]byte)),
Refresh: "true",
IfPrimaryTerm: eEe.opts.IfPrimaryTerm,
IfSeqNo: eEe.opts.IfSeqNo,
@@ -192,14 +145,16 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
var e map[string]interface{}
if err = json.NewDecoder(resp.Body).Decode(&e); err != nil {
return
} else {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document",
utils.EventExporterS, eEe.id, e))
}
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document",
utils.EEs, eEe.Cfg().ID, e))
}
return
}
func (eEe *ElasticEe) GetMetrics() *utils.SafeMapStorage {
return eEe.dc.Clone()
func (eEe *ElasticEE) Close() (_ error) {
eEe.eClnt = nil
return
}
func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc }

View File

@@ -30,7 +30,7 @@ import (
)
func TestID(t *testing.T) {
ee := &ElasticEe{
ee := &ElasticEE{
id: "3",
}
if rcv := ee.ID(); !reflect.DeepEqual(rcv, "3") {
@@ -46,7 +46,7 @@ func TestGetMetrics(t *testing.T) {
if err != nil {
t.Error(err)
}
ee := &ElasticEe{
ee := &ElasticEE{
dc: dc,
}
@@ -57,7 +57,7 @@ func TestGetMetrics(t *testing.T) {
func TestInitClient(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
ee.cgrCfg.EEsCfg().Exporters[0].ExportPath = "/\x00"
@@ -69,7 +69,7 @@ func TestInitClient(t *testing.T) {
func TestInitCase1(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIndex] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -84,7 +84,7 @@ func TestInitCase1(t *testing.T) {
func TestInitCase2(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfPrimaryTerm] = 20
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -99,7 +99,7 @@ func TestInitCase2(t *testing.T) {
func TestInitCase2Err(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfPrimaryTerm] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
@@ -111,7 +111,7 @@ func TestInitCase2Err(t *testing.T) {
func TestInitCase3(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfSeqNo] = 20
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -126,7 +126,7 @@ func TestInitCase3(t *testing.T) {
func TestInitCase3Err(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfSeqNo] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
@@ -138,7 +138,7 @@ func TestInitCase3Err(t *testing.T) {
func TestInitCase4(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsOpType] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -153,7 +153,7 @@ func TestInitCase4(t *testing.T) {
func TestInitCase5(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsPipeline] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -168,7 +168,7 @@ func TestInitCase5(t *testing.T) {
func TestInitCase6(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsRouting] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -183,7 +183,7 @@ func TestInitCase6(t *testing.T) {
func TestInitCase7(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsTimeout] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
errExpect := "time: invalid duration \"test\""
@@ -195,7 +195,7 @@ func TestInitCase7(t *testing.T) {
func TestInitCase8(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionLow] = 20
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -210,7 +210,7 @@ func TestInitCase8(t *testing.T) {
func TestInitCase8Err(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionLow] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax"
@@ -222,7 +222,7 @@ func TestInitCase8Err(t *testing.T) {
func TestInitCase9(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionType] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -237,7 +237,7 @@ func TestInitCase9(t *testing.T) {
func TestInitCase10(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsWaitForActiveShards] = "test"
ee := &ElasticEe{
ee := &ElasticEE{
cgrCfg: cgrCfg,
}
if err := ee.init(); err != nil {
@@ -273,7 +273,7 @@ func TestElasticExportEvent(t *testing.T) {
if err != nil {
t.Error(err)
}
eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -324,7 +324,7 @@ func TestElasticExportEvent2(t *testing.T) {
if err != nil {
t.Error(err)
}
eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -375,7 +375,7 @@ func TestElasticExportEvent3(t *testing.T) {
if err != nil {
t.Error(err)
}
eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -416,7 +416,7 @@ func TestElasticExportEvent4(t *testing.T) {
if err != nil {
t.Error(err)
}
eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -456,7 +456,7 @@ func TestElasticExportEvent5(t *testing.T) {
if err != nil {
t.Error(err)
}
eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc)
eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}

View File

@@ -24,6 +24,7 @@ import (
"io"
"os"
"path"
"sync"
"github.com/cgrates/cgrates/engine"
@@ -31,15 +32,15 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
func NewFileCSVee(cfg *config.EventExporterCfg,
cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cfg: cfg,
dc: dc,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = fCsv.init()
return
@@ -47,21 +48,23 @@ func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
// FileCSVee implements EventExporter interface for .csv files
type FileCSVee struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
file io.WriteCloser
csvWriter *csv.Writer
dc *utils.SafeMapStorage
reqs *concReq
sync.Mutex
slicePreparing
// for header and trailer composing
cgrCfg *config.CGRConfig
filterS *engine.FilterS
}
// init will create all the necessary dependencies, including opening the file
func (fCsv *FileCSVee) init() (err error) {
fCsv.Lock()
defer fCsv.Unlock()
// create the file
filePath := path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath,
fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
filePath := path.Join(fCsv.Cfg().ExportPath,
fCsv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix)
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.ExportPath] = filePath
fCsv.dc.Unlock()
@@ -70,109 +73,60 @@ func (fCsv *FileCSVee) init() (err error) {
}
fCsv.csvWriter = csv.NewWriter(fCsv.file)
fCsv.csvWriter.Comma = utils.CSVSep
if fieldSep, has := fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Opts[utils.CSVFieldSepOpt]; has {
if fieldSep, has := fCsv.Cfg().Opts[utils.CSVFieldSepOpt]; has {
fCsv.csvWriter.Comma = rune(utils.IfaceAsString(fieldSep)[0])
}
return fCsv.composeHeader()
}
// ID returns the identificator of this exporter
func (fCsv *FileCSVee) ID() string {
return fCsv.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// verify if we need to add the trailer
if err := fCsv.composeTrailer(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EventExporterS, fCsv.id, err.Error()))
}
fCsv.csvWriter.Flush()
if err := fCsv.file.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EventExporterS, fCsv.id, err.Error()))
}
}
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.reqs.get()
defer func() {
updateEEMetrics(fCsv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
fCsv.reqs.done()
}()
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.NumberOfEvents] = fCsv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
fCsv.dc.Unlock()
var csvRecord []string
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()) == 0 {
csvRecord = make([]string, 0, len(cgrEv.Event))
for _, val := range cgrEv.Event {
csvRecord = append(csvRecord, utils.IfaceAsString(val))
}
} else {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fCsv.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant),
fCsv.filterS, oNm)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil {
return
}
csvRecord = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
return fCsv.csvWriter.Write(csvRecord)
}
// Compose and cache the header
func (fCsv *FileCSVee) composeHeader() (err error) {
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()) == 0 {
return
if len(fCsv.Cfg().HeaderFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
fCsv.filterS, oNm)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()); err != nil {
return
}
return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaHdr].OrderedFieldsAsStrings())
return
}
// Compose and cache the trailer
func (fCsv *FileCSVee) composeTrailer() (err error) {
if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()) == 0 {
return
if len(fCsv.Cfg().TrailerFields()) != 0 {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil {
return
}
return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings())
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fCsv.dc,
utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(),
}, fCsv.cgrCfg.GeneralCfg().DefaultTenant,
fCsv.filterS, oNm)
if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()); err != nil {
return
}
return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings())
return
}
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage {
return fCsv.dc.Clone()
func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg }
func (fCsv *FileCSVee) Connect() (_ error) { return }
func (fCsv *FileCSVee) ExportEvent(ev interface{}, _ string) error {
fCsv.Lock() // make sure that only one event is writen in file at once
defer fCsv.Unlock()
return fCsv.csvWriter.Write(ev.([]string))
}
func (fCsv *FileCSVee) Close() (err error) {
fCsv.Lock()
defer fCsv.Unlock()
// verify if we need to add the trailer
if err = fCsv.composeTrailer(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
fCsv.csvWriter.Flush()
if err = fCsv.file.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fCsv.Cfg().ID, err.Error()))
}
return
}
func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc }

View File

@@ -23,20 +23,20 @@ import (
"io"
"os"
"path"
"sync"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cfg: cfg,
dc: dc,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = fFwv.init()
return
@@ -44,19 +44,21 @@ func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
// FileFWVee implements EventExporter interface for .fwv files
type FileFWVee struct {
id string
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
file io.WriteCloser
sync.Mutex
slicePreparing
// for header and trailer composing
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
file io.WriteCloser
dc *utils.SafeMapStorage
reqs *concReq
}
// init will create all the necessary dependencies, including opening the file
func (fFwv *FileFWVee) init() (err error) {
filePath := path.Join(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ExportPath,
fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
filePath := path.Join(fFwv.Cfg().ExportPath,
fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.ExportPath] = filePath
fFwv.dc.Unlock()
@@ -67,85 +69,16 @@ func (fFwv *FileFWVee) init() (err error) {
return fFwv.composeHeader()
}
// ID returns the identificator of this exporter
func (fFwv *FileFWVee) ID() string {
return fFwv.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
// verify if we need to add the trailer
if err := fFwv.composeTrailer(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EventExporterS, fFwv.id, err.Error()))
}
if err := fFwv.file.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EventExporterS, fFwv.id, err.Error()))
}
}
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.reqs.get()
defer func() {
updateEEMetrics(fFwv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
fFwv.reqs.done()
}()
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.NumberOfEvents] = fFwv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
fFwv.dc.Unlock()
var records []string
if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()) == 0 {
records = make([]string, 0, len(cgrEv.Event))
for _, val := range cgrEv.Event {
records = append(records, utils.IfaceAsString(val))
}
} else {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: fFwv.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant),
fFwv.filterS, oNm)
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil {
return
}
records = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings()
}
for _, record := range records {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
}
}
_, err = io.WriteString(fFwv.file, "\n")
return
}
// Compose and cache the header
func (fFwv *FileFWVee) composeHeader() (err error) {
if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()) == 0 {
if len(fFwv.Cfg().HeaderFields()) == 0 {
return
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
fFwv.filterS, oNm)
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()); err != nil {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range eeReq.ExpData[utils.MetaHdr].OrderedFieldsAsStrings() {
for _, record := range exp.OrderedFieldsAsStrings() {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
}
@@ -156,21 +89,14 @@ func (fFwv *FileFWVee) composeHeader() (err error) {
// Compose and cache the trailer
func (fFwv *FileFWVee) composeTrailer() (err error) {
if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()) == 0 {
if len(fFwv.Cfg().TrailerFields()) == 0 {
return
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaTrl: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: fFwv.dc,
utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(),
}, fFwv.cgrCfg.GeneralCfg().DefaultTenant,
fFwv.filterS, oNm)
if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()); err != nil {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil {
return
}
for _, record := range eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings() {
for _, record := range exp.OrderedFieldsAsStrings() {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
}
@@ -179,6 +105,35 @@ func (fFwv *FileFWVee) composeTrailer() (err error) {
return
}
func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage {
return fFwv.dc.Clone()
func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg }
func (fFwv *FileFWVee) Connect() (_ error) { return }
func (fFwv *FileFWVee) ExportEvent(records interface{}, _ string) (err error) {
fFwv.Lock() // make sure that only one event is writen in file at once
defer fFwv.Unlock()
for _, record := range records.([]string) {
if _, err = io.WriteString(fFwv.file, record); err != nil {
return
}
}
_, err = io.WriteString(fFwv.file, "\n")
return
}
func (fFwv *FileFWVee) Close() (err error) {
fFwv.Lock()
defer fFwv.Unlock()
// verify if we need to add the trailer
if err = fFwv.composeTrailer(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}
if err = fFwv.file.Close(); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file",
utils.EEs, fFwv.Cfg().ID, err.Error()))
}
return
}
func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc }

View File

@@ -19,8 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package ees
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"github.com/cgrates/cgrates/config"
@@ -28,126 +32,121 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewHTTPjsonMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) {
pstrJSON = &HTTPjsonMapEE{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
cfg: cfg,
dc: dc,
client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
pstrJSON.pstr = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type],
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
pstrJSON.hdr, err = pstrJSON.composeHeader(cgrCfg, filterS)
return
}
// HTTPjsonMapEE implements EventExporter interface for .csv files
type HTTPjsonMapEE struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
pstr *engine.HTTPPoster
dc *utils.SafeMapStorage
reqs *concReq
}
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
client *http.Client
reqs *concReq
// ID returns the identificator of this exporter
func (httpEE *HTTPjsonMapEE) ID() string {
return httpEE.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {
}
// ExportEvent implements EventExporter
func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpEE.reqs.get()
defer func() {
updateEEMetrics(httpEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
httpEE.reqs.done()
}()
httpEE.dc.Lock()
httpEE.dc.MapStorage[utils.NumberOfEvents] = httpEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
httpEE.dc.Unlock()
valMp := make(map[string]interface{})
hdr := http.Header{}
if len(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ContentFields()) == 0 {
valMp = cgrEv.Event
} else {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpEE.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpEE.cgrCfg.GeneralCfg().DefaultTenant),
httpEE.filterS, oNm)
if err = eeReq.SetFields(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path)
path = path[:len(path)-1] // remove the last index
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
if hdr, err = httpEE.composeHeader(); err != nil {
return
}
}
var body []byte
if body, err = json.Marshal(valMp); err != nil {
return
}
if err = httpEE.pstr.PostValues(body, hdr); err != nil &&
httpEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone {
engine.AddFailedPost(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ExportPath,
httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Type, utils.EventExporterS,
&engine.HTTPPosterRequest{Header: hdr, Body: body},
httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Opts)
}
return
}
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage {
return httpEE.dc.Clone()
hdr http.Header
}
// Compose and cache the header
func (httpEE *HTTPjsonMapEE) composeHeader() (hdr http.Header, err error) {
func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *engine.FilterS) (hdr http.Header, err error) {
hdr = make(http.Header)
if len(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].HeaderFields()) == 0 {
if len(httpEE.Cfg().HeaderFields()) == 0 {
return
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpEE.dc,
utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(),
}, httpEE.cgrCfg.GeneralCfg().DefaultTenant,
httpEE.filterS, oNm)
if err = eeReq.SetFields(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].HeaderFields()); err != nil {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaHdr].GetFirstElement(); el != nil; el = el.Next() {
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaHdr].Field(path) //Safe to ignore error, since the path always exists
path = path[:len(path)-1] // remove the last index
nmIt, _ := exp.Field(path) //Safe to ignore error, since the path always exists
path = path[:len(path)-1] // remove the last index
hdr.Set(strings.Join(path, utils.NestingSep), nmIt.String())
}
return
}
func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg }
func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return }
func (httpEE *HTTPjsonMapEE) ExportEvent(content interface{}, _ string) (err error) {
httpEE.reqs.get()
defer httpEE.reqs.done()
pReq := content.(httpPosterRequest)
var req *http.Request
if req, err = prepareRequest(httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil {
return
}
_, err = sendHTTPReq(httpEE.client, req)
return
}
func (httpEE *HTTPjsonMapEE) Close() (_ error) { return }
func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc }
func (httpEE *HTTPjsonMapEE) PrepareMap(mp map[string]interface{}) (interface{}, error) {
body, err := json.Marshal(mp)
return &httpPosterRequest{
Header: httpEE.hdr,
Body: body,
}, err
}
func (httpEE *HTTPjsonMapEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
valMp := make(map[string]interface{})
for el := mp.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := mp.Field(path)
path = path[:len(path)-1] // remove the last index
valMp[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
body, err := json.Marshal(valMp)
return &httpPosterRequest{
Header: httpEE.hdr,
Body: body,
}, err
}
func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) {
var body io.Reader
if cType == utils.ContentForm {
body = strings.NewReader(content.(url.Values).Encode())
} else {
body = bytes.NewBuffer(content.([]byte))
}
contentType := "application/x-www-form-urlencoded"
if cType == utils.ContentJSON {
contentType = "application/json"
}
hdr.Set("Content-Type", contentType)
if req, err = http.NewRequest(http.MethodPost, addr, body); err != nil {
return
}
req.Header = hdr
return
}
func sendHTTPReq(client *http.Client, req *http.Request) (respBody []byte, err error) {
var resp *http.Response
if resp, err = client.Do(req); err != nil {
return
}
respBody, err = io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return
}
if resp.StatusCode > 299 {
err = fmt.Errorf("unexpected status code received: <%d>", resp.StatusCode)
}
return
}

View File

@@ -28,124 +28,92 @@ import (
"github.com/cgrates/cgrates/utils"
)
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (httpPost *HTTPPost, err error) {
httpPost = &HTTPPost{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) {
httpPost = &HTTPPostEE{
cfg: cfg,
dc: dc,
client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout},
reqs: newConcReq(cfg.ConcurrentRequests),
}
httpPost.httpPoster = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type],
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts)
httpPost.hdr, err = httpPost.composeHeader(cgrCfg, filterS)
return
}
// FileCSVee implements EventExporter interface for .csv files
type HTTPPost struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
dc *utils.SafeMapStorage
reqs *concReq
type HTTPPostEE struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
client *http.Client
reqs *concReq
hdr http.Header
}
// ID returns the identificator of this exporter
func (httpPost *HTTPPost) ID() string {
return httpPost.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpPost.reqs.get()
defer func() {
updateEEMetrics(httpPost.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
httpPost.reqs.done()
}()
httpPost.dc.Lock()
httpPost.dc.MapStorage[utils.NumberOfEvents] = httpPost.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
httpPost.dc.Unlock()
urlVals := url.Values{}
hdr := http.Header{}
if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()) == 0 {
for k, v := range cgrEv.Event {
urlVals.Set(k, utils.IfaceAsString(v))
}
} else {
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: httpPost.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTenant),
httpPost.filterS, oNm)
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path)
path = path[:len(path)-1] // remove the last index
urlVals.Set(strings.Join(path, utils.NestingSep), nmIt.String())
}
if hdr, err = httpPost.composeHeader(); err != nil {
return
}
}
if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil &&
httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone {
engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath,
httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS,
&engine.HTTPPosterRequest{
Header: hdr,
Body: urlVals,
}, httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts)
}
return
}
func (httpPost *HTTPPost) GetMetrics() *utils.SafeMapStorage {
return httpPost.dc.Clone()
type httpPosterRequest struct {
Header http.Header
Body interface{}
}
// Compose and cache the header
func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) {
func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *engine.FilterS) (hdr http.Header, err error) {
hdr = make(http.Header)
if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()) == 0 {
if len(httpPost.Cfg().HeaderFields()) == 0 {
return
}
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaHdr: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: httpPost.dc,
utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(),
}, httpPost.cgrCfg.GeneralCfg().DefaultTenant,
httpPost.filterS, oNm)
if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()); err != nil {
var exp *utils.OrderedNavigableMap
if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaHdr].GetFirstElement(); el != nil; el = el.Next() {
for el := exp.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := eeReq.ExpData[utils.MetaHdr].Field(path)
path = path[:len(path)-1] // remove the last index
nmIt, _ := exp.Field(path) //Safe to ignore error, since the path always exists
path = path[:len(path)-1] // remove the last index
hdr.Set(strings.Join(path, utils.NestingSep), nmIt.String())
}
return
}
func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg }
func (httpPost *HTTPPostEE) Connect() (_ error) { return }
func (httpPost *HTTPPostEE) ExportEvent(content interface{}, _ string) (err error) {
httpPost.reqs.get()
defer httpPost.reqs.done()
pReq := content.(*httpPosterRequest)
var req *http.Request
if req, err = prepareRequest(httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil {
return
}
_, err = sendHTTPReq(httpPost.client, req)
return
}
func (httpPost *HTTPPostEE) Close() (_ error) { return }
func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc }
func (httpPost *HTTPPostEE) PrepareMap(mp map[string]interface{}) (interface{}, error) {
urlVals := url.Values{}
for k, v := range mp {
urlVals.Set(k, utils.IfaceAsString(v))
}
return &httpPosterRequest{
Header: httpPost.hdr,
Body: urlVals,
}, nil
}
func (httpPost *HTTPPostEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
urlVals := url.Values{}
for el := mp.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := mp.Field(path)
path = path[:len(path)-1] // remove the last index
urlVals.Set(strings.Join(path, utils.NestingSep), nmIt.String())
}
return &httpPosterRequest{
Header: httpPost.hdr,
Body: urlVals,
}, nil
}

View File

@@ -33,7 +33,7 @@ import (
)
func TestHttpPostID(t *testing.T) {
httpPost := &HTTPPost{
httpPost := &HTTPPostEE{
id: "3",
}
if rcv := httpPost.ID(); !reflect.DeepEqual(rcv, "3") {
@@ -49,7 +49,7 @@ func TestHttpPostGetMetrics(t *testing.T) {
if err != nil {
t.Error(err)
}
httpPost := &HTTPPost{
httpPost := &HTTPPostEE{
dc: dc,
}
@@ -72,7 +72,7 @@ func TestHttpPostExportEvent(t *testing.T) {
if err != nil {
t.Error(err)
}
httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -117,7 +117,7 @@ func TestHttpPostExportEvent2(t *testing.T) {
}))
defer srv.Close()
cgrCfg.EEsCfg().Exporters[0].ExportPath = srv.URL + "/"
httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -161,7 +161,7 @@ func TestHttpPostExportEvent3(t *testing.T) {
if err != nil {
t.Error(err)
}
httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -208,7 +208,7 @@ func TestHttpPostExportEvent4(t *testing.T) {
if err != nil {
t.Error(err)
}
httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -258,7 +258,7 @@ func TestHttpPostComposeHeader(t *testing.T) {
if err != nil {
t.Error(err)
}
httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc)
httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc)
if err != nil {
t.Error(err)
}
@@ -345,7 +345,7 @@ func TestHttpPostSync(t *testing.T) {
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc)
exp, err := NewHTTPPostEE(cgrCfg, cfgIdx, new(engine.FilterS), dc)
if err != nil {
t.Error(err)
}
@@ -404,7 +404,7 @@ func TestHttpPostSyncLimit(t *testing.T) {
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL
exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc)
exp, err := NewHTTPPostEE(cgrCfg, cfgIdx, new(engine.FilterS), dc)
if err != nil {
t.Error(err)
}

View File

@@ -1,5 +1,5 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
@@ -15,7 +15,8 @@ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
package ees
import (
"crypto/tls"
@@ -25,82 +26,41 @@ import (
"sync"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
)
// NewNatsPoster creates a kafka poster
func NewNatsPoster(dialURL string, attempts int, opts map[string]interface{}, nodeID string, connTimeout time.Duration) (natsPstr *NatsPoster, err error) {
natsPstr = &NatsPoster{
dialURL: dialURL,
subject: utils.DefaultQueueID,
attempts: attempts,
// NewNatsEE creates a kafka poster
func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) {
natsPstr = &NatsEE{
cfg: cfg,
dc: dc,
subject: utils.DefaultQueueID,
reqs: newConcReq(cfg.ConcurrentRequests),
}
err = natsPstr.parseOpt(opts, nodeID, connTimeout)
err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout)
return
}
// NatsPoster is a kafka poster
type NatsPoster struct {
dialURL string
subject string // identifier of the CDR queue where we publish
attempts int
jetStream bool
opts []nats.Option
jsOpts []nats.JSOpt
sync.Mutex // protect writer
// NatsEE is a kafka poster
type NatsEE struct {
subject string // identifier of the CDR queue where we publish
jetStream bool
opts []nats.Option
jsOpts []nats.JSOpt
poster *nats.Conn
posterJS nats.JetStreamContext
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
reqs *concReq
sync.RWMutex // protect writer
bytePreparing
}
// Post is the method being called when we need to post anything in the queue
// the optional chn will permits channel caching
func (pstr *NatsPoster) Post(content []byte, _ string) (err error) {
fib := utils.Fib()
for i := 0; i < pstr.attempts; i++ {
if err = pstr.newPostWriter(); err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<NatsPoster> connecting to nats server, err: %s", err.Error()))
return
}
for i := 0; i < pstr.attempts; i++ {
pstr.Lock()
if pstr.jetStream {
_, err = pstr.posterJS.Publish(pstr.subject, content)
} else {
err = pstr.poster.Publish(pstr.subject, content)
}
pstr.Unlock()
if err == nil {
break
}
if i+1 < pstr.attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
return
}
// Close closes the kafka writer
func (pstr *NatsPoster) Close() {
pstr.Lock()
if pstr.poster != nil {
pstr.poster.Drain()
}
pstr.poster = nil
pstr.Unlock()
}
func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) {
func (pstr *NatsEE) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) {
if useJetStreamVal, has := opts[utils.NatsJetStream]; has {
if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil {
return
@@ -123,11 +83,13 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con
return
}
func (pstr *NatsPoster) newPostWriter() (err error) {
func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg }
func (pstr *NatsEE) Connect() (err error) {
pstr.Lock()
defer pstr.Unlock()
if pstr.poster == nil {
if pstr.poster, err = nats.Connect(pstr.dialURL, pstr.opts...); err != nil {
if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil {
return
}
if pstr.jetStream {
@@ -137,6 +99,31 @@ func (pstr *NatsPoster) newPostWriter() (err error) {
return
}
func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) {
pstr.reqs.get()
pstr.RLock()
if pstr.jetStream {
_, err = pstr.posterJS.Publish(pstr.subject, content.([]byte))
} else {
err = pstr.poster.Publish(pstr.subject, content.([]byte))
}
pstr.RUnlock()
pstr.reqs.done()
return
}
func (pstr *NatsEE) Close() (err error) {
pstr.Lock()
if pstr.poster != nil {
err = pstr.poster.Drain()
pstr.poster = nil
}
pstr.Unlock()
return
}
func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc }
func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) {
nop = make([]nats.Option, 0, 7)
nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID),

View File

@@ -38,9 +38,6 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaAMQPjsonMap:
pstrJSON.poster = engine.NewAMQPPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaAMQPV1jsonMap:
pstrJSON.poster = engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
@@ -53,10 +50,6 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
case utils.MetaS3jsonMap:
pstrJSON.poster = engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts)
case utils.MetaNatsjsonMap:
pstrJSON.poster, err = engine.NewNatsPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts,
cgrCfg.GeneralCfg().NodeID, cgrCfg.GeneralCfg().ConnectTimeout)
}
return
}

View File

@@ -29,63 +29,57 @@ import (
"gorm.io/gorm"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
func NewSQLEe(cfg *config.EventExporterCfg,
dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
cfg: cfg,
dc: dc,
reqs: newConcReq(cfg.ConcurrentRequests),
}
dialect, err := sqlEe.NewSQLEeURL(cgrCfg)
if err != nil {
return
}
sqlEe.db, sqlEe.sqldb, err = openDB(cgrCfg, cfgIdx, dialect)
err = sqlEe.initDialector()
return
}
// SQLEe implements EventExporter interface for SQL
type SQLEe struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
db *gorm.DB
sqldb *sql.DB
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
db *gorm.DB
sqldb *sql.DB
reqs *concReq
dialect gorm.Dialector
tableName string
dc *utils.SafeMapStorage
reqs *concReq
colNames []string
}
func (sqlEe *SQLEe) NewSQLEeURL(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) {
type sqlPosterRequest struct {
Querry string
Values []interface{}
}
func (sqlEe *SQLEe) initDialector() (err error) {
var u *url.URL
// var err error
if u, err = url.Parse(strings.TrimPrefix(cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ExportPath, utils.Meta)); err != nil {
if u, err = url.Parse(strings.TrimPrefix(sqlEe.Cfg().ExportPath, utils.Meta)); err != nil {
return
}
password, _ := u.User.Password()
dbname := utils.SQLDefaultDBName
if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLDBNameOpt]; has {
if vals, has := sqlEe.Cfg().Opts[utils.SQLDBNameOpt]; has {
dbname = utils.IfaceAsString(vals)
}
ssl := utils.SQLDefaultSSLMode
if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SSLModeCfg]; has {
if vals, has := sqlEe.Cfg().Opts[utils.SSLModeCfg]; has {
ssl = utils.IfaceAsString(vals)
}
// tableName is mandatory in opts
if iface, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLTableNameOpt]; !has {
return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt)
if iface, has := sqlEe.Cfg().Opts[utils.SQLTableNameOpt]; !has {
return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt)
} else {
sqlEe.tableName = utils.IfaceAsString(iface)
}
@@ -93,18 +87,17 @@ func (sqlEe *SQLEe) NewSQLEeURL(cgrCfg *config.CGRConfig) (dialect gorm.Dialecto
// var dialect gorm.Dialector
switch u.Scheme {
case utils.MySQL:
dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
sqlEe.dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
u.User.Username(), password, u.Hostname(), u.Port(), dbname))
case utils.Postgres:
dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl))
sqlEe.dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl))
default:
return nil, fmt.Errorf("db type <%s> not supported", u.Scheme)
return fmt.Errorf("db type <%s> not supported", u.Scheme)
}
return
}
func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *gorm.DB, sqlDB *sql.DB, err error) {
func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, sqlDB *sql.DB, err error) {
if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil {
return
}
@@ -112,21 +105,21 @@ func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *g
return
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConnsCfg]; has {
if iface, has := opts[utils.SQLMaxIdleConnsCfg]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, nil, err
}
sqlDB.SetMaxIdleConns(int(val))
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxOpenConns]; has {
if iface, has := opts[utils.SQLMaxOpenConns]; has {
val, err := utils.IfaceAsTInt64(iface)
if err != nil {
return nil, nil, err
}
sqlDB.SetMaxOpenConns(int(val))
}
if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxConnLifetime]; has {
if iface, has := opts[utils.SQLMaxConnLifetime]; has {
val, err := utils.IfaceAsDuration(iface)
if err != nil {
return nil, nil, err
@@ -137,71 +130,56 @@ func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *g
return
}
// ID returns the identificator of this exporter
func (sqlEe *SQLEe) ID() string {
return sqlEe.id
func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg }
func (sqlEe *SQLEe) Connect() (err error) {
if sqlEe.db == nil || sqlEe.sqldb == nil {
sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts)
}
return
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) {
sqlEe.sqldb.Close()
}
// ExportEvent implements EventExporter
func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error {
sqlEe.reqs.get()
defer func() {
updateEEMetrics(sqlEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
sqlEe.reqs.done()
}()
sqlEe.dc.Lock()
sqlEe.dc.MapStorage[utils.NumberOfEvents] = sqlEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
sqlEe.dc.Unlock()
defer sqlEe.reqs.done()
sReq := req.(*sqlPosterRequest)
return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error
}
func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() }
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc }
func (sqlEe *SQLEe) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) {
var vals []interface{}
var colNames []string
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: sqlEe.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: sqlEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, sqlEe.cgrCfg.GeneralCfg().DefaultTenant),
sqlEe.filterS, oNm)
if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil {
return
}
for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() {
nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(el.Value)
for el := mp.GetFirstElement(); el != nil; el = el.Next() {
nmIt, _ := mp.Field(el.Value)
pathWithoutIndex := strings.Join(el.Value[:len(el.Value)-1], utils.NestingSep) // remove the index path.index
if pathWithoutIndex != utils.MetaRow {
colNames = append(colNames, pathWithoutIndex)
}
vals = append(vals, nmIt.Data)
}
sqlValues := make([]string, len(vals))
for i := range vals {
sqlValues[i] = "?"
}
var sqlQuery string
if len(colNames) != len(vals) {
sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ","))
sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ",
sqlEe.tableName,
strings.Join(sqlValues, ","))
} else {
colNamesStr := "(" + strings.Join(colNames, ", ") + ")"
sqlQuery = fmt.Sprintf("INSERT INTO %s %s VALUES (%s); ", sqlEe.tableName, colNamesStr, strings.Join(sqlValues, ","))
sqlQuery = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s); ",
sqlEe.tableName,
strings.Join(colNames, ", "),
strings.Join(sqlValues, ","))
}
sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...)
return
}
func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage {
return sqlEe.dc.Clone()
return &sqlPosterRequest{
Querry: sqlQuery,
Values: vals,
}, nil
}

View File

@@ -20,77 +20,29 @@ package ees
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (vEe *VirtualEe, err error) {
vEe = &VirtualEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (vEe *VirtualEE, err error) {
vEe = &VirtualEE{
cfg: cfg,
dc: dc,
}
err = vEe.init()
return
}
// VirtualEe implements EventExporter interface for .csv files
type VirtualEe struct {
id string
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
dc *utils.SafeMapStorage
reqs *concReq
// VirtualEE implements EventExporter interface for .csv files
type VirtualEE struct {
cfg *config.EventExporterCfg
dc *utils.SafeMapStorage
}
// init will create all the necessary dependencies, including opening the file
func (vEe *VirtualEe) init() (err error) {
return
}
// ID returns the identificator of this exporter
func (vEe *VirtualEe) ID() string {
return vEe.id
}
// OnEvicted implements EventExporter, doing the cleanup before exit
func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {
}
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
vEe.reqs.get()
defer func() {
updateEEMetrics(vEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
vEe.cgrCfg.GeneralCfg().DefaultTimezone))
vEe.reqs.done()
}()
vEe.dc.Lock()
vEe.dc.MapStorage[utils.NumberOfEvents] = vEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1
vEe.dc.Unlock()
oNm := map[string]*utils.OrderedNavigableMap{
utils.MetaExp: utils.NewOrderedNavigableMap(),
}
eeReq := engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: vEe.dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: vEe.cgrCfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTenant),
vEe.filterS, oNm)
if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil {
return
}
return
}
func (vEe *VirtualEe) GetMetrics() *utils.SafeMapStorage {
return vEe.dc.Clone()
func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg }
func (vEe *VirtualEE) Connect() error { return nil }
func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil }
func (vEe *VirtualEE) Close() error { return nil }
func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc }
func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil }
func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) {
return nil, nil
}

View File

@@ -28,7 +28,7 @@ import (
)
func TestVirtualEeID(t *testing.T) {
vEe := &VirtualEe{
vEe := &VirtualEE{
id: "3",
}
if rcv := vEe.ID(); !reflect.DeepEqual(rcv, "3") {
@@ -44,7 +44,7 @@ func TestVirtualEeGetMetrics(t *testing.T) {
if err != nil {
t.Error(err)
}
vEe := &VirtualEe{
vEe := &VirtualEE{
dc: dc,
}
@@ -65,7 +65,7 @@ func TestVirtualEeExportEvent(t *testing.T) {
if err != nil {
t.Error(err)
}
vEe := &VirtualEe{
vEe := &VirtualEE{
id: "string",
cgrCfg: cgrCfg,
cfgIdx: 0,

View File

@@ -59,6 +59,11 @@ func SetHTTPPstrTransport(pstrTransport *http.Transport) {
httpPstrTransport = pstrTransport
}
// GetHTTPPstrTransport gets the http transport to be used by the HTTP Poster
func GetHTTPPstrTransport() *http.Transport {
return httpPstrTransport
}
// NewHTTPTransport will create a new transport for HTTP client
func NewHTTPTransport(opts map[string]interface{}) (trsp *http.Transport, err error) {
trsp = &http.Transport{

View File

@@ -50,15 +50,15 @@ func writeFailedPosts(itmID string, value interface{}) {
if !canConvert {
return
}
filePath := path.Join(config.CgrConfig().GeneralCfg().FailedPostsDir, expEv.FileName())
filePath := expEv.FilePath()
if err := expEv.WriteToFile(filePath); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write file <%s> because <%s>",
utils.CDRs, filePath, err))
}
}
func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) {
key := utils.ConcatenatedKey(expPath, format, module)
func AddFailedPost(failedPostsDir, expPath, format, module string, ev interface{}, opts map[string]interface{}) {
key := utils.ConcatenatedKey(failedPostsDir, expPath, format, module)
// also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id
if qID := utils.FirstNonEmpty(
utils.IfaceAsString(opts[utils.AMQPQueueID]),
@@ -75,10 +75,11 @@ func AddFailedPost(expPath, format, module string, ev interface{}, opts map[stri
}
if failedPost == nil {
failedPost = &ExportEvents{
Path: expPath,
Format: format,
Opts: opts,
module: module,
Path: expPath,
Format: format,
Opts: opts,
module: module,
failedPostsDir: failedPostsDir,
}
}
failedPost.AddEvent(ev)
@@ -107,17 +108,18 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) {
// ExportEvents used to save the failed post to file
type ExportEvents struct {
lk sync.RWMutex
Path string
Opts map[string]interface{}
Format string
Events []interface{}
module string
lk sync.RWMutex
Path string
Opts map[string]interface{}
Format string
Events []interface{}
failedPostsDir string
module string
}
// FileName returns the file name it should use for saving the failed events
func (expEv *ExportEvents) FileName() string {
return expEv.module + utils.PipeSep + utils.UUIDSha1Prefix() + utils.GOBSuffix
// FilePath returns the file path it should use for saving the failed events
func (expEv *ExportEvents) FilePath() string {
return path.Join(expEv.failedPostsDir, expEv.module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix)
}
// SetModule sets the module for this event

View File

@@ -37,7 +37,7 @@ func TestSetFldPostCacheTTL(t *testing.T) {
func TestAddFldPost(t *testing.T) {
SetFailedPostCacheTTL(5 * time.Second)
AddFailedPost("path1", "format1", "module1", "1", make(map[string]interface{}))
AddFailedPost("", "path1", "format1", "module1", "1", make(map[string]interface{}))
x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
@@ -60,8 +60,8 @@ func TestAddFldPost(t *testing.T) {
if !reflect.DeepEqual(eOut, failedPost) {
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost))
}
AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{}))
AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"})
AddFailedPost("", "path1", "format1", "module1", "2", make(map[string]interface{}))
AddFailedPost("", "path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"})
x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1"))
if !ok {
t.Error("Error reading from cache")
@@ -106,9 +106,9 @@ func TestAddFldPost(t *testing.T) {
}
}
func TestFileName(t *testing.T) {
func TestFilePath(t *testing.T) {
exportEvent := &ExportEvents{}
rcv := exportEvent.FileName()
rcv := exportEvent.FilePath()
if rcv[0] != '|' {
t.Errorf("Expecting: '|', received: %+v", rcv[0])
} else if rcv[8:] != ".gob" {
@@ -117,7 +117,7 @@ func TestFileName(t *testing.T) {
exportEvent = &ExportEvents{
module: "module",
}
rcv = exportEvent.FileName()
rcv = exportEvent.FilePath()
if rcv[:7] != "module|" {
t.Errorf("Expecting: 'module|', received: %+v", rcv[:7])
} else if rcv[14:] != ".gob" {

View File

@@ -35,24 +35,6 @@ type HTTPPosterRequest struct {
Body interface{}
}
// HTTPPostJSON posts without automatic failover
func HTTPPostJSON(url string, content []byte) (respBody []byte, err error) {
client := &http.Client{Transport: httpPstrTransport}
var resp *http.Response
if resp, err = client.Post(url, "application/json", bytes.NewBuffer(content)); err != nil {
return
}
respBody, err = io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return
}
if resp.StatusCode > 299 {
err = fmt.Errorf("Unexpected status code received: %d", resp.StatusCode)
}
return
}
// NewHTTPPoster return a new HTTP poster
func NewHTTPPoster(replyTimeout time.Duration, addr, contentType string,
attempts int) (httposter *HTTPPoster) {

View File

@@ -1,44 +0,0 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"encoding/json"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
// Sample HttpJsonPost, more for usage purposes
func TestHttpJsonPost(t *testing.T) {
cdrOut := &ExternalCDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), OrderID: 123,
ToR: utils.MetaVoice, OriginID: "dsafdsaf", OriginHost: "192.168.1.1",
Source: utils.UnitTest, RequestType: utils.MetaRated, Tenant: "cgrates.org",
Category: "call", Account: "account1", Subject: "tgooiscs0014", Destination: "1002",
SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String(), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String(),
RunID: utils.MetaDefault,
Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01,
}
jsn, _ := json.Marshal(cdrOut)
if _, err := HTTPPostJSON("http://localhost:8000", jsn); err == nil {
t.Error(err)
}
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/streadway/amqp"
@@ -79,7 +80,7 @@ type AMQPER struct {
conn *amqp.Connection
channel *amqp.Channel
poster engine.Poster
poster *ees.AMQPee
}
// Config returns the curent configuration
@@ -166,7 +167,7 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) {
utils.ERs, msg.MessageId, err.Error()))
}
if rdr.poster != nil { // post it
if err := rdr.poster.Post(msg.Body, utils.EmptyString); err != nil {
if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, msg.MessageId, err.Error()))
@@ -254,6 +255,9 @@ func (rdr *AMQPER) createPoster() {
len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster = engine.NewAMQPPoster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt)
rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{
ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
Opts: processedOpt,
}, nil)
}

View File

@@ -26,6 +26,7 @@ import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
@@ -79,7 +80,7 @@ type NatsER struct {
opts []nats.Option
jsOpts []nats.JSOpt
poster *engine.NatsPoster
poster *ees.NatsEE
}
// Config returns the curent configuration
@@ -134,7 +135,7 @@ func (rdr *NatsER) Serve() (err error) {
utils.ERs, string(msg.Data), err.Error()))
}
if rdr.poster != nil { // post it
if err := rdr.poster.Post(msg.Data, utils.EmptyString); err != nil {
if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> writing message %s error: %s",
utils.ERs, string(msg.Data), err.Error()))
@@ -186,11 +187,13 @@ func (rdr *NatsER) createPoster() (err error) {
len(rdr.Config().ProcessedPath) == 0 {
return
}
rdr.poster, err = engine.NewNatsPoster(utils.FirstNonEmpty(
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
rdr.cgrCfg.GeneralCfg().PosterAttempts,
processedOpt, rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{
ExportPath: utils.FirstNonEmpty(
rdr.Config().ProcessedPath, rdr.Config().SourcePath),
Opts: processedOpt,
Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts,
}, rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil)
return
}
@@ -214,7 +217,7 @@ func (rdr *NatsER) processOpts() (err error) {
rdr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)}
}
}
rdr.opts, err = engine.GetNatsOpts(rdr.Config().Opts,
rdr.opts, err = ees.GetNatsOpts(rdr.Config().Opts,
rdr.cgrCfg.GeneralCfg().NodeID,
rdr.cgrCfg.GeneralCfg().ConnectTimeout)
return

View File

@@ -31,6 +31,7 @@ import (
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/ees"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/nats-io/nats.go"
@@ -78,7 +79,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) {
t.Fatal(err)
}
nop, err := engine.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
if err != nil {
t.Fatal(err)
}
@@ -165,7 +166,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) {
t.Fatal(err)
}
nop, err := engine.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second)
if err != nil {
t.Fatal(err)
}

View File

@@ -68,7 +68,7 @@ type EventExporterService struct {
// ServiceName returns the service name
func (es *EventExporterService) ServiceName() string {
return utils.EventExporterS
return utils.EEs
}
// ShouldRun returns if the service should be running
@@ -109,7 +109,7 @@ func (es *EventExporterService) Start() (err error) {
fltrS := <-es.filterSChan
es.filterSChan <- fltrS
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS))
utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs))
es.Lock()
defer es.Unlock()

View File

@@ -61,8 +61,8 @@ func TestEventExporterSCoverage(t *testing.T) {
t.Errorf("Expected service to be running")
}
serviceName := srv2.ServiceName()
if serviceName != utils.EventExporterS {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.EventExporterS, serviceName)
if serviceName != utils.EEs {
t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.EEs, serviceName)
}
shouldRun := srv2.ShouldRun()
if shouldRun != false {

View File

@@ -731,7 +731,6 @@ const (
Count = "Count"
ProfileID = "ProfileID"
SortedRoutes = "SortedRoutes"
EventExporterS = "EventExporterS"
MetaMonthly = "*monthly"
MetaYearly = "*yearly"
MetaDaily = "*daily"

View File

@@ -75,3 +75,9 @@ func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) {
defer ms.RUnlock()
return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()}
}
func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) {
ms.RLock()
defer ms.RUnlock()
return ms.MapStorage.Clone()
}