Adding content_fields to CDRS replication config

This commit is contained in:
DanB
2016-10-14 12:51:48 +02:00
parent 3425a8da53
commit 3c5c41d542
6 changed files with 44 additions and 18 deletions

View File

@@ -23,14 +23,15 @@ import (
"net/url"
)
type CdrReplicationCfg struct {
Transport string
Address string
Synchronous bool
Attempts int // Number of attempts if not success
CdrFilter utils.RSRFields // Only replicate if the filters here are matching
type CDRReplicationCfg struct {
Transport string
Address string
Synchronous bool
Attempts int // Number of attempts if not success
CdrFilter utils.RSRFields // Only replicate if the filters here are matching
ContentFields []*CfgCdrField
}
func (rplCfg CdrReplicationCfg) FallbackFileName() string {
func (rplCfg CDRReplicationCfg) FallbackFileName() string {
return fmt.Sprintf("cdr_%s_%s_%s.form", rplCfg.Transport, url.QueryEscape(rplCfg.Address), utils.GenUUID())
}

View File

@@ -241,7 +241,7 @@ type CGRConfig struct {
CDRSUserSConns []*HaPoolConfig // address where to reach the users service: <""|internal|x.y.z.y:1234>
CDRSAliaseSConns []*HaPoolConfig // address where to reach the aliases service: <""|internal|x.y.z.y:1234>
CDRSStatSConns []*HaPoolConfig // address where to reach the cdrstats service. Empty to disable stats gathering <""|internal|x.y.z.y:1234>
CDRSCdrReplication []*CdrReplicationCfg // Replicate raw CDRs to a number of servers
CDRSCdrReplication []*CDRReplicationCfg // Replicate raw CDRs to a number of servers
CDRStatsEnabled bool // Enable CDR Stats service
CDRStatsSaveInterval time.Duration // Save interval duration
CdreProfiles map[string]*CdreConfig
@@ -887,9 +887,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
}
}
if jsnCdrsCfg.Cdr_replication != nil {
self.CDRSCdrReplication = make([]*CdrReplicationCfg, len(*jsnCdrsCfg.Cdr_replication))
self.CDRSCdrReplication = make([]*CDRReplicationCfg, len(*jsnCdrsCfg.Cdr_replication))
for idx, rplJsonCfg := range *jsnCdrsCfg.Cdr_replication {
self.CDRSCdrReplication[idx] = new(CdrReplicationCfg)
self.CDRSCdrReplication[idx] = new(CDRReplicationCfg)
if rplJsonCfg.Transport != nil {
self.CDRSCdrReplication[idx].Transport = *rplJsonCfg.Transport
}

View File

@@ -141,7 +141,31 @@ const CGRATES_CFG_JSON = `
"users_conns": [], // address where to reach the user service, empty to disable user profile functionality: <""|*internal|x.y.z.y:1234>
"aliases_conns": [], // address where to reach the aliases service, empty to disable aliases functionality: <""|*internal|x.y.z.y:1234>
"cdrstats_conns": [], // address where to reach the cdrstats service, empty to disable stats functionality<""|*internal|x.y.z.y:1234>
"cdr_replication":[] // replicate the raw CDR to a number of servers
"cdr_replication":[
// {
// "transport": "*http_post", // mechanism to use when replicating
// "address": "http://127.0.0.1:12080/cdr_http", // address where to replicate
// "attempts": 1, // number of attempts for POST before failing on file
// "cdr_filter": "", // filter the CDRs being replicated
// "content_fields": [ // template of the replicated content fields
// {"tag": "CGRID", "type": "*composed", "value": "CGRID"},
// {"tag":"RunID", "type": "*composed", "value": "RunID"},
// {"tag":"TOR", "type": "*composed", "value": "ToR"},
// {"tag":"OriginID", "type": "*composed", "value": "OriginID"},
// {"tag":"RequestType", "type": "*composed", "value": "RequestType"},
// {"tag":"Direction", "type": "*composed", "value": "Direction"},
// {"tag":"Tenant", "type": "*composed", "value": "Tenant"},
// {"tag":"Category", "type": "*composed", "value": "Category"},
// {"tag":"Account", "type": "*composed", "value": "Account"},
// {"tag":"Subject", "type": "*composed", "value": "Subject"},
// {"tag":"Destination", "type": "*composed", "value": "Destination"},
// {"tag":"SetupTime", "type": "*composed", "value": "SetupTime", "layout": "2006-01-02T15:04:05Z07:00"},
// {"tag":"AnswerTime", "type": "*composed", "value": "AnswerTime", "layout": "2006-01-02T15:04:05Z07:00"},
// {"tag":"Usage", "type": "*composed", "value": "Usage"},
// {"tag":"Cost", "type": "*composed", "value": "Cost"},
// ],
// },
]
},

View File

@@ -98,11 +98,12 @@ type CdrsJsonCfg struct {
}
type CdrReplicationJsonCfg struct {
Transport *string
Address *string
Synchronous *bool
Attempts *int
Cdr_filter *string
Transport *string
Address *string
Synchronous *bool
Attempts *int
Cdr_filter *string
Content_fields *[]*CdrFieldJsonCfg
}
// Cdrstats config section

View File

@@ -475,7 +475,7 @@ func (self *CdrServer) replicateCdr(cdr *CDR) error {
if rplCfg.Synchronous {
errChan = make(chan error)
}
go func(body interface{}, rplCfg *config.CdrReplicationCfg, content string, errChan chan error) {
go func(body interface{}, rplCfg *config.CDRReplicationCfg, content string, errChan chan error) {
fallbackPath := path.Join(
self.cgrCfg.HttpFailedDir,
rplCfg.FallbackFileName())

View File

@@ -156,7 +156,7 @@ func TestCdrsFileFailover(t *testing.T) {
}
time.Sleep(time.Duration(1 * time.Second))
failoverContent := []byte(`Account=1001&AnswerTime=2013-12-07T08%3A42%3A26Z&Category=call&Destination=1002&Direction=%2Aout&DisconnectCause=&OriginHost=192.168.1.1&OriginID=httpjsonrpc1&PDD=0&RequestType=%2Apseudoprepaid&SetupTime=2013-12-07T08%3A42%3A24Z&Source=UNKNOWN&Subject=1001&Supplier=&Tenant=cgrates.org&ToR=%2Avoice&Usage=10&field_extr1=val_extr1&fieldextr2=valextr2`)
var rplCfg *config.CdrReplicationCfg
var rplCfg *config.CDRReplicationCfg
for _, rplCfg = range cdrsMasterCfg.CDRSCdrReplication {
if strings.HasSuffix(rplCfg.Address, "invalid") { // Find the config which shold generate the failoback
break