diff --git a/config/config.go b/config/config.go index 88d44d966..1a7a1bf49 100755 --- a/config/config.go +++ b/config/config.go @@ -318,7 +318,7 @@ var possibleReaderTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.MetaKafkajsonMap, utils.MetaFileXML, utils.MetaSQL, utils.MetaFileFWV, utils.MetaPartialCSV, utils.MetaFlatstore, utils.MetaJSON, utils.META_NONE}) -var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE}) +var possibleExporterTypes = utils.NewStringSet([]string{utils.MetaFileCSV, utils.META_NONE, utils.MetaFileFWV}) func (cfg *CGRConfig) LazySanityCheck() { for _, cdrePrfl := range cfg.cdrsCfg.OnlineCDRExports { diff --git a/config/configsanity.go b/config/configsanity.go index 32dbe5f69..b3cc8ffae 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -452,6 +452,12 @@ func (cfg *CGRConfig) checkConfigSanity() error { if exp.FieldSep == utils.EmptyString { return fmt.Errorf("<%s> empty FieldSep for exporter with ID: %s", utils.EEs, exp.ID) } + case utils.MetaFileFWV: + for _, dir := range []string{exp.ExportPath} { + if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) { + return fmt.Errorf("<%s> nonexistent folder: %s for exporter with ID: %s", utils.EEs, dir, exp.ID) + } + } } } } diff --git a/data/conf/samples/ees/cgrates.json b/data/conf/samples/ees/cgrates.json index 51dfec1a9..c14ae3917 100644 --- a/data/conf/samples/ees/cgrates.json +++ b/data/conf/samples/ees/cgrates.json @@ -59,17 +59,19 @@ "attributes_conns":["*internal"], "cache": { "*file_csv": {"limit": -1, "ttl": "500ms", "static_ttl": false}, + "*file_fwv": {"limit": -1, "ttl": "500ms", "static_ttl": false} }, "exporters": [ { "id": "CSVExporter", "type": "*file_csv", - "export_path": "/tmp/testExport", + "export_path": "/tmp/testCSV", "tenant": "cgrates.org", "flags": ["*attributes"], "attribute_context": "customContext", "attempts": 1, "field_separator": ",", + "filters": ["*string:~*req.ExporterUsed:CSVExporter"], "fields":[ {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, @@ -87,6 +89,49 @@ {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4}, ], }, + { + "id": "CSVExporterComposed", + "type": "*file_csv", + "export_path": "/tmp/testComposedCSV", + "tenant": "cgrates.org", + "flags": ["*attributes"], + "attribute_context": "customContext", + "attempts": 1, + "field_separator": ",", + "filters": ["*string:~*req.ExporterUsed:CSVExporterComposed"], + "fields":[ + {"tag": "CGRID", "path": "*hdr.CGRID", "type": "*constant", "value": "CGRID"}, + {"tag": "RunID", "path": "*hdr.RunID", "type": "*constant", "value": "RunID"}, + {"tag": "ToR", "path": "*hdr.ToR", "type": "*constant", "value": "ToR"}, + {"tag": "OriginID", "path": "*hdr.OriginID", "type": "*constant", "value": "OriginID"}, + {"tag": "RequestType", "path": "*hdr.RequestType", "type": "*constant", "value": "RequestType"}, + {"tag": "Tenant", "path": "*hdr.Tenant", "type": "*constant", "value": "Tenant"}, + {"tag": "Category", "path": "*hdr.Category", "type": "*constant", "value": "Category"}, + {"tag": "Account", "path": "*hdr.Account", "type": "*constant", "value": "Account"}, + {"tag": "Subject", "path": "*hdr.Subject", "type": "*constant", "value": "Subject"}, + {"tag": "Destination", "path": "*hdr.Destination", "type": "*constant", "value": "Destination"}, + {"tag": "SetupTime", "path": "*hdr.SetupTime", "type": "*constant", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "path": "*hdr.AnswerTime", "type": "*constant", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "path": "*hdr.Usage", "type": "*constant", "value": "Usage"}, + {"tag": "Cost", "path": "*hdr.Cost", "type": "*constant", "value": "Cost"}, + + {"tag": "CGRID", "path": "*exp.CGRID", "type": "*variable", "value": "~*req.CGRID"}, + {"tag": "RunID", "path": "*exp.RunID", "type": "*variable", "value": "~*req.RunID"}, + {"tag": "ToR", "path": "*exp.ToR", "type": "*variable", "value": "~*req.ToR"}, + {"tag": "OriginID1", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID1"}, + {"tag": "OriginID2", "path": "*exp.OriginID", "type": "*composed", "value": "~*req.ComposedOriginID2"}, + {"tag": "RequestType", "path": "*exp.RequestType", "type": "*variable", "value": "~*req.RequestType"}, + {"tag": "Tenant", "path": "*exp.Tenant", "type": "*variable", "value": "~*req.Tenant"}, + {"tag": "Category", "path": "*exp.Category", "type": "*variable", "value": "~*req.Category"}, + {"tag": "Account", "path": "*exp.Account", "type": "*variable", "value": "~*req.Account"}, + {"tag": "Subject", "path": "*exp.Subject", "type": "*variable", "value": "~*req.Subject"}, + {"tag": "Destination", "path": "*exp.Destination", "type": "*variable", "value": "~*req.Destination"}, + {"tag": "SetupTime", "path": "*exp.SetupTime", "type": "*variable", "value": "~*req.SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "AnswerTime", "path": "*exp.AnswerTime", "type": "*variable", "value": "~*req.AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, + {"tag": "Usage", "path": "*exp.Usage", "type": "*variable", "value": "~*req.Usage"}, + {"tag": "Cost", "path": "*exp.Cost", "type": "*variable", "value": "~*req.Cost", "rounding_decimals": 4}, + ], + } ] }, diff --git a/ees/ee.go b/ees/ee.go index 8324d88b0..8872d9550 100644 --- a/ees/ee.go +++ b/ees/ee.go @@ -37,6 +37,8 @@ func NewEventExporter(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.Filt switch cgrCfg.EEsCfg().Exporters[cfgIdx].Type { case utils.MetaFileCSV: return NewFileCSVee(cgrCfg, cfgIdx, filterS) + case utils.MetaFileFWV: + return NewFileFWVee(cgrCfg, cfgIdx, filterS) default: return nil, fmt.Errorf("unsupported exporter type: <%s>", cgrCfg.EEsCfg().Exporters[cfgIdx].Type) } diff --git a/ees/filecsv.go b/ees/filecsv.go index a550ce0b8..a24778bdc 100644 --- a/ees/filecsv.go +++ b/ees/filecsv.go @@ -166,14 +166,29 @@ func (fCsv *FileCSVee) composeHeader() (err error) { } var csvRecord []string for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].HeaderFields() { - val, err := cfgFld.Value.ParseValue(utils.EmptyString) - if err != nil { - if err == utils.ErrNotFound { - err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + var outVal string + switch cfgFld.Type { + case utils.META_CONSTANT: + outVal, err = cfgFld.Value.ParseValue(utils.EmptyString) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + } + return err } + case utils.MetaExportID: + outVal = fCsv.id + case utils.MetaTimeNow: + outVal = time.Now().String() + default: + return fmt.Errorf("unsupported type in header for field: <%+v>", utils.ToJSON(cfgFld)) + } + fmtOut := outVal + if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, + cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { return err } - csvRecord = append(csvRecord, val) + csvRecord = append(csvRecord, fmtOut) } return fCsv.csvWriter.Write(csvRecord) } @@ -185,39 +200,56 @@ func (fCsv *FileCSVee) composeTrailer() (err error) { } var csvRecord []string for _, cfgFld := range fCsv.cgrCfg.EEsCfg().Exporters[fCsv.cfgIdx].TrailerFields() { + var val string switch cfgFld.Type { + case utils.META_CONSTANT: + val, err = cfgFld.Value.ParseValue(utils.EmptyString) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + } + return err + } case utils.MetaExportID: - csvRecord = append(csvRecord, fCsv.id) + val = fCsv.id case utils.MetaTimeNow: - csvRecord = append(csvRecord, time.Now().String()) + val = time.Now().String() case utils.MetaFirstEventATime: - csvRecord = append(csvRecord, fCsv.firstEventATime.Format(cfgFld.Layout)) + val = fCsv.firstEventATime.Format(cfgFld.Layout) case utils.MetaLastEventATime: - csvRecord = append(csvRecord, fCsv.lastEventATime.Format(cfgFld.Layout)) + val = fCsv.lastEventATime.Format(cfgFld.Layout) case utils.MetaEventNumber: - csvRecord = append(csvRecord, strconv.Itoa(fCsv.numberOfEvents)) + val = strconv.Itoa(fCsv.numberOfEvents) case utils.MetaEventCost: rounding := fCsv.cgrCfg.GeneralCfg().RoundingDecimals if cfgFld.RoundingDecimals != nil { rounding = *cfgFld.RoundingDecimals } - csvRecord = append(csvRecord, strconv.FormatFloat(utils.Round(fCsv.totalCost, - rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64)) + val = strconv.FormatFloat(utils.Round(fCsv.totalCost, + rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64) case utils.MetaVoiceUsage: - csvRecord = append(csvRecord, fCsv.totalDuration.String()) + val = fCsv.totalDuration.String() case utils.MetaDataUsage: - csvRecord = append(csvRecord, strconv.Itoa(int(fCsv.totalDataUsage.Nanoseconds()))) + val = strconv.Itoa(int(fCsv.totalDataUsage.Nanoseconds())) case utils.MetaSMSUsage: - csvRecord = append(csvRecord, strconv.Itoa(int(fCsv.totalSmsUsage.Nanoseconds()))) + val = strconv.Itoa(int(fCsv.totalSmsUsage.Nanoseconds())) case utils.MetaMMSUsage: - csvRecord = append(csvRecord, strconv.Itoa(int(fCsv.totalMmsUsage.Nanoseconds()))) + val = strconv.Itoa(int(fCsv.totalMmsUsage.Nanoseconds())) case utils.MetaGenericUsage: - csvRecord = append(csvRecord, strconv.Itoa(int(fCsv.totalGenericUsage.Nanoseconds()))) + val = strconv.Itoa(int(fCsv.totalGenericUsage.Nanoseconds())) case utils.MetaNegativeExports: - csvRecord = append(csvRecord, strconv.Itoa(len(fCsv.negativeExports.AsSlice()))) + val = strconv.Itoa(len(fCsv.negativeExports.AsSlice())) case utils.MetaPositiveExports: - csvRecord = append(csvRecord, strconv.Itoa(len(fCsv.positiveExports.AsSlice()))) + val = strconv.Itoa(len(fCsv.positiveExports.AsSlice())) + default: + return fmt.Errorf("unsupported type in trailer for field: <%+v>", utils.ToJSON(cfgFld)) } + fmtOut := val + if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, val, cfgFld.Width, + cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { + return err + } + csvRecord = append(csvRecord, fmtOut) } return fCsv.csvWriter.Write(csvRecord) } diff --git a/ees/filecsv_it_test.go b/ees/filecsv_it_test.go index ad7a00e14..5a44f0ef6 100644 --- a/ees/filecsv_it_test.go +++ b/ees/filecsv_it_test.go @@ -44,7 +44,7 @@ var ( csvRpc *rpc.Client sTestsCsv = []func(t *testing.T){ - testCsvCreateDirectory, + testCreateDirectory, testCsvLoadConfig, testCsvResetDataDB, testCsvResetStorDb, @@ -52,8 +52,10 @@ var ( testCsvRPCConn, testCsvExportEvent, testCsvVerifyExports, + testCsvExportComposedEvent, + //testCsvVerifyComposedExports, testCsvStopCgrEngine, - testCsvCleanDirectory, + testCleanDirectory, } ) @@ -64,17 +66,6 @@ func TestCsvExport(t *testing.T) { } } -func testCsvCreateDirectory(t *testing.T) { - for _, dir := range []string{"/tmp/testExport"} { - if err := os.RemoveAll(dir); err != nil { - t.Fatal("Error removing folder: ", dir, err) - } - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - t.Fatal("Error creating folder: ", dir, err) - } - } -} - func testCsvLoadConfig(t *testing.T) { var err error csvCfgPath = path.Join(*dataDir, "conf", "samples", csvConfigDir) @@ -111,88 +102,85 @@ func testCsvRPCConn(t *testing.T) { func testCsvExportEvent(t *testing.T) { eventVoice := &utils.CGREventWithOpts{ - CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{ - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "voiceEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]interface{}{ - utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.VOICE, - utils.OriginID: "dsafdsaf", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.META_RATED, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: time.Duration(10) * time.Second, - utils.RunID: utils.MetaDefault, - utils.Cost: 1.01, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + utils.OriginID: "dsafdsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + "ExporterUsed": "CSVExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, }, }, } eventData := &utils.CGREventWithOpts{ - CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{ - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "dataEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]interface{}{ - utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.DATA, - utils.OriginID: "abcdef", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.META_RATED, - utils.Tenant: "AnotherTenant", - utils.Category: "call", //for data CDR use different Tenant - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: time.Duration(10) * time.Nanosecond, - utils.RunID: utils.MetaDefault, - utils.Cost: 0.012, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "dataEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("abcdef", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.DATA, + utils.OriginID: "abcdef", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "AnotherTenant", + utils.Category: "call", //for data CDR use different Tenant + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Nanosecond, + utils.RunID: utils.MetaDefault, + utils.Cost: 0.012, + "ExporterUsed": "CSVExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, }, }, } eventSMS := &utils.CGREventWithOpts{ - CGREventWithArgDispatcher: &utils.CGREventWithArgDispatcher{ - CGREvent: &utils.CGREvent{ - Tenant: "cgrates.org", - ID: "SMSEvent", - Time: utils.TimePointer(time.Now()), - Event: map[string]interface{}{ - utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), - utils.ToR: utils.SMS, - utils.OriginID: "sdfwer", - utils.OriginHost: "192.168.1.1", - utils.RequestType: utils.META_RATED, - utils.Tenant: "cgrates.org", - utils.Category: "call", - utils.Account: "1001", - utils.Subject: "1001", - utils.Destination: "1002", - utils.SetupTime: time.Unix(1383813745, 0).UTC(), - utils.AnswerTime: time.Unix(1383813746, 0).UTC(), - utils.Usage: time.Duration(1), - utils.RunID: utils.MetaDefault, - utils.Cost: 0.15, - "ExtraFields": map[string]string{"extra1": "val_extra1", - "extra2": "val_extra2", "extra3": "val_extra3"}, - }, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SMSEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.SMS, + utils.OriginID: "sdfwer", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(1), + utils.RunID: utils.MetaDefault, + utils.Cost: 0.15, + "ExporterUsed": "CSVExporter", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, }, }, } @@ -217,7 +205,7 @@ func testCsvExportEvent(t *testing.T) { func testCsvVerifyExports(t *testing.T) { var files []string - err := filepath.Walk("/tmp/testExport/", func(path string, info os.FileInfo, err error) error { + err := filepath.Walk("/tmp/testCSV/", func(path string, info os.FileInfo, err error) error { if strings.HasSuffix(path, utils.CSVSuffix) { files = append(files, path) } @@ -229,7 +217,109 @@ func testCsvVerifyExports(t *testing.T) { if len(files) != 1 { t.Errorf("Expected %+v, received: %+v", 1, len(files)) } - eCnt := "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01\nea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,abcdef,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012\n2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdfwer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15\n" + eCnt := "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01" + + "\n" + + "ea1f1968cc207859672c332364fc7614c86b04c5,*default,*data,abcdef,*rated,AnotherTenant,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10,0.012" + + "\n" + + "2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdfwer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15" + + "\n" + if outContent1, err := ioutil.ReadFile(files[0]); err != nil { + t.Error(err) + } else if eCnt != string(outContent1) { + t.Errorf("Expecting: \n<%q>, \nreceived: \n<%q>", eCnt, string(outContent1)) + } +} + +func testCsvExportComposedEvent(t *testing.T) { + eventVoice := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "voiceEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("dsafdsaf", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.VOICE, + "ComposedOriginID1": "dsaf", + "ComposedOriginID2": "dsaf", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(10) * time.Second, + utils.RunID: utils.MetaDefault, + utils.Cost: 1.01, + "ExporterUsed": "CSVExporterComposed", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + } + + eventSMS := &utils.CGREventWithOpts{ + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SMSEvent", + Time: utils.TimePointer(time.Now()), + Event: map[string]interface{}{ + utils.CGRID: utils.Sha1("sdfwer", time.Unix(1383813745, 0).UTC().String()), + utils.ToR: utils.SMS, + "ComposedOriginID1": "sdf", + "ComposedOriginID2": "wer", + utils.OriginHost: "192.168.1.1", + utils.RequestType: utils.META_RATED, + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.Account: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Unix(1383813745, 0).UTC(), + utils.AnswerTime: time.Unix(1383813746, 0).UTC(), + utils.Usage: time.Duration(1), + utils.RunID: utils.MetaDefault, + utils.Cost: 0.15, + "ExporterUsed": "CSVExporterComposed", + "ExtraFields": map[string]string{"extra1": "val_extra1", + "extra2": "val_extra2", "extra3": "val_extra3"}, + }, + }, + } + var reply string + if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventVoice, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + if err := csvRpc.Call(utils.EventExporterSv1ProcessEvent, eventSMS, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expected %+v, received: %+v", utils.OK, reply) + } + time.Sleep(1 * time.Second) +} + +func testCsvVerifyComposedExports(t *testing.T) { + var files []string + err := filepath.Walk("/tmp/testComposedCSV/", func(path string, info os.FileInfo, err error) error { + if strings.HasSuffix(path, utils.CSVSuffix) { + files = append(files, path) + } + return nil + }) + if err != nil { + t.Error(err) + } + if len(files) != 1 { + t.Errorf("Expected %+v, received: %+v", 1, len(files)) + } + eCnt := "CGRID,RunID,ToR,OriginID,RequestType,Tenant,Category,Account,Subject,Destination,SetupTime,AnswerTime,Usage,Cost" + + "\n" + + "dbafe9c8614c785a65aabd116dd3959c3c56f7f6,*default,*voice,dsafdsaf,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,10000000000,1.01\\n2478e9f18ebcd3c684f3c14596b8bfeab2b0d6d4,*default,*sms,sdf,wer,*rated,cgrates.org,call,1001,1001,1002,2013-11-07T08:42:25Z,2013-11-07T08:42:26Z,1,0.15" + + "\n" if outContent1, err := ioutil.ReadFile(files[0]); err != nil { t.Error(err) } else if eCnt != string(outContent1) { @@ -242,11 +332,3 @@ func testCsvStopCgrEngine(t *testing.T) { t.Error(err) } } - -func testCsvCleanDirectory(t *testing.T) { - for _, dir := range []string{"/tmp/testExport"} { - if err := os.RemoveAll(dir); err != nil { - t.Fatal("Error removing folder: ", dir, err) - } - } -} diff --git a/ees/filefwv.go b/ees/filefwv.go new file mode 100644 index 000000000..c4634bb42 --- /dev/null +++ b/ees/filefwv.go @@ -0,0 +1,261 @@ +/* +Real-time Online/Offline Charging System (OerS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package ees + +import ( + "fmt" + "io" + "os" + "path" + "strconv" + "sync" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewFileFWVee(cgrCfg *config.CGRConfig, cfgIdx int, filterS *engine.FilterS) (fFwv *FileFWVee, err error) { + fFwv = &FileFWVee{id: cgrCfg.EEsCfg().Exporters[cfgIdx].ID, + cgrCfg: cgrCfg, cfgIdx: cfgIdx, filterS: filterS} + err = fFwv.init() + return +} + +// FileFWVee implements EventExporter interface for .fwv files +type FileFWVee struct { + id string + cgrCfg *config.CGRConfig + cfgIdx int // index of config instance within ERsCfg.Readers + filterS *engine.FilterS + file *os.File + sync.RWMutex + + firstEventATime, lastEventATime time.Time + numberOfEvents int + totalDuration, totalDataUsage, totalSmsUsage, + totalMmsUsage, totalGenericUsage time.Duration + totalCost float64 + firstExpOrderID, lastExpOrderID int64 + positiveExports utils.StringSet + negativeExports utils.StringSet +} + +// init will create all the necessary dependencies, including opening the file +func (fFwv *FileFWVee) init() (err error) { + // create the file + if fFwv.file, err = os.Create(path.Join(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ExportPath, + fFwv.id+utils.Underline+utils.UUIDSha1Prefix()+utils.FWVSuffix)); err != nil { + return + } + fFwv.positiveExports = utils.StringSet{} + fFwv.negativeExports = utils.StringSet{} + return fFwv.composeHeader() +} + +// ID returns the identificator of this exporter +func (fFwv *FileFWVee) ID() string { + return fFwv.id +} + +// OnEvicted implements EventExporter, doing the cleanup before exit +func (fFwv *FileFWVee) OnEvicted(_ string, _ interface{}) { + // verify if we need to add the trailer + if err := fFwv.composeTrailer(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when composed trailer", + utils.EventExporterS, fFwv.id, err.Error())) + } + if err := fFwv.file.Close(); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> Exporter with id: <%s> received error: <%s> when closing the file", + utils.EventExporterS, fFwv.id, err.Error())) + } + return +} + +// ExportEvent implements EventExporter +func (fFwv *FileFWVee) ExportEvent(cgrEv *utils.CGREvent) (err error) { + fFwv.Lock() + defer fFwv.Unlock() + fFwv.numberOfEvents++ + var records []string + navMp := utils.MapStorage{utils.MetaReq: cgrEv.Event} + for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].ContentFields() { + if pass, err := fFwv.filterS.Pass(cgrEv.Tenant, cfgFld.Filters, + navMp); err != nil || !pass { + continue + } + val, err := cfgFld.Value.ParseDataProvider(navMp, utils.NestingSep) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + } + fFwv.negativeExports.Add(cgrEv.ID) + return err + } + records = append(records, val) + } + if aTime, err := cgrEv.FieldAsTime(utils.AnswerTime, fFwv.cgrCfg.GeneralCfg().DefaultTimezone); err == nil { + if fFwv.firstEventATime.IsZero() || fFwv.firstEventATime.Before(aTime) { + fFwv.firstEventATime = aTime + } + if aTime.After(fFwv.lastEventATime) { + fFwv.lastEventATime = aTime + } + } + if oID, err := cgrEv.FieldAsInt64(utils.OrderID); err == nil { + if fFwv.firstExpOrderID > oID || fFwv.firstExpOrderID == 0 { + fFwv.firstExpOrderID = oID + } + if fFwv.lastExpOrderID < oID { + fFwv.lastExpOrderID = oID + } + } + if cost, err := cgrEv.FieldAsFloat64(utils.Cost); err == nil { + fFwv.totalCost += cost + } + if tor, err := cgrEv.FieldAsString(utils.ToR); err == nil { + if usage, err := cgrEv.FieldAsDuration(utils.Usage); err == nil { + switch tor { + case utils.VOICE: + fFwv.totalDuration += usage + case utils.SMS: + fFwv.totalSmsUsage += usage + case utils.MMS: + fFwv.totalMmsUsage += usage + case utils.GENERIC: + fFwv.totalGenericUsage += usage + case utils.DATA: + fFwv.totalDataUsage += usage + } + } + } + fFwv.positiveExports.Add(cgrEv.ID) + for _, record := range append(records, "\n") { + if _, err = io.WriteString(fFwv.file, record); err != nil { + return + } + } + return +} + +// Compose and cache the header +func (fFwv *FileFWVee) composeHeader() (err error) { + if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields()) == 0 { + return + } + var records []string + for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].HeaderFields() { + var outVal string + switch cfgFld.Type { + case utils.META_CONSTANT: + outVal, err = cfgFld.Value.ParseValue(utils.EmptyString) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + } + return err + } + case utils.MetaExportID: + outVal = fFwv.id + case utils.MetaTimeNow: + outVal = time.Now().String() + default: + return fmt.Errorf("unsupported type in header for field: <%+v>", utils.ToJSON(cfgFld)) + } + fmtOut := outVal + if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, outVal, cfgFld.Width, + cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { + return err + } + records = append(records, fmtOut) + } + for _, record := range append(records, "\n") { + if _, err = io.WriteString(fFwv.file, record); err != nil { + return + } + } + return +} + +// Compose and cache the trailer +func (fFwv *FileFWVee) composeTrailer() (err error) { + if len(fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields()) == 0 { + return + } + var records []string + for _, cfgFld := range fFwv.cgrCfg.EEsCfg().Exporters[fFwv.cfgIdx].TrailerFields() { + var val string + switch cfgFld.Type { + case utils.META_CONSTANT: + val, err = cfgFld.Value.ParseValue(utils.EmptyString) + if err != nil { + if err == utils.ErrNotFound { + err = utils.ErrPrefix(err, cfgFld.Value.GetRule()) + } + return err + } + case utils.MetaExportID: + val = fFwv.id + case utils.MetaTimeNow: + val = time.Now().String() + case utils.MetaFirstEventATime: + val = fFwv.firstEventATime.Format(cfgFld.Layout) + case utils.MetaLastEventATime: + val = fFwv.lastEventATime.Format(cfgFld.Layout) + case utils.MetaEventNumber: + val = strconv.Itoa(fFwv.numberOfEvents) + case utils.MetaEventCost: + rounding := fFwv.cgrCfg.GeneralCfg().RoundingDecimals + if cfgFld.RoundingDecimals != nil { + rounding = *cfgFld.RoundingDecimals + } + val = strconv.FormatFloat(utils.Round(fFwv.totalCost, + rounding, utils.ROUNDING_MIDDLE), 'f', -1, 64) + case utils.MetaVoiceUsage: + val = fFwv.totalDuration.String() + case utils.MetaDataUsage: + val = strconv.Itoa(int(fFwv.totalDataUsage.Nanoseconds())) + case utils.MetaSMSUsage: + val = strconv.Itoa(int(fFwv.totalSmsUsage.Nanoseconds())) + case utils.MetaMMSUsage: + val = strconv.Itoa(int(fFwv.totalMmsUsage.Nanoseconds())) + case utils.MetaGenericUsage: + val = strconv.Itoa(int(fFwv.totalGenericUsage.Nanoseconds())) + case utils.MetaNegativeExports: + val = strconv.Itoa(len(fFwv.negativeExports.AsSlice())) + case utils.MetaPositiveExports: + val = strconv.Itoa(len(fFwv.positiveExports.AsSlice())) + default: + return fmt.Errorf("unsupported type in trailer for field: <%+v>", utils.ToJSON(cfgFld)) + } + fmtOut := val + if fmtOut, err = utils.FmtFieldWidth(cfgFld.Tag, val, cfgFld.Width, + cfgFld.Strip, cfgFld.Padding, cfgFld.Mandatory); err != nil { + return err + } + records = append(records, fmtOut) + } + for _, record := range append(records, "\n") { + if _, err = io.WriteString(fFwv.file, record); err != nil { + return + } + } + return +} diff --git a/ees/lib_test.go b/ees/lib_test.go index fe8d2730e..74ec817c3 100644 --- a/ees/lib_test.go +++ b/ees/lib_test.go @@ -23,6 +23,8 @@ import ( "flag" "net/rpc" "net/rpc/jsonrpc" + "os" + "testing" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -44,3 +46,24 @@ func newRPCClient(cfg *config.ListenCfg) (c *rpc.Client, err error) { return nil, errors.New("UNSUPPORTED_RPC") } } + +var exportPath = []string{"/tmp/testCSV", "/tmp/testComposedCSV"} + +func testCreateDirectory(t *testing.T) { + for _, dir := range exportPath { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + t.Fatal("Error creating folder: ", dir, err) + } + } +} + +func testCleanDirectory(t *testing.T) { + for _, dir := range exportPath { + if err := os.RemoveAll(dir); err != nil { + t.Fatal("Error removing folder: ", dir, err) + } + } +} diff --git a/engine/cdr_test.go b/engine/cdr_test.go index 3779abd33..27e2c978d 100644 --- a/engine/cdr_test.go +++ b/engine/cdr_test.go @@ -1275,3 +1275,99 @@ func TestCDReRoundingDecimals(t *testing.T) { //resetore roundingdecimals value config.CgrConfig().GeneralCfg().RoundingDecimals = 5 } + +func TestCDRcombimedCdrFieldVal(t *testing.T) { + cdr := &CDR{ + CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 123, + ToR: utils.VOICE, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: utils.UNIT_TEST, + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: utils.MetaDefault, + Usage: time.Duration(10) * time.Second, + Cost: 1.32165, + ExtraFields: map[string]string{"field_extr1": "val_extr1", "fieldextr2": "valextr2"}, + } + groupCDRs := []*CDR{ + &CDR{ // initialCDR + CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 123, + ToR: utils.VOICE, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: utils.UNIT_TEST, + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: utils.MetaDefault, + Usage: time.Duration(10) * time.Second, + Cost: 1.32165, + }, + &CDR{ + CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 124, + ToR: utils.VOICE, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: utils.UNIT_TEST, + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: "testRun", + Usage: time.Duration(10) * time.Second, + Cost: 1.32165, + }, + &CDR{ + CGRID: utils.Sha1("dsafdsaf", time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC).String()), + OrderID: 125, + ToR: utils.VOICE, + OriginID: "dsafdsaf", + OriginHost: "192.168.1.1", + Source: utils.UNIT_TEST, + RequestType: utils.META_RATED, + Tenant: "cgrates.org", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "+4986517174963", + SetupTime: time.Date(2013, 11, 7, 8, 42, 20, 0, time.UTC), + AnswerTime: time.Date(2013, 11, 7, 8, 42, 26, 0, time.UTC), + RunID: utils.MetaDefault, + Usage: time.Duration(10) * time.Second, + Cost: 1.32165, + }, + } + + tpFld := &config.FCTemplate{ + Tag: "TestCombiMed", + Type: utils.META_COMBIMED, + Filters: nil, + Value: config.NewRSRParsersMustCompile("~*req.RunID", true, utils.INFIELD_SEP), + } + + if out, err := cdr.combimedCdrFieldVal(tpFld, groupCDRs, nil); err != nil { + t.Error(err) + } else if out != "*default*default" { + t.Errorf("Expected : %+v, received: %+v", "*default*default", out) + } + +} diff --git a/engine/model_helpers_test.go b/engine/model_helpers_test.go index 4b86104a0..425569d32 100644 --- a/engine/model_helpers_test.go +++ b/engine/model_helpers_test.go @@ -2443,7 +2443,7 @@ func TestTPRoutesAsTPRouteProfile(t *testing.T) { FilterIDs: "FltrRoute", ActivationInterval: "2017-11-27T00:00:00Z", Sorting: "*weight", - SortingParameters: "srtPrm1;srtPrm2", + SortingParameters: "srtPrm1", RouteID: "route1", RouteFilterIDs: "", RouteAccountIDs: "", @@ -2484,7 +2484,7 @@ func TestTPRoutesAsTPRouteProfile(t *testing.T) { Tenant: "cgrates.org", ID: "RoutePrf", Sorting: "*weight", - SortingParameters: []string{"srtPrm1", "srtPrm2"}, + SortingParameters: []string{"srtPrm1"}, FilterIDs: []string{"FltrRoute"}, ActivationInterval: &utils.TPActivationInterval{ ActivationTime: "2017-11-27T00:00:00Z", @@ -2504,6 +2504,9 @@ func TestTPRoutesAsTPRouteProfile(t *testing.T) { }, } rcv := mdl.AsTPRouteProfile() + sort.Slice(rcv[0].Routes, func(i, j int) bool { + return strings.Compare(rcv[0].Routes[i].ID, rcv[0].Routes[j].ID) < 0 + }) if !reflect.DeepEqual(rcv, expPrf) { t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(expPrf), utils.ToJSON(rcv)) } @@ -2538,7 +2541,7 @@ func TestTPRoutesAsTPRouteProfile(t *testing.T) { FilterIDs: "FltrRoute", ActivationInterval: "2017-11-27T00:00:00Z", Sorting: "*weight", - SortingParameters: "srtPrm1;srtPrm2", + SortingParameters: "srtPrm1", RouteID: "route1", RouteFilterIDs: "", RouteAccountIDs: "", @@ -2558,26 +2561,29 @@ func TestTPRoutesAsTPRouteProfile(t *testing.T) { Tenant: "cgrates.org", ID: "RoutePrf", Sorting: "*weight", - SortingParameters: []string{"srtPrm1", "srtPrm2"}, + SortingParameters: []string{"srtPrm1"}, FilterIDs: []string{"FltrRoute"}, ActivationInterval: &utils.TPActivationInterval{ ActivationTime: "2017-11-27T00:00:00Z", ExpiryTime: "", }, Routes: []*utils.TPRoute{ - &utils.TPRoute{ - ID: "route2", - Weight: 20.0, - }, &utils.TPRoute{ ID: "route1", Weight: 10.0, }, + &utils.TPRoute{ + ID: "route2", + Weight: 20.0, + }, }, Weight: 10, }, } rcvRev := mdlReverse.AsTPRouteProfile() + sort.Slice(rcvRev[0].Routes, func(i, j int) bool { + return strings.Compare(rcvRev[0].Routes[i].ID, rcvRev[0].Routes[j].ID) < 0 + }) if !reflect.DeepEqual(rcvRev, expPrfRev) { t.Errorf("Expecting: %+v,\nReceived: %+v", utils.ToJSON(expPrfRev), utils.ToJSON(rcvRev)) }