diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index a9f7ff879..83b1f0fe5 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -138,6 +138,86 @@ {"tag": "TotalSMSUsage", "path": "*trl.TotalSMSUsage", "type": "*variable", "value": "~*dc.TotalSMSUsage"}, {"tag": "TotalCost", "path": "*trl.TotalCost", "type": "*variable", "value": "~*dc.TotalCost{*round:4}"}, ], + }, + { + "id": "FwvExporter", + "type": "*file_fwv", + "export_path": "/tmp/testFWV", + "tenant": "cgrates.org", + "flags": ["*attributes"], + "attribute_context": "customContext", + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.ExporterUsed:FWVExporter"], + "fields":[ + {"tag": "TypeOfRecord", "path": "*hdr.TypeOfRecord", "type": "*constant", + "value": "10", "width": 2}, + {"tag": "Filler1", "path": "*hdr.Filler1", "type": "*filler", "width": 3}, + {"tag": "DistributorCode", "path": "*hdr.DistributorCode", + "type": "*constant", "value": "VOI","width": 3}, + {"tag": "FileSeqNr", "path": "*hdr.FileSeqNr", "type": "*variable", + "value": "~*dc.ExportID","width": 5,"strip": "*right","padding": "*zeroleft"}, + {"tag": "FileCreationTime", "path": "*hdr.FileCreationTime", + "type": "*variable","value":"~*dc.TimeNow{*time_string:020106150400}", + "width": 12 }, + {"tag": "FileVersion", "path": "*hdr.FileVersion", "type": "*constant", + "value": "01","width": 2}, + {"tag": "Filler2", "path": "*hdr.Filler2", "type": "*filler", + "width": 105}, + + {"tag": "TypeOfRecord", "path": "*exp.TypeOfRecord", "type": "*constant", + "value": "20","width": 2}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", + "value": "~*req.Account","width": 12,"strip": "*left","padding": "*right"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", + "value": "~*req.Subject","width": 5,"strip": "*left","padding": "*right"}, + {"tag": "CLI", "path": "*exp.CLI", "type": "*constant", + "value": "cli","width": 15,"strip": "*xright","padding": "*right"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", + "value": "~*req.Destination","width": 24,"strip": "*xright","padding": "*right"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*constant", "value": "02","width": 2}, + {"tag": "SubtypeTOR", "path": "*exp.SubtypeTOR", "type": "*constant", "value": "11", + "width": 4, "padding": "*right"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", + "value": "~*req.SetupTime{*time_string:020106150400}", "width": 12, "padding": "*right","strip": "*right"}, + {"tag": "Duration", "path": "*exp.Duration", "type": "*variable", "value": "~*req.Usage", + "width": 6, "strip": "*right","padding": "*right","layout": "seconds"}, + {"tag": "DataVolume", "path": "*exp.DataVolume", "type": "*filler","width": 6}, + {"tag": "TaxCode", "path": "*exp.TaxCode", "type": "*constant","value":"1","width": 1}, + {"tag": "OperatorCode", "path": "*exp.OperatorCode", "type": "*constant","value":"opercode", + "width": 2, "strip": "*right", "padding": "*right"}, + {"tag": "ProductId", "path": "*exp.ProductId", "type": "*variable","value":"~*req.ProductId", + "width": 5, "strip": "*right", "padding": "*right"}, + {"tag": "NetworkId", "path": "*exp.NetworkId", "type": "*constant","value":"3", "width": 1}, + {"tag": "CallId", "path": "*exp.CallId", "type": "*variable","value":"~*req.OriginID", + "width": 16, "padding": "*right"}, + {"tag": "Filler1", "path": "*exp.Filler1", "type": "*filler", "width": 8}, + {"tag": "Filler2", "path": "*exp.Filler2", "type": "*filler", "width": 8}, + {"tag": "TerminationCode", "path": "*exp.TerminationCode", "type": "*variable", + "value":"~*req.Operator;~*req.Product", "width": 5,"strip": "*right","padding": "*right"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost{*round:5}", + "width": 9, "padding": "*zeroleft"}, + {"tag": "DestinationPrivacy", "path": "*exp.DestinationPrivacy", "type": "*masked_destination", + "width": 1}, + + {"tag": "TypeOfRecord", "path": "*trl.TypeOfRecord", "type": "*constant", + "value": "90", "width": 2}, + {"tag": "Filler1", "path": "*trl.Filler1", "type": "*filler", "width": 3}, + {"tag": "DistributorCode", "path": "*trl.DistributorCode", + "type": "*constant", "value": "VOI","width": 3}, + {"tag": "FileSeqNr", "path": "*trl.FileSeqNr", "type": "*variable", + "value": "~*dc.ExportID","width": 5,"strip": "*right","padding": "*zeroleft"}, + {"tag": "NumberOfRecords", "path": "*trl.NumberOfRecords", + "type": "*variable", "value": "~*dc.NumberOfEvents","width": 6,"padding": "*zeroleft"}, + {"tag": "CdrsDuration", "path": "*trl.CdrsDuration", "type": "*variable", + "value": "~*dc.TotalDuration","width": 8,"padding":"*zeroleft","layout": "seconds"}, + {"tag": "FirstCdrTime", "path": "*trl.FirstCdrTime", "type": "*variable", + "value": "~*dc.FirstEventATime{*time_string:020106150400}", "width": 12}, + {"tag": "LastCdrTime", "path": "*hdr.LastCdrTime", "type": "*variable", + "value": "~*dc.LastEventATime{*time_string:020106150400}", "width": 12,}, + {"tag": "Filler2", "path": "*trl.Filler2", "type": "*filler", + "width": 93} + ], } ] }, diff --git a/ees/eereq.go b/ees/eereq.go index e9d2045e9..3a9eaa4fb 100644 --- a/ees/eereq.go +++ b/ees/eereq.go @@ -202,6 +202,9 @@ func (eeR *EventExporterRequest) ParseField( return utils.EmptyString, fmt.Errorf("unsupported type: <%s>", cfgFld.Type) case utils.META_NONE: return + case utils.MetaTimeNow: + out = time.Now().Format(cfgFld.Layout) + isString = true case utils.META_FILLER: out, err = cfgFld.Value.ParseValue(utils.EmptyString) cfgFld.Padding = utils.MetaRight @@ -349,6 +352,17 @@ func (eeR *EventExporterRequest) ParseField( return nil, err } out = strconv.Itoa(int(t.Unix())) + case utils.MetaMaskedDestination: + //check if we have destination in the event + if dst, err := eeR.req.FieldAsString([]string{utils.Destination}); err != nil { + return nil, fmt.Errorf("error <%s> getting destination for %s", + err, utils.ToJSON(cfgFld)) + } else if len(cfgFld.MaskDestID) != 0 && engine.CachedDestHasPrefix(cfgFld.MaskDestID, dst) { + out = "1" + } else { + out = "0" + } + } if err != nil && diff --git a/ees/ees.go b/ees/ees.go index 16b9ac7e7..6b7494922 100644 --- a/ees/ees.go +++ b/ees/ees.go @@ -221,6 +221,7 @@ func newEEMetrics() utils.MapStorage { utils.LastExpOrderID: 0, utils.FirstEventATime: time.Time{}, utils.LastEventATime: time.Time{}, + utils.TimeNow: time.Now(), utils.TotalDuration: time.Duration(0), utils.TotalSMSUsage: time.Duration(0), utils.TotalMMSUsage: time.Duration(0), diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index 564cb9567..9185ccfb0 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -55,7 +55,7 @@ var ( testCsvExportComposedEvent, testCsvVerifyComposedExports, testCsvStopCgrEngine, - //testCleanDirectory, + testCleanDirectory, } ) diff --git a/ees/filefwv.go b/ees/filefwv.go index 76dc8dae5..87339b364 100644 --- a/ees/filefwv.go +++ b/ees/filefwv.go @@ -32,6 +32,7 @@ import ( ) func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS, dc utils.MapStorage) (fFwv *FileFWVee, err error) { + dc[utils.ExportID] = cgrCfg.EEsCfg().Exporters[cfgIdx].ID fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS, dc: dc} err = fFwv.init() diff --git a/ees/filefwv_it_test.go b/ees/filefwv_it_test.go new file mode 100644 index 000000000..c99c6ab0f --- /dev/null +++ b/ees/filefwv_it_test.go @@ -0,0 +1,164 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "io/ioutil" + "net/rpc" + "os" + "path" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + fwvConfigDir string + fwvCfgPath string + fwvCfg *config.CGRConfig + fwvRpc *rpc.Client + + sTestsFwv = []func(t *testing.T){ + testCreateDirectory, + testFwvLoadConfig, + testFwvResetDataDB, + testFwvResetStorDb, + testFwvStartEngine, + testFwvRPCConn, + testFwvExportEvent, + testFwvVerifyExports, + testFwvStopCgrEngine, + testCleanDirectory, + } +) + +func TestFwvExport(t *testing.T) { + fwvConfigDir = "ees" + for _, stest := range sTestsFwv { + t.Run(fwvConfigDir, stest) + } +} + +func testFwvLoadConfig(t *testing.T) { + var err error + fwvCfgPath = path.Join(*dataDir, "conf", "samples", fwvConfigDir) + if fwvCfg, err = config.NewCGRConfigFromPath(fwvCfgPath); err != nil { + t.Error(err) + } +} + +func testFwvResetDataDB(t *testing.T) { + if err := engine.InitDataDb(fwvCfg); err != nil { + t.Fatal(err) + } +} + +func testFwvResetStorDb(t *testing.T) { + if err := engine.InitStorDb(fwvCfg); err != nil { + t.Fatal(err) + } +} + +func testFwvStartEngine(t *testing.T) { + if _, err := engine.StopStartEngine(fwvCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +func testFwvRPCConn(t *testing.T) { + var err error + fwvRpc, err = newRPCClient(fwvCfg.ListenCfg()) + if err != nil { + t.Fatal(err) + } +} + +func testFwvExportEvent(t *testing.T) { + event := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "Event", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.OrderID: 1, + utils.CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC).String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + utils.AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + utils.Usage: time.Duration(10) * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 2.34567, + "ExporterUsed": "FWVExporter", + "ExtraFields": map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + }, + }, + } + var reply string + if err := fwvRpc.Call(utils.EventExporterSv1ProcessEvent, event, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + time.Sleep(1 * time.Second) +} + +func testFwvVerifyExports(t *testing.T) { + var files []string + err := filepath.Walk("/tmp/testFWV/", func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, utils.FWVSuffix) { + files = append(files, path) + } + return nil + }) + if err != nil { + t.Error(err) + } + if len(files) != 1 { + t.Errorf("Expected %+v, received: %+v", 1, len(files)) + } + eHdr := "10 VOIFwvEx02062016520001 010101000000\n" + eCnt := "201001 1001 cli 1002 0211 071113084200100000 1op3dsafdsaf 002.345670\n" + eTrl := "90 VOIFwvEx0000010000010s071113084200 \n" + if outContent1, err := ioutil.ReadFile(files[0]); err != nil { + t.Error(err) + } else if len(eHdr+eTrl+eCnt) != len(outContent1) { + t.Errorf("Expecting: <%+v>, received: <%+v>", len(eHdr+eTrl+eCnt), len(outContent1)) + } +} + +func testFwvStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} diff --git a/ees/lib_test.go b/ees/lib_test.go index 74ec817c3..07701c099 100644 --- a/ees/lib_test.go +++ b/ees/lib_test.go @@ -47,7 +47,7 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { } } -var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV"} +var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV", "/tmp/testFWV"} func testCreateDirectory(t *testing.T) { for _, dir := range exportPath { diff --git a/utils/consts.go b/utils/consts.go index 4751a9a52..f8e2acd8e 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -644,6 +644,7 @@ const ( ResourceUsage = "ResourceUsage" MetaDuration = "*duration" MetaLibPhoneNumber = "*libphonenumber" + MetaTimeString = "*time_string" MetaIP2Hex = "*ip2hex" MetaSIPURIMethod = "*sipuri_method" MetaSIPURIHost = "*sipuri_host" @@ -706,6 +707,7 @@ const ( FieldSeparator = "FieldSeparator" ExportPath = "ExportPath" ExportID = "ExportID" + TimeNow = "TimeNow" ExportFileName = "ExportFileName" GroupID = "GroupID" ThresholdType = "ThresholdType" diff --git a/utils/dataconverter.go b/utils/dataconverter.go index 943003b94..19cde7bf6 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -87,6 +87,11 @@ func NewDataConverter(params string) (conv DataConverter, err error) { return NewPhoneNumberConverter("") } return NewPhoneNumberConverter(params[len(MetaLibPhoneNumber)+1:]) + case strings.HasPrefix(params, MetaTimeString): + if len(params) == len(MetaTimeString) { // no extra params, defaults implied + return NewTimeStringConverter(time.RFC3339) + } + return NewTimeStringConverter(params[len(MetaTimeString)+1:]) default: return nil, fmt.Errorf("unsupported converter definition: <%s>", params) } @@ -334,3 +339,32 @@ func (*SIPURIMethodConverter) Convert(in interface{}) (out interface{}, err erro val := IfaceAsString(in) return sipingo.MethodFrom(val), nil } + +func NewTimeStringConverter(params string) (hdlr DataConverter, err error) { + tS := new(TimeStringConverter) + var paramsSplt []string + if params != EmptyString { + paramsSplt = strings.Split(params, InInFieldSep) + } + switch len(paramsSplt) { + case 1: + tS.Layout = paramsSplt[0] + default: + return nil, fmt.Errorf("invalid %s converter parameters: <%s>", + MetaTimeString, params) + } + return tS, nil +} + +type TimeStringConverter struct { + Layout string +} + +func (tS *TimeStringConverter) Convert(in interface{}) ( + out interface{}, err error) { + tm, err := ParseTimeDetectLayout(in.(string), EmptyString) + if err != nil { + return nil, err + } + return tm.Format(tS.Layout), nil +}