HttpPoster implementation in CDR replication and actions with call_url, new configuration parameters added: http_failed_dir, replication attempts, fixes #201, Rated bug fix in CDRC, Cost imported via CDRC in templates now

This commit is contained in:
DanB
2015-10-02 18:13:29 +02:00
parent f75afcea33
commit 2e61fcf5bb
15 changed files with 120 additions and 51 deletions

View File

@@ -64,7 +64,7 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s
cdr.Subject += fieldVal
case utils.DESTINATION:
cdr.Destination += fieldVal
case utils.RATED:
case utils.RATED_FLD:
cdr.Rated, _ = strconv.ParseBool(fieldVal)
case utils.SETUP_TIME:
if cdr.SetupTime, err = utils.ParseTimeDetectLayout(fieldVal, timezone); err != nil {
@@ -86,6 +86,10 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s
cdr.Supplier += fieldVal
case utils.DISCONNECT_CAUSE:
cdr.DisconnectCause += fieldVal
case utils.COST:
if cdr.Cost, err = strconv.ParseFloat(fieldVal, 64); err != nil {
return fmt.Errorf("Cannot parse cost field with value: %s, err: %s", fieldVal, err.Error())
}
default: // Extra fields will not match predefined so they all show up here
cdr.ExtraFields[fieldId] += fieldVal
}

View File

