Finish implementation for fwv in EventExporter

This commit is contained in:
TeoV
2020-06-02 17:01:29 +03:00
committed by Dan Christian Bogos
parent 31387e2fe2
commit 578d7030bf
9 changed files with 298 additions and 2 deletions

View File

@@ -138,6 +138,86 @@
{"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"},
{"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"},
],
},
{
"id": "FwvExporter",
"type": "*file_fwv",
"export_path": "/tmp/testFWV",
"tenant": "cgrates.org",
"flags": ["*attributes"],
"attribute_context": "customContext",
"attempts": 1,
"field_separator": ",",
"filters": ["*string:~*req.ExporterUsed:FWVExporter"],
"fields":[
{"tag": "TypeOfRecord", "path": "*hdr.TypeOfRecord", "type": "*constant",
"value": "10", "width": 2},
{"tag": "Filler1", "path": "*hdr.Filler1", "type": "*filler", "width": 3},
{"tag": "DistributorCode", "path": "*hdr.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "FileSeqNr", "path": "*hdr.FileSeqNr", "type": "*variable",
"value": "~*dc.ExportID","width": 5,"strip": "*right","padding": "*zeroleft"},
{"tag": "FileCreationTime", "path": "*hdr.FileCreationTime",
"type": "*variable","value":"~*dc.TimeNow{*time_string:020106150400}",
"width": 12 },
{"tag": "FileVersion", "path": "*hdr.FileVersion", "type": "*constant",
"value": "01","width": 2},
{"tag": "Filler2", "path": "*hdr.Filler2", "type": "*filler",
"width": 105},
{"tag": "TypeOfRecord", "path": "*exp.TypeOfRecord", "type": "*constant",
"value": "20","width": 2},
{"tag": "Account", "path": "*exp.Account", "type": "*variable",
"value": "~*req.Account","width": 12,"strip": "*left","padding": "*right"},
{"tag": "Subject", "path": "*exp.Subject", "type": "*variable",
"value": "~*req.Subject","width": 5,"strip": "*left","padding": "*right"},
{"tag": "CLI", "path": "*exp.CLI", "type": "*constant",
"value": "cli","width": 15,"strip": "*xright","padding": "*right"},
{"tag": "Destination", "path": "*exp.Destination", "type": "*variable",
"value": "~*req.Destination","width": 24,"strip": "*xright","padding": "*right"},
{"tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": "02","width": 2},
{"tag": "SubtypeTOR", "path": "*exp.SubtypeTOR", "type": "*constant", "value": "11",
"width": 4, "padding": "*right"},
{"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable",
"value": "~*req.SetupTime{*time_string:020106150400}", "width": 12, "padding": "*right","strip": "*right"},
{"tag": "Duration", "path": "*exp.Duration", "type": "*variable", "value": "~*req.Usage",
"width": 6, "strip": "*right","padding": "*right","layout": "seconds"},
{"tag": "DataVolume", "path": "*exp.DataVolume", "type": "*filler","width": 6},
{"tag": "TaxCode", "path": "*exp.TaxCode", "type": "*constant","value":"1","width": 1},
{"tag": "OperatorCode", "path": "*exp.OperatorCode", "type": "*constant","value":"opercode",
"width": 2, "strip": "*right", "padding": "*right"},
{"tag": "ProductId", "path": "*exp.ProductId", "type": "*variable","value":"~*req.ProductId",
"width": 5, "strip": "*right", "padding": "*right"},
{"tag": "NetworkId", "path": "*exp.NetworkId", "type": "*constant","value":"3", "width": 1},
{"tag": "CallId", "path": "*exp.CallId", "type": "*variable","value":"~*req.OriginID",
"width": 16, "padding": "*right"},
{"tag": "Filler1", "path": "*exp.Filler1", "type": "*filler", "width": 8},
{"tag": "Filler2", "path": "*exp.Filler2", "type": "*filler", "width": 8},
{"tag": "TerminationCode", "path": "*exp.TerminationCode", "type": "*variable",
"value":"~*req.Operator;~*req.Product", "width": 5,"strip": "*right","padding": "*right"},
{"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:5}",
"width": 9, "padding": "*zeroleft"},
{"tag": "DestinationPrivacy", "path": "*exp.DestinationPrivacy", "type": "*masked_destination",
"width": 1},
{"tag": "TypeOfRecord", "path": "*trl.TypeOfRecord", "type": "*constant",
"value": "90", "width": 2},
{"tag": "Filler1", "path": "*trl.Filler1", "type": "*filler", "width": 3},
{"tag": "DistributorCode", "path": "*trl.DistributorCode",
"type": "*constant", "value": "VOI","width": 3},
{"tag": "FileSeqNr", "path": "*trl.FileSeqNr", "type": "*variable",
"value": "~*dc.ExportID","width": 5,"strip": "*right","padding": "*zeroleft"},
{"tag": "NumberOfRecords", "path": "*trl.NumberOfRecords",
"type": "*variable", "value": "~*dc.NumberOfEvents","width": 6,"padding": "*zeroleft"},
{"tag": "CdrsDuration", "path": "*trl.CdrsDuration", "type": "*variable",
"value": "~*dc.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"},
{"tag": "FirstCdrTime", "path": "*trl.FirstCdrTime", "type": "*variable",
"value": "~*dc.FirstEventATime{*time_string:020106150400}", "width": 12},
{"tag": "LastCdrTime", "path": "*hdr.LastCdrTime", "type": "*variable",
"value": "~*dc.LastEventATime{*time_string:020106150400}", "width": 12,},
{"tag": "Filler2", "path": "*trl.Filler2", "type": "*filler",
"width": 93}
],
}
]
},

