From 3c5c41d542f9e6caa319cb03815f0f9e3598d5ac Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 14 Oct 2016 12:51:48 +0200 Subject: [PATCH] Adding content_fields to CDRS replication config --- .../{libconfig.go => cdrsreplicationcfg.go} | 15 ++++++----- config/config.go | 6 ++--- config/config_defaults.go | 26 ++++++++++++++++++- config/libconfig_json.go | 11 ++++---- engine/cdrs.go | 2 +- general_tests/cdrs_replication_it_test.go | 2 +- 6 files changed, 44 insertions(+), 18 deletions(-) rename config/{libconfig.go => cdrsreplicationcfg.go} (73%) diff --git a/config/libconfig.go b/config/cdrsreplicationcfg.go similarity index 73% rename from config/libconfig.go rename to config/cdrsreplicationcfg.go index 02e4ab870..e77704bd0 100644 --- a/config/libconfig.go +++ b/config/cdrsreplicationcfg.go @@ -23,14 +23,15 @@ import ( "net/url" ) -type CdrReplicationCfg struct { - Transport string - Address string - Synchronous bool - Attempts int // Number of attempts if not success - CdrFilter utils.RSRFields // Only replicate if the filters here are matching +type CDRReplicationCfg struct { + Transport string + Address string + Synchronous bool + Attempts int // Number of attempts if not success + CdrFilter utils.RSRFields // Only replicate if the filters here are matching + ContentFields []*CfgCdrField } -func (rplCfg CdrReplicationCfg) FallbackFileName() string { +func (rplCfg CDRReplicationCfg) FallbackFileName() string { return fmt.Sprintf("cdr_%s_%s_%s.form", rplCfg.Transport, url.QueryEscape(rplCfg.Address), utils.GenUUID()) } diff --git a/config/config.go b/config/config.go index d0f40be7f..ac6527bdc 100644 --- a/config/config.go +++ b/config/config.go @@ -241,7 +241,7 @@ type CGRConfig struct { CDRSUserSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234> CDRSAliaseSConns []*HaPoolConfig // address where to reach the aliases service: <""|internal|x.y.z.y:1234> CDRSStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234> - CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers + CDRSCdrReplication []*CDRReplicationCfg // Replicate raw CDRs to a number of servers CDRStatsEnabled bool // Enable CDR Stats service CDRStatsSaveInterval time.Duration // Save interval duration CdreProfiles map[string]*CdreConfig @@ -887,9 +887,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { } } if jsnCdrsCfg.Cdr_replication != nil { - self.CDRSCdrReplication = make([]*CdrReplicationCfg, len(*jsnCdrsCfg.Cdr_replication)) + self.CDRSCdrReplication = make([]*CDRReplicationCfg, len(*jsnCdrsCfg.Cdr_replication)) for idx, rplJsonCfg := range *jsnCdrsCfg.Cdr_replication { - self.CDRSCdrReplication[idx] = new(CdrReplicationCfg) + self.CDRSCdrReplication[idx] = new(CDRReplicationCfg) if rplJsonCfg.Transport != nil { self.CDRSCdrReplication[idx].Transport = *rplJsonCfg.Transport } diff --git a/config/config_defaults.go b/config/config_defaults.go index 0d68158df..b80659391 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -141,7 +141,31 @@ const CGRATES_CFG_JSON = ` "users_conns": [], // address where to reach the user service, empty to disable user profile functionality: <""|*internal|x.y.z.y:1234> "aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234> "cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234> - "cdr_replication":[] // replicate the raw CDR to a number of servers + "cdr_replication":[ +// { +// "transport": "*http_post", // mechanism to use when replicating +// "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate +// "attempts": 1, // number of attempts for POST before failing on file +// "cdr_filter": "", // filter the CDRs being replicated +// "content_fields": [ // template of the replicated content fields +// {"tag": "CGRID", "type": "*composed", "value": "CGRID"}, +// {"tag":"RunID", "type": "*composed", "value": "RunID"}, +// {"tag":"TOR", "type": "*composed", "value": "ToR"}, +// {"tag":"OriginID", "type": "*composed", "value": "OriginID"}, +// {"tag":"RequestType", "type": "*composed", "value": "RequestType"}, +// {"tag":"Direction", "type": "*composed", "value": "Direction"}, +// {"tag":"Tenant", "type": "*composed", "value": "Tenant"}, +// {"tag":"Category", "type": "*composed", "value": "Category"}, +// {"tag":"Account", "type": "*composed", "value": "Account"}, +// {"tag":"Subject", "type": "*composed", "value": "Subject"}, +// {"tag":"Destination", "type": "*composed", "value": "Destination"}, +// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"}, +// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"}, +// {"tag":"Usage", "type": "*composed", "value": "Usage"}, +// {"tag":"Cost", "type": "*composed", "value": "Cost"}, +// ], +// }, + ] }, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 759e46da4..ee4234fec 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -98,11 +98,12 @@ type CdrsJsonCfg struct { } type CdrReplicationJsonCfg struct { - Transport *string - Address *string - Synchronous *bool - Attempts *int - Cdr_filter *string + Transport *string + Address *string + Synchronous *bool + Attempts *int + Cdr_filter *string + Content_fields *[]*CdrFieldJsonCfg } // Cdrstats config section diff --git a/engine/cdrs.go b/engine/cdrs.go index f446ad326..e9d3f9397 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -475,7 +475,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error { if rplCfg.Synchronous { errChan = make(chan error) } - go func(body interface{}, rplCfg *config.CdrReplicationCfg, content string, errChan chan error) { + go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) { fallbackPath := path.Join( self.cgrCfg.HttpFailedDir, rplCfg.FallbackFileName()) diff --git a/general_tests/cdrs_replication_it_test.go b/general_tests/cdrs_replication_it_test.go index 576b8a11f..6134b99cd 100644 --- a/general_tests/cdrs_replication_it_test.go +++ b/general_tests/cdrs_replication_it_test.go @@ -156,7 +156,7 @@ func TestCdrsFileFailover(t *testing.T) { } time.Sleep(time.Duration(1 * time.Second)) failoverContent := []byte(`Account=1001&AnswerTime=2013-12-07T08%3A42%3A26Z&Category=call&Destination=1002&Direction=%2Aout&DisconnectCause=&OriginHost=192.168.1.1&OriginID=httpjsonrpc1&PDD=0&RequestType=%2Apseudoprepaid&SetupTime=2013-12-07T08%3A42%3A24Z&Source=UNKNOWN&Subject=1001&Supplier=&Tenant=cgrates.org&ToR=%2Avoice&Usage=10&field_extr1=val_extr1&fieldextr2=valextr2`) - var rplCfg *config.CdrReplicationCfg + var rplCfg *config.CDRReplicationCfg for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication { if strings.HasSuffix(rplCfg.Address, "invalid") { // Find the config which shold generate the failoback break