mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 15:18:44 +05:00
StartEngine changes in libtest
This commit is contained in:
@@ -372,7 +372,7 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
|
||||
_, hasLastUsed := cdr.ExtraFields[utils.LastUsed]
|
||||
if utils.IsSliceMember([]string{utils.META_PREPAID, utils.PREPAID}, cdr.RequestType) && (cdr.Usage != 0 || hasLastUsed) { // ToDo: Get rid of PREPAID as soon as we don't want to support it backwards
|
||||
// Should be previously calculated and stored in DB
|
||||
delay := utils.Fib()
|
||||
fib := utils.Fib()
|
||||
var smCosts []*SMCost
|
||||
cgrID := cdr.CGRID
|
||||
if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT {
|
||||
@@ -384,7 +384,7 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
|
||||
break
|
||||
}
|
||||
if i != 3 {
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}
|
||||
if len(smCosts) != 0 { // Cost retrieved from SMCost table
|
||||
|
||||
@@ -68,18 +68,22 @@ func StartEngine(cfgPath string, waitEngine int) (*exec.Cmd, error) {
|
||||
if err := engine.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cfg, err := config.NewCGRConfigFromFolder(cfgPath); err == nil {
|
||||
for {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
_, err2 := jsonrpc.Dial("tcp", cfg.RPCJSONListen)
|
||||
if err2 == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cfg, err := config.NewCGRConfigFromFolder(cfgPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//time.Sleep(time.Duration(waitEngine) * time.Millisecond) // Give time to rater to fire up
|
||||
fib := utils.Fib()
|
||||
var connected bool
|
||||
for i := 0; i < 200; i++ {
|
||||
time.Sleep(time.Duration(fib()) * time.Millisecond)
|
||||
if _, err := jsonrpc.Dial("tcp", cfg.RPCJSONListen); err == nil {
|
||||
connected = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !connected {
|
||||
return nil, fmt.Errorf("engine did not open port <%d>", cfg.RPCJSONListen)
|
||||
}
|
||||
return engine, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -165,7 +165,7 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
|
||||
switch transport {
|
||||
case utils.META_HTTP_POST:
|
||||
go func() {
|
||||
delay := utils.Fib()
|
||||
fib := utils.Fib()
|
||||
for i := 0; i < 5; i++ { // Loop so we can increase the success rate on best effort
|
||||
if _, err := ps.pubFunc(address, ttlVerify, jsn); err == nil {
|
||||
break // Success, no need to reinterate
|
||||
@@ -173,7 +173,7 @@ func (ps *PubSub) Publish(evt CgrEvent, reply *string) error {
|
||||
utils.Logger.Warning(fmt.Sprintf("<PubSub> Failed calling url: [%s], error: [%s], event type: %s", address, err.Error(), evt["EventName"]))
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -32,21 +32,24 @@ import (
|
||||
|
||||
// One session handled by SM
|
||||
type SMGSession struct {
|
||||
mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed
|
||||
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
|
||||
CGRID string // Unique identifier for this session
|
||||
RunID string // Keep a reference for the derived run
|
||||
Timezone string
|
||||
EventStart SMGenericEvent // Event which started the session
|
||||
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
|
||||
mux sync.RWMutex // protects the SMGSession in places where is concurrently accessed
|
||||
stopDebit chan struct{} // Channel to communicate with debit loops when closing the session
|
||||
clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect.
|
||||
rals rpcclient.RpcClientConnection // Connector to rals service
|
||||
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
|
||||
|
||||
CGRID string // Unique identifier for this session
|
||||
RunID string // Keep a reference for the derived run
|
||||
Timezone string
|
||||
EventStart SMGenericEvent // Event which started the session
|
||||
CD *engine.CallDescriptor // initial CD used for debits, updated on each debit
|
||||
|
||||
CallCosts []*engine.CallCost
|
||||
ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked
|
||||
LastUsage time.Duration // last requested Duration
|
||||
LastDebit time.Duration // last real debited duration
|
||||
TotalUsage time.Duration // sum of lastUsage
|
||||
clntConn rpcclient.RpcClientConnection // Reference towards client connection on SMG side so we can disconnect.
|
||||
rals rpcclient.RpcClientConnection // Connector to rals service
|
||||
cdrsrv rpcclient.RpcClientConnection // Connector to CDRS service
|
||||
ExtraDuration time.Duration // keeps the current duration debited on top of what heas been asked
|
||||
LastUsage time.Duration // last requested Duration
|
||||
LastDebit time.Duration // last real debited duration
|
||||
TotalUsage time.Duration // sum of lastUsage
|
||||
|
||||
}
|
||||
|
||||
// Called in case of automatic debits
|
||||
|
||||
@@ -362,11 +362,11 @@ func Unzip(src, dest string) error {
|
||||
}
|
||||
|
||||
// successive Fibonacci numbers.
|
||||
func Fib() func() time.Duration {
|
||||
func Fib() func() int {
|
||||
a, b := 0, 1
|
||||
return func() time.Duration {
|
||||
return func() int {
|
||||
a, b = b, a+b
|
||||
return time.Duration(a) * time.Second
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
|
||||
urlVals = content.(url.Values)
|
||||
body = []byte(urlVals.Encode())
|
||||
}
|
||||
delay := Fib()
|
||||
fib := Fib()
|
||||
bodyType := "application/x-www-form-urlencoded"
|
||||
if contentType == CONTENT_JSON {
|
||||
bodyType = "application/json"
|
||||
@@ -164,19 +164,19 @@ func (poster *HTTPPoster) Post(addr string, contentType string, content interfac
|
||||
}
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, error: <%s>", addr, err.Error()))
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode > 299 {
|
||||
Logger.Warning(fmt.Sprintf("<HTTPPoster> Posting to : <%s>, unexpected status code received: <%d>", addr, resp.StatusCode))
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
continue
|
||||
}
|
||||
return respBody, nil
|
||||
@@ -249,13 +249,13 @@ type AMQPPoster struct {
|
||||
// the optional chn will permits channel caching
|
||||
func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []byte, fallbackFileName string) (*amqp.Channel, error) {
|
||||
var err error
|
||||
delay := Fib()
|
||||
fib := Fib()
|
||||
if chn == nil {
|
||||
for i := 0; i < pstr.attempts; i++ {
|
||||
if chn, err = pstr.NewPostChannel(); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
@@ -275,7 +275,7 @@ func (pstr *AMQPPoster) Post(chn *amqp.Channel, contentType string, content []by
|
||||
}); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(delay())
|
||||
time.Sleep(time.Duration(fib()) * time.Second)
|
||||
}
|
||||
if err != nil && fallbackFileName != META_NONE {
|
||||
err = pstr.writeToFile(fallbackFileName, content)
|
||||
|
||||
Reference in New Issue
Block a user