Started changing the EEs

This commit is contained in:
Trial97
2021-07-29 16:09:39 +03:00
committed by Dan Christian Bogos
parent 05f05b1356
commit c84e861e2a
2 changed files with 227 additions and 83 deletions

174
ees/ee.go
View File

@@ -20,6 +20,8 @@ package ees
import (
"fmt"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
@@ -33,6 +35,20 @@ type EventExporter interface {
GetMetrics() *utils.SafeMapStorage // called to get metrics
}
type exportedEvent interface {
Parse(func(path []string, val interface{}))
AsStringSlice() []string
AsMapStringSlice() map[string]interface{}
}
type EventExporter2 interface {
Cfg() *config.EventExporterCfg // return the config
Connect() error // called before exporting an event to make sure it is connected
ExportEvent(exportedEvent) (interface{}, error) // called on each event to be exported
Close() error // called when the exporter needs to terminate
GetMetrics() *utils.SafeMapStorage // called to get metrics
}
// NewEventExporter produces exporters
func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) {
var dc *utils.SafeMapStorage
@@ -91,3 +107,161 @@ func (c *concReq) done() {
c.reqs <- struct{}{}
}
}
// composeHeaderTrailer will return the orderNM for *hdr or *trl
func composeHeaderTrailer(prfx string, fields []*config.FCTemplate, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaDC: dc,
utils.MetaCfg: cfg.GetDataProvider(),
}, cfg.GeneralCfg().DefaultTenant, fltS,
map[string]*utils.OrderedNavigableMap{prfx: r}).SetFields(fields)
return
}
func composeExp(fields []*config.FCTemplate, cgrEv *utils.CGREvent, dc utils.DataStorage, cfg *config.CGRConfig, fltS *engine.FilterS) (r *utils.OrderedNavigableMap, err error) {
r = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant),
fltS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields)
return
}
func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
tNow := time.Now()
loc, err := time.LoadLocation(location)
if err != nil {
return nil, err
}
return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(),
tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc),
}}, nil
}
func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
if hasError {
dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
} else {
dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
}
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
dc.MapStorage[utils.FirstEventATime] = time.Time{}
}
if _, has := dc.MapStorage[utils.LastEventATime]; !has {
dc.MapStorage[utils.LastEventATime] = time.Time{}
}
if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
dc.MapStorage[utils.FirstEventATime] = aTime
}
if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
dc.MapStorage[utils.LastEventATime] = aTime
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
dc.MapStorage[utils.FirstExpOrderID] = int64(0)
}
if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
dc.MapStorage[utils.LastExpOrderID] = int64(0)
}
if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
dc.MapStorage[utils.FirstExpOrderID] = oID
}
if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
dc.MapStorage[utils.LastExpOrderID] = oID
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
if _, has := dc.MapStorage[utils.TotalCost]; !has {
dc.MapStorage[utils.TotalCost] = float64(0.0)
}
dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
if _, has := dc.MapStorage[utils.TotalDuration]; !has {
dc.MapStorage[utils.TotalDuration] = time.Duration(0)
}
dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
}
}
type expOrderedNavigableMap utils.OrderedNavigableMap
func (v *expOrderedNavigableMap) Parse(f func(path []string, val interface{})) {
nm := (*utils.OrderedNavigableMap)(v)
for el := nm.GetFirstElement(); el != nil; el = el.Next() {
nmIt, _ := nm.Field(el.Value)
f(el.Value, nmIt.Data)
}
}
func (v *expOrderedNavigableMap) AsStringSlice() []string {
return (*utils.OrderedNavigableMap)(v).OrderedFieldsAsStrings()
}
func (v *expOrderedNavigableMap) AsMapStringSlice() (m map[string]interface{}) {
m = map[string]interface{}{}
nm := (*utils.OrderedNavigableMap)(v)
for el := nm.GetFirstElement(); el != nil; el = el.Next() {
path := el.Value
nmIt, _ := nm.Field(path)
path = path[:len(path)-1] // remove the last index
m[strings.Join(path, utils.NestingSep)] = nmIt.String()
}
return
}
type expMapStorage utils.MapStorage
func (v expMapStorage) Parse(f func(path []string, val interface{})) {
for k, val := range utils.MapStorage(v) {
f([]string{k}, val)
}
}
func (v expMapStorage) AsStringSlice() (s []string) {
s = make([]string, 0, len(v))
for _, val := range utils.MapStorage(v) {
s = append(s, utils.IfaceAsString(val))
}
return
}
func (v expMapStorage) AsMapStringSlice() map[string]interface{} { return v }

