From 21a94a849936e4cf6c196adfdad920d724d74fba Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 31 Jul 2012 13:05:25 +0300 Subject: [PATCH 1/4] using global logger database --- cmd/cgr-rater/cgr-rater.go | 32 ++++++++++++++++-------------- sessionmanager/fssessionmanager.go | 12 ++++++++++- sessionmanager/postgreslogger.go | 8 -------- sessionmanager/session.go | 8 ++------ sessionmanager/sessionmanager.go | 1 + 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index ee8b859d7..2c29bbbcc 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -173,17 +173,7 @@ func listenToHttpRequests() { http.ListenAndServe(stats_listen, nil) } -func startMediator(responder *timespans.Responder) { - var db *sql.DB - var err error - if !mediator_skipdb { - db, err = sql.Open(logging_db_type, fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", logging_db_host, logging_db_port, logging_db_db, logging_db_user, logging_db_password)) - //defer db.Close() - - if err != nil { - timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err)) - } - } +func startMediator(responder *timespans.Responder, db *sql.DB) { var connector sessionmanager.Connector if mediator_rater == INTERNAL { connector = responder @@ -205,7 +195,7 @@ func startMediator(responder *timespans.Responder) { m.parseCSV() } -func startSessionManager(responder *timespans.Responder) { +func startSessionManager(responder *timespans.Responder, db *sql.DB) { var connector sessionmanager.Connector if sm_rater == INTERNAL { connector = responder @@ -223,7 +213,7 @@ func startSessionManager(responder *timespans.Responder) { } connector = &sessionmanager.RPCClientConnector{client} } - sm := &sessionmanager.FSSessionManager{} + sm := sessionmanager.NewFSSessionManager(db) sm.Connect(&sessionmanager.SessionDelegate{connector}, sm_freeswitch_server, sm_freeswitch_pass) } @@ -253,6 +243,18 @@ func main() { defer getter.Close() timespans.SetStorageGetter(getter) + db, err := sql.Open(logging_db_type, fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", logging_db_host, logging_db_port, logging_db_db, logging_db_user, logging_db_password)) + if err != nil { + timespans.Logger.Err(fmt.Sprintf("Could not connect to logger database: %v", err)) + } + if db != nil { + defer db.Close() + } + + if err != nil { + timespans.Logger.Err(fmt.Sprintf("failed to open the database: %v", err)) + } + if rater_enabled && rater_balancer != DISABLED && !balancer_enabled { go registerToBalancer() go stopRaterSingnalHandler() @@ -283,11 +285,11 @@ func main() { } if sm_enabled { - go startSessionManager(responder) + go startSessionManager(responder, db) } if mediator_enabled { - go startMediator(responder) + go startMediator(responder, db) } <-exitChan diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 60cb3001a..4c01af504 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -20,6 +20,7 @@ package sessionmanager import ( "bufio" + "database/sql" "fmt" "log" "net" @@ -33,6 +34,11 @@ type FSSessionManager struct { buf *bufio.Reader sessions []*Session sessionDelegate *SessionDelegate + postgresLogger *PostgresLogger //NewPostgresLogger("gosqltest", "rif", "testus") +} + +func NewFSSessionManager(db *sql.DB) *FSSessionManager { + return &FSSessionManager{postgresLogger: &PostgresLogger{db}} } // Connects to the freeswitch mod_event_socket server and starts @@ -123,7 +129,7 @@ func (sm *FSSessionManager) OnChannelHangupComplete(ev Event) { s := sm.GetSession(ev.GetUUID()) if sm.sessionDelegate != nil { sm.sessionDelegate.OnChannelHangupComplete(ev, s) - s.SaveMOperations() + s.SaveOperations() } else { log.Print("HangupComplete") } @@ -142,3 +148,7 @@ func (sm *FSSessionManager) OnOther(ev Event) { func (sm *FSSessionManager) GetSessionDelegate() *SessionDelegate { return sm.sessionDelegate } + +func (sm *FSSessionManager) GetDbLogger() *PostgresLogger { + return sm.postgresLogger +} diff --git a/sessionmanager/postgreslogger.go b/sessionmanager/postgreslogger.go index e8ef66844..a577e8b3a 100644 --- a/sessionmanager/postgreslogger.go +++ b/sessionmanager/postgreslogger.go @@ -31,14 +31,6 @@ type PostgresLogger struct { db *sql.DB } -func NewPostgresLogger(dbName, user, pass string) *PostgresLogger { - db, err := sql.Open("postgres", fmt.Sprintf("dbname=%s user=%s password=%s sslmode=disable", dbName, user, pass)) - if err != nil { - log.Printf("Failed to open the database: %v", err) - } - return &PostgresLogger{db} -} - func (psl *PostgresLogger) Close() { psl.db.Close() } diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 1097752ec..ad53f6703 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -25,10 +25,6 @@ import ( "time" ) -var ( - postgresLogger = NewPostgresLogger("gosqltest", "rif", "testus") -) - // Session type holding the call information fields, a session delegate for specific // actions and a channel to signal end of the debit loop. type Session struct { @@ -113,13 +109,13 @@ func (s *Session) String() string { } // -func (s *Session) SaveMOperations() { +func (s *Session) SaveOperations() { go func() { firstCC := s.CallCosts[0] for _, cc := range s.CallCosts[1:] { firstCC.Merge(cc) } - postgresLogger.Log(s.uuid, firstCC) + s.sessionManager.GetDbLogger().Log(s.uuid, firstCC) log.Print(firstCC) }() } diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index a2aae17d2..e1d29fdbe 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -21,4 +21,5 @@ package sessionmanager type SessionManager interface { DisconnectSession(*Session) GetSessionDelegate() *SessionDelegate + GetDbLogger() *PostgresLogger } From 2a1c3276ffc629ce0ba0bf00967586c7e3371c52 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 31 Jul 2012 14:18:48 +0300 Subject: [PATCH 2/4] updated spanstress --- cmd/stress/cgr-spansstress/cgr-spansstress.go | 4 ++-- data/test.config | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/stress/cgr-spansstress/cgr-spansstress.go b/cmd/stress/cgr-spansstress/cgr-spansstress.go index 750971d15..c57a6494c 100644 --- a/cmd/stress/cgr-spansstress/cgr-spansstress.go +++ b/cmd/stress/cgr-spansstress/cgr-spansstress.go @@ -23,9 +23,9 @@ import ( "github.com/cgrates/cgrates/timespans" "log" "os" + "runtime" "runtime/pprof" "time" - "runtime" ) var ( @@ -59,7 +59,7 @@ func main() { t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC) cd := timespans.CallDescriptor{Direction: "OUT", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2} - getter, err := timespans.NewRedisStorage("", 10) + getter, err := timespans.NewRedisStorage("", 10, "") defer getter.Close() timespans.SetStorageGetter(getter) diff --git a/data/test.config b/data/test.config index c47240289..7a50b3a3c 100644 --- a/data/test.config +++ b/data/test.config @@ -17,7 +17,6 @@ [global] redis_server = test #redis address host:port redis_db = 1 # redis database number -redis_user = test redis_pass = test db_type = test # db_host = test # The host to connect to. Values that start with / are for UNIX domain sockets. From 2ceb3307035e5ab91fd59bab3ddb2cb53227f0f2 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 31 Jul 2012 14:33:51 +0300 Subject: [PATCH 3/4] merged stress tools and warning for scheduler --- cmd/cgr-rater/scheduler.go | 3 +- .../cgr-balancerstress/cgr-balancerstress.go | 62 ------------------- cmd/stress/cgr-raterstress/cgr-raterstress.go | 26 +++++--- .../cgr-raterstress.py} | 0 4 files changed, 19 insertions(+), 72 deletions(-) delete mode 100644 cmd/stress/cgr-balancerstress/cgr-balancerstress.go rename cmd/stress/{cgr-balancerstress/cgr-balancerstress.py => cgr-raterstress/cgr-raterstress.py} (100%) diff --git a/cmd/cgr-rater/scheduler.go b/cmd/cgr-rater/scheduler.go index afadca763..78cb0142c 100644 --- a/cmd/cgr-rater/scheduler.go +++ b/cmd/cgr-rater/scheduler.go @@ -19,6 +19,7 @@ along with this program. If not, see package main import ( + "fmt" "github.com/cgrates/cgrates/timespans" "log" "sort" @@ -68,7 +69,7 @@ func (s scheduler) loop() { func loadActionTimings(storage timespans.StorageGetter) { actionTimings, err := storage.GetAllActionTimings() if err != nil { - log.Fatalf("Cannot get action timings:", err) + timespans.Logger.Warning(fmt.Sprintf("Cannot get action timings: %v", err)) } // recreate the queue s.queue = timespans.ActionTimingPriotityList{} diff --git a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go b/cmd/stress/cgr-balancerstress/cgr-balancerstress.go deleted file mode 100644 index 3ed049bef..000000000 --- a/cmd/stress/cgr-balancerstress/cgr-balancerstress.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package main - -import ( - "flag" - "github.com/cgrates/cgrates/timespans" - "log" - "net/rpc" - //"net/rpc/jsonrpc" - "time" -) - -var ( - balancer = flag.String("balancer", "localhost:2001", "balancer server address") - runs = flag.Int("runs", 10000, "stress cycle number") - parallel = flag.Bool("parallel", false, "run requests in parallel") -) - -func main() { - flag.Parse() - t1 := time.Date(2012, time.February, 02, 17, 30, 0, 0, time.UTC) - t2 := time.Date(2012, time.February, 02, 18, 30, 0, 0, time.UTC) - cd := timespans.CallDescriptor{Direction: "OUT", TOR: "0", Tenant: "vdf", Subject: "rif", Destination: "0256", TimeStart: t1, TimeEnd: t2} - result := timespans.CallCost{} - client, err := rpc.Dial("tcp", *balancer) - if err != nil { - log.Fatalf("could not connect to balancer: %v", err) - } - start := time.Now() - if *parallel { - var divCall *rpc.Call - for i := 0; i < *runs; i++ { - divCall = client.Go("Responder.GetCost", cd, &result, nil) - } - <-divCall.Done - } else { - for j := 0; j < *runs; j++ { - client.Call("Responder.GetCost", cd, &result) - } - } - duration := time.Since(start) - log.Println(result) - client.Close() - log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds()) -} diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.go b/cmd/stress/cgr-raterstress/cgr-raterstress.go index 880365ee1..340475790 100644 --- a/cmd/stress/cgr-raterstress/cgr-raterstress.go +++ b/cmd/stress/cgr-raterstress/cgr-raterstress.go @@ -19,17 +19,19 @@ along with this program. If not, see package main import ( + "flag" "github.com/cgrates/cgrates/timespans" "log" "net/rpc" - "net/rpc/jsonrpc" + //"net/rpc/jsonrpc" "time" - "flag" ) var ( - runs = flag.Int("runs", 10000, "stress cycle number") - json = flag.Bool("json", false, "use JSON for RPC encoding") + balancer = flag.String("balancer", "localhost:2001", "balancer server address") + runs = flag.Int("runs", 10000, "stress cycle number") + parallel = flag.Bool("parallel", false, "run requests in parallel") + json = flag.Bool("json", false, "use JSON for RPC encoding") ) func main() { @@ -48,14 +50,20 @@ func main() { if err != nil { log.Panic("Could not connect to rater: ", err) } - i := 0 start := time.Now() - for j := 0; j < *runs; j++ { - client.Call("Responder.GetCost", cd, &result) + if *parallel { + var divCall *rpc.Call + for i := 0; i < *runs; i++ { + divCall = client.Go("Responder.GetCost", cd, &result, nil) + } + <-divCall.Done + } else { + for j := 0; j < *runs; j++ { + client.Call("Responder.GetCost", cd, &result) + } } duration := time.Since(start) log.Println(result) - log.Println(i) - log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds()) client.Close() + log.Printf("Elapsed: %v resulted: %v req/s.", duration, float64(*runs)/duration.Seconds()) } diff --git a/cmd/stress/cgr-balancerstress/cgr-balancerstress.py b/cmd/stress/cgr-raterstress/cgr-raterstress.py similarity index 100% rename from cmd/stress/cgr-balancerstress/cgr-balancerstress.py rename to cmd/stress/cgr-raterstress/cgr-raterstress.py From eed45bdb0e2c1c7a9abf0135374dcd1cfbb53017 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 31 Jul 2012 14:51:59 +0300 Subject: [PATCH 4/4] return o zero call costs --- cmd/stress/cgr-raterstress/cgr-raterstress.go | 1 + sessionmanager/fssessionmanager.go | 2 +- sessionmanager/postgreslogger.go | 4 ++++ sessionmanager/sessiondelegate.go | 3 +++ 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/stress/cgr-raterstress/cgr-raterstress.go b/cmd/stress/cgr-raterstress/cgr-raterstress.go index 340475790..5fdf7aae9 100644 --- a/cmd/stress/cgr-raterstress/cgr-raterstress.go +++ b/cmd/stress/cgr-raterstress/cgr-raterstress.go @@ -24,6 +24,7 @@ import ( "log" "net/rpc" //"net/rpc/jsonrpc" + "net/rpc/jsonrpc" "time" ) diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 4c01af504..5e944af2d 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -34,7 +34,7 @@ type FSSessionManager struct { buf *bufio.Reader sessions []*Session sessionDelegate *SessionDelegate - postgresLogger *PostgresLogger //NewPostgresLogger("gosqltest", "rif", "testus") + postgresLogger *PostgresLogger } func NewFSSessionManager(db *sql.DB) *FSSessionManager { diff --git a/sessionmanager/postgreslogger.go b/sessionmanager/postgreslogger.go index a577e8b3a..93243493d 100644 --- a/sessionmanager/postgreslogger.go +++ b/sessionmanager/postgreslogger.go @@ -36,6 +36,10 @@ func (psl *PostgresLogger) Close() { } func (psl *PostgresLogger) Log(uuid string, cc *timespans.CallCost) { + if psl.db == nil { + timespans.Logger.Warning("Cannot write log to database.") + return + } tss, err := json.Marshal(cc.Timespans) if err != nil { log.Printf("Error marshalling timespans to json: %v", err) diff --git a/sessionmanager/sessiondelegate.go b/sessionmanager/sessiondelegate.go index cec163877..a8ddc641b 100644 --- a/sessionmanager/sessiondelegate.go +++ b/sessionmanager/sessiondelegate.go @@ -72,6 +72,9 @@ func (rsd *SessionDelegate) OnChannelAnswer(ev Event, s *Session) { } func (rsd *SessionDelegate) OnChannelHangupComplete(ev Event, s *Session) { + if len(s.CallCosts) == 0 { + return // why would we have 0 callcosts + } lastCC := s.CallCosts[len(s.CallCosts)-1] // put credit back start := time.Now()