@@ -199,6 +199,7 @@ type CGRConfig struct {
RoundingDecimals int // Number of decimals to round end prices at
HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate
TpExportPath string // Path towards export folder for offline Tariff Plans
HttpFailedDir string // Directory path where we store failed http requests
MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
@@ -570,6 +571,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if jsnGeneralCfg.Tpexport_dir != nil {
self.TpExportPath = *jsnGeneralCfg.Tpexport_dir
}
if jsnGeneralCfg.Http_failed_dir != nil {
self.HttpFailedDir = *jsnGeneralCfg.Http_failed_dir
}
if jsnGeneralCfg.Default_timezone != nil {
self.DefaultTimezone = *jsnGeneralCfg.Default_timezone
}
@@ -664,6 +668,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error {
if rplJsonCfg.Synchronous != nil {
self.CDRSCdrReplication[idx].Synchronous = *rplJsonCfg.Synchronous
}
if rplJsonCfg.Attempts != nil {
self.CDRSCdrReplication[idx].Attempts = *rplJsonCfg.Attempts
}
if rplJsonCfg.Cdr_filter != nil {
if self.CDRSCdrReplication[idx].CdrFilter, err = utils.ParseRSRFields(*rplJsonCfg.Cdr_filter, utils.INFIELD_SEP); err != nil {
return err

View File

@@ -28,19 +28,20 @@ const CGRATES_CFG_JSON = `
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate
"rounding_decimals": 10, // system level precision for floats
"dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
"tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans
"default_reqtype": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default Type of Record to consider when missing from requests
"default_tenant": "cgrates.org", // default Tenant to consider when missing from requests
"default_subject": "cgrates", // default rating Subject to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"response_cache_ttl": "3s", // the life span of a cached response
"reconnects": -1, // number of retries in case of connection lost
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate
"rounding_decimals": 10, // system level precision for floats
"dbdata_encoding": "msgpack", // encoding used to store object data in strings: <msgpack|json>
"tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans
"http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests
"default_reqtype": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default Type of Record to consider when missing from requests
"default_tenant": "cgrates.org", // default Tenant to consider when missing from requests
"default_subject": "cgrates", // default rating Subject to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"response_cache_ttl": "3s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
},

View File

@@ -42,6 +42,7 @@ func TestDfGeneralJsonCfg(t *testing.T) {
Rounding_decimals: utils.IntPointer(10),
Dbdata_encoding: utils.StringPointer("msgpack"),
Tpexport_dir: utils.StringPointer("/var/log/cgrates/tpe"),
Http_failed_dir: utils.StringPointer("/var/log/cgrates/http_failed"),
Default_reqtype: utils.StringPointer(utils.META_RATED),
Default_category: utils.StringPointer("call"),
Default_tenant: utils.StringPointer("cgrates.org"),

View File

@@ -26,5 +26,6 @@ type CdrReplicationCfg struct {
Transport string
Server string
Synchronous bool
Attempts int // Number of attempts if not success
CdrFilter utils.RSRFields // Only replicate if the filters here are matching
}

View File

@@ -24,13 +24,14 @@ type GeneralJsonCfg struct {
Rounding_decimals *int
Dbdata_encoding *string
Tpexport_dir *string
Http_failed_dir *string
Default_reqtype *string
Default_category *string
Default_tenant *string
Default_subject *string
Default_timezone *string
Reconnects *int
Connect_attempts *int
Reconnects *int
Response_cache_ttl *string
Internal_ttl *string
}
@@ -93,6 +94,7 @@ type CdrReplicationJsonCfg struct {
Transport *string
Server *string
Synchronous *bool
Attempts *int
Cdr_filter *string
}

View File

@@ -12,7 +12,7 @@
"enabled": true, // start the CDR Server service: <true|false>
"store_cdrs": false, // store cdrs in storDb
"cdr_replication":[ // replicate the rated CDR to a number of servers
{"transport": "*http_post", "server": "http://127.0.0.1:12080/cdr_http"},
{"transport": "*http_post", "server": "http://127.0.0.1:12080/cdr_http", "attempts": 1},
//{"transport": "*http_post", "server": "http://127.0.0.1:8000/mycdr"},
],

View File

@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/smtp"
"path"
"reflect"
"sort"
"strconv"
@@ -427,7 +428,7 @@ func genericReset(ub *Account) error {
return nil
}
func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) {
func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error {
var o interface{}
if ub != nil {
o = ub
@@ -435,12 +436,9 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err
if sq != nil {
o = sq
}
//jsn, err := json.Marshal(o)
//if err != nil {
// return err
//}
cfg := config.CgrConfig()
_, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, o)
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
_, err := utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 1, fallbackPath)
return err
}
@@ -453,23 +451,9 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions)
if sq != nil {
o = sq
}
//jsn, err := json.Marshal(o)
//if err != nil {
// return err
//}
cfg := config.CgrConfig()
go func() {
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
if _, err := utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, o); err == nil {
break // Success, no need to reinterate
} else if i == 4 { // Last iteration, syslog the warning
utils.Logger.Warning(fmt.Sprintf("<Triggers> WARNING: Failed calling url: [%s], error: [%s], triggered: %s", a.ExtraParameters, err.Error(), o))
break
}
time.Sleep(time.Duration(i) * time.Minute)
}
}()
fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID()))
go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 3, fallbackPath)
return nil
}

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"path"
"time"
"github.com/cgrates/cgrates/config"
@@ -362,10 +363,11 @@ func (self *CdrServer) replicateCdr(cdr *StoredCdr) error {
}
switch rplCfg.Transport {
case utils.META_HTTP_POST:
httpClient := new(http.Client)
errChan := make(chan error)
go func(cdr *StoredCdr, rplCfg *config.CdrReplicationCfg, errChan chan error) {
if _, err := httpClient.PostForm(fmt.Sprintf("%s", rplCfg.Server), cdr.AsHttpForm()); err != nil {
utils.Logger.Debug(fmt.Sprintf("Replicating CDR: %+v, attempts: %d", cdr, rplCfg.Attempts))
fallbackPath := path.Join(self.cgrCfg.HttpFailedDir, fmt.Sprintf("cdr_%s_%s_%s.form", rplCfg.Transport, rplCfg.Server, utils.GenUUID()))
if _, err := utils.HttpPoster(rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, cdr.AsHttpForm(), utils.CONTENT_FORM, rplCfg.Attempts, fallbackPath); err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRReplicator> Replicating CDR: %+v, got error: %s", cdr, err.Error()))
errChan <- err
}

View File

@@ -287,7 +287,6 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err
}, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound {
return err
}
// replace user profile fields
if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil {
return err

View File

@@ -39,6 +39,7 @@ binary-arch: clean
mkdir -p $(PKGDIR)/var/log/cgrates/cdre/fwv
mkdir -p $(PKGDIR)/var/log/cgrates/history
mkdir -p $(PKGDIR)/var/log/cgrates/tpe
mkdir -p $(PKGDIR)/var/log/cgrates/http_failed
dh_strip
dh_compress
dh_fixperms

View File

@@ -39,6 +39,7 @@ binary-arch: clean
mkdir -p $(PKGDIR)/var/log/cgrates/cdre/fwv
mkdir -p $(PKGDIR)/var/log/cgrates/history
mkdir -p $(PKGDIR)/var/log/cgrates/tpe
mkdir -p $(PKGDIR)/var/log/cgrates/http_failed
dh_strip
dh_compress
dh_fixperms

View File

@@ -124,7 +124,8 @@ const (
RATED_SUBJECT = "RatedSubject"
COST = "Cost"
COST_DETAILS = "CostDetails"
RATED = "Rated"
RATED = "rated"
RATED_FLD = "Rated"
DEFAULT_RUNID = "*default"
META_DEFAULT = "*default"
STATIC_VALUE_PREFIX = "^"

View File

@@ -21,14 +21,33 @@ package utils
import (
"bytes"
"crypto/tls"
"encoding/gob"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"time"
)
var (
CONTENT_JSON = "json"
CONTENT_FORM = "form"
CONTENT_TEXT = "text"
)
// Converts interface to []byte
func GetBytes(content interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(content)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Post without automatic failover
func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) {
body, err := json.Marshal(content)
@@ -55,8 +74,20 @@ func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte,
}
// Post with built-in failover
func HttpJsonPoster(url string, skipTlsVerify bool, content interface{}, retries int, fallbackFilePath string) ([]byte, error) {
body, err := json.Marshal(content)
func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentType string, attempts int, fallbackFilePath string) ([]byte, error) {
var body []byte
var urlData url.Values
var err error
switch contentType {
case CONTENT_JSON:
body, err = json.Marshal(content)
case CONTENT_FORM:
urlData = content.(url.Values)
case CONTENT_TEXT:
body = content.([]byte)
default:
err = fmt.Errorf("Unsupported ContentType: %s", contentType)
}
if err != nil {
return nil, err
}
@@ -65,22 +96,31 @@ func HttpJsonPoster(url string, skipTlsVerify bool, content interface{}, retries
}
client := &http.Client{Transport: tr}
delay := Fib()
for i := 0; i < retries; i++ {
resp, err := client.Post(url, "application/json", bytes.NewBuffer(body))
bodyType := "application/x-www-form-urlencoded"
if contentType == CONTENT_JSON {
bodyType = "application/json"
}
for i := 0; i < attempts; i++ {
var resp *http.Response
if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) {
resp, err = client.Post(addr, bodyType, bytes.NewBuffer(body))
} else if contentType == CONTENT_FORM {
resp, err = client.PostForm(addr, urlData)
}
if err != nil {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", url, err.Error()))
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(delay())
continue
}
defer resp.Body.Close()
respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", url, err.Error()))
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
time.Sleep(delay())
continue
}
if resp.StatusCode > 299 {
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, unexpected status code received: <%d>", url, resp.StatusCode))
Logger.Warning(fmt.Sprintf("<HttpPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
time.Sleep(delay())
continue
}

View File

@@ -22,6 +22,7 @@ import (
"encoding/json"
"flag"
"io/ioutil"
"os"
"reflect"
"testing"
)
@@ -38,8 +39,8 @@ func TestHttpJsonPoster(t *testing.T) {
return
}
content := &TestContent{Var1: "Val1", Var2: "Val2"}
filePath := "/tmp/test_http_poster.cgr"
if _, err := HttpJsonPoster("http://localhost:8080/invalid", true, content, 3, filePath); err != nil {
filePath := "/tmp/cgr_test_http_poster.json"
if _, err := HttpPoster("http://localhost:8080/invalid", true, content, true, 3, filePath); err != nil {
t.Error(err)
}
jsnContent, _ := json.Marshal(content)
@@ -48,4 +49,28 @@ func TestHttpJsonPoster(t *testing.T) {
} else if !reflect.DeepEqual(jsnContent, readBytes) {
t.Errorf("Expecting: %q, received: %q", string(jsnContent), string(readBytes))
}
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
}
func TestHttpBytesPoster(t *testing.T) {
if !*testLocal {
return
}
content := []byte(`Test
Test2
`)
filePath := "/tmp/test_http_poster.http"
if _, err := HttpPoster("http://localhost:8080/invalid", true, content, false, 3, filePath); err != nil {
t.Error(err)
}
if readBytes, err := ioutil.ReadFile(filePath); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(content, readBytes) {
t.Errorf("Expecting: %q, received: %q", string(content), string(readBytes))
}
if err := os.Remove(filePath); err != nil {
t.Error("Failed removing file: ", filePath)
}
}