mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-15 05:09:54 +05:00
CDRS with online replication capabilities
This commit is contained in:
@@ -133,31 +133,42 @@
|
||||
// "users_conns": [], // address where to reach the user service, empty to disable user profile functionality: <""|*internal|x.y.z.y:1234>
|
||||
// "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234>
|
||||
// "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234>
|
||||
// "cdr_replication":[
|
||||
// // { // sample replication, not configured by default
|
||||
// // "transport": "*amqp_json_map", // mechanism to use when replicating
|
||||
// // "address": "http://127.0.0.1:12080/cdr_json_map", // address where to replicate
|
||||
// // "attempts": 1, // number of attempts for POST before failing on file
|
||||
// // "cdr_filter": "", // filter the CDRs being replicated
|
||||
// // "content_fields": [ // template of the replicated content fields
|
||||
// // {"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
// // {"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
|
||||
// // {"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
|
||||
// // {"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
// // {"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
|
||||
// // {"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
|
||||
// // {"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
|
||||
// // {"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
|
||||
// // {"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
|
||||
// // {"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
|
||||
// // {"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
|
||||
// // {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
|
||||
// // {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
|
||||
// // {"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
|
||||
// // {"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
|
||||
// // ],
|
||||
// // },
|
||||
// ]
|
||||
// "online_cdr_exports":[], // list of CDRE profiles to use for real-time CDR exports
|
||||
// },
|
||||
|
||||
|
||||
// "cdre": {
|
||||
// "*default": {
|
||||
// "export_format": "*file_csv", // exported CDRs format <*file_csv|*file_fwv|*http_post|*http_json_cdr|*http_json_map|*amqp_json_cdr|*amqp_json_map>
|
||||
// "export_path": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed
|
||||
// "cdr_filter": "", // filter CDRs exported by this template
|
||||
// "synchronous": false, // block processing until export has a result
|
||||
// "attempts": 1, // Number of attempts if not success
|
||||
// "field_separator": ",", // used field separator in some export formats, eg: *file_csv
|
||||
// "usage_multiply_factor": {
|
||||
// "*any": 1 // multiply usage based on ToR field or *any for all
|
||||
// },
|
||||
// "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
|
||||
// "header_fields": [], // template of the exported header fields
|
||||
// "content_fields": [ // template of the exported content fields
|
||||
// {"tag": "CGRID", "type": "*composed", "value": "CGRID"},
|
||||
// {"tag":"RunID", "type": "*composed", "value": "RunID"},
|
||||
// {"tag":"TOR", "type": "*composed", "value": "ToR"},
|
||||
// {"tag":"OriginID", "type": "*composed", "value": "OriginID"},
|
||||
// {"tag":"RequestType", "type": "*composed", "value": "RequestType"},
|
||||
// {"tag":"Direction", "type": "*composed", "value": "Direction"},
|
||||
// {"tag":"Tenant", "type": "*composed", "value": "Tenant"},
|
||||
// {"tag":"Category", "type": "*composed", "value": "Category"},
|
||||
// {"tag":"Account", "type": "*composed", "value": "Account"},
|
||||
// {"tag":"Subject", "type": "*composed", "value": "Subject"},
|
||||
// {"tag":"Destination", "type": "*composed", "value": "Destination"},
|
||||
// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
// {"tag":"Usage", "type": "*composed", "value": "Usage"},
|
||||
// {"tag":"Cost", "type": "*composed", "value": "Cost", "rounding_decimals": 4},
|
||||
// ],
|
||||
// "trailer_fields": [], // template of the exported trailer fields
|
||||
// },
|
||||
// },
|
||||
|
||||
|
||||
@@ -227,39 +238,6 @@
|
||||
// ],
|
||||
|
||||
|
||||
// "cdre": {
|
||||
// "*default": {
|
||||
// "cdr_format": "csv", // exported CDRs format <csv>
|
||||
// "field_separator": ",",
|
||||
// "data_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from KBytes to Bytes)
|
||||
// "sms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from SMS unit to call duration in some billing systems)
|
||||
// "mms_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from MMS unit to call duration in some billing systems)
|
||||
// "generic_usage_multiply_factor": 1, // multiply data usage before export (eg: convert from GENERIC unit to call duration in some billing systems)
|
||||
// "cost_multiply_factor": 1, // multiply cost before export, eg: add VAT
|
||||
// "export_directory": "/var/spool/cgrates/cdre", // path where the exported CDRs will be placed
|
||||
// "header_fields": [], // template of the exported header fields
|
||||
// "content_fields": [ // template of the exported content fields
|
||||
// {"tag": "CGRID", "type": "*composed", "value": "CGRID"},
|
||||
// {"tag":"RunID", "type": "*composed", "value": "RunID"},
|
||||
// {"tag":"TOR", "type": "*composed", "value": "ToR"},
|
||||
// {"tag":"OriginID", "type": "*composed", "value": "OriginID"},
|
||||
// {"tag":"RequestType", "type": "*composed", "value": "RequestType"},
|
||||
// {"tag":"Direction", "type": "*composed", "value": "Direction"},
|
||||
// {"tag":"Tenant", "type": "*composed", "value": "Tenant"},
|
||||
// {"tag":"Category", "type": "*composed", "value": "Category"},
|
||||
// {"tag":"Account", "type": "*composed", "value": "Account"},
|
||||
// {"tag":"Subject", "type": "*composed", "value": "Subject"},
|
||||
// {"tag":"Destination", "type": "*composed", "value": "Destination"},
|
||||
// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
|
||||
// {"tag":"Usage", "type": "*composed", "value": "Usage"},
|
||||
// {"tag":"Cost", "type": "*composed", "value": "Cost", "rounding_decimals": 4},
|
||||
// ],
|
||||
// "trailer_fields": [], // template of the exported trailer fields
|
||||
// },
|
||||
// },
|
||||
|
||||
|
||||
// "sm_generic": {
|
||||
// "enabled": false, // starts SessionManager service: <true|false>
|
||||
// "listen_bijson": "127.0.0.1:2014", // address where to listen for bidirectional JSON-RPC requests
|
||||
|
||||
@@ -16,73 +16,72 @@
|
||||
"cdrs": {
|
||||
"enabled": true, // start the CDR Server service: <true|false>
|
||||
"store_cdrs": false, // store cdrs in storDb
|
||||
"cdr_replication":[ // replicate the rated CDR to a number of servers
|
||||
{
|
||||
"transport": "*http_post",
|
||||
"address": "http://127.0.0.1:12080/cdr_http",
|
||||
"attempts": 1,
|
||||
"cdr_filter": "RunID(*default)",
|
||||
"content_fields": [
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
{"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
|
||||
{"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
|
||||
{"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
{"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"},
|
||||
{"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
|
||||
{"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
|
||||
{"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
|
||||
{"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
|
||||
{"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
|
||||
{"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
|
||||
{"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
|
||||
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
|
||||
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
|
||||
{"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
|
||||
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"transport": "*amqp_json_map",
|
||||
"address": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
|
||||
"attempts": 1,
|
||||
"cdr_filter": "",
|
||||
"content_fields": [
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
{"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
|
||||
{"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
|
||||
{"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
{"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"},
|
||||
{"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
|
||||
{"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
|
||||
{"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
|
||||
{"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
|
||||
{"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
|
||||
{"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
|
||||
{"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
|
||||
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
|
||||
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
|
||||
{"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
|
||||
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"transport": "*http_post",
|
||||
"address": "http://127.0.0.1:12080/invalid",
|
||||
"cdr_filter": "OriginID(httpjsonrpc1)",
|
||||
"attempts": 1,
|
||||
"content_fields": [
|
||||
{"tag": "OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"transport": "*amqp_json_map",
|
||||
"address": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
|
||||
"attempts": 1,
|
||||
"content_fields": [
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
],
|
||||
},
|
||||
],
|
||||
"online_cdr_exports": ["http_localhost", "amqp_localhost", "http_test_file", "amqp_test_file"],
|
||||
},
|
||||
|
||||
|
||||
"cdre": {
|
||||
"http_localhost": {
|
||||
"export_format": "*http_post",
|
||||
"export_path": "http://127.0.0.1:12080/cdr_http",
|
||||
"cdr_filter": "RunID(*default)",
|
||||
"content_fields": [ // template of the exported content fields
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
{"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
|
||||
{"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
|
||||
{"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
{"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"},
|
||||
{"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
|
||||
{"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
|
||||
{"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
|
||||
{"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
|
||||
{"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
|
||||
{"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
|
||||
{"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
|
||||
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
|
||||
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
|
||||
{"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
|
||||
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
|
||||
],
|
||||
},
|
||||
"amqp_localhost": {
|
||||
"export_format": "*amqp_json_map",
|
||||
"export_path": "amqp://guest:guest@localhost:5672/?queue_id=cgrates_cdrs",
|
||||
"cdr_filter": "RunID(*default)",
|
||||
"content_fields": [ // template of the exported content fields
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
{"tag":"RunID", "type": "*composed", "value": "RunID", "field_id": "RunID"},
|
||||
{"tag":"TOR", "type": "*composed", "value": "ToR", "field_id": "ToR"},
|
||||
{"tag":"OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
{"tag":"OriginHost", "type": "*composed", "value": "OriginHost", "field_id": "OriginHost"},
|
||||
{"tag":"RequestType", "type": "*composed", "value": "RequestType", "field_id": "RequestType"},
|
||||
{"tag":"Direction", "type": "*composed", "value": "Direction", "field_id": "Direction"},
|
||||
{"tag":"Tenant", "type": "*composed", "value": "Tenant", "field_id": "Tenant"},
|
||||
{"tag":"Category", "type": "*composed", "value": "Category", "field_id": "Category"},
|
||||
{"tag":"Account", "type": "*composed", "value": "Account", "field_id": "Account"},
|
||||
{"tag":"Subject", "type": "*composed", "value": "Subject", "field_id": "Subject"},
|
||||
{"tag":"Destination", "type": "*composed", "value": "Destination", "field_id": "Destination"},
|
||||
{"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "SetupTime"},
|
||||
{"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00", "field_id": "AnswerTime"},
|
||||
{"tag":"Usage", "type": "*composed", "value": "Usage", "field_id": "Usage"},
|
||||
{"tag":"Cost", "type": "*composed", "value": "Cost", "field_id": "Cost"},
|
||||
],
|
||||
},
|
||||
"http_test_file": {
|
||||
"export_format": "*http_post",
|
||||
"export_path": "http://127.0.0.1:12080/invalid",
|
||||
"cdr_filter": "OriginID(httpjsonrpc1)",
|
||||
"content_fields": [
|
||||
{"tag": "OriginID", "type": "*composed", "value": "OriginID", "field_id": "OriginID"},
|
||||
],
|
||||
},
|
||||
"amqp_test_file": {
|
||||
"export_format": "*amqp_json_map",
|
||||
"export_path": "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs",
|
||||
"content_fields": [
|
||||
{"tag": "CGRID", "type": "*composed", "value": "CGRID", "field_id": "CGRID"},
|
||||
],
|
||||
},
|
||||
},
|
||||
|
||||
}
|
||||
105
engine/cdrs.go
105
engine/cdrs.go
@@ -192,7 +192,7 @@ func (self *CdrServer) processCdr(cdr *CDR) (err error) {
|
||||
go self.stats.Call("CDRStatsV1.AppendCDR", cdr, &out)
|
||||
}
|
||||
if len(self.cgrCfg.CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
|
||||
self.replicateCdr(cdr)
|
||||
self.replicateCDRs([]*CDR{cdr})
|
||||
}
|
||||
|
||||
if self.rals != nil && !cdr.Rated { // CDRs not rated will be processed by Rating
|
||||
@@ -280,9 +280,7 @@ func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, stats, rep
|
||||
}
|
||||
}
|
||||
if replicate {
|
||||
for _, ratedCDR := range ratedCDRs {
|
||||
self.replicateCdr(ratedCDR)
|
||||
}
|
||||
self.replicateCDRs(ratedCDRs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -448,91 +446,22 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
func (self *CdrServer) replicateCdr(cdr *CDR) error {
|
||||
return nil
|
||||
/*
|
||||
for _, rplCfg := range self.cgrCfg.CDRSOnlineCDRExports {
|
||||
passesFilters := true
|
||||
for _, cdfFltr := range rplCfg.CdrFilter {
|
||||
if !cdfFltr.FilterPasses(cdr.FieldAsString(cdfFltr)) {
|
||||
passesFilters = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !passesFilters { // Not passes filters, ignore this replication
|
||||
continue
|
||||
}
|
||||
var body interface{}
|
||||
var content = ""
|
||||
switch rplCfg.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaAMQPjsonCDR:
|
||||
jsn, err := json.Marshal(cdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.MetaHTTPjsonMap, utils.MetaAMQPjsonMap:
|
||||
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
jsn, err := json.Marshal(expMp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body = jsn
|
||||
case utils.META_HTTP_POST:
|
||||
expMp, err := cdr.AsExportMap(rplCfg.ContentFields, self.cgrCfg.HttpSkipTlsVerify, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vals := url.Values{}
|
||||
for fld, val := range expMp {
|
||||
vals.Set(fld, val)
|
||||
}
|
||||
body = vals
|
||||
}
|
||||
var errChan chan error
|
||||
if rplCfg.Synchronous {
|
||||
errChan = make(chan error)
|
||||
}
|
||||
go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) {
|
||||
var err error
|
||||
fallbackPath := utils.META_NONE
|
||||
if rplCfg.FallbackFileName() != utils.META_NONE {
|
||||
fallbackPath = path.Join(self.cgrCfg.FailedPostsDir, rplCfg.FallbackFileName())
|
||||
}
|
||||
switch rplCfg.Transport {
|
||||
case utils.MetaHTTPjsonCDR, utils.MetaHTTPjsonMap, utils.MetaHTTPjson, utils.META_HTTP_POST:
|
||||
_, err = self.httpPoster.Post(rplCfg.Address, utils.PosterTransportContentTypes[rplCfg.Transport], body, rplCfg.Attempts, fallbackPath)
|
||||
case utils.MetaAMQPjsonCDR, utils.MetaAMQPjsonMap:
|
||||
var amqpPoster *utils.AMQPPoster
|
||||
amqpPoster, err = utils.AMQPPostersCache.GetAMQPPoster(rplCfg.Address, rplCfg.Attempts, self.cgrCfg.FailedPostsDir)
|
||||
if err == nil { // error will be checked bellow
|
||||
var chn *amqp.Channel
|
||||
chn, err = amqpPoster.Post(
|
||||
nil, utils.PosterTransportContentTypes[rplCfg.Transport], body.([]byte), rplCfg.FallbackFileName())
|
||||
if chn != nil {
|
||||
chn.Close()
|
||||
}
|
||||
}
|
||||
default:
|
||||
err = fmt.Errorf("unsupported replication transport: %s", rplCfg.Transport)
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf(
|
||||
"<CDRReplicator> Replicating CDR: %+v, transport: %s, got error: %s", cdr, rplCfg.Transport, err.Error()))
|
||||
}
|
||||
if rplCfg.Synchronous {
|
||||
errChan <- err
|
||||
}
|
||||
}(body, rplCfg, content, errChan)
|
||||
if rplCfg.Synchronous { // Synchronize here
|
||||
<-errChan
|
||||
}
|
||||
func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
|
||||
for _, exportID := range self.cgrCfg.CDRSOnlineCDRExports {
|
||||
expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer
|
||||
var cdre *CDRExporter
|
||||
if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat, expTpl.ExportPath, self.cgrCfg.FailedPostsDir, "CDRSReplication",
|
||||
expTpl.Synchronous, expTpl.Attempts, expTpl.FieldSeparator, expTpl.UsageMultiplyFactor,
|
||||
expTpl.CostMultiplyFactor, self.cgrCfg.RoundingDecimals, self.cgrCfg.HttpSkipTlsVerify, self.httpPoster); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Building CDRExporter for online exports got error: <%s>", err.Error()))
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
*/
|
||||
if err = cdre.ExportCDRs(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRS> Replicating CDR: %+v, got error: <%s>", err.Error()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational
|
||||
|
||||
@@ -226,10 +226,15 @@ func TestCdrsAMQPReplication(t *testing.T) {
|
||||
func TestCdrsHTTPPosterFileFailover(t *testing.T) {
|
||||
time.Sleep(time.Duration(2 * time.Second))
|
||||
failoverContent := []byte(`OriginID=httpjsonrpc1`)
|
||||
var rplCfg *config.CDRReplicationCfg
|
||||
filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.FailedPostsDir)
|
||||
if len(filesInDir) == 0 {
|
||||
t.Fatalf("No files in directory: %s", cdrsMasterCfg.FailedPostsDir)
|
||||
}
|
||||
var foundFile bool
|
||||
for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication {
|
||||
if strings.HasSuffix(rplCfg.Address, "invalid") { // Find the config which shold generate the failoback
|
||||
var fileName string
|
||||
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
|
||||
fileName = file.Name()
|
||||
if strings.Index(fileName, utils.FormSuffix) != -1 {
|
||||
foundFile = true
|
||||
break
|
||||
}
|
||||
@@ -237,49 +242,25 @@ func TestCdrsHTTPPosterFileFailover(t *testing.T) {
|
||||
if !foundFile {
|
||||
t.Fatal("Could not find the file in folder")
|
||||
}
|
||||
filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.FailedPostsDir)
|
||||
if len(filesInDir) == 0 {
|
||||
t.Fatalf("No files in directory: %s", cdrsMasterCfg.FailedPostsDir)
|
||||
}
|
||||
var fileName string
|
||||
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
|
||||
fileName = file.Name()
|
||||
if strings.Index(fileName, ".txt") != -1 {
|
||||
break
|
||||
}
|
||||
}
|
||||
filePath := path.Join(cdrsMasterCfg.FailedPostsDir, fileName)
|
||||
if readBytes, err := ioutil.ReadFile(filePath); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(failoverContent, readBytes) { // Checking just the prefix should do since some content is dynamic
|
||||
t.Errorf("Expecting: %q, received: %q", string(failoverContent), string(readBytes))
|
||||
}
|
||||
/*
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
t.Error("Failed removing file: ", filePath)
|
||||
}
|
||||
*/
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
t.Error("Failed removing file: ", filePath)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCdrsAMQPPosterFileFailover(t *testing.T) {
|
||||
time.Sleep(time.Duration(10 * time.Second))
|
||||
failoverContent := []byte(`{"CGRID":"57548d485d61ebcba55afbe5d939c82a8e9ff670"}`)
|
||||
var rplCfg *config.CDRReplicationCfg
|
||||
var foundFile bool
|
||||
for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication {
|
||||
if rplCfg.Address == "amqp://guest:guest@localhost:25672/?queue_id=cgrates_cdrs" { // Find the config which shold generate the failoback
|
||||
foundFile = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundFile {
|
||||
t.Fatal("Could not find the file in folder")
|
||||
}
|
||||
filesInDir, _ := ioutil.ReadDir(cdrsMasterCfg.FailedPostsDir)
|
||||
if len(filesInDir) == 0 {
|
||||
t.Fatalf("No files in directory: %s", cdrsMasterCfg.FailedPostsDir)
|
||||
}
|
||||
foundFile = false
|
||||
var foundFile bool
|
||||
var fileName string
|
||||
for _, file := range filesInDir { // First file in directory is the one we need, harder to find it's name out of config
|
||||
fileName = file.Name()
|
||||
|
||||
Reference in New Issue
Block a user