diff --git a/engine/cdrs.go b/engine/cdrs.go index a8f444a94..31a6092fe 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -372,7 +372,7 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { _, hasLastUsed := cdr.ExtraFields[utils.LastUsed] if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && (cdr.Usage != 0 || hasLastUsed) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards // Should be previously calculated and stored in DB - delay := utils.Fib() + fib := utils.Fib() var smCosts []*SMCost cgrID := cdr.CGRID if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT { @@ -384,7 +384,7 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { break } if i != 3 { - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) } } if len(smCosts) != 0 { // Cost retrieved from SMCost table diff --git a/engine/libtest.go b/engine/libtest.go index 3d96172ec..064647608 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -68,18 +68,22 @@ func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) { if err := engine.Start(); err != nil { return nil, err } - if cfg, err := config.NewCGRConfigFromFolder(cfgPath); err == nil { - for { - time.Sleep(100 * time.Millisecond) - _, err2 := jsonrpc.Dial("tcp", cfg.RPCJSONListen) - if err2 == nil { - break - } - } - } else { + cfg, err := config.NewCGRConfigFromFolder(cfgPath) + if err != nil { return nil, err } - //time.Sleep(time.Duration(waitEngine) * time.Millisecond) // Give time to rater to fire up + fib := utils.Fib() + var connected bool + for i := 0; i < 200; i++ { + time.Sleep(time.Duration(fib()) * time.Millisecond) + if _, err := jsonrpc.Dial("tcp", cfg.RPCJSONListen); err == nil { + connected = true + break + } + } + if !connected { + return nil, fmt.Errorf("engine did not open port <%d>", cfg.RPCJSONListen) + } return engine, nil } diff --git a/engine/pubsub.go b/engine/pubsub.go index 0308a8b23..3ae0e74b7 100644 --- a/engine/pubsub.go +++ b/engine/pubsub.go @@ -165,7 +165,7 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error { switch transport { case utils.META_HTTP_POST: go func() { - delay := utils.Fib() + fib := utils.Fib() for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort if _, err := ps.pubFunc(address, ttlVerify, jsn); err == nil { break // Success, no need to reinterate @@ -173,7 +173,7 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error { utils.Logger.Warning(fmt.Sprintf(" Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"])) break } - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) } }() } diff --git a/sessionmanager/smg_session.go b/sessionmanager/smg_session.go index 79d3326f8..c00ab9465 100644 --- a/sessionmanager/smg_session.go +++ b/sessionmanager/smg_session.go @@ -32,21 +32,24 @@ import ( // One session handled by SM type SMGSession struct { - mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed - stopDebit chan struct{} // Channel to communicate with debit loops when closing the session - CGRID string // Unique identifier for this session - RunID string // Keep a reference for the derived run - Timezone string - EventStart SMGenericEvent // Event which started the session - CD *engine.CallDescriptor // initial CD used for debits, updated on each debit + mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed + stopDebit chan struct{} // Channel to communicate with debit loops when closing the session + clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. + rals rpcclient.RpcClientConnection // Connector to rals service + cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service + + CGRID string // Unique identifier for this session + RunID string // Keep a reference for the derived run + Timezone string + EventStart SMGenericEvent // Event which started the session + CD *engine.CallDescriptor // initial CD used for debits, updated on each debit + CallCosts []*engine.CallCost - ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked - LastUsage time.Duration // last requested Duration - LastDebit time.Duration // last real debited duration - TotalUsage time.Duration // sum of lastUsage - clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect. - rals rpcclient.RpcClientConnection // Connector to rals service - cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service + ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked + LastUsage time.Duration // last requested Duration + LastDebit time.Duration // last real debited duration + TotalUsage time.Duration // sum of lastUsage + } // Called in case of automatic debits diff --git a/utils/coreutils.go b/utils/coreutils.go index de5c7d4bb..b628addcc 100644 --- a/utils/coreutils.go +++ b/utils/coreutils.go @@ -362,11 +362,11 @@ func Unzip(src, dest string) error { } // successive Fibonacci numbers. -func Fib() func() time.Duration { +func Fib() func() int { a, b := 0, 1 - return func() time.Duration { + return func() int { a, b = b, a+b - return time.Duration(a) * time.Second + return a } } diff --git a/utils/poster.go b/utils/poster.go index b25d40c72..d01036a1b 100644 --- a/utils/poster.go +++ b/utils/poster.go @@ -150,7 +150,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac urlVals = content.(url.Values) body = []byte(urlVals.Encode()) } - delay := Fib() + fib := Fib() bodyType := "application/x-www-form-urlencoded" if contentType == CONTENT_JSON { bodyType = "application/json" @@ -164,19 +164,19 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac } if err != nil { Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) continue } defer resp.Body.Close() respBody, err = ioutil.ReadAll(resp.Body) if err != nil { Logger.Warning(fmt.Sprintf(" Posting to : <%s>, error: <%s>", addr, err.Error())) - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) continue } if resp.StatusCode > 299 { Logger.Warning(fmt.Sprintf(" Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode)) - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) continue } return respBody, nil @@ -249,13 +249,13 @@ type AMQPPoster struct { // the optional chn will permits channel caching func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, fallbackFileName string) (*amqp.Channel, error) { var err error - delay := Fib() + fib := Fib() if chn == nil { for i := 0; i < pstr.attempts; i++ { if chn, err = pstr.NewPostChannel(); err == nil { break } - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) } if err != nil && fallbackFileName != META_NONE { err = pstr.writeToFile(fallbackFileName, content) @@ -275,7 +275,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by }); err == nil { break } - time.Sleep(delay()) + time.Sleep(time.Duration(fib()) * time.Second) } if err != nil && fallbackFileName != META_NONE { err = pstr.writeToFile(fallbackFileName, content)