worked on mediator, updated config tests

This commit is contained in:
Radu Ioan Fericean
2012-07-27 17:44:18 +03:00
parent 98967149b7
commit 87f208811e
6 changed files with 120 additions and 79 deletions

View File

@@ -64,11 +64,14 @@ var (
mediator_enabled = false
mediator_cdr_file = "Master.csv" // Freeswitch Master CSV CDR file.
mediator_result_file = "out.csv" // Generated file containing CDR and price info.
mediator_rater = INTERNAL // address where to access rater. Can be internal, direct rater address or the address of a balancer
mediator_host = "localhost" // The host to connect to. Values that start with / are for UNIX domain sockets.
mediator_port = "5432" // The port to bind to.
mediator_db = "cgrates" // The name of the database to connect to.
mediator_user = "" // The user to sign in as.
mediator_password = "" // The user's password.
mediator_json = false // use JSON for RPC encoding
mediator_skipdb = false
stats_enabled = false
stats_listen = "127.0.0.1:8000" // Web server address (for stat reports)
@@ -107,11 +110,14 @@ func readConfig(configFn string) {
mediator_enabled, _ = c.GetBool("mediator", "enabled")
mediator_cdr_file, _ = c.GetString("mediator", "cdr_file")
mediator_result_file, _ = c.GetString("mediator", "result_file")
mediator_host, _ = c.GetString("mediator", "host")
mediator_port, _ = c.GetString("mediator", "port")
mediator_db, _ = c.GetString("mediator", "db")
mediator_user, _ = c.GetString("mediator", "user")
mediator_password, _ = c.GetString("mediator", "password")
mediator_rater, _ = c.GetString("mediator", "rater")
mediator_host, _ = c.GetString("mediator", "db_host")
mediator_port, _ = c.GetString("mediator", "db_port")
mediator_db, _ = c.GetString("mediator", "db_name")
mediator_user, _ = c.GetString("mediator", "db_user")
mediator_password, _ = c.GetString("mediator", "db_passwd")
mediator_json, _ = c.GetBool("mediator", "json")
mediator_skipdb, _ = c.GetBool("mediator", "skipdb")
stats_enabled, _ = c.GetBool("stats_server", "enabled")
stats_listen, _ = c.GetString("stats_server", "listen")
@@ -157,6 +163,33 @@ func listenToHttpRequests() {
http.ListenAndServe(stats_listen, nil)
}
func startMediator(responder *timespans.Responder) {
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", mediator_host, mediator_port, mediator_db, mediator_user, mediator_password))
//defer db.Close()
if err != nil {
timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err))
}
var connector sessionmanager.Connector
if mediator_rater == INTERNAL {
connector = responder
} else {
var client *rpc.Client
var err error
if mediator_json {
client, err = jsonrpc.Dial("tcp", mediator_rater)
} else {
client, err = rpc.Dial("tcp", mediator_rater)
}
if err != nil {
timespans.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err))
exitChan <- true
}
connector = &sessionmanager.RPCClientConnector{client}
}
m := &Mediator{connector, db, mediator_skipdb}
m.parseCSV()
}
func startSessionManager(responder *timespans.Responder) {
var connector sessionmanager.Connector
if sm_rater == INTERNAL {
@@ -240,30 +273,7 @@ func main() {
}
if mediator_enabled {
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", mediator_host, mediator_port, mediator_db, mediator_user, mediator_password))
//defer db.Close()
if err != nil {
timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err))
}
var connector sessionmanager.Connector
if sm_rater == INTERNAL {
connector = responder
} else {
var client *rpc.Client
var err error
if sm_json {
client, err = jsonrpc.Dial("tcp", sm_rater)
} else {
client, err = rpc.Dial("tcp", sm_rater)
}
if err != nil {
timespans.Logger.Crit(fmt.Sprintf("Could not connect to rater: %v", err))
exitChan <- true
}
connector = &sessionmanager.RPCClientConnector{client}
}
m := &Mediator{connector, db}
_ = m
go startMediator(responder)
}
<-exitChan

View File

