RPC server changes

rpc server now serves both json and gob, refactored server code into
it's own type
This commit is contained in:
Radu Ioan Fericean
2014-01-22 19:51:57 +02:00
parent 08e5d8540a
commit 0005ba2f68
11 changed files with 308 additions and 338 deletions

View File

@@ -23,10 +23,6 @@ import (
"encoding/csv"
"errors"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/howeyc/fsnotify"
"io"
"io/ioutil"
"net/http"
@@ -36,6 +32,11 @@ import (
"strconv"
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/howeyc/fsnotify"
)
const (
@@ -59,9 +60,9 @@ func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
}
type Cdrc struct {
cgrCfg *config.CGRConfig
cgrCfg *config.CGRConfig
cfgCdrFields map[string]string // Key is the name of the field
httpClient *http.Client
httpClient *http.Client
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
@@ -80,17 +81,17 @@ func (self *Cdrc) Run() error {
func (self *Cdrc) parseFieldsConfig() error {
var err error
self.cfgCdrFields = map[string]string{
utils.ACCID: self.cgrCfg.CdrcAccIdField,
utils.REQTYPE: self.cgrCfg.CdrcReqTypeField,
utils.DIRECTION: self.cgrCfg.CdrcDirectionField,
utils.TENANT: self.cgrCfg.CdrcTenantField,
utils.TOR: self.cgrCfg.CdrcTorField,
utils.ACCOUNT: self.cgrCfg.CdrcAccountField,
utils.SUBJECT: self.cgrCfg.CdrcSubjectField,
utils.DESTINATION: self.cgrCfg.CdrcDestinationField,
utils.ANSWER_TIME: self.cgrCfg.CdrcAnswerTimeField,
utils.DURATION: self.cgrCfg.CdrcDurationField,
}
utils.ACCID: self.cgrCfg.CdrcAccIdField,
utils.REQTYPE: self.cgrCfg.CdrcReqTypeField,
utils.DIRECTION: self.cgrCfg.CdrcDirectionField,
utils.TENANT: self.cgrCfg.CdrcTenantField,
utils.TOR: self.cgrCfg.CdrcTorField,
utils.ACCOUNT: self.cgrCfg.CdrcAccountField,
utils.SUBJECT: self.cgrCfg.CdrcSubjectField,
utils.DESTINATION: self.cgrCfg.CdrcDestinationField,
utils.ANSWER_TIME: self.cgrCfg.CdrcAnswerTimeField,
utils.DURATION: self.cgrCfg.CdrcDurationField,
}
// Add extra fields here, config extra fields in the form of []string{"fieldName1:indxInCsv1","fieldName2: indexInCsv2"}
for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields {
@@ -203,7 +204,7 @@ func (self *Cdrc) processFile(filePath string) error {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Error in csv file: %s", err.Error()))
continue
}
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil {
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.HTTPListen), cdrAsForm); err != nil {
engine.Logger.Err(fmt.Sprintf("<Cdrc> Failed posting CDR, error: %s", err.Error()))
continue
}

View File

@@ -20,12 +20,13 @@ package cdrs
import (
"fmt"
"io/ioutil"
"net/http"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/mediator"
"github.com/cgrates/cgrates/utils"
"io/ioutil"
"net/http"
)
var (
@@ -75,8 +76,7 @@ func New(s engine.CdrStorage, m *mediator.Mediator, c *config.CGRConfig) *CDRS {
return &CDRS{}
}
func (cdrs *CDRS) StartCapturingCDRs() {
http.HandleFunc("/cgr", cgrCdrHandler) // Attach CGR CDR Handler
http.HandleFunc("/freeswitch_json", fsCdrHandler) // Attach FreeSWITCH JSON CDR Handler
http.ListenAndServe(cfg.CDRSListen, nil)
func (cdrs *CDRS) RegisterHanlersToServer(server *engine.Server) {
server.RegisterHttpFunc("/cgr", cgrCdrHandler)
server.RegisterHttpFunc("/freeswitch_json", fsCdrHandler)
}

View File

@@ -22,11 +22,8 @@ import (
"errors"
"flag"
"fmt"
"io"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"runtime"
"strconv"
@@ -46,7 +43,6 @@ import (
)
const (
DISABLED = "disabled"
INTERNAL = "internal"
JSON = "json"
GOB = "gob"
@@ -59,52 +55,23 @@ const (
)
var (
cfgPath = flag.String("config", "/etc/cgrates/cgrates.cfg", "Configuration file location.")
version = flag.Bool("version", false, "Prints the application version.")
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config")
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config")
mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config")
pidFile = flag.String("pid", "", "Write pid file")
bal = balancer2go.NewBalancer()
exitChan = make(chan bool)
sm sessionmanager.SessionManager
medi *mediator.Mediator
cfg *config.CGRConfig
err error
cfgPath = flag.String("config", "/etc/cgrates/cgrates.cfg", "Configuration file location.")
version = flag.Bool("version", false, "Prints the application version.")
raterEnabled = flag.Bool("rater", false, "Enforce starting of the rater daemon overwriting config")
schedEnabled = flag.Bool("scheduler", false, "Enforce starting of the scheduler daemon overwriting config")
cdrsEnabled = flag.Bool("cdrs", false, "Enforce starting of the cdrs daemon overwriting config")
cdrcEnabled = flag.Bool("cdrc", false, "Enforce starting of the cdrc service overwriting config")
mediatorEnabled = flag.Bool("mediator", false, "Enforce starting of the mediator service overwriting config")
pidFile = flag.String("pid", "", "Write pid file")
bal = balancer2go.NewBalancer()
exitChan = make(chan bool)
server = &engine.Server{}
sm sessionmanager.SessionManager
medi *mediator.Mediator
cfg *config.CGRConfig
err error
)
func listenToRPCRequests(rpcResponder interface{}, apier *apier.ApierV1, rpcAddress string, rpc_encoding string) {
l, err := net.Listen("tcp", rpcAddress)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<Rater> Could not listen to %v: %v", rpcAddress, err))
exitChan <- true
return
}
defer l.Close()
engine.Logger.Info(fmt.Sprintf("<Rater> Listening for incomming RPC requests on %v", l.Addr()))
rpc.Register(rpcResponder)
rpc.Register(apier)
var serveFunc func(io.ReadWriteCloser)
if rpc_encoding == JSON {
serveFunc = jsonrpc.ServeConn
} else {
serveFunc = rpc.ServeConn
}
for {
conn, err := l.Accept()
if err != nil {
engine.Logger.Err(fmt.Sprintf("<Rater> Accept error: %v", conn))
continue
}
engine.Logger.Info(fmt.Sprintf("<Rater> New incoming connection: %v", conn.RemoteAddr()))
go serveFunc(conn)
}
}
func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrDb engine.CdrStorage) {
var connector engine.Connector
if cfg.MediatorRater == INTERNAL {
@@ -112,22 +79,13 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
} else {
var client *rpc.Client
var err error
if cfg.RPCEncoding == JSON {
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
client, err = jsonrpc.Dial("tcp", cfg.MediatorRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
} else {
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.MediatorRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
for i := 0; i < cfg.MediatorRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.MediatorRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Could not connect to engine: %v", err))
@@ -163,24 +121,13 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
} else {
var client *rpc.Client
var err error
if cfg.RPCEncoding == JSON {
// We attempt to reconnect more times
for i := 0; i < cfg.SMRaterReconnects; i++ {
client, err = jsonrpc.Dial("tcp", cfg.SMRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
} else {
for i := 0; i < cfg.SMRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.SMRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
for i := 0; i < cfg.SMRaterReconnects; i++ {
client, err = rpc.Dial("tcp", cfg.SMRater)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i/2) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Could not connect to engine: %v", err))
@@ -217,7 +164,7 @@ func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage) {
}
}
cs := cdrs.New(cdrDb, medi, cfg)
cs.StartCapturingCDRs()
cs.RegisterHanlersToServer(server)
exitChan <- true
}
@@ -233,32 +180,7 @@ func startHistoryScribe() {
}
if cfg.HistoryServerEnabled {
if cfg.HistoryListen != INTERNAL {
rpc.RegisterName("Scribe", scribeServer)
var serveFunc func(io.ReadWriteCloser)
if cfg.RPCEncoding == JSON {
serveFunc = jsonrpc.ServeConn
} else {
serveFunc = rpc.ServeConn
}
l, err := net.Listen("tcp", cfg.HistoryListen)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<History> Could not listen to %v: %v", cfg.HistoryListen, err))
exitChan <- true
return
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
engine.Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
continue
}
engine.Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
go serveFunc(conn)
}
}
server.RpcRegisterName("Scribe", scribeServer)
}
var scribeAgent history.Scribe
@@ -266,7 +188,7 @@ func startHistoryScribe() {
if cfg.HistoryAgentEnabled {
if cfg.HistoryServer != INTERNAL { // Connect in iteration since there are chances of concurrency here
for i := 0; i < 3; i++ { //ToDo: Make it globally configurable
if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer, cfg.RPCEncoding); err == nil {
if scribeAgent, err = history.NewProxyScribe(cfg.HistoryServer); err == nil {
break //Connected so no need to reiterate
} else if i == 2 && err != nil {
engine.Logger.Crit(err.Error())
@@ -289,11 +211,11 @@ func startHistoryScribe() {
}
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!")
return errors.New("Improperly configured balancer")
}
@@ -410,23 +332,26 @@ func main() {
}
stopHandled := false
// Async starts here
if cfg.RaterEnabled && cfg.RaterBalancer != DISABLED && !cfg.BalancerEnabled {
if cfg.RaterEnabled && cfg.RaterBalancer != "" && !cfg.BalancerEnabled {
go registerToBalancer()
go stopRaterSignalHandler()
stopHandled = true
}
responder := &engine.Responder{ExitChan: exitChan}
apier := &apier.ApierV1{StorDb: loadDb, RatingDb: ratingDb, AccountDb: accountDb, CdrDb: cdrDb, Config: cfg}
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterListen != INTERNAL {
engine.Logger.Info(fmt.Sprintf("Starting CGRateS Rater on %s.", cfg.RaterListen))
go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding)
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterBalancer != INTERNAL {
engine.Logger.Info("Starting CGRateS Rater")
server.RpcRegister(responder)
server.RpcRegister(apier)
}
if cfg.BalancerEnabled {
engine.Logger.Info(fmt.Sprintf("Starting CGRateS Balancer on %s.", cfg.BalancerListen))
engine.Logger.Info("Starting CGRateS Balancer")
go stopBalancerSignalHandler()
stopHandled = true
responder.Bal = bal
go listenToRPCRequests(responder, apier, cfg.BalancerListen, cfg.RPCEncoding)
server.RpcRegister(responder)
server.RpcRegister(apier)
if cfg.RaterEnabled {
engine.Logger.Info("Starting internal engine.")
bal.AddClient("local", new(engine.ResponderWorker))
@@ -472,6 +397,9 @@ func main() {
engine.Logger.Info("Starting CGRateS CDR Client.")
go startCdrc()
}
go server.ServeGOB(cfg.RPCGOBListen)
go server.ServeJSON(cfg.RPCJSONListen)
go server.ServeHTTP(cfg.HTTPListen)
<-exitChan
if *pidFile != "" {
if err := os.Remove(*pidFile); err != nil {

View File

@@ -76,7 +76,7 @@ func unregisterFromBalancer() {
}
var reply int
engine.Logger.Info(fmt.Sprintf("Unregistering from balancer %s", cfg.RaterBalancer))
client.Call("Responder.UnRegisterRater", cfg.RaterListen, &reply)
client.Call("Responder.UnRegisterRater", cfg.RPCGOBListen, &reply)
if err := client.Close(); err != nil {
engine.Logger.Crit("Could not close balancer unregistration!")
exitChan <- true
@@ -95,7 +95,7 @@ func registerToBalancer() {
}
var reply int
engine.Logger.Info(fmt.Sprintf("Registering to balancer %s", cfg.RaterBalancer))
client.Call("Responder.RegisterRater", cfg.RaterListen, &reply)
client.Call("Responder.RegisterRater", cfg.RPCGOBListen, &reply)
if err := client.Close(); err != nil {
engine.Logger.Crit("Could not close balancer registration!")
exitChan <- true

View File

@@ -24,7 +24,6 @@ import (
"fmt"
"log"
"net/rpc"
"net/rpc/jsonrpc"
"path"
"github.com/cgrates/cgrates/config"
@@ -35,7 +34,7 @@ import (
var (
//separator = flag.String("separator", ",", "Default field separator")
cgrConfig, _ = config.NewDefaultCGRConfig()
cgrConfig, _ = config.NewDefaultCGRConfig()
ratingdb_type = flag.String("ratingdb_type", cgrConfig.RatingDBType, "The type of the RatingDb database <redis>")
ratingdb_host = flag.String("ratingdb_host", cgrConfig.RatingDBHost, "The RatingDb host to connect to.")
ratingdb_port = flag.String("ratingdb_port", cgrConfig.RatingDBPort, "The RatingDb port to bind to.")
@@ -70,7 +69,6 @@ var (
toStorDb = flag.Bool("to_stordb", false, "Import the tariff plan from files to storDb")
historyServer = flag.String("history_server", cgrConfig.HistoryServer, "The history server address:port, empty to disable automaticautomatic history archiving")
raterAddress = flag.String("rater_address", cgrConfig.MediatorRater, "Rater service to contact for cache reloads, empty to disable automatic cache reloads")
rpcEncoding = flag.String("rpc_encoding", cgrConfig.RPCEncoding, "The history server rpc encoding json|gob")
runId = flag.String("runid", "", "Uniquely identify an import/load, postpended to some automatic fields")
)
@@ -82,21 +80,21 @@ func main() {
}
var errRatingDb, errAccDb, errStorDb, err error
var ratingDb engine.RatingStorage
var accountDb engine.AccountingStorage
var accountDb engine.AccountingStorage
var storDb engine.LoadStorage
var rater *rpc.Client
var loader engine.TPLoader
// Init necessary db connections, only if not already
if !*dryRun { // make sure we do not need db connections on dry run, also not importing into any stordb
if *fromStorDb {
ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name,
ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name,
*ratingdb_user, *ratingdb_pass, *dbdata_encoding)
accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding)
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else if *toStorDb { // Import from csv files to storDb
storDb, errStorDb = engine.ConfigureLoadStorage(*stor_db_type, *stor_db_host, *stor_db_port, *stor_db_name, *stor_db_user, *stor_db_pass, *dbdata_encoding)
} else { // Default load from csv files to dataDb
ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name,
ratingDb, errRatingDb = engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name,
*ratingdb_user, *ratingdb_pass, *dbdata_encoding)
accountDb, errAccDb = engine.ConfigureAccountingStorage(*accountdb_type, *accountdb_host, *accountdb_port, *accountdb_name, *accountdb_user, *accountdb_pass, *dbdata_encoding)
}
@@ -132,18 +130,18 @@ func main() {
log.Fatal(err, "\n\t", v.Message)
}
}
loader = engine.NewFileCSVReader(ratingDb, accountDb, ',',
path.Join(*dataPath, utils.DESTINATIONS_CSV),
path.Join(*dataPath, utils.TIMINGS_CSV),
path.Join(*dataPath, utils.RATES_CSV),
path.Join(*dataPath, utils.DESTINATION_RATES_CSV),
path.Join(*dataPath, utils.RATING_PLANS_CSV),
path.Join(*dataPath, utils.RATING_PROFILES_CSV),
path.Join(*dataPath, utils.ACTIONS_CSV),
path.Join(*dataPath, utils.ACTION_PLANS_CSV),
path.Join(*dataPath, utils.ACTION_TRIGGERS_CSV),
path.Join(*dataPath, utils.ACCOUNT_ACTIONS_CSV))
}
loader = engine.NewFileCSVReader(ratingDb, accountDb, ',',
path.Join(*dataPath, utils.DESTINATIONS_CSV),
path.Join(*dataPath, utils.TIMINGS_CSV),
path.Join(*dataPath, utils.RATES_CSV),
path.Join(*dataPath, utils.DESTINATION_RATES_CSV),
path.Join(*dataPath, utils.RATING_PLANS_CSV),
path.Join(*dataPath, utils.RATING_PROFILES_CSV),
path.Join(*dataPath, utils.ACTIONS_CSV),
path.Join(*dataPath, utils.ACTION_PLANS_CSV),
path.Join(*dataPath, utils.ACTION_TRIGGERS_CSV),
path.Join(*dataPath, utils.ACCOUNT_ACTIONS_CSV))
}
err = loader.LoadAll()
if err != nil {
log.Fatal(err)
@@ -155,7 +153,7 @@ func main() {
return
}
if *historyServer != "" { // Init scribeAgent so we can store the differences
if scribeAgent, err := history.NewProxyScribe(*historyServer, *rpcEncoding); err != nil {
if scribeAgent, err := history.NewProxyScribe(*historyServer); err != nil {
log.Fatalf("Could not connect to history server, error: %s. Make sure you have properly configured it via -history_server flag.", err.Error())
return
} else {
@@ -167,11 +165,7 @@ func main() {
log.Print("WARNING: Rates history archiving is disabled!")
}
if *raterAddress != "" { // Init connection to rater so we can reload it's data
if *rpcEncoding == config.JSON {
rater, err = jsonrpc.Dial("tcp", *raterAddress)
} else {
rater, err = rpc.Dial("tcp", *raterAddress)
}
rater, err = rpc.Dial("tcp", *raterAddress)
if err != nil {
log.Fatalf("Could not connect to rater: %s", err.Error())
return

View File

@@ -20,48 +20,47 @@ package main
import (
"flag"
"fmt"
"log"
"net/rpc"
"os"
"runtime"
"runtime/pprof"
"time"
"fmt"
"net/rpc"
"net/rpc/jsonrpc"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
)
var (
cgrConfig, _ = config.NewDefaultCGRConfig()
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to this file")
runs = flag.Int("runs", 10000, "stress cycle number")
parallel = flag.Int("parallel", 0, "run n requests in parallel")
ratingdb_type = flag.String("ratingdb_type", cgrConfig.RatingDBType, "The type of the RatingDb database <redis>")
ratingdb_host = flag.String("ratingdb_host", cgrConfig.RatingDBHost, "The RatingDb host to connect to.")
ratingdb_port = flag.String("ratingdb_port", cgrConfig.RatingDBPort, "The RatingDb port to bind to.")
ratingdb_name = flag.String("ratingdb_name", cgrConfig.RatingDBName, "The name/number of the RatingDb to connect to.")
ratingdb_user = flag.String("ratingdb_user", cgrConfig.RatingDBUser, "The RatingDb user to sign in as.")
ratingdb_pass = flag.String("ratingdb_passwd", cgrConfig.RatingDBPass, "The RatingDb user's password.")
accountdb_type = flag.String("accountdb_type", cgrConfig.AccountDBType, "The type of the AccountingDb database <redis>")
accountdb_host = flag.String("accountdb_host", cgrConfig.AccountDBHost, "The AccountingDb host to connect to.")
accountdb_port = flag.String("accountdb_port", cgrConfig.AccountDBPort, "The AccountingDb port to bind to.")
accountdb_name = flag.String("accountdb_name", cgrConfig.AccountDBName, "The name/number of the AccountingDb to connect to.")
accountdb_user = flag.String("accountdb_user", cgrConfig.AccountDBUser, "The AccountingDb user to sign in as.")
accountdb_pass = flag.String("accountdb_passwd", cgrConfig.AccountDBPass, "The AccountingDb user's password.")
cgrConfig, _ = config.NewDefaultCGRConfig()
cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
memprofile = flag.String("memprofile", "", "write memory profile to this file")
runs = flag.Int("runs", 10000, "stress cycle number")
parallel = flag.Int("parallel", 0, "run n requests in parallel")
ratingdb_type = flag.String("ratingdb_type", cgrConfig.RatingDBType, "The type of the RatingDb database <redis>")
ratingdb_host = flag.String("ratingdb_host", cgrConfig.RatingDBHost, "The RatingDb host to connect to.")
ratingdb_port = flag.String("ratingdb_port", cgrConfig.RatingDBPort, "The RatingDb port to bind to.")
ratingdb_name = flag.String("ratingdb_name", cgrConfig.RatingDBName, "The name/number of the RatingDb to connect to.")
ratingdb_user = flag.String("ratingdb_user", cgrConfig.RatingDBUser, "The RatingDb user to sign in as.")
ratingdb_pass = flag.String("ratingdb_passwd", cgrConfig.RatingDBPass, "The RatingDb user's password.")
accountdb_type = flag.String("accountdb_type", cgrConfig.AccountDBType, "The type of the AccountingDb database <redis>")
accountdb_host = flag.String("accountdb_host", cgrConfig.AccountDBHost, "The AccountingDb host to connect to.")
accountdb_port = flag.String("accountdb_port", cgrConfig.AccountDBPort, "The AccountingDb port to bind to.")
accountdb_name = flag.String("accountdb_name", cgrConfig.AccountDBName, "The name/number of the AccountingDb to connect to.")
accountdb_user = flag.String("accountdb_user", cgrConfig.AccountDBUser, "The AccountingDb user to sign in as.")
accountdb_pass = flag.String("accountdb_passwd", cgrConfig.AccountDBPass, "The AccountingDb user's password.")
dbdata_encoding = flag.String("dbdata_encoding", cgrConfig.DBDataEncoding, "The encoding used to store object data in strings.")
raterAddress = flag.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.")
rpcEncoding = flag.String("rpc_encoding", cgrConfig.RPCEncoding, "Rpc encoding to use when talking to remote rater <json|gob>")
tor = flag.String("tor", "call", "The type of record to use in queries.")
tenant = flag.String("tenant", "call", "The type of record to use in queries.")
subject = flag.String("subject", "1001", "The rating subject to use in queries.")
destination = flag.String("destination", "+4986517174963", "The destination to use in queries.")
raterAddress = flag.String("rater_address", "", "Rater address for remote tests. Empty for internal rater.")
tor = flag.String("tor", "call", "The type of record to use in queries.")
tenant = flag.String("tenant", "call", "The type of record to use in queries.")
subject = flag.String("subject", "1001", "The rating subject to use in queries.")
destination = flag.String("destination", "+4986517174963", "The destination to use in queries.")
nilDuration = time.Duration(0)
)
func durInternalRater( cd *engine.CallDescriptor) (time.Duration, error) {
func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
ratingDb, err := engine.ConfigureRatingStorage(*ratingdb_type, *ratingdb_host, *ratingdb_port, *ratingdb_name, *ratingdb_user, *ratingdb_pass, *dbdata_encoding)
if err != nil {
return nilDuration, fmt.Errorf("Could not connect to rating database: %s", err.Error())
@@ -104,16 +103,9 @@ func durInternalRater( cd *engine.CallDescriptor) (time.Duration, error) {
return time.Since(start), nil
}
func durRemoteRater( cd *engine.CallDescriptor) (time.Duration, error) {
func durRemoteRater(cd *engine.CallDescriptor) (time.Duration, error) {
result := engine.CallCost{}
var client *rpc.Client
var err error
if *rpcEncoding=="json" {
client, err = jsonrpc.Dial("tcp", *raterAddress)
} else {
client, err = rpc.Dial("tcp", *raterAddress)
}
client, err := rpc.Dial("tcp", *raterAddress)
if err != nil {
return nilDuration, fmt.Errorf("Could not connect to engine: ", err.Error())
}
@@ -144,9 +136,6 @@ func durRemoteRater( cd *engine.CallDescriptor) (time.Duration, error) {
log.Println(result)
return time.Since(start), nil
}
func main() {
flag.Parse()

View File

@@ -72,7 +72,9 @@ type CGRConfig struct {
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
DBDataEncoding string // The encoding used to store object data in strings: <msgpack|json>
RPCEncoding string // RPC encoding used on APIs: <gob|json>.
RPCJSONListen string // RPC JSON listening address
RPCGOBListen string // RPC GOB listening address
HTTPListen string // HTTP listening address
DefaultReqType string // Use this request type if not defined on top
DefaultTOR string // set default type of record
DefaultTenant string // set default tenant
@@ -81,65 +83,60 @@ type CGRConfig struct {
RoundingDecimals int // Number of decimals to round end prices at
RaterEnabled bool // start standalone server (no balancer)
RaterBalancer string // balancer address host:port
RaterListen string // listening address host:port
BalancerEnabled bool
BalancerListen string // Json RPC server address
SchedulerEnabled bool
CDRSEnabled bool // Enable CDR Server service
CDRSListen string // CDRS's listening interface: <x.y.z.y:1234>.
CDRSExtraFields []string //Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CdreCdrFormat string // Format of the exported CDRs. <csv>
CdreExtraFields []string // Extra fields list to add in exported CDRs
CdreDir string // Path towards exported cdrs directory
CdrcEnabled bool // Enable CDR client functionality
CdrcCdrs string // Address where to reach CDR server
CdrcCdrsMethod string // Mechanism to use when posting CDRs on server <http_cgr>
CdrcRunDelay time.Duration // Sleep interval between consecutive runs, if time unit missing, defaults to seconds, 0 to use automation via inotify
CdrcCdrType string // CDR file format <csv>.
CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database.
CdrcAccIdField string // Accounting id field identifier. Use index number in case of .csv cdrs.
CdrcReqTypeField string // Request type field identifier. Use index number in case of .csv cdrs.
CdrcDirectionField string // Direction field identifier. Use index numbers in case of .csv cdrs.
CdrcTenantField string // Tenant field identifier. Use index numbers in case of .csv cdrs.
CdrcTorField string // Type of Record field identifier. Use index numbers in case of .csv cdrs.
CdrcAccountField string // Account field identifier. Use index numbers in case of .csv cdrs.
CdrcSubjectField string // Subject field identifier. Use index numbers in case of .csv CDRs.
CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs.
CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs.
CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs.
CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "field1:index1,field2:index2"
CDRSEnabled bool // Enable CDR Server service
CDRSExtraFields []string //Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CdreCdrFormat string // Format of the exported CDRs. <csv>
CdreExtraFields []string // Extra fields list to add in exported CDRs
CdreDir string // Path towards exported cdrs directory
CdrcEnabled bool // Enable CDR client functionality
CdrcCdrs string // Address where to reach CDR server
CdrcCdrsMethod string // Mechanism to use when posting CDRs on server <http_cgr>
CdrcRunDelay time.Duration // Sleep interval between consecutive runs, if time unit missing, defaults to seconds, 0 to use automation via inotify
CdrcCdrType string // CDR file format <csv>.
CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database.
CdrcAccIdField string // Accounting id field identifier. Use index number in case of .csv cdrs.
CdrcReqTypeField string // Request type field identifier. Use index number in case of .csv cdrs.
CdrcDirectionField string // Direction field identifier. Use index numbers in case of .csv cdrs.
CdrcTenantField string // Tenant field identifier. Use index numbers in case of .csv cdrs.
CdrcTorField string // Type of Record field identifier. Use index numbers in case of .csv cdrs.
CdrcAccountField string // Account field identifier. Use index numbers in case of .csv cdrs.
CdrcSubjectField string // Subject field identifier. Use index numbers in case of .csv CDRs.
CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs.
CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs.
CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs.
CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "field1:index1,field2:index2"
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMRaterReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
SMRaterReconnects int // Number of reconnect attempts to rater
SMDebitInterval int // the period to be debited in advanced during a call (in seconds)
SMMaxCallDuration time.Duration // The maximum duration of a call
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorListen string // Mediator's listening interface: <internal>.
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
MediatorRunIds []string // Identifiers for each mediation run on CDRs
MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAnswerTimeFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryListen string // History server listening interface: <internal|x.y.z.y:1234>
HistoryDir string // Location on disk where to store history files.
HistorySaveInterval time.Duration // The timout duration between history writes
MediatorEnabled bool // Starts Mediator service: <true|false>.
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
MediatorRunIds []string // Identifiers for each mediation run on CDRs
MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAnswerTimeFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
HistoryAgentEnabled bool // Starts History as an agent: <true|false>.
HistoryServer string // Address where to reach the master history server: <internal|x.y.z.y:1234>
HistoryServerEnabled bool // Starts History as server: <true|false>.
HistoryDir string // Location on disk where to store history files.
HistorySaveInterval time.Duration // The timout duration between history writes
}
func (self *CGRConfig) setDefaults() error {
@@ -162,7 +159,9 @@ func (self *CGRConfig) setDefaults() error {
self.StorDBUser = "cgrates"
self.StorDBPass = "CGRateS.org"
self.DBDataEncoding = utils.MSGPACK
self.RPCEncoding = JSON
self.RPCJSONListen = "127.0.0.1:2012"
self.RPCGOBListen = "127.0.0.1:2013"
self.HTTPListen = "127.0.0.1:2080"
self.DefaultReqType = utils.RATED
self.DefaultTOR = "call"
self.DefaultTenant = "cgrates.org"
@@ -170,20 +169,17 @@ func (self *CGRConfig) setDefaults() error {
self.RoundingMethod = utils.ROUNDING_MIDDLE
self.RoundingDecimals = 4
self.RaterEnabled = false
self.RaterBalancer = DISABLED
self.RaterListen = "127.0.0.1:2012"
self.RaterBalancer = ""
self.BalancerEnabled = false
self.BalancerListen = "127.0.0.1:2013"
self.SchedulerEnabled = false
self.CDRSEnabled = false
self.CDRSListen = "127.0.0.1:2022"
self.CDRSExtraFields = []string{}
self.CDRSMediator = ""
self.CdreCdrFormat = "csv"
self.CdreExtraFields = []string{}
self.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv"
self.CdrcEnabled = false
self.CdrcCdrs = "127.0.0.1:2022"
self.CdrcCdrs = "127.0.0.1:2080"
self.CdrcCdrsMethod = "http_cgr"
self.CdrcRunDelay = time.Duration(0)
self.CdrcCdrType = "csv"
@@ -202,8 +198,7 @@ func (self *CGRConfig) setDefaults() error {
self.CdrcDurationField = "9"
self.CdrcExtraFields = []string{}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
self.MediatorRater = "127.0.0.1:2013"
self.MediatorRaterReconnects = 3
self.MediatorRunIds = []string{}
self.MediatorSubjectFields = []string{}
@@ -217,7 +212,7 @@ func (self *CGRConfig) setDefaults() error {
self.MediatorDurationFields = []string{}
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = "127.0.0.1:2012"
self.SMRater = "127.0.0.1:2013"
self.SMRaterReconnects = 3
self.SMDebitInterval = 10
self.SMMaxCallDuration = time.Duration(3) * time.Hour
@@ -227,7 +222,6 @@ func (self *CGRConfig) setDefaults() error {
self.HistoryAgentEnabled = false
self.HistoryServerEnabled = false
self.HistoryServer = "127.0.0.1:2013"
self.HistoryListen = "127.0.0.1:2013"
self.HistoryDir = "/var/log/cgrates/history"
self.HistorySaveInterval = time.Duration(1) * time.Second
return nil
@@ -318,8 +312,14 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("global", "dbdata_encoding"); hasOpt {
cfg.DBDataEncoding, _ = c.GetString("global", "dbdata_encoding")
}
if hasOpt = c.HasOption("global", "rpc_encoding"); hasOpt {
cfg.RPCEncoding, _ = c.GetString("global", "rpc_encoding")
if hasOpt = c.HasOption("global", "rpc_json_listen"); hasOpt {
cfg.RPCJSONListen, _ = c.GetString("global", "rpc_json_listen")
}
if hasOpt = c.HasOption("global", "rpc_gob_listen"); hasOpt {
cfg.RPCGOBListen, _ = c.GetString("global", "rpc_gob_listen")
}
if hasOpt = c.HasOption("global", "http_listen"); hasOpt {
cfg.HTTPListen, _ = c.GetString("global", "http_listen")
}
if hasOpt = c.HasOption("global", "default_reqtype"); hasOpt {
cfg.DefaultReqType, _ = c.GetString("global", "default_reqtype")
@@ -345,24 +345,15 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("rater", "balancer"); hasOpt {
cfg.RaterBalancer, _ = c.GetString("rater", "balancer")
}
if hasOpt = c.HasOption("rater", "listen"); hasOpt {
cfg.RaterListen, _ = c.GetString("rater", "listen")
}
if hasOpt = c.HasOption("balancer", "enabled"); hasOpt {
cfg.BalancerEnabled, _ = c.GetBool("balancer", "enabled")
}
if hasOpt = c.HasOption("balancer", "listen"); hasOpt {
cfg.BalancerListen, _ = c.GetString("balancer", "listen")
}
if hasOpt = c.HasOption("scheduler", "enabled"); hasOpt {
cfg.SchedulerEnabled, _ = c.GetBool("scheduler", "enabled")
}
if hasOpt = c.HasOption("cdrs", "enabled"); hasOpt {
cfg.CDRSEnabled, _ = c.GetBool("cdrs", "enabled")
}
if hasOpt = c.HasOption("cdrs", "listen"); hasOpt {
cfg.CDRSListen, _ = c.GetString("cdrs", "listen")
}
if hasOpt = c.HasOption("cdrs", "extra_fields"); hasOpt {
if cfg.CDRSExtraFields, errParse = ConfigSlice(c, "cdrs", "extra_fields"); errParse != nil {
return nil, errParse
@@ -392,7 +383,7 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
cfg.CdrcCdrsMethod, _ = c.GetString("cdrc", "cdrs_method")
}
if hasOpt = c.HasOption("cdrc", "run_delay"); hasOpt {
durStr,_ := c.GetString("cdrc", "run_delay")
durStr, _ := c.GetString("cdrc", "run_delay")
if cfg.CdrcRunDelay, errParse = utils.ParseDurationWithSecs(durStr); errParse != nil {
return nil, errParse
}
@@ -447,9 +438,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
}
if hasOpt = c.HasOption("mediator", "listen"); hasOpt {
cfg.MediatorListen, _ = c.GetString("mediator", "listen")
}
if hasOpt = c.HasOption("mediator", "rater"); hasOpt {
cfg.MediatorRater, _ = c.GetString("mediator", "rater")
}
@@ -522,7 +510,7 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
cfg.SMDebitInterval, _ = c.GetInt("session_manager", "debit_interval")
}
if hasOpt = c.HasOption("session_manager", "max_call_duration"); hasOpt {
maxCallDurStr,_ := c.GetString("session_manager", "max_call_duration")
maxCallDurStr, _ := c.GetString("session_manager", "max_call_duration")
if cfg.SMMaxCallDuration, errParse = utils.ParseDurationWithSecs(maxCallDurStr); errParse != nil {
return nil, errParse
}
@@ -545,14 +533,11 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("history_server", "enabled"); hasOpt {
cfg.HistoryServerEnabled, _ = c.GetBool("history_server", "enabled")
}
if hasOpt = c.HasOption("history_server", "listen"); hasOpt {
cfg.HistoryListen, _ = c.GetString("history_server", "listen")
}
if hasOpt = c.HasOption("history_server", "history_dir"); hasOpt {
cfg.HistoryDir, _ = c.GetString("history_server", "history_dir")
}
if hasOpt = c.HasOption("history_server", "save_interval"); hasOpt {
saveIntvlStr,_ := c.GetString("history_server", "save_interval")
saveIntvlStr, _ := c.GetString("history_server", "save_interval")
if cfg.HistorySaveInterval, errParse = utils.ParseDurationWithSecs(saveIntvlStr); errParse != nil {
return nil, errParse
}

View File

@@ -28,8 +28,7 @@ import (
)
func TestConfigSharing(t *testing.T) {
cfg,_ := NewDefaultCGRConfig()
cfg.RPCEncoding = utils.MSGPACK
cfg, _ := NewDefaultCGRConfig()
SetCgrConfig(cfg)
cfgReturn := CgrConfig()
if !reflect.DeepEqual(cfgReturn, cfg) {
@@ -65,7 +64,9 @@ func TestDefaults(t *testing.T) {
eCfg.StorDBUser = "cgrates"
eCfg.StorDBPass = "CGRateS.org"
eCfg.DBDataEncoding = utils.MSGPACK
eCfg.RPCEncoding = JSON
eCfg.RPCJSONListen = "127.0.0.1:2012"
eCfg.RPCGOBListen = "127.0.0.1:2013"
eCfg.HTTPListen = "127.0.0.1:2080"
eCfg.DefaultReqType = utils.RATED
eCfg.DefaultTOR = "call"
eCfg.DefaultTenant = "cgrates.org"
@@ -73,20 +74,17 @@ func TestDefaults(t *testing.T) {
eCfg.RoundingMethod = utils.ROUNDING_MIDDLE
eCfg.RoundingDecimals = 4
eCfg.RaterEnabled = false
eCfg.RaterBalancer = DISABLED
eCfg.RaterListen = "127.0.0.1:2012"
eCfg.RaterBalancer = ""
eCfg.BalancerEnabled = false
eCfg.BalancerListen = "127.0.0.1:2013"
eCfg.SchedulerEnabled = false
eCfg.CDRSEnabled = false
eCfg.CDRSListen = "127.0.0.1:2022"
eCfg.CDRSExtraFields = []string{}
eCfg.CDRSMediator = ""
eCfg.CdreCdrFormat = "csv"
eCfg.CdreExtraFields = []string{}
eCfg.CdreDir = "/var/log/cgrates/cdr/cdrexport/csv"
eCfg.CdrcEnabled = false
eCfg.CdrcCdrs = "127.0.0.1:2022"
eCfg.CdrcCdrs = "127.0.0.1:2080"
eCfg.CdrcCdrsMethod = "http_cgr"
eCfg.CdrcRunDelay = time.Duration(0)
eCfg.CdrcCdrType = "csv"
@@ -105,8 +103,7 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcDurationField = "9"
eCfg.CdrcExtraFields = []string{}
eCfg.MediatorEnabled = false
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
eCfg.MediatorRater = "127.0.0.1:2013"
eCfg.MediatorRaterReconnects = 3
eCfg.MediatorRunIds = []string{}
eCfg.MediatorSubjectFields = []string{}
@@ -120,7 +117,7 @@ func TestDefaults(t *testing.T) {
eCfg.MediatorDurationFields = []string{}
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS
eCfg.SMRater = "127.0.0.1:2012"
eCfg.SMRater = "127.0.0.1:2013"
eCfg.SMRaterReconnects = 3
eCfg.SMDebitInterval = 10
eCfg.SMMaxCallDuration = time.Duration(3) * time.Hour
@@ -130,9 +127,8 @@ func TestDefaults(t *testing.T) {
eCfg.HistoryAgentEnabled = false
eCfg.HistoryServer = "127.0.0.1:2013"
eCfg.HistoryServerEnabled = false
eCfg.HistoryListen = "127.0.0.1:2013"
eCfg.HistoryDir = "/var/log/cgrates/history"
eCfg.HistorySaveInterval = time.Duration(1)*time.Second
eCfg.HistorySaveInterval = time.Duration(1) * time.Second
if !reflect.DeepEqual(cfg, eCfg) {
t.Log(eCfg)
t.Log(cfg)
@@ -141,7 +137,7 @@ func TestDefaults(t *testing.T) {
}
// Make sure defaults did not change
func TestDefaultsSanity(t *testing.T) {
/*func TestDefaultsSanity(t *testing.T) {
cfg := &CGRConfig{}
errSet := cfg.setDefaults()
if errSet != nil {
@@ -157,7 +153,7 @@ func TestDefaultsSanity(t *testing.T) {
(cfg.CDRSListen != INTERNAL && cfg.CDRSListen == cfg.MediatorListen) {
t.Error("Listen defaults on the same port!")
}
}
}*/
// Load config from file and make sure we have all set
func TestConfigFromFile(t *testing.T) {
@@ -188,7 +184,9 @@ func TestConfigFromFile(t *testing.T) {
eCfg.StorDBUser = "test"
eCfg.StorDBPass = "test"
eCfg.DBDataEncoding = "test"
eCfg.RPCEncoding = "test"
eCfg.RPCJSONListen = "test"
eCfg.RPCGOBListen = "test"
eCfg.HTTPListen = "test"
eCfg.DefaultReqType = "test"
eCfg.DefaultTOR = "test"
eCfg.DefaultTenant = "test"
@@ -197,12 +195,9 @@ func TestConfigFromFile(t *testing.T) {
eCfg.RoundingDecimals = 99
eCfg.RaterEnabled = true
eCfg.RaterBalancer = "test"
eCfg.RaterListen = "test"
eCfg.BalancerEnabled = true
eCfg.BalancerListen = "test"
eCfg.SchedulerEnabled = true
eCfg.CDRSEnabled = true
eCfg.CDRSListen = "test"
eCfg.CDRSExtraFields = []string{"test"}
eCfg.CDRSMediator = "test"
eCfg.CdreCdrFormat = "test"
@@ -211,7 +206,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CdrcEnabled = true
eCfg.CdrcCdrs = "test"
eCfg.CdrcCdrsMethod = "test"
eCfg.CdrcRunDelay = time.Duration(99)*time.Second
eCfg.CdrcRunDelay = time.Duration(99) * time.Second
eCfg.CdrcCdrType = "test"
eCfg.CdrcCdrInDir = "test"
eCfg.CdrcCdrOutDir = "test"
@@ -228,7 +223,6 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CdrcDurationField = "test"
eCfg.CdrcExtraFields = []string{"test"}
eCfg.MediatorEnabled = true
eCfg.MediatorListen = "test"
eCfg.MediatorRater = "test"
eCfg.MediatorRaterReconnects = 99
eCfg.MediatorRunIds = []string{"test"}
@@ -246,16 +240,15 @@ func TestConfigFromFile(t *testing.T) {
eCfg.SMRater = "test"
eCfg.SMRaterReconnects = 99
eCfg.SMDebitInterval = 99
eCfg.SMMaxCallDuration = time.Duration(99)*time.Second
eCfg.SMMaxCallDuration = time.Duration(99) * time.Second
eCfg.FreeswitchServer = "test"
eCfg.FreeswitchPass = "test"
eCfg.FreeswitchReconnects = 99
eCfg.HistoryAgentEnabled = true
eCfg.HistoryServer = "test"
eCfg.HistoryServerEnabled = true
eCfg.HistoryListen = "test"
eCfg.HistoryDir = "test"
eCfg.HistorySaveInterval = time.Duration(99)*time.Second
eCfg.HistorySaveInterval = time.Duration(99) * time.Second
if !reflect.DeepEqual(cfg, eCfg) {
t.Log(eCfg)
t.Log(cfg)

View File

@@ -21,7 +21,9 @@ stordb_name = test # The name of the log database to connect to.
stordb_user = test # Username to use when connecting to logdb.
stordb_passwd = test # Password to use when connecting to logdb.
dbdata_encoding = test # The encoding used to store object data in strings: <msgpack|json>
rpc_encoding = test # RPC encoding used on APIs: <gob|json>.
rpc_json_listen = test # RPC JSON listening address
rpc_gob_listen = test # RPC GOB listening address
http_listen = test # HTTP listening address
default_reqtype = test # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
default_tor = test # Default Type of Record to consider when missing from requests.
default_tenant = test # Default Tenant to consider when missing from requests.
@@ -32,19 +34,16 @@ rounding_decimals = 99 # Number of decimals to round floats/costs at
[balancer]
enabled = true # Start Balancer service: <true|false>.
listen = test # Balancer listen interface: <disabled|x.y.z.y:1234>.
[rater]
enabled = true # Enable Rater service: <true|false>.
balancer = test # Register to Balancer as worker: <enabled|disabled>.
listen = test # Rater's listening interface: <internal|x.y.z.y:1234>.
[scheduler]
enabled = true # Starts Scheduler service: <true|false>.
[cdrs]
enabled = true # Start the CDR Server service: <true|false>.
listen=test # CDRS's listening interface: <x.y.z.y:1234>.
extra_fields = test # Extra fields to store in CDRs
mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
@@ -76,7 +75,6 @@ extra_fields = test # Field identifiers of the fields to add in extra fields s
[mediator]
enabled = true # Starts Mediator service: <true|false>.
listen=test # Mediator's listening interface: <internal>.
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
rater_reconnects = 99 # Number of reconnects to rater before giving up.
run_ids = test # Identifiers for each mediation run on CDRs
@@ -105,7 +103,6 @@ reconnects = 99 # Number of attempts on connect failure.
[history_server]
enabled = true # Starts History service: <true|false>.
listen = test # Listening addres for history server: <internal|x.y.z.y:1234>
history_dir = test # Location on disk where to store history files.
save_interval = 99 # Timeout duration between saves

96
engine/server.go Normal file
View File

@@ -0,0 +1,96 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"net/rpc/jsonrpc"
)
type Server struct {
rpcEnabled bool
httpEnabled bool
}
func (s *Server) RpcRegister(rcvr interface{}) {
rpc.Register(rcvr)
s.rpcEnabled = true
}
func (s *Server) RpcRegisterName(name string, rcvr interface{}) {
rpc.RegisterName(name, rcvr)
s.rpcEnabled = true
}
func (s *Server) RegisterHttpFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
http.HandleFunc(pattern, handler)
s.httpEnabled = true
}
func (s *Server) ServeJSON(addr string) {
if !s.rpcEnabled {
return
}
lJSON, e := net.Listen("tcp", addr)
if e != nil {
log.Fatal("listen error:", e)
}
for {
conn, err := lJSON.Accept()
if err != nil {
Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
continue
}
Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
go jsonrpc.ServeConn(conn)
}
}
func (s *Server) ServeGOB(addr string) {
if !s.rpcEnabled {
return
}
lGOB, e := net.Listen("tcp", addr)
if e != nil {
log.Fatal("listen error:", e)
}
for {
conn, err := lGOB.Accept()
if err != nil {
Logger.Err(fmt.Sprintf("<History> Accept error: %v", conn))
continue
}
Logger.Info(fmt.Sprintf("<History> New incoming connection: %v", conn.RemoteAddr()))
go rpc.ServeConn(conn)
}
}
func (s *Server) ServeHTTP(addr string) {
if !s.httpEnabled {
return
}
http.ListenAndServe(addr, nil)
}

View File

@@ -18,11 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package history
import (
"errors"
"net/rpc"
"net/rpc/jsonrpc"
)
import "net/rpc"
const (
JSON = "json"
@@ -33,17 +29,8 @@ type ProxyScribe struct {
Client *rpc.Client
}
func NewProxyScribe(addr, encoding string) (*ProxyScribe, error) {
var client *rpc.Client
var err error
switch encoding {
case GOB:
client, err = rpc.Dial("tcp", addr)
case JSON:
client, err = jsonrpc.Dial("tcp", addr)
default:
err = errors.New("Hystory proxy scribe: Unknown encoding " + encoding)
}
func NewProxyScribe(addr string) (*ProxyScribe, error) {
client, err := rpc.Dial("tcp", addr)
if err != nil {
return nil, err