Added concurent requests for ees

This commit is contained in:
Trial97
2021-07-21 17:07:57 +03:00
committed by Dan Christian Bogos
parent f1796317a0
commit 24b09329be
20 changed files with 215 additions and 97 deletions

View File

@@ -491,6 +491,7 @@ const CGRATES_CFG_JSON = `
"id": "*default", // identifier of the EventReader profile
"type": "*none", // exporter type
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
"concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited
"opts": {
// CSV

View File

@@ -1879,18 +1879,19 @@ func TestDfEventExporterCfg(t *testing.T) {
},
Exporters: &[]*EventExporterJsonCfg{
{
Id: utils.StringPointer(utils.MetaDefault),
Type: utils.StringPointer(utils.MetaNone),
Export_path: utils.StringPointer("/var/spool/cgrates/ees"),
Attribute_context: utils.StringPointer(utils.EmptyString),
Timezone: utils.StringPointer(utils.EmptyString),
Filters: &[]string{},
Attribute_ids: &[]string{},
Flags: &[]string{},
Synchronous: utils.BoolPointer(false),
Attempts: utils.IntPointer(1),
Fields: &[]*FcTemplateJsonCfg{},
Opts: make(map[string]interface{}),
Id: utils.StringPointer(utils.MetaDefault),
Type: utils.StringPointer(utils.MetaNone),
Export_path: utils.StringPointer("/var/spool/cgrates/ees"),
Attribute_context: utils.StringPointer(utils.EmptyString),
Timezone: utils.StringPointer(utils.EmptyString),
Filters: &[]string{},
Attribute_ids: &[]string{},
Flags: &[]string{},
Synchronous: utils.BoolPointer(false),
Attempts: utils.IntPointer(1),
Fields: &[]*FcTemplateJsonCfg{},
Opts: make(map[string]interface{}),
Concurrent_requests: utils.IntPointer(0),
},
},
}

File diff suppressed because one or more lines are too long

View File

@@ -153,21 +153,22 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]interf
// EventExporterCfg the config for a Event Exporter
type EventExporterCfg struct {
ID string
Type string
ExportPath string
Opts map[string]interface{}
Timezone string
Filters []string
Flags utils.FlagsWithParams
AttributeSIDs []string // selective AttributeS profiles
AttributeSCtx string // context to use when querying AttributeS
Synchronous bool
Attempts int
Fields []*FCTemplate
headerFields []*FCTemplate
contentFields []*FCTemplate
trailerFields []*FCTemplate
ID string
Type string
ExportPath string
Opts map[string]interface{}
Timezone string
Filters []string
Flags utils.FlagsWithParams
AttributeSIDs []string // selective AttributeS profiles
AttributeSCtx string // context to use when querying AttributeS
Synchronous bool
Attempts int
ConcurrentRequests int
Fields []*FCTemplate
headerFields []*FCTemplate
contentFields []*FCTemplate
trailerFields []*FCTemplate
}
func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTemplates map[string][]*FCTemplate, separator string) (err error) {
@@ -210,6 +211,9 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe
if jsnEec.Attempts != nil {
eeC.Attempts = *jsnEec.Attempts
}
if jsnEec.Concurrent_requests != nil {
eeC.ConcurrentRequests = *jsnEec.Concurrent_requests
}
if jsnEec.Fields != nil {
eeC.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnEec.Fields, separator)
if err != nil {
@@ -266,19 +270,20 @@ func (eeC *EventExporterCfg) TrailerFields() []*FCTemplate {
// Clone returns a deep copy of EventExporterCfg
func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) {
cln = &EventExporterCfg{
ID: eeC.ID,
Type: eeC.Type,
ExportPath: eeC.ExportPath,
Timezone: eeC.Timezone,
Flags: eeC.Flags.Clone(),
AttributeSCtx: eeC.AttributeSCtx,
Synchronous: eeC.Synchronous,
Attempts: eeC.Attempts,
Fields: make([]*FCTemplate, len(eeC.Fields)),
headerFields: make([]*FCTemplate, len(eeC.headerFields)),
contentFields: make([]*FCTemplate, len(eeC.contentFields)),
trailerFields: make([]*FCTemplate, len(eeC.trailerFields)),
Opts: make(map[string]interface{}),
ID: eeC.ID,
Type: eeC.Type,
ExportPath: eeC.ExportPath,
Timezone: eeC.Timezone,
Flags: eeC.Flags.Clone(),
AttributeSCtx: eeC.AttributeSCtx,
Synchronous: eeC.Synchronous,
Attempts: eeC.Attempts,
ConcurrentRequests: eeC.ConcurrentRequests,
Fields: make([]*FCTemplate, len(eeC.Fields)),
headerFields: make([]*FCTemplate, len(eeC.headerFields)),
contentFields: make([]*FCTemplate, len(eeC.contentFields)),
trailerFields: make([]*FCTemplate, len(eeC.trailerFields)),
Opts: make(map[string]interface{}),
}
if eeC.Filters != nil {
@@ -319,16 +324,17 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str
flgs = []string{}
}
initialMP = map[string]interface{}{
utils.IDCfg: eeC.ID,
utils.TypeCfg: eeC.Type,
utils.ExportPathCfg: eeC.ExportPath,
utils.TimezoneCfg: eeC.Timezone,
utils.FiltersCfg: eeC.Filters,
utils.FlagsCfg: flgs,
utils.AttributeContextCfg: eeC.AttributeSCtx,
utils.AttributeIDsCfg: eeC.AttributeSIDs,
utils.SynchronousCfg: eeC.Synchronous,
utils.AttemptsCfg: eeC.Attempts,
utils.IDCfg: eeC.ID,
utils.TypeCfg: eeC.Type,
utils.ExportPathCfg: eeC.ExportPath,
utils.TimezoneCfg: eeC.Timezone,
utils.FiltersCfg: eeC.Filters,
utils.FlagsCfg: flgs,
utils.AttributeContextCfg: eeC.AttributeSCtx,
utils.AttributeIDsCfg: eeC.AttributeSIDs,
utils.SynchronousCfg: eeC.Synchronous,
utils.AttemptsCfg: eeC.Attempts,
utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests,
}
opts := make(map[string]interface{})
for k, v := range eeC.Opts {

View File

@@ -634,13 +634,14 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
utils.OptsCfg: map[string]interface{}{
utils.KafkaGroupID: "test",
},
utils.TimezoneCfg: "UTC",
utils.FiltersCfg: []string{},
utils.FlagsCfg: []string{"randomFlag"},
utils.AttributeIDsCfg: []string{},
utils.AttributeContextCfg: utils.EmptyString,
utils.SynchronousCfg: false,
utils.AttemptsCfg: 1,
utils.TimezoneCfg: "UTC",
utils.FiltersCfg: []string{},
utils.FlagsCfg: []string{"randomFlag"},
utils.AttributeIDsCfg: []string{},
utils.AttributeContextCfg: utils.EmptyString,
utils.SynchronousCfg: false,
utils.AttemptsCfg: 1,
utils.ConcurrentRequestsCfg: 0,
utils.FieldsCfg: []map[string]interface{}{
{
utils.TagCfg: utils.CGRID,
@@ -667,7 +668,7 @@ func TestEEsCfgAsMapInterface(t *testing.T) {
eMap[utils.ExportersCfg].([]map[string]interface{})[0][utils.FieldsCfg] = nil
if !reflect.DeepEqual(rcv[utils.ExportersCfg].([]map[string]interface{})[1],
eMap[utils.ExportersCfg].([]map[string]interface{})[0]) {
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap[utils.ExportersCfg].([]map[string]interface{})[1]),
t.Errorf("Expected %+v \n, received %+v", utils.ToJSON(eMap[utils.ExportersCfg].([]map[string]interface{})[0]),
utils.ToJSON(rcv[utils.ExportersCfg].([]map[string]interface{})[0]))
}
rcv[utils.ExportersCfg] = nil

View File

@@ -189,18 +189,19 @@ type EEsJsonCfg struct {
// EventExporterJsonCfg is the configuration of a single EventExporter
type EventExporterJsonCfg struct {
Id *string
Type *string
Export_path *string
Opts map[string]interface{}
Timezone *string
Filters *[]string
Flags *[]string
Attribute_ids *[]string
Attribute_context *string
Synchronous *bool
Attempts *int
Fields *[]*FcTemplateJsonCfg
Id *string
Type *string
Export_path *string
Opts map[string]interface{}
Timezone *string
Filters *[]string
Flags *[]string
Attribute_ids *[]string
Attribute_context *string
Synchronous *bool
Attempts *int
Concurrent_requests *int
Fields *[]*FcTemplateJsonCfg
}
// SessionSJsonCfg config section

View File

@@ -64,3 +64,30 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type)
}
}
func newConcReq(limit int) (c *concReq) {
c = &concReq{limit: limit}
if limit > 0 {
c.reqs = make(chan struct{}, limit)
for i := 0; i < limit; i++ {
c.reqs <- struct{}{}
}
}
return
}
type concReq struct {
reqs chan struct{}
limit int
}
func (c *concReq) get() {
if c.limit > 0 {
<-c.reqs
}
}
func (c *concReq) done() {
if c.limit > 0 {
c.reqs <- struct{}{}
}
}

View File

@@ -31,6 +31,7 @@ import (
func TestNewEventExporter(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileCSV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
errExpect := "open /var/spool/cgrates/ees/*default_"
@@ -63,6 +64,7 @@ func TestNewEventExporter(t *testing.T) {
func TestNewEventExporterCase2(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaFileFWV
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
errExpect := "open /var/spool/cgrates/ees/*default_"
@@ -92,6 +94,7 @@ func TestNewEventExporterCase2(t *testing.T) {
func TestNewEventExporterCase3(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPPost
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
if err != nil {
@@ -116,6 +119,7 @@ func TestNewEventExporterCase3(t *testing.T) {
func TestNewEventExporterCase4(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaHTTPjsonMap
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
if err != nil {
@@ -140,6 +144,7 @@ func TestNewEventExporterCase4(t *testing.T) {
func TestNewEventExporterCase5(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaAMQPjsonMap
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
if err != nil {
@@ -164,6 +169,7 @@ func TestNewEventExporterCase5(t *testing.T) {
func TestNewEventExporterCase6(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaVirt
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
if err != nil {
@@ -191,6 +197,7 @@ func TestNewEventExporterCase6(t *testing.T) {
func TestNewEventExporterDefaultCase(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaNone
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg, 0, filterS)
errExpect := fmt.Sprintf("unsupported exporter type: <%s>", utils.MetaNone)
@@ -203,6 +210,7 @@ func TestNewEventExporterDefaultCase(t *testing.T) {
func TestNewEventExporterCase7(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaElastic
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
cgrCfg.EEsCfg().Exporters[0].ExportPath = "/invalid/path"
filterS := engine.NewFilterS(cgrCfg, nil, nil)
ee, err := NewEventExporter(cgrCfg, 0, filterS)
@@ -233,6 +241,7 @@ func TestNewEventExporterCase7(t *testing.T) {
func TestNewEventExporterCase8(t *testing.T) {
cgrCfg := config.NewDefaultCGRConfig()
cgrCfg.EEsCfg().Exporters[0].Type = utils.MetaSQL
cgrCfg.EEsCfg().Exporters[0].ConcurrentRequests = 0
filterS := engine.NewFilterS(cgrCfg, nil, nil)
_, err := NewEventExporter(cgrCfg, 0, filterS)
errExpect := "MANDATORY_IE_MISSING: [sqlTableName]"

View File

@@ -34,8 +34,14 @@ import (
func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (eEe *ElasticEe, err error) {
eEe = &ElasticEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
eEe = &ElasticEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = eEe.init()
return
}
@@ -49,6 +55,7 @@ type ElasticEe struct {
filterS *engine.FilterS
dc *utils.SafeMapStorage
opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap
reqs *concReq
}
// init will create all the necessary dependencies, including opening the file
@@ -120,9 +127,11 @@ func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {}
// ExportEvent implements EventExporter
func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
eEe.reqs.get()
defer func() {
updateEEMetrics(eEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone,
eEe.cgrCfg.GeneralCfg().DefaultTimezone))
eEe.reqs.done()
}()
eEe.dc.Lock()
eEe.dc.MapStorage[utils.NumberOfEvents] = eEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -33,8 +33,14 @@ import (
func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) {
fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
fCsv = &FileCSVee{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = fCsv.init()
return
}
@@ -48,6 +54,7 @@ type FileCSVee struct {
file io.WriteCloser
csvWriter *csv.Writer
dc *utils.SafeMapStorage
reqs *concReq
}
// init will create all the necessary dependencies, including opening the file
@@ -90,9 +97,11 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fCsv.reqs.get()
defer func() {
updateEEMetrics(fCsv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone,
fCsv.cgrCfg.GeneralCfg().DefaultTimezone))
fCsv.reqs.done()
}()
fCsv.dc.Lock()
fCsv.dc.MapStorage[utils.NumberOfEvents] = fCsv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -213,6 +213,7 @@ func TestFileCsvExportEvent(t *testing.T) {
file: nopCloser{byteBuff},
csvWriter: csvNW,
dc: dc,
reqs: newConcReq(0),
}
cgrEv.Event = map[string]interface{}{
"test1": "value",

View File

@@ -30,8 +30,14 @@ import (
)
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) {
fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
fFwv = &FileFWVee{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = fFwv.init()
return
}
@@ -44,6 +50,7 @@ type FileFWVee struct {
filterS *engine.FilterS
file io.WriteCloser
dc *utils.SafeMapStorage
reqs *concReq
}
// init will create all the necessary dependencies, including opening the file
@@ -80,9 +87,11 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) {
fFwv.reqs.get()
defer func() {
updateEEMetrics(fFwv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone,
fFwv.cgrCfg.GeneralCfg().DefaultTimezone))
fFwv.reqs.done()
}()
fFwv.dc.Lock()
fFwv.dc.MapStorage[utils.NumberOfEvents] = fFwv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -213,6 +213,7 @@ func TestFileFwvExportEvent(t *testing.T) {
filterS: filterS,
file: nopCloser{byteBuff},
dc: dc,
reqs: newConcReq(0),
}
cgrEv.Event = map[string]interface{}{
"test1": "value",
@@ -301,6 +302,7 @@ func TestFileFwvExportEventWriteError(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: dc,
reqs: newConcReq(0),
}
cgrEv.Event = map[string]interface{}{
"test1": "value",
@@ -328,6 +330,7 @@ func TestFileFwvComposeHeaderWriteError(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
reqs: newConcReq(0),
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -361,6 +364,7 @@ func TestFileFwvComposeTrailerWriteError(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
reqs: newConcReq(0),
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -393,6 +397,7 @@ func TestFileFwvOnEvictedTrailer(t *testing.T) {
filterS: filterS,
file: nopCloserWrite{byteBuff},
dc: &utils.SafeMapStorage{},
reqs: newConcReq(0),
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{
@@ -432,6 +437,7 @@ func TestFileFwvOnEvictedClose(t *testing.T) {
filterS: filterS,
file: nopCloserError{byteBuff},
dc: &utils.SafeMapStorage{},
reqs: newConcReq(0),
}
cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Fields = []*config.FCTemplate{
{

View File

@@ -36,6 +36,7 @@ func NewHTTPjsonMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
pstrJSON.pstr, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
@@ -53,6 +54,7 @@ type HTTPjsonMapEE struct {
filterS *engine.FilterS
pstr *engine.HTTPPoster
dc *utils.SafeMapStorage
reqs *concReq
}
// ID returns the identificator of this exporter
@@ -65,9 +67,11 @@ func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {}
// ExportEvent implements EventExporter
func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpEE.reqs.get()
defer func() {
updateEEMetrics(httpEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone,
httpEE.cgrCfg.GeneralCfg().DefaultTimezone))
httpEE.reqs.done()
}()
httpEE.dc.Lock()
httpEE.dc.MapStorage[utils.NumberOfEvents] = httpEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -30,8 +30,14 @@ import (
func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (httpPost *HTTPPost, err error) {
httpPost = &HTTPPost{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
httpPost = &HTTPPost{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout,
cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath,
utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type],
@@ -47,6 +53,7 @@ type HTTPPost struct {
filterS *engine.FilterS
httpPoster *engine.HTTPPoster
dc *utils.SafeMapStorage
reqs *concReq
}
// ID returns the identificator of this exporter
@@ -59,9 +66,11 @@ func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {}
// ExportEvent implements EventExporter
func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) {
httpPost.reqs.get()
defer func() {
updateEEMetrics(httpPost.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone,
httpPost.cgrCfg.GeneralCfg().DefaultTimezone))
httpPost.reqs.done()
}()
httpPost.dc.Lock()
httpPost.dc.MapStorage[utils.NumberOfEvents] = httpPost.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -35,6 +35,7 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type {
case utils.MetaAMQPjsonMap:
@@ -68,6 +69,7 @@ type PosterJSONMapEE struct {
filterS *engine.FilterS
poster engine.Poster
dc *utils.SafeMapStorage
reqs *concReq
}
// ID returns the identificator of this exporter
@@ -82,9 +84,11 @@ func (pstrEE *PosterJSONMapEE) OnEvicted(string, interface{}) {
// ExportEvent implements EventExporter
func (pstrEE *PosterJSONMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) {
pstrEE.reqs.get()
defer func() {
updateEEMetrics(pstrEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(pstrEE.cgrCfg.EEsCfg().Exporters[pstrEE.cfgIdx].Timezone,
pstrEE.cgrCfg.GeneralCfg().DefaultTimezone))
pstrEE.reqs.done()
}()
pstrEE.dc.Lock()
pstrEE.dc.MapStorage[utils.NumberOfEvents] = pstrEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -229,6 +229,7 @@ func TestPosterJsonMapExportEvent1(t *testing.T) {
filterS: filterS,
dc: dc,
poster: tstPstr,
reqs: newConcReq(0),
}
// pstrEE.poster = tstPstr
cgrEv.Event = map[string]interface{}{

View File

@@ -35,8 +35,14 @@ import (
func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) {
sqlEe = &SQLEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
sqlEe = &SQLEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
dialect, err := sqlEe.NewSQLEeUrl(cgrCfg)
if err != nil {
@@ -57,7 +63,8 @@ type SQLEe struct {
tableName string
dc *utils.SafeMapStorage
dc *utils.SafeMapStorage
reqs *concReq
}
func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) {
@@ -142,9 +149,11 @@ func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) {
// ExportEvent implements EventExporter
func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
sqlEe.reqs.get()
defer func() {
updateEEMetrics(sqlEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone,
sqlEe.cgrCfg.GeneralCfg().DefaultTimezone))
sqlEe.reqs.done()
}()
sqlEe.dc.Lock()
sqlEe.dc.MapStorage[utils.NumberOfEvents] = sqlEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -26,8 +26,14 @@ import (
func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS,
dc *utils.SafeMapStorage) (vEe *VirtualEe, err error) {
vEe = &VirtualEe{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
vEe = &VirtualEe{
id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg,
cfgIdx: cfgIdx,
filterS: filterS,
dc: dc,
reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests),
}
err = vEe.init()
return
}
@@ -39,6 +45,7 @@ type VirtualEe struct {
cfgIdx int // index of config instance within ERsCfg.Readers
filterS *engine.FilterS
dc *utils.SafeMapStorage
reqs *concReq
}
// init will create all the necessary dependencies, including opening the file
@@ -56,9 +63,11 @@ func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {}
// ExportEvent implements EventExporter
func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) {
vEe.reqs.get()
defer func() {
updateEEMetrics(vEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone,
vEe.cgrCfg.GeneralCfg().DefaultTimezone))
vEe.reqs.done()
}()
vEe.dc.Lock()
vEe.dc.MapStorage[utils.NumberOfEvents] = vEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1

View File

@@ -71,6 +71,7 @@ func TestVirtualEeExportEvent(t *testing.T) {
cfgIdx: 0,
filterS: filterS,
dc: dc,
reqs: newConcReq(0),
}
cgrEv.Event = map[string]interface{}{
"test1": "value",