mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-14 20:59:53 +05:00
Adding re-connecting rpc client
This commit is contained in:
@@ -23,7 +23,6 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/rpc"
|
||||
"os"
|
||||
//"runtime"
|
||||
"strconv"
|
||||
@@ -40,6 +39,7 @@ import (
|
||||
"github.com/cgrates/cgrates/scheduler"
|
||||
"github.com/cgrates/cgrates/sessionmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -93,11 +93,11 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
|
||||
<-cacheChan // Cache needs to come up before we are ready
|
||||
connector = responder
|
||||
} else {
|
||||
var client *rpc.Client
|
||||
var client *rpcclient.RpcClient
|
||||
var err error
|
||||
|
||||
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
|
||||
client, err = rpc.Dial("tcp", cfg.MediatorRater)
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.MediatorRater, 0, 3, utils.GOB)
|
||||
if err == nil { //Connected so no need to reiterate
|
||||
break
|
||||
}
|
||||
@@ -146,11 +146,11 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
|
||||
<-cacheChan // Wait for the cache to init before start doing queries
|
||||
connector = responder
|
||||
} else {
|
||||
var client *rpc.Client
|
||||
var client *rpcclient.RpcClient
|
||||
var err error
|
||||
|
||||
for i := 0; i < cfg.SMRaterReconnects; i++ {
|
||||
client, err = rpc.Dial("tcp", cfg.SMRater)
|
||||
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, 3, utils.GOB)
|
||||
if err == nil { //Connected so no need to reiterate
|
||||
break
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/cgrates/cgrates/balancer2go"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
type Responder struct {
|
||||
@@ -289,7 +290,7 @@ type Connector interface {
|
||||
}
|
||||
|
||||
type RPCClientConnector struct {
|
||||
Client *rpc.Client
|
||||
Client *rpcclient.RpcClient
|
||||
}
|
||||
|
||||
func (rcc *RPCClientConnector) GetCost(cd CallDescriptor, cc *CallCost) error {
|
||||
|
||||
@@ -121,9 +121,12 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
}
|
||||
for _, dc := range dcs {
|
||||
runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
|
||||
engine.Logger.Debug(fmt.Sprintf("RunFiltersStr: %s", dc.RunFilters))
|
||||
matchingAllFilters := true
|
||||
for _, dcRunFilter := range runFilters {
|
||||
engine.Logger.Debug(fmt.Sprintf("Processing runFilter Id: %s with rules: %+v", dcRunFilter.Id, dcRunFilter.RSRules[0]))
|
||||
if fltrPass, _ := storedCdr.PassesFieldFilter(dcRunFilter); !fltrPass {
|
||||
engine.Logger.Debug(fmt.Sprintf("Not matching runFilter Id: %s with rules: %+v, cdr: %+v", dcRunFilter.Id, dcRunFilter.RSRules[0], storedCdr))
|
||||
matchingAllFilters = false
|
||||
break
|
||||
}
|
||||
@@ -148,7 +151,6 @@ func (self *Mediator) RateCdr(storedCdr *utils.StoredCdr) error {
|
||||
err.Error()) // Cannot fork CDR, important just runid and error
|
||||
continue
|
||||
}
|
||||
engine.Logger.Debug(fmt.Sprintf("Appending CdrRun: %+v\n", forkedCdr))
|
||||
cdrRuns = append(cdrRuns, forkedCdr)
|
||||
}
|
||||
for _, cdr := range cdrRuns {
|
||||
|
||||
@@ -8,3 +8,4 @@ go get -u -v github.com/go-sql-driver/mysql
|
||||
go get -u -v github.com/hoisie/redis
|
||||
go get -u -v github.com/howeyc/fsnotify
|
||||
go get -u -v github.com/cgrates/liner
|
||||
go get -u -v github.com/cgrates/rpcclient
|
||||
|
||||
Reference in New Issue
Block a user