View File

@@ -30,9 +30,9 @@ import (
)
// onCacheEvicted is called by ltcache when evicting an item
func onCacheEvicted(itmID string, value interface{}) {
ee := value.(EventExporter)
ee.OnEvicted(itmID, value)
func onCacheEvicted(_ string, value interface{}) {
ee := value.(EventExporter2)
ee.Close()
}
// NewEventExporterS instantiates the EventExporterS
@@ -265,94 +265,64 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithEeIDs, rply *
return
}
func newEEMetrics(location string) (*utils.SafeMapStorage, error) {
tNow := time.Now()
loc, err := time.LoadLocation(location)
if err != nil {
return nil, err
}
return &utils.SafeMapStorage{MapStorage: utils.MapStorage{
utils.NumberOfEvents: int64(0),
utils.PositiveExports: utils.StringSet{},
utils.NegativeExports: utils.StringSet{},
utils.TimeNow: time.Date(tNow.Year(), tNow.Month(), tNow.Day(),
tNow.Hour(), tNow.Minute(), tNow.Second(), tNow.Nanosecond(), loc),
}}, nil
}
func (eeS *EventExporterS) exportEventWithExporter(exp EventExporter2, ev *utils.CGREvent, oneTime bool) (err error) {
var eEv exportedEvent
func updateEEMetrics(dc *utils.SafeMapStorage, cgrID string, ev engine.MapEvent, hasError bool, timezone string) {
dc.Lock()
defer dc.Unlock()
if hasError {
dc.MapStorage[utils.NegativeExports].(utils.StringSet).Add(cgrID)
exp.GetMetrics().Lock()
exp.GetMetrics().MapStorage[utils.NumberOfEvents] = exp.GetMetrics().MapStorage[utils.NumberOfEvents].(int64) + 1
exp.GetMetrics().Unlock()
if len(exp.Cfg().ContentFields()) == 0 {
eEv = expMapStorage(ev.Event)
} else {
dc.MapStorage[utils.PositiveExports].(utils.StringSet).Add(cgrID)
expNM := utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(ev.Event),
utils.MetaDC: exp.GetMetrics(),
utils.MetaOpts: utils.MapStorage(ev.APIOpts),
utils.MetaCfg: eeS.cfg.GetDataProvider(),
}, utils.FirstNonEmpty(ev.Tenant, eeS.cfg.GeneralCfg().DefaultTenant),
eeS.filterS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: expNM}).SetFields(exp.Cfg().ContentFields())
eEv = (*expOrderedNavigableMap)(expNM)
}
if aTime, err := ev.GetTime(utils.AnswerTime, timezone); err == nil {
if _, has := dc.MapStorage[utils.FirstEventATime]; !has {
dc.MapStorage[utils.FirstEventATime] = time.Time{}
exp = utils.NewOrderedNavigableMap()
err = engine.NewExportRequest(map[string]utils.DataStorage{
utils.MetaReq: utils.MapStorage(cgrEv.Event),
utils.MetaDC: dc,
utils.MetaOpts: utils.MapStorage(cgrEv.APIOpts),
utils.MetaCfg: cfg.GetDataProvider(),
}, utils.FirstNonEmpty(cgrEv.Tenant, cfg.GeneralCfg().DefaultTenant),
fltS,
map[string]*utils.OrderedNavigableMap{utils.MetaExp: r}).SetFields(fields)
return
if oneTime {
defer exp.Close()
}
fib := utils.Fib()
for i := 0; i < exp.Cfg().Attempts; i++ {
if err = exp.Connect(); err == nil {
break
}
if _, has := dc.MapStorage[utils.LastEventATime]; !has {
dc.MapStorage[utils.LastEventATime] = time.Time{}
}
if dc.MapStorage[utils.FirstEventATime].(time.Time).IsZero() ||
aTime.Before(dc.MapStorage[utils.FirstEventATime].(time.Time)) {
dc.MapStorage[utils.FirstEventATime] = aTime
}
if aTime.After(dc.MapStorage[utils.LastEventATime].(time.Time)) {
dc.MapStorage[utils.LastEventATime] = aTime
if i+1 < exp.Cfg().Attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if oID, err := ev.GetTInt64(utils.OrderID); err == nil {
if _, has := dc.MapStorage[utils.FirstExpOrderID]; !has {
dc.MapStorage[utils.FirstExpOrderID] = int64(0)
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> Exporter <%s> could not connect because err: %s", utils.EEs, exp.Cfg().ID, err.Error()))
return
}
for i := 0; i < exp.Cfg().Attempts; i++ {
if err = exp.ExportEvent(ev); err == nil {
break
}
if _, has := dc.MapStorage[utils.LastExpOrderID]; !has {
dc.MapStorage[utils.LastExpOrderID] = int64(0)
}
if dc.MapStorage[utils.FirstExpOrderID].(int64) == 0 ||
dc.MapStorage[utils.FirstExpOrderID].(int64) > oID {
dc.MapStorage[utils.FirstExpOrderID] = oID
}
if dc.MapStorage[utils.LastExpOrderID].(int64) < oID {
dc.MapStorage[utils.LastExpOrderID] = oID
if i+1 < exp.Cfg().Attempts {
time.Sleep(time.Duration(fib()) * time.Second)
}
}
if cost, err := ev.GetFloat64(utils.Cost); err == nil {
if _, has := dc.MapStorage[utils.TotalCost]; !has {
dc.MapStorage[utils.TotalCost] = float64(0.0)
}
dc.MapStorage[utils.TotalCost] = dc.MapStorage[utils.TotalCost].(float64) + cost
}
if tor, err := ev.GetString(utils.ToR); err == nil {
if usage, err := ev.GetDuration(utils.Usage); err == nil {
switch tor {
case utils.MetaVoice:
if _, has := dc.MapStorage[utils.TotalDuration]; !has {
dc.MapStorage[utils.TotalDuration] = time.Duration(0)
}
dc.MapStorage[utils.TotalDuration] = dc.MapStorage[utils.TotalDuration].(time.Duration) + usage
case utils.MetaSMS:
if _, has := dc.MapStorage[utils.TotalSMSUsage]; !has {
dc.MapStorage[utils.TotalSMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalSMSUsage] = dc.MapStorage[utils.TotalSMSUsage].(time.Duration) + usage
case utils.MetaMMS:
if _, has := dc.MapStorage[utils.TotalMMSUsage]; !has {
dc.MapStorage[utils.TotalMMSUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalMMSUsage] = dc.MapStorage[utils.TotalMMSUsage].(time.Duration) + usage
case utils.MetaGeneric:
if _, has := dc.MapStorage[utils.TotalGenericUsage]; !has {
dc.MapStorage[utils.TotalGenericUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalGenericUsage] = dc.MapStorage[utils.TotalGenericUsage].(time.Duration) + usage
case utils.MetaData:
if _, has := dc.MapStorage[utils.TotalDataUsage]; !has {
dc.MapStorage[utils.TotalDataUsage] = time.Duration(0)
}
dc.MapStorage[utils.TotalDataUsage] = dc.MapStorage[utils.TotalDataUsage].(time.Duration) + usage
}
}
if err != nil {
return
}
return
}