View File

@@ -202,6 +202,9 @@ func (eeR *EventExporterRequest) ParseField(
return utils.EmptyString, fmt.Errorf("unsupported type: <%s>", cfgFld.Type)
case utils.META_NONE:
return
case utils.MetaTimeNow:
out = time.Now().Format(cfgFld.Layout)
isString = true
case utils.META_FILLER:
out, err = cfgFld.Value.ParseValue(utils.EmptyString)
cfgFld.Padding = utils.MetaRight
@@ -349,6 +352,17 @@ func (eeR *EventExporterRequest) ParseField(
return nil, err
}
out = strconv.Itoa(int(t.Unix()))
case utils.MetaMaskedDestination:
//check if we have destination in the event
if dst, err := eeR.req.FieldAsString([]string{utils.Destination}); err != nil {
return nil, fmt.Errorf("error <%s> getting destination for %s",
err, utils.ToJSON(cfgFld))
} else if len(cfgFld.MaskDestID) != 0 && engine.CachedDestHasPrefix(cfgFld.MaskDestID, dst) {
out = "1"
} else {
out = "0"
}
}
if err != nil &&

View File

@@ -221,6 +221,7 @@ func newEEMetrics() utils.MapStorage {
utils.LastExpOrderID: 0,
utils.FirstEventATime: time.Time{},
utils.LastEventATime: time.Time{},
utils.TimeNow: time.Now(),
utils.TotalDuration: time.Duration(0),
utils.TotalSMSUsage: time.Duration(0),
utils.TotalMMSUsage: time.Duration(0),

View File

@@ -55,7 +55,7 @@ var (
testCsvExportComposedEvent,
testCsvVerifyComposedExports,
testCsvStopCgrEngine,
//testCleanDirectory,
testCleanDirectory,
}
)

View File

@@ -32,6 +32,7 @@ import (
)
func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) {
dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID
fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID,
cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc}
err = fFwv.init()

164
ees/filefwv_it_test.go Normal file
View File

