From c84e861e2a21d8418792f025f7d7edf623b2bf4d Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 29 Jul 2021 16:09:39 +0300 Subject: [PATCH] Started changing the EEs --- ees/ee.go | 174 +++++++++++++++++++++++++++++++++++++++++++++++++++++ ees/ees.go | 136 ++++++++++++++++------------------------- 2 files changed, 227 insertions(+), 83 deletions(-) diff --git a/ees/ee.go b/ees/ee.go index 75c41a5cb..ad989c8ae 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -20,6 +20,8 @@ package ees import ( "fmt" + "strings" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -33,6 +35,20 @@ type EventExporter interface { GetMetrics() *utils.SafeMapStorage // called to get metrics } +type exportedEvent interface { + Parse(func(path []string, val interface{})) + AsStringSlice() []string + AsMapStringSlice() map[string]interface{} +} + +type EventExporter2 interface { + Cfg() *config.EventExporterCfg // return the config + Connect() error // called before exporting an event to make sure it is connected + ExportEvent(exportedEvent) (interface{}, error) // called on each event to be exported + Close() error // called when the exporter needs to terminate + GetMetrics() *utils.SafeMapStorage // called to get metrics +} + // NewEventExporter produces exporters func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) { var dc *utils.SafeMapStorage @@ -91,3 +107,161 @@ func (c *concReq) done() { c.reqs <- struct{}{} } } + +// composeHeaderTrailer will return the orderNM for *hdr or *trl +func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { + r = utils.NewOrderedNavigableMap() + err = engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaDC: dc, + utils.MetaCfg: cfg.GetDataProvider(), + }, cfg.GeneralCfg().DefaultTenant, fltS, + map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(fields) + return +} + +func composeExp(fields []*config.FCTemplate, cgrEv *utils.CGREvent, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) { + r = utils.NewOrderedNavigableMap() + err = engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaReq: utils.MapStorage(cgrEv.Event), + utils.MetaDC: dc, + utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), + utils.MetaCfg: cfg.GetDataProvider(), + }, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant), + fltS, + map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields) + return +} + +func newEEMetrics(location string) (*utils.SafeMapStorage, error) { + tNow := time.Now() + loc, err := time.LoadLocation(location) + if err != nil { + return nil, err + } + return &utils.SafeMapStorage{MapStorage: utils.MapStorage{ + utils.NumberOfEvents: int64(0), + utils.PositiveExports: utils.StringSet{}, + utils.NegativeExports: utils.StringSet{}, + utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(), + tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc), + }}, nil +} + +func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) { + dc.Lock() + defer dc.Unlock() + if hasError { + dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID) + } else { + dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID) + } + if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil { + if _, has := dc.MapStorage[utils.FirstEventATime]; !has { + dc.MapStorage[utils.FirstEventATime] = time.Time{} + } + if _, has := dc.MapStorage[utils.LastEventATime]; !has { + dc.MapStorage[utils.LastEventATime] = time.Time{} + } + if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() || + aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) { + dc.MapStorage[utils.FirstEventATime] = aTime + } + if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) { + dc.MapStorage[utils.LastEventATime] = aTime + } + } + if oID, err := ev.GetTInt64(utils.OrderID); err == nil { + if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has { + dc.MapStorage[utils.FirstExpOrderID] = int64(0) + } + if _, has := dc.MapStorage[utils.LastExpOrderID]; !has { + dc.MapStorage[utils.LastExpOrderID] = int64(0) + } + if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 || + dc.MapStorage[utils.FirstExpOrderID].(int64) > oID { + dc.MapStorage[utils.FirstExpOrderID] = oID + } + if dc.MapStorage[utils.LastExpOrderID].(int64) < oID { + dc.MapStorage[utils.LastExpOrderID] = oID + } + } + if cost, err := ev.GetFloat64(utils.Cost); err == nil { + if _, has := dc.MapStorage[utils.TotalCost]; !has { + dc.MapStorage[utils.TotalCost] = float64(0.0) + } + dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost + } + if tor, err := ev.GetString(utils.ToR); err == nil { + if usage, err := ev.GetDuration(utils.Usage); err == nil { + switch tor { + case utils.MetaVoice: + if _, has := dc.MapStorage[utils.TotalDuration]; !has { + dc.MapStorage[utils.TotalDuration] = time.Duration(0) + } + dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage + case utils.MetaSMS: + if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has { + dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0) + } + dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage + case utils.MetaMMS: + if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has { + dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0) + } + dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage + case utils.MetaGeneric: + if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has { + dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0) + } + dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage + case utils.MetaData: + if _, has := dc.MapStorage[utils.TotalDataUsage]; !has { + dc.MapStorage[utils.TotalDataUsage] = time.Duration(0) + } + dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage + } + } + } +} + +type expOrderedNavigableMap utils.OrderedNavigableMap + +func (v *expOrderedNavigableMap) Parse(f func(path []string, val interface{})) { + nm := (*utils.OrderedNavigableMap)(v) + for el := nm.GetFirstElement(); el != nil; el = el.Next() { + nmIt, _ := nm.Field(el.Value) + f(el.Value, nmIt.Data) + } +} + +func (v *expOrderedNavigableMap) AsStringSlice() []string { + return (*utils.OrderedNavigableMap)(v).OrderedFieldsAsStrings() +} +func (v *expOrderedNavigableMap) AsMapStringSlice() (m map[string]interface{}) { + m = map[string]interface{}{} + nm := (*utils.OrderedNavigableMap)(v) + for el := nm.GetFirstElement(); el != nil; el = el.Next() { + path := el.Value + nmIt, _ := nm.Field(path) + path = path[:len(path)-1] // remove the last index + m[strings.Join(path, utils.NestingSep)] = nmIt.String() + } + return +} + +type expMapStorage utils.MapStorage + +func (v expMapStorage) Parse(f func(path []string, val interface{})) { + for k, val := range utils.MapStorage(v) { + f([]string{k}, val) + } +} + +func (v expMapStorage) AsStringSlice() (s []string) { + s = make([]string, 0, len(v)) + for _, val := range utils.MapStorage(v) { + s = append(s, utils.IfaceAsString(val)) + } + return +} +func (v expMapStorage) AsMapStringSlice() map[string]interface{} { return v } diff --git a/ees/ees.go b/ees/ees.go index d8b9854f6..7d21c1279 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -30,9 +30,9 @@ import ( ) // onCacheEvicted is called by ltcache when evicting an item -func onCacheEvicted(itmID string, value interface{}) { - ee := value.(EventExporter) - ee.OnEvicted(itmID, value) +func onCacheEvicted(_ string, value interface{}) { + ee := value.(EventExporter2) + ee.Close() } // NewEventExporterS instantiates the EventExporterS @@ -265,94 +265,64 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply * return } -func newEEMetrics(location string) (*utils.SafeMapStorage, error) { - tNow := time.Now() - loc, err := time.LoadLocation(location) - if err != nil { - return nil, err - } - return &utils.SafeMapStorage{MapStorage: utils.MapStorage{ - utils.NumberOfEvents: int64(0), - utils.PositiveExports: utils.StringSet{}, - utils.NegativeExports: utils.StringSet{}, - utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(), - tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc), - }}, nil -} +func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils.CGREvent, oneTime bool) (err error) { + var eEv exportedEvent -func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) { - dc.Lock() - defer dc.Unlock() - if hasError { - dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID) + exp.GetMetrics().Lock() + exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1 + exp.GetMetrics().Unlock() + if len(exp.Cfg().ContentFields()) == 0 { + eEv = expMapStorage(ev.Event) } else { - dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID) + expNM := utils.NewOrderedNavigableMap() + err = engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaReq: utils.MapStorage(ev.Event), + utils.MetaDC: exp.GetMetrics(), + utils.MetaOpts: utils.MapStorage(ev.APIOpts), + utils.MetaCfg: eeS.cfg.GetDataProvider(), + }, utils.FirstNonEmpty(ev.Tenant, eeS.cfg.GeneralCfg().DefaultTenant), + eeS.filterS, + map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields()) + eEv = (*expOrderedNavigableMap)(expNM) } - if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil { - if _, has := dc.MapStorage[utils.FirstEventATime]; !has { - dc.MapStorage[utils.FirstEventATime] = time.Time{} + + exp = utils.NewOrderedNavigableMap() + err = engine.NewExportRequest(map[string]utils.DataStorage{ + utils.MetaReq: utils.MapStorage(cgrEv.Event), + utils.MetaDC: dc, + utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts), + utils.MetaCfg: cfg.GetDataProvider(), + }, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant), + fltS, + map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields) + return + if oneTime { + defer exp.Close() + } + fib := utils.Fib() + + for i := 0; i < exp.Cfg().Attempts; i++ { + if err = exp.Connect(); err == nil { + break } - if _, has := dc.MapStorage[utils.LastEventATime]; !has { - dc.MapStorage[utils.LastEventATime] = time.Time{} - } - if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() || - aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) { - dc.MapStorage[utils.FirstEventATime] = aTime - } - if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) { - dc.MapStorage[utils.LastEventATime] = aTime + if i+1 < exp.Cfg().Attempts { + time.Sleep(time.Duration(fib()) * time.Second) } } - if oID, err := ev.GetTInt64(utils.OrderID); err == nil { - if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has { - dc.MapStorage[utils.FirstExpOrderID] = int64(0) + if err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter <%s> could not connect because err: %s", utils.EEs, exp.Cfg().ID, err.Error())) + return + } + for i := 0; i < exp.Cfg().Attempts; i++ { + if err = exp.ExportEvent(ev); err == nil { + break } - if _, has := dc.MapStorage[utils.LastExpOrderID]; !has { - dc.MapStorage[utils.LastExpOrderID] = int64(0) - } - if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 || - dc.MapStorage[utils.FirstExpOrderID].(int64) > oID { - dc.MapStorage[utils.FirstExpOrderID] = oID - } - if dc.MapStorage[utils.LastExpOrderID].(int64) < oID { - dc.MapStorage[utils.LastExpOrderID] = oID + if i+1 < exp.Cfg().Attempts { + time.Sleep(time.Duration(fib()) * time.Second) } } - if cost, err := ev.GetFloat64(utils.Cost); err == nil { - if _, has := dc.MapStorage[utils.TotalCost]; !has { - dc.MapStorage[utils.TotalCost] = float64(0.0) - } - dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost - } - if tor, err := ev.GetString(utils.ToR); err == nil { - if usage, err := ev.GetDuration(utils.Usage); err == nil { - switch tor { - case utils.MetaVoice: - if _, has := dc.MapStorage[utils.TotalDuration]; !has { - dc.MapStorage[utils.TotalDuration] = time.Duration(0) - } - dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage - case utils.MetaSMS: - if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has { - dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0) - } - dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage - case utils.MetaMMS: - if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has { - dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0) - } - dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage - case utils.MetaGeneric: - if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has { - dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0) - } - dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage - case utils.MetaData: - if _, has := dc.MapStorage[utils.TotalDataUsage]; !has { - dc.MapStorage[utils.TotalDataUsage] = time.Duration(0) - } - dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage - } - } + if err != nil { + return } + return }