From dcf8c427a9b772aeb5b51918ba849966ae181ad7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Tue, 14 Aug 2012 00:51:48 +0300 Subject: [PATCH] started postgres storage --- cmd/cgr-rater/cgr-rater.go | 45 ++++--------- cmd/cgr-rater/mediator.go | 4 +- sessionmanager/fssessionmanager.go | 8 +-- sessionmanager/loggerdb.go | 28 -------- sessionmanager/mongologger.go | 54 --------------- sessionmanager/postgreslogger.go | 64 ------------------ sessionmanager/session.go | 2 +- sessionmanager/sessionmanager.go | 6 +- timespans/storage_interface.go | 2 + timespans/storage_map.go | 15 +++++ timespans/storage_mongo.go | 30 ++++++++- timespans/storage_postgres.go | 102 +++++++++++++++++++++++++++++ timespans/storage_redis.go | 14 ++++ 13 files changed, 184 insertions(+), 190 deletions(-) delete mode 100644 sessionmanager/loggerdb.go delete mode 100644 sessionmanager/mongologger.go delete mode 100644 sessionmanager/postgreslogger.go create mode 100644 timespans/storage_postgres.go diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 981a6cd80..246f4a37d 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -28,7 +28,6 @@ import ( "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/timespans" "io" - "labix.org/v2/mgo" "net" "net/http" "net/rpc" @@ -38,14 +37,13 @@ import ( ) const ( - DISABLED = "disabled" - INTERNAL = "internal" - JSON = "json" - GOB = "gob" - POSTGRES = "postgres" - MONGO = "mongo" - MONGO_COLLECTION = "cdr" - FS = "freeswitch" + DISABLED = "disabled" + INTERNAL = "internal" + JSON = "json" + GOB = "gob" + POSTGRES = "postgres" + MONGO = "mongo" + FS = "freeswitch" ) var ( @@ -179,7 +177,7 @@ func listenToHttpRequests() { http.ListenAndServe(stats_listen, nil) } -func startMediator(responder *timespans.Responder, loggerDb sessionmanager.LogDb) { +func startMediator(responder *timespans.Responder, loggerDb timespans.StorageGetter) { var connector sessionmanager.Connector if mediator_rater == INTERNAL { connector = responder @@ -201,7 +199,7 @@ func startMediator(responder *timespans.Responder, loggerDb sessionmanager.LogDb m.parseCSV() } -func startSessionManager(responder *timespans.Responder, loggerDb sessionmanager.LogDb) { +func startSessionManager(responder *timespans.Responder, loggerDb timespans.StorageGetter) { var connector sessionmanager.Connector if sm_rater == INTERNAL { connector = responder @@ -260,7 +258,7 @@ func main() { defer getter.Close() timespans.SetStorageGetter(getter) - var loggerDb sessionmanager.LogDb + var loggerDb timespans.StorageGetter if logging_db_type != DISABLED { switch logging_db_type { case POSTGRES: @@ -271,28 +269,9 @@ func main() { if db != nil { defer db.Close() } - loggerDb = &sessionmanager.PostgresLogger{db} + loggerDb = ×pans.PostgresLogger{db} case MONGO: - dial := fmt.Sprintf(logging_db_host) - if logging_db_user != "" && logging_db_password != "" { - dial = fmt.Sprintf("%s:%s@%s", logging_db_user, logging_db_password, dial) - } - if logging_db_port != "" { - dial += ":" + logging_db_port - } - session, err := mgo.Dial(dial) - if err != nil { - timespans.Logger.Err(fmt.Sprintf("Could not connect to logger database: %v", err)) - } - if session != nil { - defer session.Close() - - // Optional. Switch the session to a monotonic behavior. - session.SetMode(mgo.Monotonic, true) - - c := session.DB(logging_db_name).C(MONGO_COLLECTION) - loggerDb = &sessionmanager.MongoLogger{c} - } + loggerDb, err = timespans.NewMongoStorage(logging_db_host, logging_db_port, logging_db_name, logging_db_user, logging_db_password) } } diff --git a/cmd/cgr-rater/mediator.go b/cmd/cgr-rater/mediator.go index 6d4007a8a..eb949f129 100644 --- a/cmd/cgr-rater/mediator.go +++ b/cmd/cgr-rater/mediator.go @@ -31,7 +31,7 @@ import ( type Mediator struct { Connector sessionmanager.Connector - loggerDb sessionmanager.LogDb + loggerDb timespans.StorageGetter SkipDb bool } @@ -67,7 +67,7 @@ func (m *Mediator) parseCSV() { func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) { searchedUUID := record[10] - cc, err = m.loggerDb.GetLog(searchedUUID) + cc, err = m.loggerDb.GetCallCostLog(searchedUUID) if err != nil { cc, err = m.GetCostsFromRater(record) } diff --git a/sessionmanager/fssessionmanager.go b/sessionmanager/fssessionmanager.go index 310aab069..b32abedb9 100644 --- a/sessionmanager/fssessionmanager.go +++ b/sessionmanager/fssessionmanager.go @@ -34,13 +34,13 @@ type FSSessionManager struct { buf *bufio.Reader sessions []*Session sessionDelegate *SessionDelegate - loggerDB LogDb + loggerDB timespans.StorageGetter address, pass string delayFunc func() int } -func NewFSSessionManager(ldb LogDb) *FSSessionManager { - return &FSSessionManager{loggerDB: ldb} +func NewFSSessionManager(storage timespans.StorageGetter) *FSSessionManager { + return &FSSessionManager{loggerDB: storage} } // Connects to the freeswitch mod_event_socket server and starts @@ -176,7 +176,7 @@ func (sm *FSSessionManager) GetSessionDelegate() *SessionDelegate { return sm.sessionDelegate } -func (sm *FSSessionManager) GetDbLogger() LogDb { +func (sm *FSSessionManager) GetDbLogger() timespans.StorageGetter { return sm.loggerDB } diff --git a/sessionmanager/loggerdb.go b/sessionmanager/loggerdb.go deleted file mode 100644 index 789d0b66c..000000000 --- a/sessionmanager/loggerdb.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "github.com/cgrates/cgrates/timespans" -) - -type LogDb interface { - Log(uuid string, cc *timespans.CallCost) - GetLog(uuid string)(*timespans.CallCost, error) -} \ No newline at end of file diff --git a/sessionmanager/mongologger.go b/sessionmanager/mongologger.go deleted file mode 100644 index f0891c373..000000000 --- a/sessionmanager/mongologger.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "fmt" - "github.com/cgrates/cgrates/timespans" - "labix.org/v2/mgo" - "labix.org/v2/mgo/bson" -) - -type LogEntry struct { - Id string `bson:"_id,omitempty"` - CallCost *timespans.CallCost -} - -type MongoLogger struct { - Col *mgo.Collection -} - -func (ml *MongoLogger) Log(uuid string, cc *timespans.CallCost) { - if ml.Col == nil { - //timespans.Logger.Warning("Cannot write log to database.") - return - } - - err := ml.Col.Insert(&LogEntry{uuid, cc}) - if err != nil { - timespans.Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err)) - } -} - -func (ml *MongoLogger) GetLog(uuid string)(cc *timespans.CallCost, err error) { - result := new(LogEntry) - err = ml.Col.Find(bson.M{"_id": uuid}).One(result) - cc = result.CallCost - return -} \ No newline at end of file diff --git a/sessionmanager/postgreslogger.go b/sessionmanager/postgreslogger.go deleted file mode 100644 index a778f15f7..000000000 --- a/sessionmanager/postgreslogger.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012 Radu Ioan Fericean - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package sessionmanager - -import ( - "database/sql" - "encoding/json" - "fmt" - "github.com/cgrates/cgrates/timespans" -) - -type PostgresLogger struct { - Db *sql.DB -} - -func (psl *PostgresLogger) Log(uuid string, cc *timespans.CallCost) { - if psl.Db == nil { - //timespans.Logger.Warning("Cannot write log to database.") - return - } - tss, err := json.Marshal(cc.Timespans) - if err != nil { - timespans.Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) - } - _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", - uuid, - cc.Direction, - cc.Tenant, - cc.TOR, - cc.Subject, - cc.Account, - cc.Destination, - cc.Cost, - cc.ConnectFee, - tss)) - if err != nil { - timespans.Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err)) - } -} - -func (psl *PostgresLogger) GetLog(uuid string) (cc *timespans.CallCost, err error) { - row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s'", uuid)) - var uuid_found string - var timespansJson string - err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) - err = json.Unmarshal([]byte(timespansJson), cc.Timespans) - return -} \ No newline at end of file diff --git a/sessionmanager/session.go b/sessionmanager/session.go index 99b76be67..bcb63511a 100644 --- a/sessionmanager/session.go +++ b/sessionmanager/session.go @@ -126,7 +126,7 @@ func (s *Session) SaveOperations() { firstCC.Merge(cc) } if s.sessionManager.GetDbLogger() != nil { - s.sessionManager.GetDbLogger().Log(s.uuid, firstCC) + s.sessionManager.GetDbLogger().LogCallCost(s.uuid, firstCC) } timespans.Logger.Debug(firstCC.String()) }() diff --git a/sessionmanager/sessionmanager.go b/sessionmanager/sessionmanager.go index 19ceae2f0..d22c35ab1 100644 --- a/sessionmanager/sessionmanager.go +++ b/sessionmanager/sessionmanager.go @@ -18,8 +18,12 @@ along with this program. If not, see package sessionmanager +import ( + "github.com/cgrates/cgrates/timespans" +) + type SessionManager interface { DisconnectSession(*Session) GetSessionDelegate() *SessionDelegate - GetDbLogger() LogDb + GetDbLogger() timespans.StorageGetter } diff --git a/timespans/storage_interface.go b/timespans/storage_interface.go index 71a003ef8..c571a7b35 100644 --- a/timespans/storage_interface.go +++ b/timespans/storage_interface.go @@ -47,6 +47,8 @@ type StorageGetter interface { GetActionTimings(string) ([]*ActionTiming, error) SetActionTimings(string, []*ActionTiming) error GetAllActionTimings() (map[string][]*ActionTiming, error) + LogCallCost(uuid string, cc *CallCost) error + GetCallCostLog(uuid string) (*CallCost, error) } type Marshaler interface { diff --git a/timespans/storage_map.go b/timespans/storage_map.go index 3ac195068..3c5a1efa4 100644 --- a/timespans/storage_map.go +++ b/timespans/storage_map.go @@ -143,3 +143,18 @@ func (ms *MapStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err return } + +func (ms *MapStorage) LogCallCost(uuid string, cc *CallCost) error { + result, err := ms.ms.Marshal(cc) + ms.dict[uuid] = result + return err +} + +func (ms *MapStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { + if values, ok := ms.dict[uuid]; ok { + err = ms.ms.Unmarshal(values, &cc) + } else { + return nil, errors.New("not found") + } + return +} diff --git a/timespans/storage_mongo.go b/timespans/storage_mongo.go index 4d9b09e96..56cef3047 100644 --- a/timespans/storage_mongo.go +++ b/timespans/storage_mongo.go @@ -30,10 +30,17 @@ type MongoStorage struct { db *mgo.Database } -func NewMongoStorage(address, db string) (StorageGetter, error) { - session, err := mgo.Dial(address) +func NewMongoStorage(host, port, db, user, pass string) (StorageGetter, error) { + dial := fmt.Sprintf(host) + if user != "" && pass != "" { + dial = fmt.Sprintf("%s:%s@%s", user, pass, dial) + } + if port != "" { + dial += ":" + port + } + session, err := mgo.Dial(dial) if err != nil { - Logger.Err("Could not contact mongo server") + Logger.Err(fmt.Sprintf("Could not connect to logger database: %v", err)) return nil, err } ndb := session.DB(db) @@ -96,6 +103,11 @@ type AtKeyValue struct { Value []*ActionTiming } +type LogEntry struct { + Id string `bson:"_id,omitempty"` + CallCost *CallCost +} + func (ms *MongoStorage) GetActivationPeriodsOrFallback(key string) ([]*ActivationPeriod, string, error) { result := new(ApKeyValue) err := ms.db.C("activationperiods").Find(bson.M{"key": key}).One(&result) @@ -158,3 +170,15 @@ func (ms *MongoStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e } return } + +func (ms *MongoStorage) LogCallCost(uuid string, cc *CallCost) error { + return ms.db.C("cclog").Insert(&LogEntry{uuid, cc}) + +} + +func (ms *MongoStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { + result := new(LogEntry) + err = ms.db.C("cclog").Find(bson.M{"_id": uuid}).One(result) + cc = result.CallCost + return +} diff --git a/timespans/storage_postgres.go b/timespans/storage_postgres.go new file mode 100644 index 000000000..3e62250f1 --- /dev/null +++ b/timespans/storage_postgres.go @@ -0,0 +1,102 @@ +/* +Rating system designed to be used in VoIP Carriers World +Copyright (C) 2012 Radu Ioan Fericean + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package timespans + +import ( + "database/sql" + "encoding/json" + "fmt" +) + +type PostgresStorage struct { + Db *sql.DB +} + +func (psl *PostgresStorage) Close() {} + +func (psl *PostgresStorage) Flush() (err error) { + return +} + +func (psl *PostgresStorage) GetActivationPeriodsOrFallback(string) (aps []*ActivationPeriod, fallback string, err error) { + return +} + +func (psl *PostgresStorage) SetActivationPeriodsOrFallback(key string, aps []*ActivationPeriod, fallback string) (err error) { + return +} + +func (psl *PostgresStorage) GetDestination(string) (d *Destination, err error) { + return +} + +func (psl *PostgresStorage) SetDestination(d *Destination) (err error) { + return +} + +func (psl *PostgresStorage) GetActions(string) (as []*Action, err error) { + return +} + +func (psl *PostgresStorage) SetActions(key string, as []*Action) (err error) { return } + +func (psl *PostgresStorage) GetUserBalance(string) (ub *UserBalance, err error) { return } + +func (psl *PostgresStorage) SetUserBalance(ub *UserBalance) (err error) { return } + +func (psl *PostgresStorage) GetActionTimings(key string) (ats []*ActionTiming, err error) { return } + +func (psl *PostgresStorage) SetActionTimings(key string, ats []*ActionTiming) (err error) { return } + +func (psl *PostgresStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, err error) { return } + +func (psl *PostgresStorage) LogCallCost(uuid string, cc *CallCost) (err error) { + if psl.Db == nil { + //timespans.Logger.Warning("Cannot write log to database.") + return + } + tss, err := json.Marshal(cc.Timespans) + if err != nil { + Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + uuid, + cc.Direction, + cc.Tenant, + cc.TOR, + cc.Subject, + cc.Account, + cc.Destination, + cc.Cost, + cc.ConnectFee, + tss)) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute insert statement: %v", err)) + } + return +} + +func (psl *PostgresStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { + row := psl.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s'", uuid)) + var uuid_found string + var timespansJson string + err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) + err = json.Unmarshal([]byte(timespansJson), cc.Timespans) + return +} diff --git a/timespans/storage_redis.go b/timespans/storage_redis.go index 50424814d..9574e3984 100644 --- a/timespans/storage_redis.go +++ b/timespans/storage_redis.go @@ -147,3 +147,17 @@ func (rs *RedisStorage) GetAllActionTimings() (ats map[string][]*ActionTiming, e return } + +func (rs *RedisStorage) LogCallCost(uuid string, cc *CallCost) (err error) { + result, err := rs.ms.Marshal(cc) + return rs.db.Set(uuid, result) +} + +func (rs *RedisStorage) GetCallCostLog(uuid string) (cc *CallCost, err error) { + if values, err := rs.db.Get(uuid); err == nil { + err = rs.ms.Unmarshal(values, &cc) + } else { + return nil, err + } + return +}