@@ -0,0 +1,164 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ees
import (
"io/ioutil"
"net/rpc"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
fwvConfigDir string
fwvCfgPath string
fwvCfg *config.CGRConfig
fwvRpc *rpc.Client
sTestsFwv = []func(t *testing.T){
testCreateDirectory,
testFwvLoadConfig,
testFwvResetDataDB,
testFwvResetStorDb,
testFwvStartEngine,
testFwvRPCConn,
testFwvExportEvent,
testFwvVerifyExports,
testFwvStopCgrEngine,
testCleanDirectory,
}
)
func TestFwvExport(t *testing.T) {
fwvConfigDir = "ees"
for _, stest := range sTestsFwv {
t.Run(fwvConfigDir, stest)
}
}
func testFwvLoadConfig(t *testing.T) {
var err error
fwvCfgPath = path.Join(*dataDir, "conf", "samples", fwvConfigDir)
if fwvCfg, err = config.NewCGRConfigFromPath(fwvCfgPath); err != nil {
t.Error(err)
}
}
func testFwvResetDataDB(t *testing.T) {
if err := engine.InitDataDb(fwvCfg); err != nil {
t.Fatal(err)
}
}
func testFwvResetStorDb(t *testing.T) {
if err := engine.InitStorDb(fwvCfg); err != nil {
t.Fatal(err)
}
}
func testFwvStartEngine(t *testing.T) {
if _, err := engine.StopStartEngine(fwvCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
func testFwvRPCConn(t *testing.T) {
var err error
fwvRpc, err = newRPCClient(fwvCfg.ListenCfg())
if err != nil {
t.Fatal(err)
}
}
func testFwvExportEvent(t *testing.T) {
event := &utils.CGREventWithOpts{
CGREvent: &utils.CGREvent{
Tenant: "cgrates.org",
ID: "Event",
Time: utils.TimePointer(time.Now()),
Event: map[string]interface{}{
utils.OrderID: 1,
utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()),
utils.ToR: utils.VOICE,
utils.OriginID: "dsafdsaf",
utils.OriginHost: "192.168.1.1",
utils.RequestType: utils.META_RATED,
utils.Tenant: "cgrates.org",
utils.Category: "call",
utils.Account: "1001",
utils.Subject: "1001",
utils.Destination: "1002",
utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC),
utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC),
utils.Usage: time.Duration(10) * time.Second,
utils.RunID: utils.MetaDefault,
utils.Cost: 2.34567,
"ExporterUsed": "FWVExporter",
"ExtraFields": map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"},
},
},
}
var reply string
if err := fwvRpc.Call(utils.EventExporterSv1ProcessEvent, event, &reply); err != nil {
t.Error(err)
} else if reply != utils.OK {
t.Errorf("Expected %+v, received: %+v", utils.OK, reply)
}
time.Sleep(1 * time.Second)
}
func testFwvVerifyExports(t *testing.T) {
var files []string
err := filepath.Walk("/tmp/testFWV/", func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, utils.FWVSuffix) {
files = append(files, path)
}
return nil
})
if err != nil {
t.Error(err)
}
if len(files) != 1 {
t.Errorf("Expected %+v, received: %+v", 1, len(files))
}
eHdr := "10 VOIFwvEx02062016520001 010101000000\n"
eCnt := "201001 1001 cli 1002 0211 071113084200100000 1op3dsafdsaf 002.345670\n"
eTrl := "90 VOIFwvEx0000010000010s071113084200 \n"
if outContent1, err := ioutil.ReadFile(files[0]); err != nil {
t.Error(err)
} else if len(eHdr+eTrl+eCnt) != len(outContent1) {
t.Errorf("Expecting: <%+v>, received: <%+v>", len(eHdr+eTrl+eCnt), len(outContent1))
}
}
func testFwvStopCgrEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}

View File

@@ -47,7 +47,7 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) {
}
}
var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV"}
var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV"}
func testCreateDirectory(t *testing.T) {
for _, dir := range exportPath {

View File

@@ -644,6 +644,7 @@ const (
ResourceUsage = "ResourceUsage"
MetaDuration = "*duration"
MetaLibPhoneNumber = "*libphonenumber"
MetaTimeString = "*time_string"
MetaIP2Hex = "*ip2hex"
MetaSIPURIMethod = "*sipuri_method"
MetaSIPURIHost = "*sipuri_host"
@@ -706,6 +707,7 @@ const (
FieldSeparator = "FieldSeparator"
ExportPath = "ExportPath"
ExportID = "ExportID"
TimeNow = "TimeNow"
ExportFileName = "ExportFileName"
GroupID = "GroupID"
ThresholdType = "ThresholdType"

View File

@@ -87,6 +87,11 @@ func NewDataConverter(params string) (conv DataConverter, err error) {
return NewPhoneNumberConverter("")
}
return NewPhoneNumberConverter(params[len(MetaLibPhoneNumber)+1:])
case strings.HasPrefix(params, MetaTimeString):
if len(params) == len(MetaTimeString) { // no extra params, defaults implied
return NewTimeStringConverter(time.RFC3339)
}
return NewTimeStringConverter(params[len(MetaTimeString)+1:])
default:
return nil, fmt.Errorf("unsupported converter definition: <%s>", params)
}
@@ -334,3 +339,32 @@ func (*SIPURIMethodConverter) Convert(in interface{}) (out interface{}, err erro
val := IfaceAsString(in)
return sipingo.MethodFrom(val), nil
}
func NewTimeStringConverter(params string) (hdlr DataConverter, err error) {
tS := new(TimeStringConverter)
var paramsSplt []string
if params != EmptyString {
paramsSplt = strings.Split(params, InInFieldSep)
}
switch len(paramsSplt) {
case 1:
tS.Layout = paramsSplt[0]
default:
return nil, fmt.Errorf("invalid %s converter parameters: <%s>",
MetaTimeString, params)
}
return tS, nil
}
type TimeStringConverter struct {
Layout string
}
func (tS *TimeStringConverter) Convert(in interface{}) (
out interface{}, err error) {
tm, err := ParseTimeDetectLayout(in.(string), EmptyString)
if err != nil {
return nil, err
}
return tm.Format(tS.Layout), nil
}