mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Issue #234:
Added http_post and http_text on Cdrs Replication
Added as default 1 attemps in cdrs_replication (Old json version
doesn't have this option)
Added url scape on servername when file fallback is created. If not
can't be created
This commit is contained in:
@@ -676,6 +676,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
|
||||
}
|
||||
if rplJsonCfg.Attempts != nil {
|
||||
self.CDRSCdrReplication[idx].Attempts = *rplJsonCfg.Attempts
|
||||
} else {
|
||||
// Use 1 as default. If not the cdr will never send
|
||||
self.CDRSCdrReplication[idx].Attempts = 1
|
||||
}
|
||||
if rplJsonCfg.Cdr_filter != nil {
|
||||
if self.CDRSCdrReplication[idx].CdrFilter, err = utils.ParseRSRFields(*rplJsonCfg.Cdr_filter, utils.INFIELD_SEP); err != nil {
|
||||
|
||||
@@ -19,9 +19,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package config
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CdrReplicationCfg struct {
|
||||
@@ -32,6 +33,15 @@ type CdrReplicationCfg struct {
|
||||
CdrFilter utils.RSRFields // Only replicate if the filters here are matching
|
||||
}
|
||||
|
||||
func (rplCfg CdrReplicationCfg) GetFallbackFileName() string {
|
||||
serverName := url.QueryEscape(rplCfg.Server)
|
||||
|
||||
result := fmt.Sprintf("cdr_%s_%s_%s.form",
|
||||
rplCfg.Transport,
|
||||
serverName, utils.GenUUID())
|
||||
return result
|
||||
}
|
||||
|
||||
type SureTaxCfg struct {
|
||||
Url string
|
||||
ClientNumber string
|
||||
|
||||
@@ -366,20 +366,40 @@ func (self *CdrServer) replicateCdr(cdr *StoredCdr) error {
|
||||
if !passesFilters { // Not passes filters, ignore this replication
|
||||
continue
|
||||
}
|
||||
var body interface{}
|
||||
var content = ""
|
||||
switch rplCfg.Transport {
|
||||
case utils.META_HTTP_POST:
|
||||
errChan := make(chan error)
|
||||
go func(cdr *StoredCdr, rplCfg *config.CdrReplicationCfg, errChan chan error) {
|
||||
fallbackPath := path.Join(self.cgrCfg.HttpFailedDir, fmt.Sprintf("cdr_%s_%s_%s.form", rplCfg.Transport, rplCfg.Server, utils.GenUUID()))
|
||||
if _, err := utils.HttpPoster(rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, cdr.AsHttpForm(), utils.CONTENT_FORM, rplCfg.Attempts, fallbackPath); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<CDRReplicator> Replicating CDR: %+v, got error: %s", cdr, err.Error()))
|
||||
errChan <- err
|
||||
}
|
||||
errChan <- nil
|
||||
}(cdr, rplCfg, errChan)
|
||||
if rplCfg.Synchronous { // Synchronize here
|
||||
<-errChan
|
||||
content = utils.CONTENT_FORM
|
||||
body = cdr.AsHttpForm()
|
||||
case utils.META_HTTP_TEXT:
|
||||
content = utils.CONTENT_TEXT
|
||||
body = cdr
|
||||
case utils.META_HTTP_JSON:
|
||||
content = utils.CONTENT_JSON
|
||||
body = cdr
|
||||
}
|
||||
|
||||
errChan := make(chan error)
|
||||
go func(body interface{}, rplCfg *config.CdrReplicationCfg, content string, errChan chan error) {
|
||||
|
||||
fallbackPath := path.Join(
|
||||
self.cgrCfg.HttpFailedDir,
|
||||
rplCfg.GetFallbackFileName())
|
||||
|
||||
_, err := utils.HttpPoster(
|
||||
rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, body,
|
||||
content, rplCfg.Attempts, fallbackPath)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf(
|
||||
"<CDRReplicator> Replicating CDR: %+v, got error: %s", cdr, err.Error()))
|
||||
errChan <- err
|
||||
}
|
||||
errChan <- nil
|
||||
|
||||
}(body, rplCfg, content, errChan)
|
||||
if rplCfg.Synchronous { // Synchronize here
|
||||
<-errChan
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -205,6 +205,8 @@ const (
|
||||
METATAG = "metatag"
|
||||
HTTP_POST = "http_post"
|
||||
META_HTTP_POST = "*http_post"
|
||||
META_HTTP_JSON = "*http_json"
|
||||
META_HTTP_TEXT = "*http_text"
|
||||
META_HTTP_JSONRPC = "*http_jsonrpc"
|
||||
NANO_MULTIPLIER = 1000000000
|
||||
CGR_AUTHORIZE = "CGR_AUTHORIZE"
|
||||
|
||||
Reference in New Issue
Block a user