diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go
index ee8b859d7..2c29bbbcc 100644
--- a/cmd/cgr-rater/cgr-rater.go
+++ b/cmd/cgr-rater/cgr-rater.go
@@ -173,17 +173,7 @@ func listenToHttpRequests() {
http.ListenAndServe(stats_listen, nil)
}
-func startMediator(responder *timespans.Responder) {
- var db *sql.DB
- var err error
- if !mediator_skipdb {
- db, err = sql.Open(logging_db_type, fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", logging_db_host, logging_db_port, logging_db_db, logging_db_user, logging_db_password))
- //defer db.Close()
-
- if err != nil {
- timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err))
- }
- }
+func startMediator(responder *timespans.Responder, db *sql.DB) {
var connector sessionmanager.Connector
if mediator_rater == INTERNAL {
connector = responder
@@ -205,7 +195,7 @@ func startMediator(responder *timespans.Responder) {
m.parseCSV()
}
-func startSessionManager(responder *timespans.Responder) {
+func startSessionManager(responder *timespans.Responder, db *sql.DB) {
var connector sessionmanager.Connector
if sm_rater == INTERNAL {
connector = responder
@@ -223,7 +213,7 @@ func startSessionManager(responder *timespans.Responder) {
}
connector = &sessionmanager.RPCClientConnector{client}
}
- sm := &sessionmanager.FSSessionManager{}
+ sm := sessionmanager.NewFSSessionManager(db)
sm.Connect(&sessionmanager.SessionDelegate{connector}, sm_freeswitch_server, sm_freeswitch_pass)
}
@@ -253,6 +243,18 @@ func main() {
defer getter.Close()
timespans.SetStorageGetter(getter)
+ db, err := sql.Open(logging_db_type, fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", logging_db_host, logging_db_port, logging_db_db, logging_db_user, logging_db_password))
+ if err != nil {
+ timespans.Logger.Err(fmt.Sprintf("Could not connect to logger database: %v", err))
+ }
+ if db != nil {
+ defer db.Close()
+ }
+
+ if err != nil {
+ timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err))
+ }
+
if rater_enabled && rater_balancer != DISABLED && !balancer_enabled {
go registerToBalancer()
go stopRaterSingnalHandler()
@@ -283,11 +285,11 @@ func main() {
}
if sm_enabled {
- go startSessionManager(responder)
+ go startSessionManager(responder, db)
}
if mediator_enabled {
- go startMediator(responder)
+ go startMediator(responder, db)
}
<-exitChan
diff --git a/cmd/cgr-rater/scheduler.go b/cmd/cgr-rater/scheduler.go
index afadca763..78cb0142c 100644
--- a/cmd/cgr-rater/scheduler.go
+++ b/cmd/cgr-rater/scheduler.go
@@ -19,6 +19,7 @@ along with this program. If not, see
package main
import (
+ "fmt"
"github.com/cgrates/cgrates/timespans"
"log"
"sort"
@@ -68,7 +69,7 @@ func (s scheduler) loop() {
func loadActionTimings(storage timespans.StorageGetter) {
actionTimings, err := storage.GetAllActionTimings()
if err != nil {
- log.Fatalf("Cannot get action timings:", err)
+ timespans.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err))
}
// recreate the queue
s.queue = timespans.ActionTimingPriotityList{}
diff --git a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go b/cmd/stress/cgr-balancerstress/cgr-balancerstress.go
deleted file mode 100644
index 3ed049bef..000000000
--- a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-Rating system designed to be used in VoIP Carriers World
-Copyright (C) 2012 Radu Ioan Fericean
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package main
-
-import (
- "flag"
- "github.com/cgrates/cgrates/timespans"
- "log"
- "net/rpc"
- //"net/rpc/jsonrpc"
- "time"
-)
-
-var (
- balancer = flag.String("balancer", "localhost:2001", "balancer server address")
- runs = flag.Int("runs", 10000, "stress cycle number")
- parallel = flag.Bool("parallel", false, "run requests in parallel")
-)
-
-func main() {
- flag.Parse()
- t1 := time.Date(2012, time.February, 02, 17, 30, 0, 0, time.UTC)
- t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC)
- cd := timespans.CallDescriptor{Direction: "OUT", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2}
- result := timespans.CallCost{}
- client, err := rpc.Dial("tcp", *balancer)
- if err != nil {
- log.Fatalf("could not connect to balancer: %v", err)
- }
- start := time.Now()
- if *parallel {
- var divCall *rpc.Call
- for i := 0; i < *runs; i++ {
- divCall = client.Go("Responder.GetCost", cd, &result, nil)
- }
- <-divCall.Done
- } else {
- for j := 0; j < *runs; j++ {
- client.Call("Responder.GetCost", cd, &result)
- }
- }
- duration := time.Since(start)
- log.Println(result)
- client.Close()
- log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds())
-}
diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.go b/cmd/stress/cgr-raterstress/cgr-raterstress.go
index 880365ee1..5fdf7aae9 100644
--- a/cmd/stress/cgr-raterstress/cgr-raterstress.go
+++ b/cmd/stress/cgr-raterstress/cgr-raterstress.go
@@ -19,17 +19,20 @@ along with this program. If not, see
package main
import (
+ "flag"
"github.com/cgrates/cgrates/timespans"
"log"
"net/rpc"
+ //"net/rpc/jsonrpc"
"net/rpc/jsonrpc"
"time"
- "flag"
)
var (
- runs = flag.Int("runs", 10000, "stress cycle number")
- json = flag.Bool("json", false, "use JSON for RPC encoding")
+ balancer = flag.String("balancer", "localhost:2001", "balancer server address")
+ runs = flag.Int("runs", 10000, "stress cycle number")
+ parallel = flag.Bool("parallel", false, "run requests in parallel")
+ json = flag.Bool("json", false, "use JSON for RPC encoding")
)
func main() {
@@ -48,14 +51,20 @@ func main() {
if err != nil {
log.Panic("Could not connect to rater: ", err)
}
- i := 0
start := time.Now()
- for j := 0; j < *runs; j++ {
- client.Call("Responder.GetCost", cd, &result)
+ if *parallel {
+ var divCall *rpc.Call
+ for i := 0; i < *runs; i++ {
+ divCall = client.Go("Responder.GetCost", cd, &result, nil)
+ }
+ <-divCall.Done
+ } else {
+ for j := 0; j < *runs; j++ {
+ client.Call("Responder.GetCost", cd, &result)
+ }
}
duration := time.Since(start)
log.Println(result)
- log.Println(i)
- log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds())
client.Close()
+ log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds())
}
diff --git a/cmd/stress/cgr-balancerstress/cgr-balancerstress.py b/cmd/stress/cgr-raterstress/cgr-raterstress.py
similarity index 100%
rename from cmd/stress/cgr-balancerstress/cgr-balancerstress.py
rename to cmd/stress/cgr-raterstress/cgr-raterstress.py
diff --git a/cmd/stress/cgr-spansstress/cgr-spansstress.go b/cmd/stress/cgr-spansstress/cgr-spansstress.go
index 750971d15..c57a6494c 100644
--- a/cmd/stress/cgr-spansstress/cgr-spansstress.go
+++ b/cmd/stress/cgr-spansstress/cgr-spansstress.go
@@ -23,9 +23,9 @@ import (
"github.com/cgrates/cgrates/timespans"
"log"
"os"
+ "runtime"
"runtime/pprof"
"time"
- "runtime"
)
var (
@@ -59,7 +59,7 @@ func main() {
t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC)
cd := timespans.CallDescriptor{Direction: "OUT", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2}
- getter, err := timespans.NewRedisStorage("", 10)
+ getter, err := timespans.NewRedisStorage("", 10, "")
defer getter.Close()
timespans.SetStorageGetter(getter)
diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go
index 60cb3001a..5e944af2d 100644
--- a/sessionmanager/fssessionmanager.go
+++ b/sessionmanager/fssessionmanager.go
@@ -20,6 +20,7 @@ package sessionmanager
import (
"bufio"
+ "database/sql"
"fmt"
"log"
"net"
@@ -33,6 +34,11 @@ type FSSessionManager struct {
buf *bufio.Reader
sessions []*Session
sessionDelegate *SessionDelegate
+ postgresLogger *PostgresLogger
+}
+
+func NewFSSessionManager(db *sql.DB) *FSSessionManager {
+ return &FSSessionManager{postgresLogger: &PostgresLogger{db}}
}
// Connects to the freeswitch mod_event_socket server and starts
@@ -123,7 +129,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) {
s := sm.GetSession(ev.GetUUID())
if sm.sessionDelegate != nil {
sm.sessionDelegate.OnChannelHangupComplete(ev, s)
- s.SaveMOperations()
+ s.SaveOperations()
} else {
log.Print("HangupComplete")
}
@@ -142,3 +148,7 @@ func (sm *FSSessionManager) OnOther(ev Event) {
func (sm *FSSessionManager) GetSessionDelegate() *SessionDelegate {
return sm.sessionDelegate
}
+
+func (sm *FSSessionManager) GetDbLogger() *PostgresLogger {
+ return sm.postgresLogger
+}
diff --git a/sessionmanager/postgreslogger.go b/sessionmanager/postgreslogger.go
index e8ef66844..93243493d 100644
--- a/sessionmanager/postgreslogger.go
+++ b/sessionmanager/postgreslogger.go
@@ -31,19 +31,15 @@ type PostgresLogger struct {
db *sql.DB
}
-func NewPostgresLogger(dbName, user, pass string) *PostgresLogger {
- db, err := sql.Open("postgres", fmt.Sprintf("dbname=%s user=%s password=%s sslmode=disable", dbName, user, pass))
- if err != nil {
- log.Printf("Failed to open the database: %v", err)
- }
- return &PostgresLogger{db}
-}
-
func (psl *PostgresLogger) Close() {
psl.db.Close()
}
func (psl *PostgresLogger) Log(uuid string, cc *timespans.CallCost) {
+ if psl.db == nil {
+ timespans.Logger.Warning("Cannot write log to database.")
+ return
+ }
tss, err := json.Marshal(cc.Timespans)
if err != nil {
log.Printf("Error marshalling timespans to json: %v", err)
diff --git a/sessionmanager/session.go b/sessionmanager/session.go
index 1097752ec..ad53f6703 100644
--- a/sessionmanager/session.go
+++ b/sessionmanager/session.go
@@ -25,10 +25,6 @@ import (
"time"
)
-var (
- postgresLogger = NewPostgresLogger("gosqltest", "rif", "testus")
-)
-
// Session type holding the call information fields, a session delegate for specific
// actions and a channel to signal end of the debit loop.
type Session struct {
@@ -113,13 +109,13 @@ func (s *Session) String() string {
}
//
-func (s *Session) SaveMOperations() {
+func (s *Session) SaveOperations() {
go func() {
firstCC := s.CallCosts[0]
for _, cc := range s.CallCosts[1:] {
firstCC.Merge(cc)
}
- postgresLogger.Log(s.uuid, firstCC)
+ s.sessionManager.GetDbLogger().Log(s.uuid, firstCC)
log.Print(firstCC)
}()
}
diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go
index cec163877..a8ddc641b 100644
--- a/sessionmanager/sessiondelegate.go
+++ b/sessionmanager/sessiondelegate.go
@@ -72,6 +72,9 @@ func (rsd *SessionDelegate) OnChannelAnswer(ev Event, s *Session) {
}
func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) {
+ if len(s.CallCosts) == 0 {
+ return // why would we have 0 callcosts
+ }
lastCC := s.CallCosts[len(s.CallCosts)-1]
// put credit back
start := time.Now()
diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go
index a2aae17d2..e1d29fdbe 100644
--- a/sessionmanager/sessionmanager.go
+++ b/sessionmanager/sessionmanager.go
@@ -21,4 +21,5 @@ package sessionmanager
type SessionManager interface {
DisconnectSession(*Session)
GetSessionDelegate() *SessionDelegate
+ GetDbLogger() *PostgresLogger
}