From 31387e2fe2dccbb6052d2e2bbd21608ed808f945 Mon Sep 17 00:00:00 2001 From: TeoV Date: Mon, 1 Jun 2020 16:55:36 +0300 Subject: [PATCH] Finish implementation for ees *file_csv format --- agents/agentreq_test.go | 41 ++++ config/config_defaults.go | 2 +- config/config_json_test.go | 9 +- config/config_test.go | 44 ++-- config/eescfg_test.go | 22 +- data/conf/samples/ees/cgrates.json | 11 +- ees/ee.go | 6 +- ees/eereq.go | 363 +++++++++++++++++++++++++++++ ees/ees.go | 21 +- ees/filecsv.go | 176 +++++--------- ees/filecsv_it_test.go | 16 +- ees/filefwv.go | 172 +++++--------- utils/consts.go | 18 ++ utils/orderednavigablemap.go | 2 +- 14 files changed, 612 insertions(+), 291 deletions(-) create mode 100644 ees/eereq.go diff --git a/agents/agentreq_test.go b/agents/agentreq_test.go index c03f8c206..b0c16c321 100644 --- a/agents/agentreq_test.go +++ b/agents/agentreq_test.go @@ -2039,6 +2039,47 @@ func TestAgReqDynamicPath(t *testing.T) { } } +func TestAgReqRoundingDecimals(t *testing.T) { + cfg, _ := config.NewDefaultCGRConfig() + data := engine.NewInternalDB(nil, nil, true, cfg.DataDbCfg().Items) + dm := engine.NewDataManager(data, config.CgrConfig().CacheCfg(), nil) + filterS := engine.NewFilterS(cfg, nil, dm) + agReq := NewAgentRequest(nil, nil, nil, nil, nil, nil, "cgrates.org", "", filterS, nil, nil) + // populate request, emulating the way will be done in HTTPAgent + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.ToR, PathItems: utils.PathItems{{Field: utils.ToR}}}, utils.NewNMData(utils.VOICE)) + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Account, PathItems: utils.PathItems{{Field: utils.Account}}}, utils.NewNMData("1001")) + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Destination, PathItems: utils.PathItems{{Field: utils.Destination}}}, utils.NewNMData("1002")) + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.AnswerTime, PathItems: utils.PathItems{{Field: utils.AnswerTime}}}, utils.NewNMData( + time.Date(2013, 12, 30, 15, 0, 1, 0, time.UTC))) + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.RequestType, PathItems: utils.PathItems{{Field: utils.RequestType}}}, utils.NewNMData(utils.META_PREPAID)) + agReq.CGRRequest.Set(&utils.FullPath{Path: utils.Cost, PathItems: utils.PathItems{{Field: utils.Cost}}}, utils.NewNMData(12.12645)) + + agReq.CGRReply = &utils.NavigableMap2{} + + tplFlds := []*config.FCTemplate{ + {Tag: "Cost", + Path: utils.MetaCgrep + utils.NestingSep + utils.Cost, Type: utils.META_COMPOSED, + Value: config.NewRSRParsersMustCompile("~*cgreq.Cost{*round:3}", true, utils.INFIELD_SEP)}, + } + for _, v := range tplFlds { + v.ComputePath() + } + if err := agReq.SetFields(tplFlds); err != nil { + t.Error(err) + } + + if rcv, err := agReq.CGRReply.Field(utils.PathItems{{Field: utils.Cost}}); err != nil { + t.Error(err) + } else if sls, canCast := rcv.(*utils.NMSlice); !canCast { + t.Errorf("Cannot cast to &utils.NMSlice %+v", rcv) + } else if len(*sls) != 1 { + t.Errorf("expecting: %+v, \n received: %+v ", 1, len(*sls)) + } else if (*sls)[0].Interface() != "12.126" { + t.Errorf("expecting: %+v, \n received: %+v", + "12.126", (*sls)[0].Interface()) + } +} + /* $go test -bench=. -run=^$ -benchtime=10s -count=3 goos: linux diff --git a/config/config_defaults.go b/config/config_defaults.go index d0eec4654..0f8657f10 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -379,7 +379,7 @@ const CGRATES_CFG_JSON = ` {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, ], diff --git a/config/config_json_test.go b/config/config_json_test.go index 9438e9103..0b4356a40 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -1956,11 +1956,10 @@ func TestDfEventExporterCfg(t *testing.T) { Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.Usage), }, { - Tag: utils.StringPointer(utils.Cost), - Path: utils.StringPointer("*exp.Cost"), - Type: utils.StringPointer(utils.MetaVariable), - Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.COST), - Rounding_decimals: utils.IntPointer(4), + Tag: utils.StringPointer(utils.Cost), + Path: utils.StringPointer("*exp.Cost"), + Type: utils.StringPointer(utils.MetaVariable), + Value: utils.StringPointer(utils.DynamicDataPrefix + utils.MetaReq + utils.NestingSep + utils.COST + "{*round:4}"), }, } eCfg := &EEsJsonCfg{ diff --git a/config/config_test.go b/config/config_test.go index 54e0ddd21..5a596d47d 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -1977,12 +1977,11 @@ func TestCgrCdfEventExporter(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, Fields: []*FCTemplate{ @@ -2078,12 +2077,11 @@ func TestCgrCdfEventExporter(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, headerFields: []*FCTemplate{}, @@ -2258,12 +2256,11 @@ func TestCgrCfgEventExporterDefault(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, Fields: []*FCTemplate{ @@ -2359,12 +2356,11 @@ func TestCgrCfgEventExporterDefault(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, headerFields: []*FCTemplate{}, diff --git a/config/eescfg_test.go b/config/eescfg_test.go index 5f82bf5e2..9b47daa04 100644 --- a/config/eescfg_test.go +++ b/config/eescfg_test.go @@ -256,12 +256,11 @@ func TestEventExporterSameID(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, contentFields: []*FCTemplate{ @@ -357,12 +356,11 @@ func TestEventExporterSameID(t *testing.T) { Layout: time.RFC3339, }, { - Tag: utils.Cost, - Path: "*exp.Cost", - Type: utils.MetaVariable, - Value: NewRSRParsersMustCompile("~*req.Cost", true, utils.INFIELD_SEP), - Layout: time.RFC3339, - RoundingDecimals: utils.IntPointer(4), + Tag: utils.Cost, + Path: "*exp.Cost", + Type: utils.MetaVariable, + Value: NewRSRParsersMustCompile("~*req.Cost{*round:4}", true, utils.INFIELD_SEP), + Layout: time.RFC3339, }, }, headerFields: []*FCTemplate{}, diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index c14ae3917..a9f7ff879 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -86,7 +86,7 @@ {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, ], }, { @@ -100,6 +100,7 @@ "field_separator": ",", "filters": ["*string:~*req.ExporterUsed:CSVExporterComposed"], "fields":[ + {"tag": "Number", "path": "*hdr.Number", "type": "*constant", "value": "NumberOfEvent"}, {"tag": "CGRID", "path": "*hdr.CGRID", "type": "*constant", "value": "CGRID"}, {"tag": "RunID", "path": "*hdr.RunID", "type": "*constant", "value": "RunID"}, {"tag": "ToR", "path": "*hdr.ToR", "type": "*constant", "value": "ToR"}, @@ -115,6 +116,7 @@ {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"}, {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"}, + {"tag": "Number", "path": "*exp.Number", "type": "*variable", "value": "~*dc.NumberOfEvents"}, {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, @@ -129,7 +131,12 @@ {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, - {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:4}"}, + + {"tag": "NumberOfEvents", "path": "*trl.NumberOfEvents", "type": "*variable", "value": "~*dc.NumberOfEvents"}, + {"tag": "TotalDuration", "path": "*trl.TotalDuration", "type": "*variable", "value": "~*dc.TotalDuration"}, + {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"}, + {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, ], } ] diff --git a/ees/ee.go b/ees/ee.go index 8872d9550..7889df5aa 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -33,12 +33,12 @@ type EventExporter interface { } // NewEventExporter produces exporters -func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (ee EventExporter, err error) { +func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (ee EventExporter, err error) { switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { case utils.MetaFileCSV: - return NewFileCSVee(cgrCfg, cfgIdx, filterS) + return NewFileCSVee(cgrCfg, cfgIdx, filterS, dc) case utils.MetaFileFWV: - return NewFileFWVee(cgrCfg, cfgIdx, filterS) + return NewFileFWVee(cgrCfg, cfgIdx, filterS, dc) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } diff --git a/ees/eereq.go b/ees/eereq.go new file mode 100644 index 000000000..e9d2045e9 --- /dev/null +++ b/ees/eereq.go @@ -0,0 +1,363 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "fmt" + "math" + "net" + "strconv" + "strings" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewEventExporterRequest returns a new EventExporterRequest +func NewEventExporterRequest(req utils.DataProvider, dc utils.MapStorage, + tnt, timezone string, filterS *engine.FilterS) (eeR *EventExporterRequest) { + eeR = &EventExporterRequest{ + req: req, + tmz: timezone, + tnt: tnt, + filterS: filterS, + cnt: utils.NewOrderedNavigableMap(), + hdr: utils.NewOrderedNavigableMap(), + trl: utils.NewOrderedNavigableMap(), + dc: dc, + } + eeR.dynamicProvider = utils.NewDynamicDataProvider(eeR) + return +} + +// EventExporterRequest represents data related to one request towards agent +// implements utils.DataProvider so we can pass it to filters +type EventExporterRequest struct { + req utils.DataProvider // request + eeDP utils.DataProvider // eventExporter DataProvider + tnt string + tmz string + cnt *utils.OrderedNavigableMap // Used in reply to access the request that was send + hdr *utils.OrderedNavigableMap // Used in reply to access the request that was send + trl *utils.OrderedNavigableMap // Used in reply to access the request that was send + dc utils.MapStorage + + filterS *engine.FilterS + dynamicProvider *utils.DynamicDataProvider +} + +// String implements utils.DataProvider +func (eeR *EventExporterRequest) String() string { + return utils.ToIJSON(eeR) +} + +// RemoteHost implements utils.DataProvider +func (eeR *EventExporterRequest) RemoteHost() net.Addr { + return eeR.req.RemoteHost() +} + +// FieldAsInterface implements utils.DataProvider +func (eeR *EventExporterRequest) FieldAsInterface(fldPath []string) (val interface{}, err error) { + switch fldPath[0] { + default: + return nil, fmt.Errorf("unsupported field prefix: <%s>", fldPath[0]) + case utils.MetaReq: + val, err = eeR.req.FieldAsInterface(fldPath[1:]) + case utils.MetaCache: + if cacheVal, ok := engine.Cache.Get(utils.CacheUCH, strings.Join(fldPath[1:], utils.NestingSep)); !ok { + err = utils.ErrNotFound + } else { + val = cacheVal + } + case utils.MetaDC: + val, err = eeR.dc.FieldAsInterface(fldPath[1:]) + } + if err != nil { + return + } + if nmItems, isNMItems := val.(*utils.NMSlice); isNMItems { // special handling of NMItems, take the last value out of it + val = (*nmItems)[len(*nmItems)-1].Interface() + } + return +} + +// Field implements utils.NMInterface +func (eeR *EventExporterRequest) Field(fldPath utils.PathItems) (val utils.NMInterface, err error) { + switch fldPath[0].Field { + default: + return nil, fmt.Errorf("unsupported field prefix: <%s>", fldPath[0]) + case utils.MetaExp: + val, err = eeR.cnt.Field(fldPath[1:]) + case utils.MetaHdr: + val, err = eeR.hdr.Field(fldPath[1:]) + case utils.MetaTrl: + val, err = eeR.trl.Field(fldPath[1:]) + } + return +} + +// FieldAsString implements utils.DataProvider +func (eeR *EventExporterRequest) FieldAsString(fldPath []string) (val string, err error) { + var iface interface{} + if iface, err = eeR.FieldAsInterface(fldPath); err != nil { + return + } + return utils.IfaceAsString(iface), nil +} + +//SetFields will populate fields of AgentRequest out of templates +func (eeR *EventExporterRequest) SetFields(tplFlds []*config.FCTemplate) (err error) { + for _, tplFld := range tplFlds { + if pass, err := eeR.filterS.Pass(eeR.tnt, + tplFld.Filters, eeR); err != nil { + return err + } else if !pass { + continue + } + var out interface{} + out, err = eeR.ParseField(tplFld) + if err != nil { + if err == utils.ErrNotFound { + if !tplFld.Mandatory { + err = nil + continue + } + err = utils.ErrPrefixNotFound(tplFld.Tag) + } + return + } + fullPath := &utils.FullPath{ + PathItems: tplFld.GetPathItems().Clone(), // need to clone so me do not modify the template + Path: tplFld.Path, + } + + nMItm := &config.NMItem{Data: out, Path: tplFld.GetPathSlice()[1:], Config: tplFld} + switch tplFld.Type { + case utils.META_COMPOSED: + err = utils.ComposeNavMapVal(eeR, fullPath, nMItm) + default: + _, err = eeR.Set(fullPath, &utils.NMSlice{nMItm}) + } + if err != nil { + return + } + + if tplFld.Blocker { // useful in case of processing errors first + break + } + } + return +} + +// Set implements utils.NMInterface +func (eeR *EventExporterRequest) Set(fullPath *utils.FullPath, nm utils.NMInterface) (added bool, err error) { + switch fullPath.PathItems[0].Field { + default: + return false, fmt.Errorf("unsupported field prefix: <%s> when set field", fullPath.PathItems[0].Field) + case utils.MetaExp: + return eeR.cnt.Set(&utils.FullPath{ + PathItems: fullPath.PathItems[1:], + Path: fullPath.Path[4:], + }, nm) + case utils.MetaHdr: + return eeR.hdr.Set(&utils.FullPath{ + PathItems: fullPath.PathItems[1:], + Path: fullPath.Path[4:], + }, nm) + case utils.MetaTrl: + return eeR.trl.Set(&utils.FullPath{ + PathItems: fullPath.PathItems[1:], + Path: fullPath.Path[4:], + }, nm) + case utils.MetaCache: + err = engine.Cache.Set(utils.CacheUCH, fullPath.Path[7:], nm, nil, true, utils.NonTransactional) + } + return false, err +} + +// ParseField outputs the value based on the template item +func (eeR *EventExporterRequest) ParseField( + cfgFld *config.FCTemplate) (out interface{}, err error) { + var isString bool + switch cfgFld.Type { + default: + return utils.EmptyString, fmt.Errorf("unsupported type: <%s>", cfgFld.Type) + case utils.META_NONE: + return + case utils.META_FILLER: + out, err = cfgFld.Value.ParseValue(utils.EmptyString) + cfgFld.Padding = utils.MetaRight + isString = true + case utils.META_CONSTANT: + out, err = cfgFld.Value.ParseValue(utils.EmptyString) + isString = true + case utils.MetaRemoteHost: + out = eeR.RemoteHost().String() + isString = true + case utils.MetaVariable, utils.META_COMPOSED, utils.MetaGroup: + out, err = cfgFld.Value.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + isString = true + case utils.META_USAGE_DIFFERENCE: + if len(cfgFld.Value) != 2 { + return nil, fmt.Errorf("invalid arguments <%s> to %s", + utils.ToJSON(cfgFld.Value), utils.META_USAGE_DIFFERENCE) + } + strVal1, err := cfgFld.Value[0].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + strVal2, err := cfgFld.Value[1].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + tEnd, err := utils.ParseTimeDetectLayout(strVal1, eeR.tmz) + if err != nil { + return "", err + } + tStart, err := utils.ParseTimeDetectLayout(strVal2, eeR.tmz) + if err != nil { + return "", err + } + out = tEnd.Sub(tStart).String() + isString = true + case utils.MetaCCUsage: + if len(cfgFld.Value) != 3 { + return nil, fmt.Errorf("invalid arguments <%s> to %s", + utils.ToJSON(cfgFld.Value), utils.MetaCCUsage) + } + strVal1, err := cfgFld.Value[0].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) // ReqNr + if err != nil { + return "", err + } + reqNr, err := strconv.ParseInt(strVal1, 10, 64) + if err != nil { + return "", fmt.Errorf("invalid requestNumber <%s> to %s", + strVal1, utils.MetaCCUsage) + } + strVal2, err := cfgFld.Value[1].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) // TotalUsage + if err != nil { + return "", err + } + usedCCTime, err := utils.ParseDurationWithNanosecs(strVal2) + if err != nil { + return "", fmt.Errorf("invalid usedCCTime <%s> to %s", + strVal2, utils.MetaCCUsage) + } + strVal3, err := cfgFld.Value[2].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) // DebitInterval + if err != nil { + return "", err + } + debitItvl, err := utils.ParseDurationWithNanosecs(strVal3) + if err != nil { + return "", fmt.Errorf("invalid debitInterval <%s> to %s", + strVal3, utils.MetaCCUsage) + } + mltpl := reqNr - 1 // terminate will be ignored (init request should always be 0) + if mltpl < 0 { + mltpl = 0 + } + return usedCCTime + time.Duration(debitItvl.Nanoseconds()*mltpl), nil + case utils.MetaSum: + iFaceVals := make([]interface{}, len(cfgFld.Value)) + for i, val := range cfgFld.Value { + strVal, err := val.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + iFaceVals[i] = utils.StringToInterface(strVal) + } + out, err = utils.Sum(iFaceVals...) + case utils.MetaDifference: + iFaceVals := make([]interface{}, len(cfgFld.Value)) + for i, val := range cfgFld.Value { + strVal, err := val.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + iFaceVals[i] = utils.StringToInterface(strVal) + } + out, err = utils.Difference(iFaceVals...) + case utils.MetaMultiply: + iFaceVals := make([]interface{}, len(cfgFld.Value)) + for i, val := range cfgFld.Value { + strVal, err := val.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + iFaceVals[i] = utils.StringToInterface(strVal) + } + out, err = utils.Multiply(iFaceVals...) + case utils.MetaDivide: + iFaceVals := make([]interface{}, len(cfgFld.Value)) + for i, val := range cfgFld.Value { + strVal, err := val.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return "", err + } + iFaceVals[i] = utils.StringToInterface(strVal) + } + out, err = utils.Divide(iFaceVals...) + case utils.MetaValueExponent: + if len(cfgFld.Value) != 2 { + return nil, fmt.Errorf("invalid arguments <%s> to %s", + utils.ToJSON(cfgFld.Value), utils.MetaValueExponent) + } + strVal1, err := cfgFld.Value[0].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) // String Value + if err != nil { + return "", err + } + val, err := strconv.ParseFloat(strVal1, 64) + if err != nil { + return "", fmt.Errorf("invalid value <%s> to %s", + strVal1, utils.MetaValueExponent) + } + strVal2, err := cfgFld.Value[1].ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) // String Exponent + if err != nil { + return "", err + } + exp, err := strconv.Atoi(strVal2) + if err != nil { + return "", err + } + out = strconv.FormatFloat(utils.Round(val*math.Pow10(exp), + config.CgrConfig().GeneralCfg().RoundingDecimals, utils.ROUNDING_MIDDLE), 'f', -1, 64) + case utils.MetaUnixTimestamp: + val, err := cfgFld.Value.ParseDataProvider(eeR.dynamicProvider, utils.NestingSep) + if err != nil { + return nil, err + } + t, err := utils.ParseTimeDetectLayout(val, cfgFld.Timezone) + if err != nil { + return nil, err + } + out = strconv.Itoa(int(t.Unix())) + } + + if err != nil && + !strings.HasPrefix(err.Error(), "Could not find") { + return + } + if isString { // format the string additionally with fmtFieldWidth + out, err = utils.FmtFieldWidth(cfgFld.Tag, out.(string), cfgFld.Width, + cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory) + } + return +} diff --git a/ees/ees.go b/ees/ees.go index e31dd9805..16b9ac7e7 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -21,6 +21,7 @@ package ees import ( "fmt" "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -176,7 +177,7 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s } } if !isCached { - if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS); err != nil { + if ee, err = NewEventExporter(eeS.cfg, cfgIdx, eeS.filterS, newEEMetrics()); err != nil { return } if hasCache { @@ -209,3 +210,21 @@ func (eeS *EventExporterS) V1ProcessEvent(cgrEv *utils.CGREventWithOpts, rply *s return } + +func newEEMetrics() utils.MapStorage { + return utils.MapStorage{ + utils.NumberOfEvents: 0, + utils.TotalCost: 0.0, + utils.PositiveExports: utils.StringSet{}, + utils.NegativeExports: utils.StringSet{}, + utils.FirstExpOrderID: 0, + utils.LastExpOrderID: 0, + utils.FirstEventATime: time.Time{}, + utils.LastEventATime: time.Time{}, + utils.TotalDuration: time.Duration(0), + utils.TotalSMSUsage: time.Duration(0), + utils.TotalMMSUsage: time.Duration(0), + utils.TotalGenericUsage: time.Duration(0), + utils.TotalDataUsage: time.Duration(0), + } +} diff --git a/ees/filecsv.go b/ees/filecsv.go index a24778bdc..4388b1176 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -23,7 +23,6 @@ import ( "fmt" "os" "path" - "strconv" "sync" "time" @@ -33,9 +32,11 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fCsv *FileCSVee, err error) { +func NewFileCSVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, + dc utils.MapStorage) (fCsv *FileCSVee, err error) { + dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID fCsv = &FileCSVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS} + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = fCsv.init() return } @@ -49,15 +50,7 @@ type FileCSVee struct { file *os.File csvWriter *csv.Writer sync.RWMutex - - firstEventATime, lastEventATime time.Time - numberOfEvents int - totalDuration, totalDataUsage, totalSmsUsage, - totalMmsUsage, totalGenericUsage time.Duration - totalCost float64 - firstExpOrderID, lastExpOrderID int64 - positiveExports utils.StringSet - negativeExports utils.StringSet + dc utils.MapStorage } // init will create all the necessary dependencies, including opening the file @@ -72,8 +65,6 @@ func (fCsv *FileCSVee) init() (err error) { if len(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep) > 0 { fCsv.csvWriter.Comma = rune(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].FieldSep[0]) } - fCsv.positiveExports = utils.StringSet{} - fCsv.negativeExports = utils.StringSet{} return fCsv.composeHeader() } @@ -101,60 +92,64 @@ func (fCsv *FileCSVee) OnEvicted(_ string, _ interface{}) { func (fCsv *FileCSVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fCsv.Lock() defer fCsv.Unlock() - fCsv.numberOfEvents++ + + fCsv.dc[utils.NumberOfEvents] = fCsv.dc[utils.NumberOfEvents].(int) + 1 + var csvRecord []string - navMp := utils.MapStorage{utils.MetaReq: cgrEv.Event} - for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields() { - if pass, err := fCsv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters, - navMp); err != nil || !pass { - continue + req := utils.MapStorage{} + for k, v := range cgrEv.Event { + req[k] = v + } + eeReq := NewEventExporterRequest(req, fCsv.dc, cgrEv.Tenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone, + fCsv.filterS) + + if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].ContentFields()); err != nil { + fCsv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + return + } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil { + return } - val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - fCsv.negativeExports.Add(cgrEv.ID) - return err - } - csvRecord = append(csvRecord, val) + csvRecord = append(csvRecord, strVal) } if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fCsv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if fCsv.firstEventATime.IsZero() || fCsv.firstEventATime.Before(aTime) { - fCsv.firstEventATime = aTime + if fCsv.dc[utils.FirstEventATime].(time.Time).IsZero() || fCsv.dc[utils.FirstEventATime].(time.Time).Before(aTime) { + fCsv.dc[utils.FirstEventATime] = aTime } - if aTime.After(fCsv.lastEventATime) { - fCsv.lastEventATime = aTime + if aTime.After(fCsv.dc[utils.LastEventATime].(time.Time)) { + fCsv.dc[utils.LastEventATime] = aTime } } if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if fCsv.firstExpOrderID > oID || fCsv.firstExpOrderID == 0 { - fCsv.firstExpOrderID = oID + if fCsv.dc[utils.FirstExpOrderID].(int64) > oID || fCsv.dc[utils.FirstExpOrderID].(int64) == 0 { + fCsv.dc[utils.FirstExpOrderID] = oID } - if fCsv.lastExpOrderID < oID { - fCsv.lastExpOrderID = oID + if fCsv.dc[utils.LastExpOrderID].(int64) < oID { + fCsv.dc[utils.LastExpOrderID] = oID } } if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - fCsv.totalCost += cost + fCsv.dc[utils.TotalCost] = fCsv.dc[utils.TotalCost].(float64) + cost } if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { switch tor { case utils.VOICE: - fCsv.totalDuration += usage + fCsv.dc[utils.TotalDuration] = fCsv.dc[utils.TotalDuration].(time.Duration) + usage case utils.SMS: - fCsv.totalSmsUsage += usage + fCsv.dc[utils.TotalSMSUsage] = fCsv.dc[utils.TotalSMSUsage].(time.Duration) + usage case utils.MMS: - fCsv.totalMmsUsage += usage + fCsv.dc[utils.TotalMMSUsage] = fCsv.dc[utils.TotalMMSUsage].(time.Duration) + usage case utils.GENERIC: - fCsv.totalGenericUsage += usage + fCsv.dc[utils.TotalGenericUsage] = fCsv.dc[utils.TotalGenericUsage].(time.Duration) + usage case utils.DATA: - fCsv.totalDataUsage += usage + fCsv.dc[utils.TotalDataUsage] = fCsv.dc[utils.TotalDataUsage].(time.Duration) + usage } } } - fCsv.positiveExports.Add(cgrEv.ID) + fCsv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) fCsv.csvWriter.Write(csvRecord) return } @@ -165,30 +160,17 @@ func (fCsv *FileCSVee) composeHeader() (err error) { return } var csvRecord []string - for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields() { - var outVal string - switch cfgFld.Type { - case utils.META_CONSTANT: - outVal, err = cfgFld.Value.ParseValue(utils.EmptyString) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - return err - } - case utils.MetaExportID: - outVal = fCsv.id - case utils.MetaTimeNow: - outVal = time.Now().String() - default: - return fmt.Errorf("unsupported type in header for field: <%+v>", utils.ToJSON(cfgFld)) + eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone, + fCsv.filterS) + if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields()); err != nil { + return + } + for el := eeReq.hdr.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.hdr.FieldAsString(el.Value.Slice()); err != nil { + return } - fmtOut := outVal - if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, - cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { - return err - } - csvRecord = append(csvRecord, fmtOut) + csvRecord = append(csvRecord, strVal) } return fCsv.csvWriter.Write(csvRecord) } @@ -199,57 +181,17 @@ func (fCsv *FileCSVee) composeTrailer() (err error) { return } var csvRecord []string - for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields() { - var val string - switch cfgFld.Type { - case utils.META_CONSTANT: - val, err = cfgFld.Value.ParseValue(utils.EmptyString) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - return err - } - case utils.MetaExportID: - val = fCsv.id - case utils.MetaTimeNow: - val = time.Now().String() - case utils.MetaFirstEventATime: - val = fCsv.firstEventATime.Format(cfgFld.Layout) - case utils.MetaLastEventATime: - val = fCsv.lastEventATime.Format(cfgFld.Layout) - case utils.MetaEventNumber: - val = strconv.Itoa(fCsv.numberOfEvents) - case utils.MetaEventCost: - rounding := fCsv.cgrCfg.GeneralCfg().RoundingDecimals - if cfgFld.RoundingDecimals != nil { - rounding = *cfgFld.RoundingDecimals - } - val = strconv.FormatFloat(utils.Round(fCsv.totalCost, - rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64) - case utils.MetaVoiceUsage: - val = fCsv.totalDuration.String() - case utils.MetaDataUsage: - val = strconv.Itoa(int(fCsv.totalDataUsage.Nanoseconds())) - case utils.MetaSMSUsage: - val = strconv.Itoa(int(fCsv.totalSmsUsage.Nanoseconds())) - case utils.MetaMMSUsage: - val = strconv.Itoa(int(fCsv.totalMmsUsage.Nanoseconds())) - case utils.MetaGenericUsage: - val = strconv.Itoa(int(fCsv.totalGenericUsage.Nanoseconds())) - case utils.MetaNegativeExports: - val = strconv.Itoa(len(fCsv.negativeExports.AsSlice())) - case utils.MetaPositiveExports: - val = strconv.Itoa(len(fCsv.positiveExports.AsSlice())) - default: - return fmt.Errorf("unsupported type in trailer for field: <%+v>", utils.ToJSON(cfgFld)) + eeReq := NewEventExporterRequest(nil, fCsv.dc, fCsv.cgrCfg.GeneralCfg().DefaultTenant, fCsv.cgrCfg.GeneralCfg().DefaultTimezone, + fCsv.filterS) + if err = eeReq.SetFields(fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields()); err != nil { + return + } + for el := eeReq.trl.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.trl.FieldAsString(el.Value.Slice()); err != nil { + return } - fmtOut := val - if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, val, cfgFld.Width, - cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { - return err - } - csvRecord = append(csvRecord, fmtOut) + csvRecord = append(csvRecord, strVal) } return fCsv.csvWriter.Write(csvRecord) } diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 5a44f0ef6..564cb9567 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -53,9 +53,9 @@ var ( testCsvExportEvent, testCsvVerifyExports, testCsvExportComposedEvent, - //testCsvVerifyComposedExports, + testCsvVerifyComposedExports, testCsvStopCgrEngine, - testCleanDirectory, + //testCleanDirectory, } ) @@ -252,7 +252,7 @@ func testCsvExportComposedEvent(t *testing.T) { utils.AnswerTime: time.Unix(1383813746, 0).UTC(), utils.Usage: time.Duration(10) * time.Second, utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, + utils.Cost: 1.016374, "ExporterUsed": "CSVExporterComposed", "ExtraFields": map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, @@ -281,7 +281,7 @@ func testCsvExportComposedEvent(t *testing.T) { utils.AnswerTime: time.Unix(1383813746, 0).UTC(), utils.Usage: time.Duration(1), utils.RunID: utils.MetaDefault, - utils.Cost: 0.15, + utils.Cost: 0.155462, "ExporterUsed": "CSVExporterComposed", "ExtraFields": map[string]string{"extra1": "val_extra1", "extra2": "val_extra2", "extra3": "val_extra3"}, @@ -316,10 +316,10 @@ func testCsvVerifyComposedExports(t *testing.T) { if len(files) != 1 { t.Errorf("Expected %+v, received: %+v", 1, len(files)) } - eCnt := "CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost" + - "\n" + - "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01\\n2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdf,wer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15" + - "\n" + eCnt := "NumberOfEvent,CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost" + "\n" + + "1,dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.0164" + "\n" + + "2,2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdfwer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.1555" + "\n" + + "2,10s,1ns,1.1718" + "\n" if outContent1, err := ioutil.ReadFile(files[0]); err != nil { t.Error(err) } else if eCnt != string(outContent1) { diff --git a/ees/filefwv.go b/ees/filefwv.go index c4634bb42..76dc8dae5 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -23,7 +23,6 @@ import ( "io" "os" "path" - "strconv" "sync" "time" @@ -32,9 +31,9 @@ import ( "github.com/cgrates/cgrates/utils" ) -func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fFwv *FileFWVee, err error) { +func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) { fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, - cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS} + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = fFwv.init() return } @@ -46,16 +45,8 @@ type FileFWVee struct { cfgIdx int // index of config instance within ERsCfg.Readers filterS *engine.FilterS file *os.File + dc utils.MapStorage sync.RWMutex - - firstEventATime, lastEventATime time.Time - numberOfEvents int - totalDuration, totalDataUsage, totalSmsUsage, - totalMmsUsage, totalGenericUsage time.Duration - totalCost float64 - firstExpOrderID, lastExpOrderID int64 - positiveExports utils.StringSet - negativeExports utils.StringSet } // init will create all the necessary dependencies, including opening the file @@ -65,8 +56,6 @@ func (fFwv *FileFWVee) init() (err error) { fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)); err != nil { return } - fFwv.positiveExports = utils.StringSet{} - fFwv.negativeExports = utils.StringSet{} return fFwv.composeHeader() } @@ -93,60 +82,62 @@ func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { fFwv.Lock() defer fFwv.Unlock() - fFwv.numberOfEvents++ + fFwv.dc[utils.NumberOfEvents] = fFwv.dc[utils.NumberOfEvents].(int) + 1 var records []string - navMp := utils.MapStorage{utils.MetaReq: cgrEv.Event} - for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields() { - if pass, err := fFwv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters, - navMp); err != nil || !pass { - continue + req := utils.MapStorage{} + for k, v := range cgrEv.Event { + req[k] = v + } + eeReq := NewEventExporterRequest(req, fFwv.dc, cgrEv.Tenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone, + fFwv.filterS) + + if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields()); err != nil { + fFwv.dc[utils.NegativeExports].(utils.StringSet).Add(cgrEv.ID) + return + } + for el := eeReq.cnt.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.cnt.FieldAsString(el.Value.Slice()); err != nil { + return } - val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - fFwv.negativeExports.Add(cgrEv.ID) - return err - } - records = append(records, val) + records = append(records, strVal) } if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fFwv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { - if fFwv.firstEventATime.IsZero() || fFwv.firstEventATime.Before(aTime) { - fFwv.firstEventATime = aTime + if fFwv.dc[utils.FirstEventATime].(time.Time).IsZero() || fFwv.dc[utils.FirstEventATime].(time.Time).Before(aTime) { + fFwv.dc[utils.FirstEventATime] = aTime } - if aTime.After(fFwv.lastEventATime) { - fFwv.lastEventATime = aTime + if aTime.After(fFwv.dc[utils.LastEventATime].(time.Time)) { + fFwv.dc[utils.LastEventATime] = aTime } } if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { - if fFwv.firstExpOrderID > oID || fFwv.firstExpOrderID == 0 { - fFwv.firstExpOrderID = oID + if fFwv.dc[utils.FirstExpOrderID].(int64) > oID || fFwv.dc[utils.FirstExpOrderID].(int64) == 0 { + fFwv.dc[utils.FirstExpOrderID] = oID } - if fFwv.lastExpOrderID < oID { - fFwv.lastExpOrderID = oID + if fFwv.dc[utils.LastExpOrderID].(int64) < oID { + fFwv.dc[utils.LastExpOrderID] = oID } } if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { - fFwv.totalCost += cost + fFwv.dc[utils.TotalCost] = fFwv.dc[utils.TotalCost].(float64) + cost } if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { switch tor { case utils.VOICE: - fFwv.totalDuration += usage + fFwv.dc[utils.TotalDuration] = fFwv.dc[utils.TotalDuration].(time.Duration) + usage case utils.SMS: - fFwv.totalSmsUsage += usage + fFwv.dc[utils.TotalSMSUsage] = fFwv.dc[utils.TotalSMSUsage].(time.Duration) + usage case utils.MMS: - fFwv.totalMmsUsage += usage + fFwv.dc[utils.TotalMMSUsage] = fFwv.dc[utils.TotalMMSUsage].(time.Duration) + usage case utils.GENERIC: - fFwv.totalGenericUsage += usage + fFwv.dc[utils.TotalGenericUsage] = fFwv.dc[utils.TotalGenericUsage].(time.Duration) + usage case utils.DATA: - fFwv.totalDataUsage += usage + fFwv.dc[utils.TotalDataUsage] = fFwv.dc[utils.TotalDataUsage].(time.Duration) + usage } } } - fFwv.positiveExports.Add(cgrEv.ID) + fFwv.dc[utils.PositiveExports].(utils.StringSet).Add(cgrEv.ID) for _, record := range append(records, "\n") { if _, err = io.WriteString(fFwv.file, record); err != nil { return @@ -161,30 +152,17 @@ func (fFwv *FileFWVee) composeHeader() (err error) { return } var records []string - for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields() { - var outVal string - switch cfgFld.Type { - case utils.META_CONSTANT: - outVal, err = cfgFld.Value.ParseValue(utils.EmptyString) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - return err - } - case utils.MetaExportID: - outVal = fFwv.id - case utils.MetaTimeNow: - outVal = time.Now().String() - default: - return fmt.Errorf("unsupported type in header for field: <%+v>", utils.ToJSON(cfgFld)) + eeReq := NewEventExporterRequest(nil, fFwv.dc, fFwv.cgrCfg.GeneralCfg().DefaultTenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone, + fFwv.filterS) + if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()); err != nil { + return + } + for el := eeReq.hdr.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.hdr.FieldAsString(el.Value.Slice()); err != nil { + return } - fmtOut := outVal - if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, - cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { - return err - } - records = append(records, fmtOut) + records = append(records, strVal) } for _, record := range append(records, "\n") { if _, err = io.WriteString(fFwv.file, record); err != nil { @@ -200,57 +178,17 @@ func (fFwv *FileFWVee) composeTrailer() (err error) { return } var records []string - for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields() { - var val string - switch cfgFld.Type { - case utils.META_CONSTANT: - val, err = cfgFld.Value.ParseValue(utils.EmptyString) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) - } - return err - } - case utils.MetaExportID: - val = fFwv.id - case utils.MetaTimeNow: - val = time.Now().String() - case utils.MetaFirstEventATime: - val = fFwv.firstEventATime.Format(cfgFld.Layout) - case utils.MetaLastEventATime: - val = fFwv.lastEventATime.Format(cfgFld.Layout) - case utils.MetaEventNumber: - val = strconv.Itoa(fFwv.numberOfEvents) - case utils.MetaEventCost: - rounding := fFwv.cgrCfg.GeneralCfg().RoundingDecimals - if cfgFld.RoundingDecimals != nil { - rounding = *cfgFld.RoundingDecimals - } - val = strconv.FormatFloat(utils.Round(fFwv.totalCost, - rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64) - case utils.MetaVoiceUsage: - val = fFwv.totalDuration.String() - case utils.MetaDataUsage: - val = strconv.Itoa(int(fFwv.totalDataUsage.Nanoseconds())) - case utils.MetaSMSUsage: - val = strconv.Itoa(int(fFwv.totalSmsUsage.Nanoseconds())) - case utils.MetaMMSUsage: - val = strconv.Itoa(int(fFwv.totalMmsUsage.Nanoseconds())) - case utils.MetaGenericUsage: - val = strconv.Itoa(int(fFwv.totalGenericUsage.Nanoseconds())) - case utils.MetaNegativeExports: - val = strconv.Itoa(len(fFwv.negativeExports.AsSlice())) - case utils.MetaPositiveExports: - val = strconv.Itoa(len(fFwv.positiveExports.AsSlice())) - default: - return fmt.Errorf("unsupported type in trailer for field: <%+v>", utils.ToJSON(cfgFld)) + eeReq := NewEventExporterRequest(nil, fFwv.dc, fFwv.cgrCfg.GeneralCfg().DefaultTenant, fFwv.cgrCfg.GeneralCfg().DefaultTimezone, + fFwv.filterS) + if err = eeReq.SetFields(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()); err != nil { + return + } + for el := eeReq.trl.GetFirstElement(); el != nil; el = el.Next() { + var strVal string + if strVal, err = eeReq.trl.FieldAsString(el.Value.Slice()); err != nil { + return } - fmtOut := val - if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, val, cfgFld.Width, - cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { - return err - } - records = append(records, fmtOut) + records = append(records, strVal) } for _, record := range append(records, "\n") { if _, err = io.WriteString(fFwv.file, record); err != nil { diff --git a/utils/consts.go b/utils/consts.go index 9c903c643..4751a9a52 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -376,6 +376,7 @@ const ( MetaDispatcherHosts = "*dispatcher_hosts" MetaFilters = "*filters" MetaCDRs = "*cdrs" + MetaDC = "*dc" MetaCaches = "*caches" MetaCache = "*cache" MetaGuardian = "*guardians" @@ -2154,6 +2155,23 @@ const ( RoutesOffset = "RoutesOffset" ) +// EventExporter metrics +const ( + NumberOfEvents = "NumberOfEvents" + TotalCost = "TotalCost" + PositiveExports = "PositiveExports" + NegativeExports = "NegativeExports" + FirstExpOrderID = "FirstExpOrderID" + LastExpOrderID = "LastExpOrderID" + FirstEventATime = "FirstEventATime" + LastEventATime = "LastEventATime" + TotalDuration = "TotalDuration" + TotalDataUsage = "TotalDataUsage" + TotalSMSUsage = "TotalSMSUsage" + TotalMMSUsage = "TotalMMSUsage" + TotalGenericUsage = "TotalGenericUsage" +) + func buildCacheInstRevPrefixes() { CachePrefixToInstance = make(map[string]string) for k, v := range CacheInstanceToPrefix { diff --git a/utils/orderednavigablemap.go b/utils/orderednavigablemap.go index 5c5e018c1..ca7883285 100644 --- a/utils/orderednavigablemap.go +++ b/utils/orderednavigablemap.go @@ -143,7 +143,7 @@ func (onm OrderedNavigableMap) Len() int { return onm.nm.Len() } -// FieldAsString returns thevalue from path as string +// FieldAsString returns the value from path as string func (onm *OrderedNavigableMap) FieldAsString(fldPath []string) (str string, err error) { var val NMInterface val, err = onm.nm.Field(NewPathItems(fldPath))