@@ -22,6 +22,7 @@ import (
"bufio"
"database/sql"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
_ "github.com/bmizerany/pq"
@@ -35,6 +36,7 @@ import (
type Mediator struct {
Connector sessionmanager.Connector
Db *sql.DB
SkipDb bool
}
/*func readDbRecord(db *sql.DB, searchedUUID string) (cc *timespans.CallCost, timespansText string, err error) {
@@ -50,19 +52,32 @@ func (m *Mediator) parseCSV() {
}
csvReader := csv.NewReader(bufio.NewReader(file))
for record, err := csvReader.Read(); err == nil; record, err = csvReader.Read() {
uuid := record[10]
_ = uuid
t, _ := time.Parse("2012-05-21 17:48:20", record[5])
fmt.Println(t)
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
//t, _ := time.Parse("2012-05-21 17:48:20", record[5])
var cc *timespans.CallCost
if !m.SkipDb {
cc, err = m.GetCostsFromDB(record)
} else {
cc, err = m.GetCostsFromRater(record)
}
if err != nil {
timespans.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record (%v): %v", record, err))
}
fmt.Println(cc)
}
}
func (m *Mediator) GetCostsFromDB(searchedUUID string) (cc *timespans.CallCost, timespansText string, err error) {
func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) {
searchedUUID := record[10]
row := m.Db.QueryRow(fmt.Sprintf("SELECT * FROM callcosts WHERE uuid='%s'", searchedUUID))
var uuid string
cc = &timespans.CallCost{}
err = row.Scan(&uuid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, &timespansText)
var timespansJson string
err = row.Scan(&uuid, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, &timespansJson)
err = json.Unmarshal([]byte(timespansJson), cc.Timespans)
if err != nil {
cc, err = m.GetCostsFromRater(record)
}
return
}

View File

@@ -29,56 +29,63 @@ func TestConfig(t *testing.T) {
redis_db != 1 ||
rater_enabled != true ||
rater_standalone != true ||
rater_balancer_server != "test" ||
rater_balancer != "test" ||
rater_listen != "test" ||
rater_json != true ||
balancer_enabled != true ||
balancer_standalone != true ||
balancer_listen_rater != "test" ||
balancer_listen_api != "test" ||
balancer_web_status_server != "test" ||
balancer_listen != "test" ||
balancer_json != true ||
scheduler_enabled != true ||
sm_enabled != true ||
sm_api_server != "test" ||
sm_rater != "test" ||
sm_freeswitch_server != "test" ||
sm_freeswitch_pass != "test" ||
sm_json != true ||
mediator_enabled != true ||
mediator_cdr_file != "test" ||
mediator_result_file != "test" ||
mediator_rater != "test" ||
mediator_host != "test" ||
mediator_port != "test" ||
mediator_db != "test" ||
mediator_user != "test" ||
mediator_password != "test" {
mediator_password != "test" ||
mediator_json != true ||
mediator_skipdb != true ||
stats_enabled != true ||
stats_listen != "test" {
t.Log(redis_server)
t.Log(redis_db)
t.Log(rater_enabled)
t.Log(rater_standalone)
t.Log(rater_balancer_server)
t.Log(rater_balancer)
t.Log(rater_listen)
t.Log(rater_json)
t.Log(balancer_enabled)
t.Log(balancer_standalone)
t.Log(balancer_listen_rater)
t.Log(balancer_listen_api)
t.Log(balancer_listen)
t.Log(balancer_json)
t.Log(scheduler_enabled)
t.Log(sm_enabled)
t.Log(sm_api_server)
t.Log(sm_rater)
t.Log(sm_freeswitch_server)
t.Log(sm_freeswitch_pass)
t.Log(sm_json)
t.Log(mediator_enabled)
t.Log(mediator_cdr_file)
t.Log(mediator_result_file)
t.Log(mediator_rater)
t.Log(mediator_host)
t.Log(mediator_port)
t.Log(mediator_db)
t.Log(mediator_user)
t.Log(mediator_password)
t.Log(stats_enabled)
t.Log(stats_listen)
t.Error("Config file read failed!")
}