From 2e61fcf5bb99bbdd52d1853a33abe44b898e6a0d Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 2 Oct 2015 18:13:29 +0200 Subject: [PATCH] HttpPoster implementation in CDR replication and actions with call_url, new configuration parameters added: http_failed_dir, replication attempts, fixes #201, Rated bug fix in CDRC, Cost imported via CDRC in templates now --- cdrc/cdrc.go | 6 ++- config/config.go | 7 +++ config/config_defaults.go | 27 +++++----- config/config_json_test.go | 1 + config/libconfig.go | 1 + config/libconfig_json.go | 4 +- .../cdrsreplicationmaster.json | 2 +- engine/action.go | 28 +++------- engine/cdrs.go | 6 ++- engine/responder.go | 1 - packages/jessie/rules | 1 + packages/squeeze/rules | 1 + utils/consts.go | 3 +- utils/httpclient.go | 54 ++++++++++++++++--- utils/httpclient_local_test.go | 29 +++++++++- 15 files changed, 120 insertions(+), 51 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 82a87b038..21c5fb80b 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -64,7 +64,7 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s cdr.Subject += fieldVal case utils.DESTINATION: cdr.Destination += fieldVal - case utils.RATED: + case utils.RATED_FLD: cdr.Rated, _ = strconv.ParseBool(fieldVal) case utils.SETUP_TIME: if cdr.SetupTime, err = utils.ParseTimeDetectLayout(fieldVal, timezone); err != nil { @@ -86,6 +86,10 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal, timezone s cdr.Supplier += fieldVal case utils.DISCONNECT_CAUSE: cdr.DisconnectCause += fieldVal + case utils.COST: + if cdr.Cost, err = strconv.ParseFloat(fieldVal, 64); err != nil { + return fmt.Errorf("Cannot parse cost field with value: %s, err: %s", fieldVal, err.Error()) + } default: // Extra fields will not match predefined so they all show up here cdr.ExtraFields[fieldId] += fieldVal } diff --git a/config/config.go b/config/config.go index 20f0fad19..900d7020a 100644 --- a/config/config.go +++ b/config/config.go @@ -199,6 +199,7 @@ type CGRConfig struct { RoundingDecimals int // Number of decimals to round end prices at HttpSkipTlsVerify bool // If enabled Http Client will accept any TLS certificate TpExportPath string // Path towards export folder for offline Tariff Plans + HttpFailedDir string // Directory path where we store failed http requests MaxCallDuration time.Duration // The maximum call duration (used by responder when querying DerivedCharging) // ToDo: export it in configuration file RaterEnabled bool // start standalone server (no balancer) RaterBalancer string // balancer address host:port @@ -570,6 +571,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if jsnGeneralCfg.Tpexport_dir != nil { self.TpExportPath = *jsnGeneralCfg.Tpexport_dir } + if jsnGeneralCfg.Http_failed_dir != nil { + self.HttpFailedDir = *jsnGeneralCfg.Http_failed_dir + } if jsnGeneralCfg.Default_timezone != nil { self.DefaultTimezone = *jsnGeneralCfg.Default_timezone } @@ -664,6 +668,9 @@ func (self *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) error { if rplJsonCfg.Synchronous != nil { self.CDRSCdrReplication[idx].Synchronous = *rplJsonCfg.Synchronous } + if rplJsonCfg.Attempts != nil { + self.CDRSCdrReplication[idx].Attempts = *rplJsonCfg.Attempts + } if rplJsonCfg.Cdr_filter != nil { if self.CDRSCdrReplication[idx].CdrFilter, err = utils.ParseRSRFields(*rplJsonCfg.Cdr_filter, utils.INFIELD_SEP); err != nil { return err diff --git a/config/config_defaults.go b/config/config_defaults.go index ac3ca8c13..c6a1ede8a 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -28,19 +28,20 @@ const CGRATES_CFG_JSON = ` // This is what you get when you load CGRateS with an empty configuration file. "general": { - "http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate - "rounding_decimals": 10, // system level precision for floats - "dbdata_encoding": "msgpack", // encoding used to store object data in strings: - "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans - "default_reqtype": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> - "default_category": "call", // default Type of Record to consider when missing from requests - "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests - "default_subject": "cgrates", // default rating Subject to consider when missing from requests - "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "connect_attempts": 3, // initial server connect attempts - "response_cache_ttl": "3s", // the life span of a cached response - "reconnects": -1, // number of retries in case of connection lost - "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up + "http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate + "rounding_decimals": 10, // system level precision for floats + "dbdata_encoding": "msgpack", // encoding used to store object data in strings: + "tpexport_dir": "/var/log/cgrates/tpe", // path towards export folder for offline Tariff Plans + "http_failed_dir": "/var/log/cgrates/http_failed", // directory path where we store failed http requests + "default_reqtype": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated> + "default_category": "call", // default Type of Record to consider when missing from requests + "default_tenant": "cgrates.org", // default Tenant to consider when missing from requests + "default_subject": "cgrates", // default rating Subject to consider when missing from requests + "default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + "connect_attempts": 3, // initial server connect attempts + "reconnects": -1, // number of retries in case of connection lost + "response_cache_ttl": "3s", // the life span of a cached response + "internal_ttl": "2m", // maximum duration to wait for internal connections before giving up }, diff --git a/config/config_json_test.go b/config/config_json_test.go index e8c01c817..fb027cc3d 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -42,6 +42,7 @@ func TestDfGeneralJsonCfg(t *testing.T) { Rounding_decimals: utils.IntPointer(10), Dbdata_encoding: utils.StringPointer("msgpack"), Tpexport_dir: utils.StringPointer("/var/log/cgrates/tpe"), + Http_failed_dir: utils.StringPointer("/var/log/cgrates/http_failed"), Default_reqtype: utils.StringPointer(utils.META_RATED), Default_category: utils.StringPointer("call"), Default_tenant: utils.StringPointer("cgrates.org"), diff --git a/config/libconfig.go b/config/libconfig.go index 23aebef03..9b964066f 100644 --- a/config/libconfig.go +++ b/config/libconfig.go @@ -26,5 +26,6 @@ type CdrReplicationCfg struct { Transport string Server string Synchronous bool + Attempts int // Number of attempts if not success CdrFilter utils.RSRFields // Only replicate if the filters here are matching } diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 4125dadb1..2965c7d43 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -24,13 +24,14 @@ type GeneralJsonCfg struct { Rounding_decimals *int Dbdata_encoding *string Tpexport_dir *string + Http_failed_dir *string Default_reqtype *string Default_category *string Default_tenant *string Default_subject *string Default_timezone *string - Reconnects *int Connect_attempts *int + Reconnects *int Response_cache_ttl *string Internal_ttl *string } @@ -93,6 +94,7 @@ type CdrReplicationJsonCfg struct { Transport *string Server *string Synchronous *bool + Attempts *int Cdr_filter *string } diff --git a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json index 0e22c7cec..20b8605d6 100644 --- a/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json +++ b/data/conf/samples/cdrsreplicationmaster/cdrsreplicationmaster.json @@ -12,7 +12,7 @@ "enabled": true, // start the CDR Server service: "store_cdrs": false, // store cdrs in storDb "cdr_replication":[ // replicate the rated CDR to a number of servers - {"transport": "*http_post", "server": "http://127.0.0.1:12080/cdr_http"}, + {"transport": "*http_post", "server": "http://127.0.0.1:12080/cdr_http", "attempts": 1}, //{"transport": "*http_post", "server": "http://127.0.0.1:8000/mycdr"}, ], diff --git a/engine/action.go b/engine/action.go index 622eccbd1..3d586991a 100644 --- a/engine/action.go +++ b/engine/action.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "net/smtp" + "path" "reflect" "sort" "strconv" @@ -427,7 +428,7 @@ func genericReset(ub *Account) error { return nil } -func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err error) { +func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) error { var o interface{} if ub != nil { o = ub @@ -435,12 +436,9 @@ func callUrl(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) (err if sq != nil { o = sq } - //jsn, err := json.Marshal(o) - //if err != nil { - // return err - //} cfg := config.CgrConfig() - _, err = utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, o) + fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) + _, err := utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 1, fallbackPath) return err } @@ -453,23 +451,9 @@ func callUrlAsync(ub *Account, sq *StatsQueueTriggered, a *Action, acs Actions) if sq != nil { o = sq } - //jsn, err := json.Marshal(o) - //if err != nil { - // return err - //} cfg := config.CgrConfig() - go func() { - for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort - if _, err := utils.HttpJsonPost(a.ExtraParameters, cfg.HttpSkipTlsVerify, o); err == nil { - break // Success, no need to reinterate - } else if i == 4 { // Last iteration, syslog the warning - utils.Logger.Warning(fmt.Sprintf(" WARNING: Failed calling url: [%s], error: [%s], triggered: %s", a.ExtraParameters, err.Error(), o)) - break - } - time.Sleep(time.Duration(i) * time.Minute) - } - - }() + fallbackPath := path.Join(cfg.HttpFailedDir, fmt.Sprintf("act_%s_%s_%s.json", a.ActionType, a.ExtraParameters, utils.GenUUID())) + go utils.HttpPoster(a.ExtraParameters, cfg.HttpSkipTlsVerify, o, utils.CONTENT_JSON, 3, fallbackPath) return nil } diff --git a/engine/cdrs.go b/engine/cdrs.go index d150945c9..58bf8b8ad 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -22,6 +22,7 @@ import ( "fmt" "io/ioutil" "net/http" + "path" "time" "github.com/cgrates/cgrates/config" @@ -362,10 +363,11 @@ func (self *CdrServer) replicateCdr(cdr *StoredCdr) error { } switch rplCfg.Transport { case utils.META_HTTP_POST: - httpClient := new(http.Client) errChan := make(chan error) go func(cdr *StoredCdr, rplCfg *config.CdrReplicationCfg, errChan chan error) { - if _, err := httpClient.PostForm(fmt.Sprintf("%s", rplCfg.Server), cdr.AsHttpForm()); err != nil { + utils.Logger.Debug(fmt.Sprintf("Replicating CDR: %+v, attempts: %d", cdr, rplCfg.Attempts)) + fallbackPath := path.Join(self.cgrCfg.HttpFailedDir, fmt.Sprintf("cdr_%s_%s_%s.form", rplCfg.Transport, rplCfg.Server, utils.GenUUID())) + if _, err := utils.HttpPoster(rplCfg.Server, self.cgrCfg.HttpSkipTlsVerify, cdr.AsHttpForm(), utils.CONTENT_FORM, rplCfg.Attempts, fallbackPath); err != nil { utils.Logger.Err(fmt.Sprintf(" Replicating CDR: %+v, got error: %s", cdr, err.Error())) errChan <- err } diff --git a/engine/responder.go b/engine/responder.go index 7200147c5..0b643e278 100644 --- a/engine/responder.go +++ b/engine/responder.go @@ -287,7 +287,6 @@ func (rs *Responder) GetDerivedMaxSessionTime(ev *StoredCdr, reply *float64) err }, ev, utils.EXTRA_FIELDS); err != nil && err != utils.ErrNotFound { return err } - // replace user profile fields if err := LoadUserProfile(ev, utils.EXTRA_FIELDS); err != nil { return err diff --git a/packages/jessie/rules b/packages/jessie/rules index d00e17c76..33646362d 100755 --- a/packages/jessie/rules +++ b/packages/jessie/rules @@ -39,6 +39,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/log/cgrates/cdre/fwv mkdir -p $(PKGDIR)/var/log/cgrates/history mkdir -p $(PKGDIR)/var/log/cgrates/tpe + mkdir -p $(PKGDIR)/var/log/cgrates/http_failed dh_strip dh_compress dh_fixperms diff --git a/packages/squeeze/rules b/packages/squeeze/rules index 4487a1c95..745308151 100755 --- a/packages/squeeze/rules +++ b/packages/squeeze/rules @@ -39,6 +39,7 @@ binary-arch: clean mkdir -p $(PKGDIR)/var/log/cgrates/cdre/fwv mkdir -p $(PKGDIR)/var/log/cgrates/history mkdir -p $(PKGDIR)/var/log/cgrates/tpe + mkdir -p $(PKGDIR)/var/log/cgrates/http_failed dh_strip dh_compress dh_fixperms diff --git a/utils/consts.go b/utils/consts.go index c3fcca656..8cf7defe4 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -124,7 +124,8 @@ const ( RATED_SUBJECT = "RatedSubject" COST = "Cost" COST_DETAILS = "CostDetails" - RATED = "Rated" + RATED = "rated" + RATED_FLD = "Rated" DEFAULT_RUNID = "*default" META_DEFAULT = "*default" STATIC_VALUE_PREFIX = "^" diff --git a/utils/httpclient.go b/utils/httpclient.go index f9589177d..5e048ebd0 100644 --- a/utils/httpclient.go +++ b/utils/httpclient.go @@ -21,14 +21,33 @@ package utils import ( "bytes" "crypto/tls" + "encoding/gob" "encoding/json" "fmt" "io/ioutil" "net/http" + "net/url" "os" "time" ) +var ( + CONTENT_JSON = "json" + CONTENT_FORM = "form" + CONTENT_TEXT = "text" +) + +// Converts interface to []byte +func GetBytes(content interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(content) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + // Post without automatic failover func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, error) { body, err := json.Marshal(content) @@ -55,8 +74,20 @@ func HttpJsonPost(url string, skipTlsVerify bool, content interface{}) ([]byte, } // Post with built-in failover -func HttpJsonPoster(url string, skipTlsVerify bool, content interface{}, retries int, fallbackFilePath string) ([]byte, error) { - body, err := json.Marshal(content) +func HttpPoster(addr string, skipTlsVerify bool, content interface{}, contentType string, attempts int, fallbackFilePath string) ([]byte, error) { + var body []byte + var urlData url.Values + var err error + switch contentType { + case CONTENT_JSON: + body, err = json.Marshal(content) + case CONTENT_FORM: + urlData = content.(url.Values) + case CONTENT_TEXT: + body = content.([]byte) + default: + err = fmt.Errorf("Unsupported ContentType: %s", contentType) + } if err != nil { return nil, err } @@ -65,22 +96,31 @@ func HttpJsonPoster(url string, skipTlsVerify bool, content interface{}, retries } client := &http.Client{Transport: tr} delay := Fib() - for i := 0; i < retries; i++ { - resp, err := client.Post(url, "application/json", bytes.NewBuffer(body)) + bodyType := "application/x-www-form-urlencoded" + if contentType == CONTENT_JSON { + bodyType = "application/json" + } + for i := 0; i < attempts; i++ { + var resp *http.Response + if IsSliceMember([]string{CONTENT_JSON, CONTENT_TEXT}, contentType) { + resp, err = client.Post(addr, bodyType, bytes.NewBuffer(body)) + } else if contentType == CONTENT_FORM { + resp, err = client.PostForm(addr, urlData) + } if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", url, err.Error())) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(delay()) continue } defer resp.Body.Close() respBody, err := ioutil.ReadAll(resp.Body) if err != nil { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", url, err.Error())) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) time.Sleep(delay()) continue } if resp.StatusCode > 299 { - Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", url, resp.StatusCode)) + Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) time.Sleep(delay()) continue } diff --git a/utils/httpclient_local_test.go b/utils/httpclient_local_test.go index a8ee14206..6930ce41d 100644 --- a/utils/httpclient_local_test.go +++ b/utils/httpclient_local_test.go @@ -22,6 +22,7 @@ import ( "encoding/json" "flag" "io/ioutil" + "os" "reflect" "testing" ) @@ -38,8 +39,8 @@ func TestHttpJsonPoster(t *testing.T) { return } content := &TestContent{Var1: "Val1", Var2: "Val2"} - filePath := "/tmp/test_http_poster.cgr" - if _, err := HttpJsonPoster("http://localhost:8080/invalid", true, content, 3, filePath); err != nil { + filePath := "/tmp/cgr_test_http_poster.json" + if _, err := HttpPoster("http://localhost:8080/invalid", true, content, true, 3, filePath); err != nil { t.Error(err) } jsnContent, _ := json.Marshal(content) @@ -48,4 +49,28 @@ func TestHttpJsonPoster(t *testing.T) { } else if !reflect.DeepEqual(jsnContent, readBytes) { t.Errorf("Expecting: %q, received: %q", string(jsnContent), string(readBytes)) } + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } +} + +func TestHttpBytesPoster(t *testing.T) { + if !*testLocal { + return + } + content := []byte(`Test + Test2 + `) + filePath := "/tmp/test_http_poster.http" + if _, err := HttpPoster("http://localhost:8080/invalid", true, content, false, 3, filePath); err != nil { + t.Error(err) + } + if readBytes, err := ioutil.ReadFile(filePath); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(content, readBytes) { + t.Errorf("Expecting: %q, received: %q", string(content), string(readBytes)) + } + if err := os.Remove(filePath); err != nil { + t.Error("Failed removing file: ", filePath) + } }