diff --git a/apier/v1/apier.go b/apier/v1/apier.go index 0d0c8381e..a1e7771f2 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1912,7 +1912,7 @@ func (apierSv1 *APIerSv1) ExportCDRs(args *utils.ArgExportCDRs, reply *map[strin if err := apierSv1.ConnMgr.Call(apierSv1.Config.ApierCfg().EEsConns, nil, utils.EeSv1ProcessEvent, argCdr, &rplyCdr); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> error: <%s> processing event: <%s> with <%s>", - utils.ApierS, err.Error(), utils.ToJSON(cdr.AsCGREvent()), utils.EventExporterS)) + utils.ApierS, err.Error(), utils.ToJSON(cdr.AsCGREvent()), utils.EEs)) withErros = true } } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index df015b4e4..b0dc9caf3 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -501,7 +501,7 @@ func main() { utils.RegistrarC: new(sync.WaitGroup), utils.DispatcherS: new(sync.WaitGroup), utils.DNSAgent: new(sync.WaitGroup), - utils.EventExporterS: new(sync.WaitGroup), + utils.EEs: new(sync.WaitGroup), utils.ERs: new(sync.WaitGroup), utils.FreeSWITCHAgent: new(sync.WaitGroup), utils.GlobalVarS: new(sync.WaitGroup), diff --git a/config/eescfg.go b/config/eescfg.go index ceb6e8548..870c92cbc 100644 --- a/config/eescfg.go +++ b/config/eescfg.go @@ -164,6 +164,7 @@ type EventExporterCfg struct { AttributeSCtx string // context to use when querying AttributeS Synchronous bool Attempts int + FailedPostsDir string ConcurrentRequests int Fields []*FCTemplate headerFields []*FCTemplate @@ -231,6 +232,9 @@ func (eeC *EventExporterCfg) loadFromJSONCfg(jsnEec *EventExporterJsonCfg, msgTe eeC.Opts[k] = v } } + if jsnEec.Failed_posts_dir != nil { + eeC.FailedPostsDir = *jsnEec.Failed_posts_dir + } return } @@ -284,6 +288,7 @@ func (eeC EventExporterCfg) Clone() (cln *EventExporterCfg) { contentFields: make([]*FCTemplate, len(eeC.contentFields)), trailerFields: make([]*FCTemplate, len(eeC.trailerFields)), Opts: make(map[string]interface{}), + FailedPostsDir: eeC.FailedPostsDir, } if eeC.Filters != nil { @@ -335,6 +340,7 @@ func (eeC *EventExporterCfg) AsMapInterface(separator string) (initialMP map[str utils.SynchronousCfg: eeC.Synchronous, utils.AttemptsCfg: eeC.Attempts, utils.ConcurrentRequestsCfg: eeC.ConcurrentRequests, + utils.FailedPostsDirCfg: eeC.FailedPostsDir, } opts := make(map[string]interface{}) for k, v := range eeC.Opts { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 86ff44305..98735f6f6 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -200,6 +200,7 @@ type EventExporterJsonCfg struct { Attribute_context *string Synchronous *bool Attempts *int + Failed_posts_dir *string Concurrent_requests *int Fields *[]*FcTemplateJsonCfg } diff --git a/engine/pstr_amqp.go b/ees/amqp.go similarity index 52% rename from engine/pstr_amqp.go rename to ees/amqp.go index 1020b057c..22c974406 100644 --- a/engine/pstr_amqp.go +++ b/ees/amqp.go @@ -16,41 +16,46 @@ You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine +package ees import ( "fmt" "sync" - "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" ) -// NewAMQPPoster creates a new amqp poster +// NewAMQPee creates a new amqp poster // "amqp://guest:guest@localhost:5672/?queueID=cgrates_cdrs" -func NewAMQPPoster(dialURL string, attempts int, opts map[string]interface{}) *AMQPPoster { - amqp := &AMQPPoster{ - attempts: attempts, - dialURL: dialURL, +func NewAMQPee(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) *AMQPee { + amqp := &AMQPee{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - amqp.parseOpts(opts) + amqp.parseOpts(cfg.Opts) return amqp } -// AMQPPoster used to post cdrs to amqp -type AMQPPoster struct { - dialURL string +// AMQPee used to post cdrs to amqp +type AMQPee struct { queueID string // identifier of the CDR queue where we publish exchange string exchangeType string routingKey string - attempts int - sync.Mutex // protect connection conn *amqp.Connection + postChan *amqp.Channel + + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq + sync.RWMutex // protect connection + bytePreparing } -func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) { +func (pstr *AMQPee) parseOpts(dialURL map[string]interface{}) { pstr.queueID = utils.DefaultQueueID pstr.routingKey = utils.DefaultQueueID if vals, has := dialURL[utils.AMQPQueueID]; has { @@ -68,85 +73,32 @@ func (pstr *AMQPPoster) parseOpts(dialURL map[string]interface{}) { } } -// Post is the method being called when we need to post anything in the queue -// the optional chn will permits channel caching -func (pstr *AMQPPoster) Post(content []byte, _ string) (err error) { - var chn *amqp.Channel - fib := utils.Fib() +func (pstr *AMQPee) Cfg() *config.EventExporterCfg { return pstr.cfg } - for i := 0; i < pstr.attempts; i++ { - if chn, err = pstr.newPostChannel(); err == nil { - break - } - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - } - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" creating new post channel, err: %s", err.Error())) - return - } - for i := 0; i < pstr.attempts; i++ { - if err = chn.Publish( - pstr.exchange, // exchange - pstr.routingKey, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: utils.ContentJSON, - Body: content, - }); err == nil { - break - } - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - } - if err != nil { - return - } - if chn != nil { - chn.Close() - } - return -} - -// Close closes the connections -func (pstr *AMQPPoster) Close() { - pstr.Lock() - if pstr.conn != nil { - pstr.conn.Close() - } - pstr.conn = nil - pstr.Unlock() -} - -func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { +func (pstr *AMQPee) Connect() (err error) { pstr.Lock() + defer pstr.Unlock() if pstr.conn == nil { - var conn *amqp.Connection - conn, err = amqp.Dial(pstr.dialURL) - if err == nil { - pstr.conn = conn - go func() { // monitor connection errors so we can restart - if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { - utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) - pstr.Close() - } - }() + if pstr.conn, err = amqp.Dial(pstr.Cfg().ExportPath); err != nil { + return } + go func() { // monitor connection errors so we can restart + if err := <-pstr.conn.NotifyClose(make(chan *amqp.Error)); err != nil { + utils.Logger.Err(fmt.Sprintf("Connection error received: %s", err.Error())) + pstr.Close() + } + }() } - pstr.Unlock() - if err != nil { - return nil, err + if pstr.postChan != nil { + return } - if postChan, err = pstr.conn.Channel(); err != nil { + + if pstr.postChan, err = pstr.conn.Channel(); err != nil { return } if pstr.exchange != "" { - if err = postChan.ExchangeDeclare( + if err = pstr.postChan.ExchangeDeclare( pstr.exchange, // name pstr.exchangeType, // type true, // durable @@ -159,7 +111,7 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { } } - if _, err = postChan.QueueDeclare( + if _, err = pstr.postChan.QueueDeclare( pstr.queueID, // name true, // durable false, // auto-delete @@ -171,7 +123,7 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { } if pstr.exchange != "" { - if err = postChan.QueueBind( + if err = pstr.postChan.QueueBind( pstr.queueID, // queue pstr.routingKey, // key pstr.exchange, // exchange @@ -183,3 +135,37 @@ func (pstr *AMQPPoster) newPostChannel() (postChan *amqp.Channel, err error) { } return } + +func (pstr *AMQPee) ExportEvent(content interface{}, _ string) (err error) { + pstr.reqs.get() + pstr.RLock() + err = pstr.postChan.Publish( + pstr.exchange, // exchange + pstr.routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: utils.ContentJSON, + Body: content.([]byte), + }) + pstr.RUnlock() + pstr.reqs.done() + return +} + +func (pstr *AMQPee) Close() (err error) { + pstr.Lock() + if pstr.postChan != nil { + pstr.postChan.Close() + pstr.postChan = nil + } + if pstr.conn != nil { + err = pstr.conn.Close() + pstr.conn = nil + } + pstr.Unlock() + return +} + +func (pstr *AMQPee) GetMetrics() *utils.SafeMapStorage { return pstr.dc } diff --git a/ees/ee.go b/ees/ee.go index ad989c8ae..bbb445707 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -19,6 +19,7 @@ along with this program. If not, see package ees import ( + "encoding/json" "fmt" "strings" "time" @@ -28,54 +29,49 @@ import ( "github.com/cgrates/cgrates/utils" ) -type EventExporter interface { - ID() string // return the exporter identificator - ExportEvent(cgrEv *utils.CGREvent) (err error) // called on each event to be exported - OnEvicted(itmID string, value interface{}) // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics -} - -type exportedEvent interface { - Parse(func(path []string, val interface{})) - AsStringSlice() []string - AsMapStringSlice() map[string]interface{} -} - type EventExporter2 interface { - Cfg() *config.EventExporterCfg // return the config - Connect() error // called before exporting an event to make sure it is connected - ExportEvent(exportedEvent) (interface{}, error) // called on each event to be exported - Close() error // called when the exporter needs to terminate - GetMetrics() *utils.SafeMapStorage // called to get metrics + Cfg() *config.EventExporterCfg // return the config + Connect() error // called before exporting an event to make sure it is connected + ExportEvent(interface{}, string) error // called on each event to be exported + Close() error // called when the exporter needs to terminate + GetMetrics() *utils.SafeMapStorage // called to get metrics + PrepareMap(map[string]interface{}) (interface{}, error) + PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) } // NewEventExporter produces exporters -func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) { +func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter2, err error) { var dc *utils.SafeMapStorage if dc, err = newEEMetrics(utils.FirstNonEmpty( cgrCfg.EEsCfg().Exporters[cfgIdx].Timezone, cgrCfg.GeneralCfg().DefaultTimezone)); err != nil { return } + cfg := cgrCfg.EEsCfg().Exporters[cfgIdx] switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { case utils.MetaFileCSV: - return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc) + return NewFileCSVee(cfg, cgrCfg, filterS, dc) case utils.MetaFileFWV: - return NewFileFWVee(cgrCfg, cfgIdx, filterS, dc) + return NewFileFWVee(cfg, cgrCfg, filterS, dc) case utils.MetaHTTPPost: - return NewHTTPPostEe(cgrCfg, cfgIdx, filterS, dc) + return NewHTTPPostEE(cfg, cgrCfg, filterS, dc) case utils.MetaHTTPjsonMap: - return NewHTTPjsonMapEE(cgrCfg, cfgIdx, filterS, dc) - case utils.MetaAMQPjsonMap, utils.MetaAMQPV1jsonMap, + return NewHTTPjsonMapEE(cfg, cgrCfg, filterS, dc) + case utils.MetaNatsjsonMap: + return NewNatsEE(cfg, cgrCfg.GeneralCfg().NodeID, + cgrCfg.GeneralCfg().ConnectTimeout, dc) + case utils.MetaAMQPjsonMap: + return NewAMQPee(cfg, dc), nil + case utils.MetaAMQPV1jsonMap, utils.MetaSQSjsonMap, utils.MetaKafkajsonMap, - utils.MetaS3jsonMap, utils.MetaNatsjsonMap: + utils.MetaS3jsonMap: return NewPosterJSONMapEE(cgrCfg, cfgIdx, filterS, dc) case utils.MetaVirt: - return NewVirtualExporter(cgrCfg, cfgIdx, filterS, dc) + return NewVirtualEE(cfg, dc) case utils.MetaElastic: - return NewElasticExporter(cgrCfg, cfgIdx, filterS, dc) + return NewElasticEE(cfg, dc) case utils.MetaSQL: - return NewSQLEe(cgrCfg, cfgIdx, filterS, dc) + return NewSQLEe(cfg, dc) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } @@ -224,44 +220,31 @@ func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, } } -type expOrderedNavigableMap utils.OrderedNavigableMap +type bytePreparing struct{} -func (v *expOrderedNavigableMap) Parse(f func(path []string, val interface{})) { - nm := (*utils.OrderedNavigableMap)(v) - for el := nm.GetFirstElement(); el != nil; el = el.Next() { - nmIt, _ := nm.Field(el.Value) - f(el.Value, nmIt.Data) - } +func (eEe *bytePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) { + return json.Marshal(mp) } - -func (v *expOrderedNavigableMap) AsStringSlice() []string { - return (*utils.OrderedNavigableMap)(v).OrderedFieldsAsStrings() -} -func (v *expOrderedNavigableMap) AsMapStringSlice() (m map[string]interface{}) { - m = map[string]interface{}{} - nm := (*utils.OrderedNavigableMap)(v) - for el := nm.GetFirstElement(); el != nil; el = el.Next() { +func (eEe *bytePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { + valMp := make(map[string]interface{}) + for el := mp.GetFirstElement(); el != nil; el = el.Next() { path := el.Value - nmIt, _ := nm.Field(path) + nmIt, _ := mp.Field(path) path = path[:len(path)-1] // remove the last index - m[strings.Join(path, utils.NestingSep)] = nmIt.String() + valMp[strings.Join(path, utils.NestingSep)] = nmIt.String() } - return + return json.Marshal(valMp) } -type expMapStorage utils.MapStorage +type slicePreparing struct{} -func (v expMapStorage) Parse(f func(path []string, val interface{})) { - for k, val := range utils.MapStorage(v) { - f([]string{k}, val) +func (eEe *slicePreparing) PrepareMap(mp map[string]interface{}) (interface{}, error) { + csvRecord := make([]string, 0, len(mp)) + for _, val := range mp { + csvRecord = append(csvRecord, utils.IfaceAsString(val)) } + return csvRecord, nil } - -func (v expMapStorage) AsStringSlice() (s []string) { - s = make([]string, 0, len(v)) - for _, val := range utils.MapStorage(v) { - s = append(s, utils.IfaceAsString(val)) - } - return +func (eEe *slicePreparing) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { + return mp.OrderedFieldsAsStrings(), nil } -func (v expMapStorage) AsMapStringSlice() map[string]interface{} { return v } diff --git a/ees/ee_test.go b/ees/ee_test.go index 14bda88f2..e1e1f8af1 100644 --- a/ees/ee_test.go +++ b/ees/ee_test.go @@ -104,11 +104,11 @@ func TestNewEventExporterCase3(t *testing.T) { "Local", utils.EmptyString, )) - eeExpect, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + eeExpect, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } - newEE := ee.(*HTTPPost) + newEE := ee.(*HTTPPostEE) newEE.dc.MapStorage[utils.TimeNow] = nil eeExpect.dc.MapStorage[utils.TimeNow] = nil if !reflect.DeepEqual(eeExpect, newEE) { @@ -182,11 +182,11 @@ func TestNewEventExporterCase6(t *testing.T) { if err != nil { t.Error(err) } - eeExpect, err := NewVirtualExporter(cgrCfg, 0, filterS, dc) + eeExpect, err := NewVirtualEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } - newEE := ee.(*VirtualEe) + newEE := ee.(*VirtualEE) newEE.dc.MapStorage[utils.TimeNow] = nil eeExpect.dc.MapStorage[utils.TimeNow] = nil if !reflect.DeepEqual(eeExpect, newEE) { @@ -224,11 +224,11 @@ func TestNewEventExporterCase7(t *testing.T) { if err != nil { t.Error(err) } - eeExpect, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eeExpect, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } - newEE := ee.(*ElasticEe) + newEE := ee.(*ElasticEE) newEE.dc.MapStorage[utils.TimeNow] = nil eeExpect.dc.MapStorage[utils.TimeNow] = nil eeExpect.eClnt = newEE.eClnt diff --git a/ees/ees.go b/ees/ees.go index 7d21c1279..413c2b7ef 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -61,7 +61,7 @@ type EventExporterS struct { // ListenAndServe keeps the service alive func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s>", - utils.CoreS, utils.EventExporterS)) + utils.CoreS, utils.EEs)) for { select { case <-stopChan: // global exit @@ -69,7 +69,7 @@ func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { case rld := <-cfgRld: // configuration was reloaded, destroy the cache cfgRld <- rld utils.Logger.Info(fmt.Sprintf("<%s> reloading configuration internals.", - utils.EventExporterS)) + utils.EEs)) eeS.setupCache(eeS.cfg.EEsCfg().Cache) } } @@ -77,7 +77,7 @@ func (eeS *EventExporterS) ListenAndServe(stopChan, cfgRld chan struct{}) { // Shutdown is called to shutdown the service func (eeS *EventExporterS) Shutdown() { - utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EventExporterS)) + utils.Logger.Info(fmt.Sprintf("<%s> shutdown <%s>", utils.CoreS, utils.EEs)) eeS.setupCache(nil) // cleanup exporters } @@ -183,11 +183,11 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * eeCache, hasCache := eeS.eesChs[eeCfg.Type] eeS.eesMux.RUnlock() var isCached bool - var ee EventExporter + var ee EventExporter2 if hasCache { var x interface{} if x, isCached = eeCache.Get(eeCfg.ID); isCached { - ee = x.(EventExporter) + ee = x.(EventExporter2) } } @@ -201,7 +201,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * } metricMapLock.Lock() - metricsMap[ee.ID()] = utils.MapStorage{} // will return the ID for all processed exporters + metricsMap[ee.Cfg().ID] = utils.MapStorage{} // will return the ID for all processed exporters metricMapLock.Unlock() if eeCfg.Synchronous { @@ -212,22 +212,17 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * if hasVerbose && !eeCfg.Synchronous { utils.Logger.Warning( fmt.Sprintf("<%s> with id <%s>, running verbosed exporter with syncronous false", - utils.EventExporterS, ee.ID())) + utils.EEs, ee.Cfg().ID)) } - go func(evict, sync bool, ee EventExporter) { - if err := ee.ExportEvent(cgrEv.CGREvent); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> with id <%s>, error: <%s>", - utils.EventExporterS, ee.ID(), err.Error())) + go func(evict, sync bool, ee EventExporter2) { + if err := eeS.exportEventWithExporter(ee, cgrEv.CGREvent, evict); err != nil { + withErr = true } - if evict { - ee.OnEvicted("", nil) // so we can close ie the file - } if sync { if hasVerbose { metricMapLock.Lock() - metricsMap[ee.ID()] = ee.GetMetrics().MapStorage + metricsMap[ee.Cfg().ID] = ee.GetMetrics().ClonedMapStorage() metricMapLock.Unlock() } wg.Done() @@ -266,13 +261,18 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * } func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils.CGREvent, oneTime bool) (err error) { - var eEv exportedEvent + if oneTime { + defer exp.Close() + } + var eEv interface{} exp.GetMetrics().Lock() exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1 exp.GetMetrics().Unlock() if len(exp.Cfg().ContentFields()) == 0 { - eEv = expMapStorage(ev.Event) + if eEv, err = exp.PrepareMap(ev.Event); err != nil { + return + } } else { expNM := utils.NewOrderedNavigableMap() err = engine.NewExportRequest(map[string]utils.DataStorage{ @@ -283,21 +283,25 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils }, utils.FirstNonEmpty(ev.Tenant, eeS.cfg.GeneralCfg().DefaultTenant), eeS.filterS, map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields()) - eEv = (*expOrderedNavigableMap)(expNM) + if eEv, err = exp.PrepareOrderMap(expNM); err != nil { + return + } } + key := utils.ConcatenatedKey(utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()), + utils.FirstNonEmpty(engine.MapEvent(ev.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault)) - exp = utils.NewOrderedNavigableMap() - err = engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: cfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant), - fltS, - map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields) - return - if oneTime { - defer exp.Close() + return ExportWithAttempts(exp, eEv, key) +} + +func ExportWithAttempts(exp EventExporter2, eEv interface{}, key string) (err error) { + if exp.Cfg().FailedPostsDir != utils.MetaNone { + defer func() { + if err != nil { + engine.AddFailedPost(exp.Cfg().FailedPostsDir, exp.Cfg().ExportPath, + exp.Cfg().Type, utils.EEs, + eEv, exp.Cfg().Opts) + } + }() } fib := utils.Fib() @@ -310,11 +314,13 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils } } if err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter <%s> could not connect because err: %s", utils.EEs, exp.Cfg().ID, err.Error())) + utils.Logger.Warning( + fmt.Sprintf("<%s> Exporter <%s> could not connect because err: <%s>", + utils.EEs, exp.Cfg().ID, err.Error())) return } for i := 0; i < exp.Cfg().Attempts; i++ { - if err = exp.ExportEvent(ev); err == nil { + if err = exp.ExportEvent(eEv, key); err == nil { break } if i+1 < exp.Cfg().Attempts { @@ -322,7 +328,9 @@ func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils } } if err != nil { - return + utils.Logger.Warning( + fmt.Sprintf("<%s> Exporter <%s> could not export because err: <%s>", + utils.EEs, exp.Cfg().ID, err.Error())) } return } diff --git a/ees/elastic.go b/ees/elastic.go index 8bf98c495..38380d7f7 100644 --- a/ees/elastic.go +++ b/ees/elastic.go @@ -19,6 +19,7 @@ along with this program. If not, see package ees import ( + "bytes" "context" "encoding/json" "fmt" @@ -27,147 +28,101 @@ import ( "github.com/elastic/go-elasticsearch/esapi" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" elasticsearch "github.com/elastic/go-elasticsearch" ) -func NewElasticExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (eEe *ElasticEe, err error) { - eEe = &ElasticEe{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), +func NewElasticEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (eEe *ElasticEE, err error) { + eEe = &ElasticEE{ + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - err = eEe.init() + err = eEe.prepareOpts() return } -// ElasticEe implements EventExporter interface for ElasticSearch export -type ElasticEe struct { - id string - eClnt *elasticsearch.Client - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - dc *utils.SafeMapStorage - opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap - reqs *concReq +// ElasticEE implements EventExporter interface for ElasticSearch export +type ElasticEE struct { + cfg *config.EventExporterCfg + eClnt *elasticsearch.Client + dc *utils.SafeMapStorage + opts esapi.IndexRequest // this variable is used only for storing the options from OptsMap + reqs *concReq + bytePreparing } // init will create all the necessary dependencies, including opening the file -func (eEe *ElasticEe) init() (err error) { - // create the client - if eEe.eClnt, err = elasticsearch.NewClient( - elasticsearch.Config{ - Addresses: strings.Split(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ExportPath, utils.InfieldSep), - }); err != nil { - return - } +func (eEe *ElasticEE) prepareOpts() (err error) { //parse opts - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIndex]; !has { - eEe.opts.Index = utils.CDRsTBL - } else { + eEe.opts.Index = utils.CDRsTBL + if val, has := eEe.Cfg().Opts[utils.ElsIndex]; has { eEe.opts.Index = utils.IfaceAsString(val) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIfPrimaryTerm]; has { + if val, has := eEe.Cfg().Opts[utils.ElsIfPrimaryTerm]; has { var intVal int64 if intVal, err = utils.IfaceAsTInt64(val); err != nil { return } eEe.opts.IfPrimaryTerm = utils.IntPointer(int(intVal)) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsIfSeqNo]; has { + if val, has := eEe.Cfg().Opts[utils.ElsIfSeqNo]; has { var intVal int64 if intVal, err = utils.IfaceAsTInt64(val); err != nil { return } eEe.opts.IfSeqNo = utils.IntPointer(int(intVal)) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsOpType]; has { + if val, has := eEe.Cfg().Opts[utils.ElsOpType]; has { eEe.opts.OpType = utils.IfaceAsString(val) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsPipeline]; has { + if val, has := eEe.Cfg().Opts[utils.ElsPipeline]; has { eEe.opts.Pipeline = utils.IfaceAsString(val) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsRouting]; has { + if val, has := eEe.Cfg().Opts[utils.ElsRouting]; has { eEe.opts.Routing = utils.IfaceAsString(val) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsTimeout]; has { + if val, has := eEe.Cfg().Opts[utils.ElsTimeout]; has { if eEe.opts.Timeout, err = utils.IfaceAsDuration(val); err != nil { return } } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsVersionLow]; has { + if val, has := eEe.Cfg().Opts[utils.ElsVersionLow]; has { var intVal int64 if intVal, err = utils.IfaceAsTInt64(val); err != nil { return } eEe.opts.Version = utils.IntPointer(int(intVal)) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsVersionType]; has { + if val, has := eEe.Cfg().Opts[utils.ElsVersionType]; has { eEe.opts.VersionType = utils.IfaceAsString(val) } - if val, has := eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Opts[utils.ElsWaitForActiveShards]; has { + if val, has := eEe.Cfg().Opts[utils.ElsWaitForActiveShards]; has { eEe.opts.WaitForActiveShards = utils.IfaceAsString(val) } return } -// ID returns the identificator of this exporter -func (eEe *ElasticEe) ID() string { - return eEe.id +func (eEe *ElasticEE) Cfg() *config.EventExporterCfg { return eEe.cfg } + +func (eEe *ElasticEE) Connect() (err error) { + // create the client + if eEe.eClnt == nil { + eEe.eClnt, err = elasticsearch.NewClient( + elasticsearch.Config{Addresses: strings.Split(eEe.Cfg().ExportPath, utils.InfieldSep)}, + ) + } + return } -// OnEvicted implements EventExporter, doing the cleanup before exit -func (eEe *ElasticEe) OnEvicted(_ string, _ interface{}) {} - // ExportEvent implements EventExporter -func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (eEe *ElasticEE) ExportEvent(ev interface{}, key string) (err error) { eEe.reqs.get() - defer func() { - updateEEMetrics(eEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].Timezone, - eEe.cgrCfg.GeneralCfg().DefaultTimezone)) - eEe.reqs.done() - }() - eEe.dc.Lock() - eEe.dc.MapStorage[utils.NumberOfEvents] = eEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - eEe.dc.Unlock() - valMp := make(map[string]interface{}) - if len(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()) == 0 { - valMp = cgrEv.Event - } else { - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: eEe.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: eEe.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, eEe.cgrCfg.GeneralCfg().DefaultTenant), - eEe.filterS, oNm) - if err = eeReq.SetFields(eEe.cgrCfg.EEsCfg().Exporters[eEe.cfgIdx].ContentFields()); err != nil { - return - } - for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() { - path := el.Value - nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path) - path = path[:len(path)-1] // remove the last index - valMp[strings.Join(path, utils.NestingSep)] = nmIt.String() - } - } - - // Set up the request object - cgrID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.CGRID), utils.GenUUID()) - runID := utils.FirstNonEmpty(engine.MapEvent(cgrEv.Event).GetStringIgnoreErrors(utils.RunID), utils.MetaDefault) + defer eEe.reqs.done() eReq := esapi.IndexRequest{ Index: eEe.opts.Index, - DocumentID: utils.ConcatenatedKey(cgrID, runID), - Body: strings.NewReader(utils.ToJSON(valMp)), + DocumentID: key, + Body: bytes.NewReader(ev.([]byte)), Refresh: "true", IfPrimaryTerm: eEe.opts.IfPrimaryTerm, IfSeqNo: eEe.opts.IfSeqNo, @@ -190,14 +145,16 @@ func (eEe *ElasticEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { var e map[string]interface{} if err = json.NewDecoder(resp.Body).Decode(&e); err != nil { return - } else { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document", - utils.EventExporterS, eEe.id, e)) } + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%+v> when indexing document", + utils.EEs, eEe.Cfg().ID, e)) } return } -func (eEe *ElasticEe) GetMetrics() *utils.SafeMapStorage { - return eEe.dc.Clone() +func (eEe *ElasticEE) Close() (_ error) { + eEe.eClnt = nil + return } + +func (eEe *ElasticEE) GetMetrics() *utils.SafeMapStorage { return eEe.dc } diff --git a/ees/elastic_test.go b/ees/elastic_test.go index 8f888a331..ed1a52c95 100644 --- a/ees/elastic_test.go +++ b/ees/elastic_test.go @@ -30,7 +30,7 @@ import ( ) func TestID(t *testing.T) { - ee := &ElasticEe{ + ee := &ElasticEE{ id: "3", } if rcv := ee.ID(); !reflect.DeepEqual(rcv, "3") { @@ -46,7 +46,7 @@ func TestGetMetrics(t *testing.T) { if err != nil { t.Error(err) } - ee := &ElasticEe{ + ee := &ElasticEE{ dc: dc, } @@ -57,7 +57,7 @@ func TestGetMetrics(t *testing.T) { func TestInitClient(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } ee.cgrCfg.EEsCfg().Exporters[0].ExportPath = "/\x00" @@ -69,7 +69,7 @@ func TestInitClient(t *testing.T) { func TestInitCase1(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIndex] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -84,7 +84,7 @@ func TestInitCase1(t *testing.T) { func TestInitCase2(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfPrimaryTerm] = 20 - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -99,7 +99,7 @@ func TestInitCase2(t *testing.T) { func TestInitCase2Err(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfPrimaryTerm] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" @@ -111,7 +111,7 @@ func TestInitCase2Err(t *testing.T) { func TestInitCase3(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfSeqNo] = 20 - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -126,7 +126,7 @@ func TestInitCase3(t *testing.T) { func TestInitCase3Err(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsIfSeqNo] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" @@ -138,7 +138,7 @@ func TestInitCase3Err(t *testing.T) { func TestInitCase4(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsOpType] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -153,7 +153,7 @@ func TestInitCase4(t *testing.T) { func TestInitCase5(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsPipeline] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -168,7 +168,7 @@ func TestInitCase5(t *testing.T) { func TestInitCase6(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsRouting] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -183,7 +183,7 @@ func TestInitCase6(t *testing.T) { func TestInitCase7(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsTimeout] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } errExpect := "time: invalid duration \"test\"" @@ -195,7 +195,7 @@ func TestInitCase7(t *testing.T) { func TestInitCase8(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionLow] = 20 - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -210,7 +210,7 @@ func TestInitCase8(t *testing.T) { func TestInitCase8Err(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionLow] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } errExpect := "strconv.ParseInt: parsing \"test\": invalid syntax" @@ -222,7 +222,7 @@ func TestInitCase8Err(t *testing.T) { func TestInitCase9(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsVersionType] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -237,7 +237,7 @@ func TestInitCase9(t *testing.T) { func TestInitCase10(t *testing.T) { cgrCfg := config.NewDefaultCGRConfig() cgrCfg.EEsCfg().Exporters[0].Opts[utils.ElsWaitForActiveShards] = "test" - ee := &ElasticEe{ + ee := &ElasticEE{ cgrCfg: cgrCfg, } if err := ee.init(); err != nil { @@ -273,7 +273,7 @@ func TestElasticExportEvent(t *testing.T) { if err != nil { t.Error(err) } - eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -324,7 +324,7 @@ func TestElasticExportEvent2(t *testing.T) { if err != nil { t.Error(err) } - eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -375,7 +375,7 @@ func TestElasticExportEvent3(t *testing.T) { if err != nil { t.Error(err) } - eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -416,7 +416,7 @@ func TestElasticExportEvent4(t *testing.T) { if err != nil { t.Error(err) } - eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -456,7 +456,7 @@ func TestElasticExportEvent5(t *testing.T) { if err != nil { t.Error(err) } - eEe, err := NewElasticExporter(cgrCfg, 0, filterS, dc) + eEe, err := NewElasticEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } diff --git a/ees/filecsv.go b/ees/filecsv.go index 897237126..5b14b88a1 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -24,6 +24,7 @@ import ( "io" "os" "path" + "sync" "github.com/cgrates/cgrates/engine" @@ -31,15 +32,15 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, +func NewFileCSVee(cfg *config.EventExporterCfg, + cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fCsv *FileCSVee, err error) { fCsv = &FileCSVee{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cfg: cfg, + dc: dc, + cgrCfg: cgrCfg, - cfgIdx: cfgIdx, filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), } err = fCsv.init() return @@ -47,21 +48,23 @@ func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, // FileCSVee implements EventExporter interface for .csv files type FileCSVee struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage file io.WriteCloser csvWriter *csv.Writer - dc *utils.SafeMapStorage - reqs *concReq + sync.Mutex + slicePreparing + // for header and trailer composing + cgrCfg *config.CGRConfig + filterS *engine.FilterS } -// init will create all the necessary dependencies, including opening the file func (fCsv *FileCSVee) init() (err error) { + fCsv.Lock() + defer fCsv.Unlock() // create the file - filePath := path.Join(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ExportPath, - fCsv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix) + filePath := path.Join(fCsv.Cfg().ExportPath, + fCsv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.CSVSuffix) fCsv.dc.Lock() fCsv.dc.MapStorage[utils.ExportPath] = filePath fCsv.dc.Unlock() @@ -70,109 +73,60 @@ func (fCsv *FileCSVee) init() (err error) { } fCsv.csvWriter = csv.NewWriter(fCsv.file) fCsv.csvWriter.Comma = utils.CSVSep - if fieldSep, has := fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Opts[utils.CSVFieldSepOpt]; has { + if fieldSep, has := fCsv.Cfg().Opts[utils.CSVFieldSepOpt]; has { fCsv.csvWriter.Comma = rune(utils.IfaceAsString(fieldSep)[0]) } return fCsv.composeHeader() } -// ID returns the identificator of this exporter -func (fCsv *FileCSVee) ID() string { - return fCsv.id -} - -// OnEvicted implements EventExporter, doing the cleanup before exit -func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { - // verify if we need to add the trailer - if err := fCsv.composeTrailer(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", - utils.EventExporterS, fCsv.id, err.Error())) - } - fCsv.csvWriter.Flush() - if err := fCsv.file.Close(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", - utils.EventExporterS, fCsv.id, err.Error())) - } -} - -// ExportEvent implements EventExporter -func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { - fCsv.reqs.get() - defer func() { - updateEEMetrics(fCsv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].Timezone, - fCsv.cgrCfg.GeneralCfg().DefaultTimezone)) - fCsv.reqs.done() - }() - fCsv.dc.Lock() - fCsv.dc.MapStorage[utils.NumberOfEvents] = fCsv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - fCsv.dc.Unlock() - - var csvRecord []string - if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()) == 0 { - csvRecord = make([]string, 0, len(cgrEv.Event)) - for _, val := range cgrEv.Event { - csvRecord = append(csvRecord, utils.IfaceAsString(val)) - } - } else { - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: fCsv.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTenant), - fCsv.filterS, oNm) - - if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil { - return - } - csvRecord = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings() - } - - return fCsv.csvWriter.Write(csvRecord) -} - // Compose and cache the header func (fCsv *FileCSVee) composeHeader() (err error) { - if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()) == 0 { - return + if len(fCsv.Cfg().HeaderFields()) != 0 { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaHdr, fCsv.Cfg().HeaderFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + return + } + return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaHdr: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: fCsv.dc, - utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(), - }, fCsv.cgrCfg.GeneralCfg().DefaultTenant, - fCsv.filterS, oNm) - if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()); err != nil { - return - } - return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaHdr].OrderedFieldsAsStrings()) + return } // Compose and cache the trailer func (fCsv *FileCSVee) composeTrailer() (err error) { - if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()) == 0 { - return + if len(fCsv.Cfg().TrailerFields()) != 0 { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaTrl, fCsv.Cfg().TrailerFields(), fCsv.dc, fCsv.cgrCfg, fCsv.filterS); err != nil { + return + } + return fCsv.csvWriter.Write(exp.OrderedFieldsAsStrings()) } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaTrl: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: fCsv.dc, - utils.MetaCfg: fCsv.cgrCfg.GetDataProvider(), - }, fCsv.cgrCfg.GeneralCfg().DefaultTenant, - fCsv.filterS, oNm) - if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()); err != nil { - return - } - - return fCsv.csvWriter.Write(eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings()) + return } -func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { - return fCsv.dc.Clone() +func (fCsv *FileCSVee) Cfg() *config.EventExporterCfg { return fCsv.cfg } + +func (fCsv *FileCSVee) Connect() (_ error) { return } + +func (fCsv *FileCSVee) ExportEvent(ev interface{}, _ string) error { + fCsv.Lock() // make sure that only one event is writen in file at once + defer fCsv.Unlock() + return fCsv.csvWriter.Write(ev.([]string)) } + +func (fCsv *FileCSVee) Close() (err error) { + fCsv.Lock() + defer fCsv.Unlock() + // verify if we need to add the trailer + if err = fCsv.composeTrailer(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", + utils.EEs, fCsv.Cfg().ID, err.Error())) + } + fCsv.csvWriter.Flush() + if err = fCsv.file.Close(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", + utils.EEs, fCsv.Cfg().ID, err.Error())) + } + return +} + +func (fCsv *FileCSVee) GetMetrics() *utils.SafeMapStorage { return fCsv.dc } diff --git a/ees/filefwv.go b/ees/filefwv.go index fb3c453cf..157051551 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -23,20 +23,20 @@ import ( "io" "os" "path" + "sync" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cfg: cfg, + dc: dc, + cgrCfg: cgrCfg, - cfgIdx: cfgIdx, filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), } err = fFwv.init() return @@ -44,19 +44,21 @@ func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, // FileFWVee implements EventExporter interface for .fwv files type FileFWVee struct { - id string + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + file io.WriteCloser + sync.Mutex + slicePreparing + + // for header and trailer composing cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers filterS *engine.FilterS - file io.WriteCloser - dc *utils.SafeMapStorage - reqs *concReq } // init will create all the necessary dependencies, including opening the file func (fFwv *FileFWVee) init() (err error) { - filePath := path.Join(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ExportPath, - fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix) + filePath := path.Join(fFwv.Cfg().ExportPath, + fFwv.Cfg().ID+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix) fFwv.dc.Lock() fFwv.dc.MapStorage[utils.ExportPath] = filePath fFwv.dc.Unlock() @@ -67,85 +69,16 @@ func (fFwv *FileFWVee) init() (err error) { return fFwv.composeHeader() } -// ID returns the identificator of this exporter -func (fFwv *FileFWVee) ID() string { - return fFwv.id -} - -// OnEvicted implements EventExporter, doing the cleanup before exit -func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { - // verify if we need to add the trailer - if err := fFwv.composeTrailer(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", - utils.EventExporterS, fFwv.id, err.Error())) - } - if err := fFwv.file.Close(); err != nil { - utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", - utils.EventExporterS, fFwv.id, err.Error())) - } -} - -// ExportEvent implements EventExporter -func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { - fFwv.reqs.get() - defer func() { - updateEEMetrics(fFwv.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].Timezone, - fFwv.cgrCfg.GeneralCfg().DefaultTimezone)) - fFwv.reqs.done() - }() - fFwv.dc.Lock() - fFwv.dc.MapStorage[utils.NumberOfEvents] = fFwv.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - fFwv.dc.Unlock() - var records []string - if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()) == 0 { - records = make([]string, 0, len(cgrEv.Event)) - for _, val := range cgrEv.Event { - records = append(records, utils.IfaceAsString(val)) - } - } else { - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: fFwv.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTenant), - fFwv.filterS, oNm) - - if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil { - return - } - records = eeReq.ExpData[utils.MetaExp].OrderedFieldsAsStrings() - } - - for _, record := range records { - if _, err = io.WriteString(fFwv.file, record); err != nil { - return - } - } - _, err = io.WriteString(fFwv.file, "\n") - return -} - // Compose and cache the header func (fFwv *FileFWVee) composeHeader() (err error) { - if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()) == 0 { + if len(fFwv.Cfg().HeaderFields()) == 0 { return } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaHdr: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: fFwv.dc, - utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(), - }, fFwv.cgrCfg.GeneralCfg().DefaultTenant, - fFwv.filterS, oNm) - if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()); err != nil { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaHdr, fFwv.Cfg().HeaderFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { return } - for _, record := range eeReq.ExpData[utils.MetaHdr].OrderedFieldsAsStrings() { + for _, record := range exp.OrderedFieldsAsStrings() { if _, err = io.WriteString(fFwv.file, record); err != nil { return } @@ -156,21 +89,14 @@ func (fFwv *FileFWVee) composeHeader() (err error) { // Compose and cache the trailer func (fFwv *FileFWVee) composeTrailer() (err error) { - if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()) == 0 { + if len(fFwv.Cfg().TrailerFields()) == 0 { return } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaTrl: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: fFwv.dc, - utils.MetaCfg: fFwv.cgrCfg.GetDataProvider(), - }, fFwv.cgrCfg.GeneralCfg().DefaultTenant, - fFwv.filterS, oNm) - if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()); err != nil { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaTrl, fFwv.Cfg().TrailerFields(), fFwv.dc, fFwv.cgrCfg, fFwv.filterS); err != nil { return } - for _, record := range eeReq.ExpData[utils.MetaTrl].OrderedFieldsAsStrings() { + for _, record := range exp.OrderedFieldsAsStrings() { if _, err = io.WriteString(fFwv.file, record); err != nil { return } @@ -179,6 +105,35 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } -func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { - return fFwv.dc.Clone() +func (fFwv *FileFWVee) Cfg() *config.EventExporterCfg { return fFwv.cfg } + +func (fFwv *FileFWVee) Connect() (_ error) { return } + +func (fFwv *FileFWVee) ExportEvent(records interface{}, _ string) (err error) { + fFwv.Lock() // make sure that only one event is writen in file at once + defer fFwv.Unlock() + for _, record := range records.([]string) { + if _, err = io.WriteString(fFwv.file, record); err != nil { + return + } + } + _, err = io.WriteString(fFwv.file, "\n") + return } + +func (fFwv *FileFWVee) Close() (err error) { + fFwv.Lock() + defer fFwv.Unlock() + // verify if we need to add the trailer + if err = fFwv.composeTrailer(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", + utils.EEs, fFwv.Cfg().ID, err.Error())) + } + if err = fFwv.file.Close(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", + utils.EEs, fFwv.Cfg().ID, err.Error())) + } + return +} + +func (fFwv *FileFWVee) GetMetrics() *utils.SafeMapStorage { return fFwv.dc } diff --git a/ees/httpjsonmap.go b/ees/httpjsonmap.go index 9638180f2..555a7b91c 100644 --- a/ees/httpjsonmap.go +++ b/ees/httpjsonmap.go @@ -19,8 +19,12 @@ along with this program. If not, see package ees import ( + "bytes" "encoding/json" + "fmt" + "io" "net/http" + "net/url" "strings" "github.com/cgrates/cgrates/config" @@ -28,125 +32,121 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewHTTPjsonMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, +func NewHTTPjsonMapEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, dc *utils.SafeMapStorage) (pstrJSON *HTTPjsonMapEE, err error) { pstrJSON = &HTTPjsonMapEE{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + cfg: cfg, + dc: dc, + client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout}, + reqs: newConcReq(cfg.ConcurrentRequests), } - - pstrJSON.pstr, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout, - cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], - cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + pstrJSON.hdr, err = pstrJSON.composeHeader(cgrCfg, filterS) return } // HTTPjsonMapEE implements EventExporter interface for .csv files type HTTPjsonMapEE struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - pstr *engine.HTTPPoster - dc *utils.SafeMapStorage - reqs *concReq -} + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + client *http.Client + reqs *concReq -// ID returns the identificator of this exporter -func (httpEE *HTTPjsonMapEE) ID() string { - return httpEE.id -} - -// OnEvicted implements EventExporter, doing the cleanup before exit -func (httpEE *HTTPjsonMapEE) OnEvicted(string, interface{}) {} - -// ExportEvent implements EventExporter -func (httpEE *HTTPjsonMapEE) ExportEvent(cgrEv *utils.CGREvent) (err error) { - httpEE.reqs.get() - defer func() { - updateEEMetrics(httpEE.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Timezone, - httpEE.cgrCfg.GeneralCfg().DefaultTimezone)) - httpEE.reqs.done() - }() - httpEE.dc.Lock() - httpEE.dc.MapStorage[utils.NumberOfEvents] = httpEE.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - httpEE.dc.Unlock() - - valMp := make(map[string]interface{}) - hdr := http.Header{} - if len(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ContentFields()) == 0 { - valMp = cgrEv.Event - } else { - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: httpEE.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, httpEE.cgrCfg.GeneralCfg().DefaultTenant), - httpEE.filterS, oNm) - - if err = eeReq.SetFields(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ContentFields()); err != nil { - return - } - for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() { - path := el.Value - nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path) - path = path[:len(path)-1] // remove the last index - valMp[strings.Join(path, utils.NestingSep)] = nmIt.String() - } - if hdr, err = httpEE.composeHeader(); err != nil { - return - } - } - - var body []byte - if body, err = json.Marshal(valMp); err != nil { - return - } - if err = httpEE.pstr.PostValues(body, hdr); err != nil && - httpEE.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone { - engine.AddFailedPost(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].ExportPath, - httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Type, utils.EventExporterS, - &engine.HTTPPosterRequest{Header: hdr, Body: body}, - httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].Opts) - } - return -} - -func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { - return httpEE.dc.Clone() + hdr http.Header } // Compose and cache the header -func (httpEE *HTTPjsonMapEE) composeHeader() (hdr http.Header, err error) { +func (httpEE *HTTPjsonMapEE) composeHeader(cgrCfg *config.CGRConfig, filterS *engine.FilterS) (hdr http.Header, err error) { hdr = make(http.Header) - if len(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].HeaderFields()) == 0 { + if len(httpEE.Cfg().HeaderFields()) == 0 { return } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaHdr: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: httpEE.dc, - utils.MetaCfg: httpEE.cgrCfg.GetDataProvider(), - }, httpEE.cgrCfg.GeneralCfg().DefaultTenant, - httpEE.filterS, oNm) - if err = eeReq.SetFields(httpEE.cgrCfg.EEsCfg().Exporters[httpEE.cfgIdx].HeaderFields()); err != nil { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaHdr, httpEE.Cfg().HeaderFields(), httpEE.dc, cgrCfg, filterS); err != nil { return } - for el := eeReq.ExpData[utils.MetaHdr].GetFirstElement(); el != nil; el = el.Next() { + for el := exp.GetFirstElement(); el != nil; el = el.Next() { path := el.Value - nmIt, _ := eeReq.ExpData[utils.MetaHdr].Field(path) //Safe to ignore error, since the path always exists - path = path[:len(path)-1] // remove the last index + nmIt, _ := exp.Field(path) //Safe to ignore error, since the path always exists + path = path[:len(path)-1] // remove the last index hdr.Set(strings.Join(path, utils.NestingSep), nmIt.String()) } return } + +func (httpEE *HTTPjsonMapEE) Cfg() *config.EventExporterCfg { return httpEE.cfg } + +func (httpEE *HTTPjsonMapEE) Connect() (_ error) { return } + +func (httpEE *HTTPjsonMapEE) ExportEvent(content interface{}, _ string) (err error) { + httpEE.reqs.get() + defer httpEE.reqs.done() + pReq := content.(httpPosterRequest) + var req *http.Request + if req, err = prepareRequest(httpEE.Cfg().ExportPath, utils.ContentJSON, pReq.Body, pReq.Header); err != nil { + return + } + _, err = sendHTTPReq(httpEE.client, req) + return +} + +func (httpEE *HTTPjsonMapEE) Close() (_ error) { return } + +func (httpEE *HTTPjsonMapEE) GetMetrics() *utils.SafeMapStorage { return httpEE.dc } + +func (httpEE *HTTPjsonMapEE) PrepareMap(mp map[string]interface{}) (interface{}, error) { + body, err := json.Marshal(mp) + return &httpPosterRequest{ + Header: httpEE.hdr, + Body: body, + }, err +} + +func (httpEE *HTTPjsonMapEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { + valMp := make(map[string]interface{}) + for el := mp.GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + nmIt, _ := mp.Field(path) + path = path[:len(path)-1] // remove the last index + valMp[strings.Join(path, utils.NestingSep)] = nmIt.String() + } + body, err := json.Marshal(valMp) + return &httpPosterRequest{ + Header: httpEE.hdr, + Body: body, + }, err +} + +func prepareRequest(addr, cType string, content interface{}, hdr http.Header) (req *http.Request, err error) { + var body io.Reader + if cType == utils.ContentForm { + body = strings.NewReader(content.(url.Values).Encode()) + } else { + body = bytes.NewBuffer(content.([]byte)) + } + contentType := "application/x-www-form-urlencoded" + if cType == utils.ContentJSON { + contentType = "application/json" + } + hdr.Set("Content-Type", contentType) + if req, err = http.NewRequest(http.MethodPost, addr, body); err != nil { + return + } + req.Header = hdr + return +} + +func sendHTTPReq(client *http.Client, req *http.Request) (respBody []byte, err error) { + var resp *http.Response + if resp, err = client.Do(req); err != nil { + return + } + respBody, err = io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return + } + if resp.StatusCode > 299 { + err = fmt.Errorf("unexpected status code received: <%d>", resp.StatusCode) + } + return +} diff --git a/ees/httppost.go b/ees/httppost.go index 15c3e0563..3b0369bc9 100644 --- a/ees/httppost.go +++ b/ees/httppost.go @@ -28,123 +28,92 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewHTTPPostEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (httpPost *HTTPPost, err error) { - httpPost = &HTTPPost{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), +func NewHTTPPostEE(cfg *config.EventExporterCfg, cgrCfg *config.CGRConfig, filterS *engine.FilterS, + dc *utils.SafeMapStorage) (httpPost *HTTPPostEE, err error) { + httpPost = &HTTPPostEE{ + cfg: cfg, + dc: dc, + client: &http.Client{Transport: engine.GetHTTPPstrTransport(), Timeout: cgrCfg.GeneralCfg().ReplyTimeout}, + reqs: newConcReq(cfg.ConcurrentRequests), } - httpPost.httpPoster, err = engine.NewHTTPPoster(cgrCfg.GeneralCfg().ReplyTimeout, - cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - utils.PosterTransportContentTypes[cgrCfg.EEsCfg().Exporters[cfgIdx].Type], - cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts) + httpPost.hdr, err = httpPost.composeHeader(cgrCfg, filterS) return } // FileCSVee implements EventExporter interface for .csv files -type HTTPPost struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - httpPoster *engine.HTTPPoster - dc *utils.SafeMapStorage - reqs *concReq +type HTTPPostEE struct { + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + client *http.Client + reqs *concReq + + hdr http.Header } - -// ID returns the identificator of this exporter -func (httpPost *HTTPPost) ID() string { - return httpPost.id -} - -// OnEvicted implements EventExporter, doing the cleanup before exit -func (httpPost *HTTPPost) OnEvicted(_ string, _ interface{}) {} - -// ExportEvent implements EventExporter -func (httpPost *HTTPPost) ExportEvent(cgrEv *utils.CGREvent) (err error) { - httpPost.reqs.get() - defer func() { - updateEEMetrics(httpPost.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Timezone, - httpPost.cgrCfg.GeneralCfg().DefaultTimezone)) - httpPost.reqs.done() - }() - httpPost.dc.Lock() - httpPost.dc.MapStorage[utils.NumberOfEvents] = httpPost.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - httpPost.dc.Unlock() - - urlVals := url.Values{} - hdr := http.Header{} - if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()) == 0 { - for k, v := range cgrEv.Event { - urlVals.Set(k, utils.IfaceAsString(v)) - } - } else { - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: httpPost.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, httpPost.cgrCfg.GeneralCfg().DefaultTenant), - httpPost.filterS, oNm) - if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ContentFields()); err != nil { - return - } - for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() { - path := el.Value - nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(path) - path = path[:len(path)-1] // remove the last index - urlVals.Set(strings.Join(path, utils.NestingSep), nmIt.String()) - } - if hdr, err = httpPost.composeHeader(); err != nil { - return - } - } - - if err = httpPost.httpPoster.PostValues(urlVals, hdr); err != nil && - httpPost.cgrCfg.GeneralCfg().FailedPostsDir != utils.MetaNone { - engine.AddFailedPost(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].ExportPath, - httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Type, utils.EventExporterS, - &engine.HTTPPosterRequest{ - Header: hdr, - Body: urlVals, - }, httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].Opts) - } - return -} - -func (httpPost *HTTPPost) GetMetrics() *utils.SafeMapStorage { - return httpPost.dc.Clone() +type httpPosterRequest struct { + Header http.Header + Body interface{} } // Compose and cache the header -func (httpPost *HTTPPost) composeHeader() (hdr http.Header, err error) { +func (httpPost *HTTPPostEE) composeHeader(cgrCfg *config.CGRConfig, filterS *engine.FilterS) (hdr http.Header, err error) { hdr = make(http.Header) - if len(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()) == 0 { + if len(httpPost.Cfg().HeaderFields()) == 0 { return } - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaHdr: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaDC: httpPost.dc, - utils.MetaCfg: httpPost.cgrCfg.GetDataProvider(), - }, httpPost.cgrCfg.GeneralCfg().DefaultTenant, - httpPost.filterS, oNm) - if err = eeReq.SetFields(httpPost.cgrCfg.EEsCfg().Exporters[httpPost.cfgIdx].HeaderFields()); err != nil { + var exp *utils.OrderedNavigableMap + if exp, err = composeHeaderTrailer(utils.MetaHdr, httpPost.Cfg().HeaderFields(), httpPost.dc, cgrCfg, filterS); err != nil { return } - for el := eeReq.ExpData[utils.MetaHdr].GetFirstElement(); el != nil; el = el.Next() { + for el := exp.GetFirstElement(); el != nil; el = el.Next() { path := el.Value - nmIt, _ := eeReq.ExpData[utils.MetaHdr].Field(path) - path = path[:len(path)-1] // remove the last index + nmIt, _ := exp.Field(path) //Safe to ignore error, since the path always exists + path = path[:len(path)-1] // remove the last index hdr.Set(strings.Join(path, utils.NestingSep), nmIt.String()) } return } + +func (httpPost *HTTPPostEE) Cfg() *config.EventExporterCfg { return httpPost.cfg } + +func (httpPost *HTTPPostEE) Connect() (_ error) { return } + +func (httpPost *HTTPPostEE) ExportEvent(content interface{}, _ string) (err error) { + httpPost.reqs.get() + defer httpPost.reqs.done() + pReq := content.(*httpPosterRequest) + var req *http.Request + if req, err = prepareRequest(httpPost.Cfg().ExportPath, utils.ContentForm, pReq.Body, pReq.Header); err != nil { + return + } + _, err = sendHTTPReq(httpPost.client, req) + return +} + +func (httpPost *HTTPPostEE) Close() (_ error) { return } + +func (httpPost *HTTPPostEE) GetMetrics() *utils.SafeMapStorage { return httpPost.dc } + +func (httpPost *HTTPPostEE) PrepareMap(mp map[string]interface{}) (interface{}, error) { + urlVals := url.Values{} + for k, v := range mp { + urlVals.Set(k, utils.IfaceAsString(v)) + } + return &httpPosterRequest{ + Header: httpPost.hdr, + Body: urlVals, + }, nil +} + +func (httpPost *HTTPPostEE) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { + urlVals := url.Values{} + for el := mp.GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + nmIt, _ := mp.Field(path) + path = path[:len(path)-1] // remove the last index + urlVals.Set(strings.Join(path, utils.NestingSep), nmIt.String()) + } + return &httpPosterRequest{ + Header: httpPost.hdr, + Body: urlVals, + }, nil +} diff --git a/ees/httppost_test.go b/ees/httppost_test.go index 3a7735a0e..2df819b68 100644 --- a/ees/httppost_test.go +++ b/ees/httppost_test.go @@ -33,7 +33,7 @@ import ( ) func TestHttpPostID(t *testing.T) { - httpPost := &HTTPPost{ + httpPost := &HTTPPostEE{ id: "3", } if rcv := httpPost.ID(); !reflect.DeepEqual(rcv, "3") { @@ -49,7 +49,7 @@ func TestHttpPostGetMetrics(t *testing.T) { if err != nil { t.Error(err) } - httpPost := &HTTPPost{ + httpPost := &HTTPPostEE{ dc: dc, } @@ -72,7 +72,7 @@ func TestHttpPostExportEvent(t *testing.T) { if err != nil { t.Error(err) } - httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -117,7 +117,7 @@ func TestHttpPostExportEvent2(t *testing.T) { })) defer srv.Close() cgrCfg.EEsCfg().Exporters[0].ExportPath = srv.URL + "/" - httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -161,7 +161,7 @@ func TestHttpPostExportEvent3(t *testing.T) { if err != nil { t.Error(err) } - httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -208,7 +208,7 @@ func TestHttpPostExportEvent4(t *testing.T) { if err != nil { t.Error(err) } - httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -258,7 +258,7 @@ func TestHttpPostComposeHeader(t *testing.T) { if err != nil { t.Error(err) } - httpPost, err := NewHTTPPostEe(cgrCfg, 0, filterS, dc) + httpPost, err := NewHTTPPostEE(cgrCfg, 0, filterS, dc) if err != nil { t.Error(err) } @@ -345,7 +345,7 @@ func TestHttpPostSync(t *testing.T) { cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL - exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc) + exp, err := NewHTTPPostEE(cgrCfg, cfgIdx, new(engine.FilterS), dc) if err != nil { t.Error(err) } @@ -406,7 +406,7 @@ func TestHttpPostSyncLimit(t *testing.T) { cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath = ts.URL - exp, err := NewHTTPPostEe(cgrCfg, cfgIdx, new(engine.FilterS), dc) + exp, err := NewHTTPPostEE(cgrCfg, cfgIdx, new(engine.FilterS), dc) if err != nil { t.Error(err) } diff --git a/engine/pstr_nats.go b/ees/nats.go similarity index 65% rename from engine/pstr_nats.go rename to ees/nats.go index 15f08c544..fca1097da 100644 --- a/engine/pstr_nats.go +++ b/ees/nats.go @@ -1,5 +1,5 @@ /* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments Copyright (C) ITsysCOM GmbH This program is free software: you can redistribute it and/or modify @@ -15,7 +15,8 @@ GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see */ -package engine + +package ees import ( "crypto/tls" @@ -25,82 +26,41 @@ import ( "sync" "time" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" ) -// NewNatsPoster creates a kafka poster -func NewNatsPoster(dialURL string, attempts int, opts map[string]interface{}, nodeID string, connTimeout time.Duration) (natsPstr *NatsPoster, err error) { - natsPstr = &NatsPoster{ - dialURL: dialURL, - subject: utils.DefaultQueueID, - attempts: attempts, +// NewNatsEE creates a kafka poster +func NewNatsEE(cfg *config.EventExporterCfg, nodeID string, connTimeout time.Duration, dc *utils.SafeMapStorage) (natsPstr *NatsEE, err error) { + natsPstr = &NatsEE{ + cfg: cfg, + dc: dc, + subject: utils.DefaultQueueID, + reqs: newConcReq(cfg.ConcurrentRequests), } - err = natsPstr.parseOpt(opts, nodeID, connTimeout) + err = natsPstr.parseOpt(cfg.Opts, nodeID, connTimeout) return } -// NatsPoster is a kafka poster -type NatsPoster struct { - dialURL string - subject string // identifier of the CDR queue where we publish - attempts int - jetStream bool - opts []nats.Option - jsOpts []nats.JSOpt - sync.Mutex // protect writer +// NatsEE is a kafka poster +type NatsEE struct { + subject string // identifier of the CDR queue where we publish + jetStream bool + opts []nats.Option + jsOpts []nats.JSOpt poster *nats.Conn posterJS nats.JetStreamContext + + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + reqs *concReq + sync.RWMutex // protect writer + bytePreparing } -// Post is the method being called when we need to post anything in the queue -// the optional chn will permits channel caching -func (pstr *NatsPoster) Post(content []byte, _ string) (err error) { - fib := utils.Fib() - for i := 0; i < pstr.attempts; i++ { - if err = pstr.newPostWriter(); err == nil { - break - } - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - } - if err != nil { - utils.Logger.Warning(fmt.Sprintf(" connecting to nats server, err: %s", err.Error())) - return - } - for i := 0; i < pstr.attempts; i++ { - pstr.Lock() - - if pstr.jetStream { - _, err = pstr.posterJS.Publish(pstr.subject, content) - } else { - err = pstr.poster.Publish(pstr.subject, content) - } - pstr.Unlock() - - if err == nil { - break - } - if i+1 < pstr.attempts { - time.Sleep(time.Duration(fib()) * time.Second) - } - } - return -} - -// Close closes the kafka writer -func (pstr *NatsPoster) Close() { - pstr.Lock() - if pstr.poster != nil { - pstr.poster.Drain() - } - pstr.poster = nil - pstr.Unlock() -} - -func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) { +func (pstr *NatsEE) parseOpt(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (err error) { if useJetStreamVal, has := opts[utils.NatsJetStream]; has { if pstr.jetStream, err = utils.IfaceAsBool(useJetStreamVal); err != nil { return @@ -123,11 +83,13 @@ func (pstr *NatsPoster) parseOpt(opts map[string]interface{}, nodeID string, con return } -func (pstr *NatsPoster) newPostWriter() (err error) { +func (pstr *NatsEE) Cfg() *config.EventExporterCfg { return pstr.cfg } + +func (pstr *NatsEE) Connect() (err error) { pstr.Lock() defer pstr.Unlock() if pstr.poster == nil { - if pstr.poster, err = nats.Connect(pstr.dialURL, pstr.opts...); err != nil { + if pstr.poster, err = nats.Connect(pstr.Cfg().ExportPath, pstr.opts...); err != nil { return } if pstr.jetStream { @@ -137,6 +99,31 @@ func (pstr *NatsPoster) newPostWriter() (err error) { return } +func (pstr *NatsEE) ExportEvent(content interface{}, _ string) (err error) { + pstr.reqs.get() + pstr.RLock() + if pstr.jetStream { + _, err = pstr.posterJS.Publish(pstr.subject, content.([]byte)) + } else { + err = pstr.poster.Publish(pstr.subject, content.([]byte)) + } + pstr.RUnlock() + pstr.reqs.done() + return +} + +func (pstr *NatsEE) Close() (err error) { + pstr.Lock() + if pstr.poster != nil { + err = pstr.poster.Drain() + pstr.poster = nil + } + pstr.Unlock() + return +} + +func (pstr *NatsEE) GetMetrics() *utils.SafeMapStorage { return pstr.dc } + func GetNatsOpts(opts map[string]interface{}, nodeID string, connTimeout time.Duration) (nop []nats.Option, err error) { nop = make([]nats.Option, 0, 7) nop = append(nop, nats.Name(utils.CGRateSLwr+nodeID), diff --git a/ees/posterjsonmap.go b/ees/posterjsonmap.go index 028892622..a736d1f62 100644 --- a/ees/posterjsonmap.go +++ b/ees/posterjsonmap.go @@ -38,9 +38,6 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), } switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { - case utils.MetaAMQPjsonMap: - pstrJSON.poster = engine.NewAMQPPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) case utils.MetaAMQPV1jsonMap: pstrJSON.poster = engine.NewAMQPv1Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) @@ -53,10 +50,6 @@ func NewPosterJSONMapEE(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Fi case utils.MetaS3jsonMap: pstrJSON.poster = engine.NewS3Poster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts) - case utils.MetaNatsjsonMap: - pstrJSON.poster, err = engine.NewNatsPoster(cgrCfg.EEsCfg().Exporters[cfgIdx].ExportPath, - cgrCfg.EEsCfg().Exporters[cfgIdx].Attempts, cgrCfg.EEsCfg().Exporters[cfgIdx].Opts, - cgrCfg.GeneralCfg().NodeID, cgrCfg.GeneralCfg().ConnectTimeout) } return } diff --git a/ees/sql.go b/ees/sql.go index 854125076..7e37c3154 100644 --- a/ees/sql.go +++ b/ees/sql.go @@ -29,63 +29,57 @@ import ( "gorm.io/gorm" "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewSQLEe(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, +func NewSQLEe(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (sqlEe *SQLEe, err error) { sqlEe = &SQLEe{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), + cfg: cfg, + dc: dc, + reqs: newConcReq(cfg.ConcurrentRequests), } - - dialect, err := sqlEe.NewSQLEeUrl(cgrCfg) - if err != nil { - return - } - sqlEe.db, sqlEe.sqldb, err = openDB(cgrCfg, cfgIdx, dialect) + err = sqlEe.initDialector() return } // SQLEe implements EventExporter interface for SQL type SQLEe struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - db *gorm.DB - sqldb *sql.DB + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage + db *gorm.DB + sqldb *sql.DB + reqs *concReq + dialect gorm.Dialector tableName string - - dc *utils.SafeMapStorage - reqs *concReq + colNames []string } -func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialector, err error) { +type sqlPosterRequest struct { + Querry string + Values []interface{} +} + +func (sqlEe *SQLEe) initDialector() (err error) { var u *url.URL // var err error - if u, err = url.Parse(strings.TrimPrefix(cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ExportPath, utils.Meta)); err != nil { + if u, err = url.Parse(strings.TrimPrefix(sqlEe.Cfg().ExportPath, utils.Meta)); err != nil { return } password, _ := u.User.Password() dbname := utils.SQLDefaultDBName - if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLDBNameOpt]; has { + if vals, has := sqlEe.Cfg().Opts[utils.SQLDBNameOpt]; has { dbname = utils.IfaceAsString(vals) } ssl := utils.SQLDefaultSSLMode - if vals, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SSLModeCfg]; has { + if vals, has := sqlEe.Cfg().Opts[utils.SSLModeCfg]; has { ssl = utils.IfaceAsString(vals) } // tableName is mandatory in opts - if iface, has := cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Opts[utils.SQLTableNameOpt]; !has { - return nil, utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) + if iface, has := sqlEe.Cfg().Opts[utils.SQLTableNameOpt]; !has { + return utils.NewErrMandatoryIeMissing(utils.SQLTableNameOpt) } else { sqlEe.tableName = utils.IfaceAsString(iface) } @@ -93,18 +87,17 @@ func (sqlEe *SQLEe) NewSQLEeUrl(cgrCfg *config.CGRConfig) (dialect gorm.Dialecto // var dialect gorm.Dialector switch u.Scheme { case utils.MySQL: - dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", + sqlEe.dialect = mysql.Open(fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", u.User.Username(), password, u.Hostname(), u.Port(), dbname)) case utils.Postgres: - dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)) + sqlEe.dialect = postgres.Open(fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)) default: - return nil, fmt.Errorf("db type <%s> not supported", u.Scheme) + return fmt.Errorf("db type <%s> not supported", u.Scheme) } return } -func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *gorm.DB, sqlDB *sql.DB, err error) { - +func openDB(dialect gorm.Dialector, opts map[string]interface{}) (db *gorm.DB, sqlDB *sql.DB, err error) { if db, err = gorm.Open(dialect, &gorm.Config{AllowGlobalUpdate: true}); err != nil { return } @@ -112,21 +105,21 @@ func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *g return } - if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxIdleConnsCfg]; has { + if iface, has := opts[utils.SQLMaxIdleConnsCfg]; has { val, err := utils.IfaceAsTInt64(iface) if err != nil { return nil, nil, err } sqlDB.SetMaxIdleConns(int(val)) } - if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxOpenConns]; has { + if iface, has := opts[utils.SQLMaxOpenConns]; has { val, err := utils.IfaceAsTInt64(iface) if err != nil { return nil, nil, err } sqlDB.SetMaxOpenConns(int(val)) } - if iface, has := cgrCfg.EEsCfg().Exporters[cfgIdx].Opts[utils.SQLMaxConnLifetime]; has { + if iface, has := opts[utils.SQLMaxConnLifetime]; has { val, err := utils.IfaceAsDuration(iface) if err != nil { return nil, nil, err @@ -137,71 +130,56 @@ func openDB(cgrCfg *config.CGRConfig, cfgIdx int, dialect gorm.Dialector) (db *g return } -// ID returns the identificator of this exporter -func (sqlEe *SQLEe) ID() string { - return sqlEe.id +func (sqlEe *SQLEe) Cfg() *config.EventExporterCfg { return sqlEe.cfg } + +func (sqlEe *SQLEe) Connect() (err error) { + if sqlEe.db == nil || sqlEe.sqldb == nil { + sqlEe.db, sqlEe.sqldb, err = openDB(sqlEe.dialect, sqlEe.Cfg().Opts) + } + return } -// OnEvicted implements EventExporter, doing the cleanup before exit -func (sqlEe *SQLEe) OnEvicted(_ string, _ interface{}) { - sqlEe.sqldb.Close() -} - -// ExportEvent implements EventExporter -func (sqlEe *SQLEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { +func (sqlEe *SQLEe) ExportEvent(req interface{}, _ string) error { sqlEe.reqs.get() - defer func() { - updateEEMetrics(sqlEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].Timezone, - sqlEe.cgrCfg.GeneralCfg().DefaultTimezone)) - sqlEe.reqs.done() - }() - sqlEe.dc.Lock() - sqlEe.dc.MapStorage[utils.NumberOfEvents] = sqlEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - sqlEe.dc.Unlock() + defer sqlEe.reqs.done() + sReq := req.(*sqlPosterRequest) + return sqlEe.db.Table(sqlEe.tableName).Exec(sReq.Querry, sReq.Values...).Error +} +func (sqlEe *SQLEe) Close() error { return sqlEe.sqldb.Close() } + +func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { return sqlEe.dc } + +func (sqlEe *SQLEe) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } + +func (sqlEe *SQLEe) PrepareOrderMap(mp *utils.OrderedNavigableMap) (interface{}, error) { var vals []interface{} var colNames []string - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: sqlEe.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: sqlEe.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, sqlEe.cgrCfg.GeneralCfg().DefaultTenant), - sqlEe.filterS, oNm) - if err = eeReq.SetFields(sqlEe.cgrCfg.EEsCfg().Exporters[sqlEe.cfgIdx].ContentFields()); err != nil { - return - } - - for el := eeReq.ExpData[utils.MetaExp].GetFirstElement(); el != nil; el = el.Next() { - nmIt, _ := eeReq.ExpData[utils.MetaExp].Field(el.Value) + for el := mp.GetFirstElement(); el != nil; el = el.Next() { + nmIt, _ := mp.Field(el.Value) pathWithoutIndex := strings.Join(el.Value[:len(el.Value)-1], utils.NestingSep) // remove the index path.index if pathWithoutIndex != utils.MetaRow { colNames = append(colNames, pathWithoutIndex) } vals = append(vals, nmIt.Data) } - sqlValues := make([]string, len(vals)) for i := range vals { sqlValues[i] = "?" } - var sqlQuery string if len(colNames) != len(vals) { - sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ", sqlEe.tableName, strings.Join(sqlValues, ",")) + sqlQuery = fmt.Sprintf("INSERT INTO %s VALUES (%s); ", + sqlEe.tableName, + strings.Join(sqlValues, ",")) } else { - colNamesStr := "(" + strings.Join(colNames, ", ") + ")" - sqlQuery = fmt.Sprintf("INSERT INTO %s %s VALUES (%s); ", sqlEe.tableName, colNamesStr, strings.Join(sqlValues, ",")) + sqlQuery = fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s); ", + sqlEe.tableName, + strings.Join(colNames, ", "), + strings.Join(sqlValues, ",")) } - - sqlEe.db.Table(sqlEe.tableName).Exec(sqlQuery, vals...) - - return -} - -func (sqlEe *SQLEe) GetMetrics() *utils.SafeMapStorage { - return sqlEe.dc.Clone() + return &sqlPosterRequest{ + Querry: sqlQuery, + Values: vals, + }, nil } diff --git a/ees/virtualee.go b/ees/virtualee.go index b214b5696..6260d7ecc 100644 --- a/ees/virtualee.go +++ b/ees/virtualee.go @@ -20,76 +20,29 @@ package ees import ( "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) -func NewVirtualExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, - dc *utils.SafeMapStorage) (vEe *VirtualEe, err error) { - vEe = &VirtualEe{ - id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, - cfgIdx: cfgIdx, - filterS: filterS, - dc: dc, - reqs: newConcReq(cgrCfg.EEsCfg().Exporters[cfgIdx].ConcurrentRequests), +func NewVirtualEE(cfg *config.EventExporterCfg, dc *utils.SafeMapStorage) (vEe *VirtualEE, err error) { + vEe = &VirtualEE{ + cfg: cfg, + dc: dc, } - err = vEe.init() return } -// VirtualEe implements EventExporter interface for .csv files -type VirtualEe struct { - id string - cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers - filterS *engine.FilterS - dc *utils.SafeMapStorage - reqs *concReq +// VirtualEE implements EventExporter interface for .csv files +type VirtualEE struct { + cfg *config.EventExporterCfg + dc *utils.SafeMapStorage } -// init will create all the necessary dependencies, including opening the file -func (vEe *VirtualEe) init() (err error) { - return -} - -// ID returns the identificator of this exporter -func (vEe *VirtualEe) ID() string { - return vEe.id -} - -// OnEvicted implements EventExporter, doing the cleanup before exit -func (vEe *VirtualEe) OnEvicted(_ string, _ interface{}) {} - -// ExportEvent implements EventExporter -func (vEe *VirtualEe) ExportEvent(cgrEv *utils.CGREvent) (err error) { - vEe.reqs.get() - defer func() { - updateEEMetrics(vEe.dc, cgrEv.ID, cgrEv.Event, err != nil, utils.FirstNonEmpty(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].Timezone, - vEe.cgrCfg.GeneralCfg().DefaultTimezone)) - vEe.reqs.done() - }() - vEe.dc.Lock() - vEe.dc.MapStorage[utils.NumberOfEvents] = vEe.dc.MapStorage[utils.NumberOfEvents].(int64) + 1 - vEe.dc.Unlock() - - oNm := map[string]*utils.OrderedNavigableMap{ - utils.MetaExp: utils.NewOrderedNavigableMap(), - } - eeReq := engine.NewExportRequest(map[string]utils.DataStorage{ - utils.MetaReq: utils.MapStorage(cgrEv.Event), - utils.MetaDC: vEe.dc, - utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), - utils.MetaCfg: vEe.cgrCfg.GetDataProvider(), - }, utils.FirstNonEmpty(cgrEv.Tenant, vEe.cgrCfg.GeneralCfg().DefaultTenant), - vEe.filterS, oNm) - if err = eeReq.SetFields(vEe.cgrCfg.EEsCfg().Exporters[vEe.cfgIdx].ContentFields()); err != nil { - return - } - - return -} - -func (vEe *VirtualEe) GetMetrics() *utils.SafeMapStorage { - return vEe.dc.Clone() +func (vEe *VirtualEE) Cfg() *config.EventExporterCfg { return vEe.cfg } +func (vEe *VirtualEE) Connect() error { return nil } +func (vEe *VirtualEE) ExportEvent(interface{}, string) error { return nil } +func (vEe *VirtualEE) Close() error { return nil } +func (vEe *VirtualEE) GetMetrics() *utils.SafeMapStorage { return vEe.dc } +func (vEe *VirtualEE) PrepareMap(map[string]interface{}) (interface{}, error) { return nil, nil } +func (vEe *VirtualEE) PrepareOrderMap(*utils.OrderedNavigableMap) (interface{}, error) { + return nil, nil } diff --git a/ees/virtualee_test.go b/ees/virtualee_test.go index 060ef8e31..ae458819c 100644 --- a/ees/virtualee_test.go +++ b/ees/virtualee_test.go @@ -28,7 +28,7 @@ import ( ) func TestVirtualEeID(t *testing.T) { - vEe := &VirtualEe{ + vEe := &VirtualEE{ id: "3", } if rcv := vEe.ID(); !reflect.DeepEqual(rcv, "3") { @@ -44,7 +44,7 @@ func TestVirtualEeGetMetrics(t *testing.T) { if err != nil { t.Error(err) } - vEe := &VirtualEe{ + vEe := &VirtualEE{ dc: dc, } @@ -65,7 +65,7 @@ func TestVirtualEeExportEvent(t *testing.T) { if err != nil { t.Error(err) } - vEe := &VirtualEe{ + vEe := &VirtualEE{ id: "string", cgrCfg: cgrCfg, cfgIdx: 0, diff --git a/engine/globalvars.go b/engine/globalvars.go index d2a19f379..bd44bd625 100644 --- a/engine/globalvars.go +++ b/engine/globalvars.go @@ -59,6 +59,11 @@ func SetHTTPPstrTransport(pstrTransport *http.Transport) { httpPstrTransport = pstrTransport } +// GetHTTPPstrTransport gets the http transport to be used by the HTTP Poster +func GetHTTPPstrTransport() *http.Transport { + return httpPstrTransport +} + // NewHTTPTransport will create a new transport for HTTP client func NewHTTPTransport(opts map[string]interface{}) (trsp *http.Transport, err error) { trsp = &http.Transport{ diff --git a/engine/libcdre.go b/engine/libcdre.go index bd7a64c38..008137763 100644 --- a/engine/libcdre.go +++ b/engine/libcdre.go @@ -49,15 +49,15 @@ func writeFailedPosts(itmID string, value interface{}) { if !canConvert { return } - filePath := path.Join(config.CgrConfig().GeneralCfg().FailedPostsDir, expEv.FileName()) + filePath := expEv.FilePath() if err := expEv.WriteToFile(filePath); err != nil { utils.Logger.Warning(fmt.Sprintf("<%s> Failed to write file <%s> because <%s>", utils.CDRs, filePath, err)) } } -func AddFailedPost(expPath, format, module string, ev interface{}, opts map[string]interface{}) { - key := utils.ConcatenatedKey(expPath, format, module) +func AddFailedPost(failedPostsDir, expPath, format, module string, ev interface{}, opts map[string]interface{}) { + key := utils.ConcatenatedKey(failedPostsDir, expPath, format, module) // also in case of amqp,amqpv1,s3,sqs and kafka also separe them after queue id if qID := utils.FirstNonEmpty( utils.IfaceAsString(opts[utils.AMQPQueueID]), @@ -74,10 +74,11 @@ func AddFailedPost(expPath, format, module string, ev interface{}, opts map[stri } if failedPost == nil { failedPost = &ExportEvents{ - Path: expPath, - Format: format, - Opts: opts, - module: module, + Path: expPath, + Format: format, + Opts: opts, + module: module, + failedPostsDir: failedPostsDir, } } failedPost.AddEvent(ev) @@ -105,17 +106,18 @@ func NewExportEventsFromFile(filePath string) (expEv *ExportEvents, err error) { // ExportEvents used to save the failed post to file type ExportEvents struct { - lk sync.RWMutex - Path string - Opts map[string]interface{} - Format string - Events []interface{} - module string + lk sync.RWMutex + Path string + Opts map[string]interface{} + Format string + Events []interface{} + failedPostsDir string + module string } -// FileName returns the file name it should use for saving the failed events -func (expEv *ExportEvents) FileName() string { - return expEv.module + utils.PipeSep + utils.UUIDSha1Prefix() + utils.GOBSuffix +// FilePath returns the file path it should use for saving the failed events +func (expEv *ExportEvents) FilePath() string { + return path.Join(expEv.failedPostsDir, expEv.module+utils.PipeSep+utils.UUIDSha1Prefix()+utils.GOBSuffix) } // SetModule sets the module for this event diff --git a/engine/libcdre_test.go b/engine/libcdre_test.go index c027d4eee..8bd7f7137 100644 --- a/engine/libcdre_test.go +++ b/engine/libcdre_test.go @@ -37,7 +37,7 @@ func TestSetFldPostCacheTTL(t *testing.T) { func TestAddFldPost(t *testing.T) { SetFailedPostCacheTTL(5 * time.Second) - AddFailedPost("path1", "format1", "module1", "1", make(map[string]interface{})) + AddFailedPost("", "path1", "format1", "module1", "1", make(map[string]interface{})) x, ok := failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -60,8 +60,8 @@ func TestAddFldPost(t *testing.T) { if !reflect.DeepEqual(eOut, failedPost) { t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(eOut), utils.ToJSON(failedPost)) } - AddFailedPost("path1", "format1", "module1", "2", make(map[string]interface{})) - AddFailedPost("path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"}) + AddFailedPost("", "path1", "format1", "module1", "2", make(map[string]interface{})) + AddFailedPost("", "path2", "format2", "module2", "3", map[string]interface{}{utils.SQSQueueID: "qID"}) x, ok = failedPostCache.Get(utils.ConcatenatedKey("path1", "format1", "module1")) if !ok { t.Error("Error reading from cache") @@ -106,9 +106,9 @@ func TestAddFldPost(t *testing.T) { } } -func TestFileName(t *testing.T) { +func TestFilePath(t *testing.T) { exportEvent := &ExportEvents{} - rcv := exportEvent.FileName() + rcv := exportEvent.FilePath() if rcv[0] != '|' { t.Errorf("Expecting: '|', received: %+v", rcv[0]) } else if rcv[8:] != ".gob" { @@ -117,7 +117,7 @@ func TestFileName(t *testing.T) { exportEvent = &ExportEvents{ module: "module", } - rcv = exportEvent.FileName() + rcv = exportEvent.FilePath() if rcv[:7] != "module|" { t.Errorf("Expecting: 'module|', received: %+v", rcv[:7]) } else if rcv[14:] != ".gob" { diff --git a/engine/pstr_http.go b/engine/pstr_http.go index 831d7a889..8504fa7fb 100644 --- a/engine/pstr_http.go +++ b/engine/pstr_http.go @@ -35,24 +35,6 @@ type HTTPPosterRequest struct { Body interface{} } -// HTTPPostJSON posts without automatic failover -func HTTPPostJSON(url string, content []byte) (respBody []byte, err error) { - client := &http.Client{Transport: httpPstrTransport} - var resp *http.Response - if resp, err = client.Post(url, "application/json", bytes.NewBuffer(content)); err != nil { - return - } - respBody, err = io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - return - } - if resp.StatusCode > 299 { - err = fmt.Errorf("Unexpected status code received: %d", resp.StatusCode) - } - return -} - // NewHTTPPoster return a new HTTP poster func NewHTTPPoster(replyTimeout time.Duration, addr, contentType string, attempts int) (httposter *HTTPPoster, err error) { diff --git a/engine/z_cdr_it_test.go b/engine/z_cdr_it_test.go deleted file mode 100644 index 3d6c4e003..000000000 --- a/engine/z_cdr_it_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// +build integration - -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ -package engine - -import ( - "encoding/json" - "testing" - "time" - - "github.com/cgrates/cgrates/utils" -) - -// Sample HttpJsonPost, more for usage purposes -func TestHttpJsonPost(t *testing.T) { - cdrOut := &ExternalCDR{CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), OrderID: 123, - ToR: utils.MetaVoice, OriginID: "dsafdsaf", OriginHost: "192.168.1.1", - Source: utils.UnitTest, RequestType: utils.MetaRated, Tenant: "cgrates.org", - Category: "call", Account: "account1", Subject: "tgooiscs0014", Destination: "1002", - SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String(), AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String(), - RunID: utils.MetaDefault, - Usage: "0.00000001", ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, Cost: 1.01, - } - jsn, _ := json.Marshal(cdrOut) - if _, err := HTTPPostJSON("http://localhost:8000", jsn); err == nil { - t.Error(err) - } -} diff --git a/ers/amqp.go b/ers/amqp.go index adde1cd72..a0846f039 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/streadway/amqp" @@ -78,7 +79,7 @@ type AMQPER struct { conn *amqp.Connection channel *amqp.Channel - poster engine.Poster + poster *ees.AMQPee } // Config returns the curent configuration @@ -165,7 +166,7 @@ func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { utils.ERs, msg.MessageId, err.Error())) } if rdr.poster != nil { // post it - if err := rdr.poster.Post(msg.Body, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, msg.MessageId, err.Error())) @@ -253,6 +254,9 @@ func (rdr *AMQPER) createPoster() { len(rdr.Config().ProcessedPath) == 0 { return } - rdr.poster = engine.NewAMQPPoster(utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewAMQPee(&config.EventExporterCfg{ + ExportPath: utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, + Opts: processedOpt, + }, nil) } diff --git a/ers/nats.go b/ers/nats.go index e710d00c8..17cee93b1 100644 --- a/ers/nats.go +++ b/ers/nats.go @@ -25,6 +25,7 @@ import ( "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" @@ -78,7 +79,7 @@ type NatsER struct { opts []nats.Option jsOpts []nats.JSOpt - poster *engine.NatsPoster + poster *ees.NatsEE } // Config returns the curent configuration @@ -133,7 +134,7 @@ func (rdr *NatsER) Serve() (err error) { utils.ERs, string(msg.Data), err.Error())) } if rdr.poster != nil { // post it - if err := rdr.poster.Post(msg.Data, utils.EmptyString); err != nil { + if err := ees.ExportWithAttempts(rdr.poster, msg.Data, utils.EmptyString); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> writing message %s error: %s", utils.ERs, string(msg.Data), err.Error())) @@ -185,11 +186,13 @@ func (rdr *NatsER) createPoster() (err error) { len(rdr.Config().ProcessedPath) == 0 { return } - rdr.poster, err = engine.NewNatsPoster(utils.FirstNonEmpty( - rdr.Config().ProcessedPath, rdr.Config().SourcePath), - rdr.cgrCfg.GeneralCfg().PosterAttempts, - processedOpt, rdr.cgrCfg.GeneralCfg().NodeID, - rdr.cgrCfg.GeneralCfg().ConnectTimeout) + rdr.poster, err = ees.NewNatsEE(&config.EventExporterCfg{ + ExportPath: utils.FirstNonEmpty( + rdr.Config().ProcessedPath, rdr.Config().SourcePath), + Opts: processedOpt, + Attempts: rdr.cgrCfg.GeneralCfg().PosterAttempts, + }, rdr.cgrCfg.GeneralCfg().NodeID, + rdr.cgrCfg.GeneralCfg().ConnectTimeout, nil) return } @@ -213,7 +216,7 @@ func (rdr *NatsER) processOpts() (err error) { rdr.jsOpts = []nats.JSOpt{nats.MaxWait(maxWait)} } } - rdr.opts, err = engine.GetNatsOpts(rdr.Config().Opts, + rdr.opts, err = ees.GetNatsOpts(rdr.Config().Opts, rdr.cgrCfg.GeneralCfg().NodeID, rdr.cgrCfg.GeneralCfg().ConnectTimeout) return diff --git a/ers/nats_it_test.go b/ers/nats_it_test.go index e7876f9df..2a2349fab 100644 --- a/ers/nats_it_test.go +++ b/ers/nats_it_test.go @@ -31,6 +31,7 @@ import ( "time" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/ees" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/nats-io/nats.go" @@ -79,7 +80,7 @@ func testCheckNatsJetStream(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := engine.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) } @@ -166,7 +167,7 @@ func testCheckNatsNormal(t *testing.T, cfg *config.CGRConfig) { t.Fatal(err) } - nop, err := engine.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) + nop, err := ees.GetNatsOpts(rdr.Config().Opts, "testExp", time.Second) if err != nil { t.Fatal(err) } diff --git a/services/ees.go b/services/ees.go index 1d2ae605c..20ffbd8af 100644 --- a/services/ees.go +++ b/services/ees.go @@ -69,7 +69,7 @@ type EventExporterService struct { // ServiceName returns the service name func (es *EventExporterService) ServiceName() string { - return utils.EventExporterS + return utils.EEs } // ShouldRun returns if the service should be running @@ -110,7 +110,7 @@ func (es *EventExporterService) Start() (err error) { fltrS := <-es.filterSChan es.filterSChan <- fltrS - utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EventExporterS)) + utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.EEs)) es.Lock() defer es.Unlock() @@ -123,6 +123,6 @@ func (es *EventExporterService) Start() (err error) { if !es.cfg.DispatcherSCfg().Enabled { es.server.RpcRegister(es.rpc) } - es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EventExporterS) + es.intConnChan <- es.anz.GetInternalCodec(es.eeS, utils.EEs) return } diff --git a/services/ees_test.go b/services/ees_test.go index 126ffbd1a..57c7bbd5b 100644 --- a/services/ees_test.go +++ b/services/ees_test.go @@ -62,8 +62,8 @@ func TestEventExporterSCoverage(t *testing.T) { t.Errorf("Expected service to be running") } serviceName := srv2.ServiceName() - if serviceName != utils.EventExporterS { - t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.EventExporterS, serviceName) + if serviceName != utils.EEs { + t.Errorf("\nExpecting <%+v>,\n Received <%+v>", utils.EEs, serviceName) } shouldRun := srv2.ShouldRun() if shouldRun != false { diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index 740d9873b..e224aacaa 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -238,7 +238,7 @@ func (srvMngr *ServiceManager) handleReload() { case <-srvMngr.GetConfig().GetReloadChan(config.STORDB_JSN): go srvMngr.reloadService(utils.StorDB) case <-srvMngr.GetConfig().GetReloadChan(config.EEsJson): - go srvMngr.reloadService(utils.EventExporterS) + go srvMngr.reloadService(utils.EEs) case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName): go srvMngr.connMgr.Reload() case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson): diff --git a/utils/consts.go b/utils/consts.go index 393528c17..38991ced6 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -802,7 +802,6 @@ const ( Count = "Count" ProfileID = "ProfileID" SortedRoutes = "SortedRoutes" - EventExporterS = "EventExporterS" MetaMonthly = "*monthly" MetaYearly = "*yearly" MetaDaily = "*daily" diff --git a/utils/safemapstorage.go b/utils/safemapstorage.go index 07ab7449c..905451f4a 100644 --- a/utils/safemapstorage.go +++ b/utils/safemapstorage.go @@ -75,3 +75,9 @@ func (ms *SafeMapStorage) Clone() (msClone *SafeMapStorage) { defer ms.RUnlock() return &SafeMapStorage{MapStorage: ms.MapStorage.Clone()} } + +func (ms *SafeMapStorage) ClonedMapStorage() (msClone MapStorage) { + ms.RLock() + defer ms.RUnlock() + return ms.MapStorage.Clone() +}