From 6345b78a445882452b581c098d42ddf5bf54f0a6 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 26 May 2013 18:24:05 +0300 Subject: [PATCH 01/10] cdr server capturing events --- cdrs/cdrs.go | 9 ++++++--- cdrs/fscdr.go | 4 ++-- cmd/cgr-rater/cgr-rater.go | 3 +-- mediator/mediator.go | 27 +++++++++++++++++++++++++-- rater/calldesc.go | 1 + rater/cdr.go | 2 +- rater/storage_gosexy.go | 8 ++++++++ rater/storage_interface.go | 2 ++ rater/storage_map.go | 8 ++++++++ rater/storage_mongo.go | 8 ++++++++ rater/storage_mysql.go | 8 ++++++++ rater/storage_postgres.go | 8 ++++++++ rater/storage_redigo.go | 8 ++++++++ rater/storage_redis.go | 8 ++++++++ 14 files changed, 94 insertions(+), 10 deletions(-) diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 0e83a4b8f..4e8193562 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -37,9 +37,12 @@ var ( func cdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) if fsCdr, err := new(FSCdr).New(body); err == nil { - log.Printf("CDR: %v", fsCdr) - //storage.SetCdr(fsCdr) - //medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) + storage.SetCdr(fsCdr) + if cfg.CDRSMediator == 'internal' { + medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) + } else { + //TODO: use the connection to mediator + } } else { rater.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index df01dfac2..fefbec562 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -107,8 +107,8 @@ func (fsCdr FSCdr) GetExtraParameters() string { func (fsCdr FSCdr) GetFallbackSubj() string { return cfg.DefaultSubject } -func (fsCdr FSCdr) GetStartTime(field string) (t time.Time, err error) { - st, err := strconv.ParseInt(fsCdr[field], 0, 64) +func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) { + st, err := strconv.ParseInt(fsCdr[START_TIME], 0, 64) t = time.Unix(0, st*1000) return } diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index fe1b847ce..d7aaadf8f 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -317,8 +317,7 @@ func main() { rater.Logger.Info("Starting CGRateS Mediator.") go startMediator(responder, loggerDb) } - - if cfg.CDRSListen!="" { + if cfg.CDRSListen != "" { rater.Logger.Info("Starting CGRateS CDR Server.") cs := cdrs.New(loggerDb, medi, cfg) go cs.StartCapturingCDRs() diff --git a/mediator/mediator.go b/mediator/mediator.go index 2b8c806f8..56a95cde6 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -223,6 +223,29 @@ func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.Cal return } -func (m *Mediator) MediateCdrFromDB(cdrID string, db rater.DataStorage) error { - return nil +/* Calculates price for the specified cdr and writes the new cdr with price to +the storage. If the cdr is nil then it will fetch it from the storage. */ +func (m *Mediator) MediateCdrFromDB(cdrID string, cdr rater.CDR, db rater.DataStorage) error { + cc := &rater.CallCost{} + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + endTime, err := cdr.GetEndTime() + if err != nil { + return err + } + cd := rater.CallDescriptor{ + Direction: cdr.GetDirection(), + Tenant: cdr.GetTenant(), + TOR: cdr.GetTOR(), + Subject: cdr.GetSubject(), + Account: cdr.GetAccount(), + Destination: cdr.GetDestination(), + TimeStart: startTime, + TimeEnd: endTime} + if err := m.connector.GetCost(cd, cc); err != nil { + return err + } + return db.SetMediatedCdr(cdr, cc) } diff --git a/rater/calldesc.go b/rater/calldesc.go index 62e7d1acd..ca5c2711a 100644 --- a/rater/calldesc.go +++ b/rater/calldesc.go @@ -33,6 +33,7 @@ func init() { Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS") if err != nil { Logger = new(utils.StdLogger) + Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err)) } } diff --git a/rater/cdr.go b/rater/cdr.go index db2d2b106..8fa743d4a 100644 --- a/rater/cdr.go +++ b/rater/cdr.go @@ -34,7 +34,7 @@ type CDR interface { GetUUID() string GetTenant() string GetReqType() string - GetStartTime(string) (time.Time, error) + GetStartTime() (time.Time, error) GetEndTime() (time.Time, error) GetFallbackSubj() string GetExtraParameters() string diff --git a/rater/storage_gosexy.go b/rater/storage_gosexy.go index 7787ae754..6f704fd2a 100644 --- a/rater/storage_gosexy.go +++ b/rater/storage_gosexy.go @@ -224,3 +224,11 @@ func (rs *GosexyStorage) GetCdr(string) (CDR, error) { func (rs *GosexyStorage) SetCdr(CDR) error { return nil } + +func (rs *GosexyStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (rs *GosexyStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_interface.go b/rater/storage_interface.go index d13d2c66b..200ac8b94 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -63,6 +63,8 @@ type DataStorage interface { GetAllActionTimings() (map[string][]*ActionTiming, error) GetCdr(string) (CDR, error) SetCdr(CDR) error + SetMediatedCdr(CDR, *CallCost) error + GetMediatedCdr(string) (CDR, error) //GetAllActionTimingsLogs() (map[string][]*ActionTiming, error) LogCallCost(uuid, source string, cc *CallCost) error LogError(uuid, source, errstr string) error diff --git a/rater/storage_map.go b/rater/storage_map.go index 5203b9253..7bf8760d2 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -189,3 +189,11 @@ func (ms *MapStorage) GetCdr(string) (CDR, error) { func (ms *MapStorage) SetCdr(CDR) error { return nil } + +func (ms *MapStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (ms *MapStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index 43e7726eb..da46e2416 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -214,3 +214,11 @@ func (ms *MongoStorage) GetCdr(string) (CDR, error) { func (ms *MongoStorage) SetCdr(CDR) error { return nil } + +func (ms *MongoStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (ms *MongoStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index a30822965..a3b157823 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -227,3 +227,11 @@ func (mys *MySQLStorage) GetCdr(string) (CDR, error) { func (mys *MySQLStorage) SetCdr(CDR) error { return nil } + +func (mys *MySQLStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (mys *MySQLStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index 54822171a..7691b3130 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -227,3 +227,11 @@ func (psl *PostgresStorage) GetCdr(string) (CDR, error) { func (psl *PostgresStorage) SetCdr(CDR) error { return nil } + +func (psl *PostgresStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (psl *PostgresStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index 25051761e..604266cd4 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -221,3 +221,11 @@ func (rs *RedigoStorage) GetCdr(string) (CDR, error) { func (rs *RedigoStorage) SetCdr(CDR) error { return nil } + +func (rs *RedigoStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (rs *RedigoStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} diff --git a/rater/storage_redis.go b/rater/storage_redis.go index e503c74a4..17d43f9e3 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -247,3 +247,11 @@ func (rs *RedisStorage) GetCdr(string) (CDR, error) { func (rs *RedisStorage) SetCdr(CDR) error { return nil } + +func (rs *RedisStorage) SetMediatedCdr(CDR, *CallCost) error { + return nil +} + +func (rs *RedisStorage) GetMediatedCdr(string) (CDR, error) { + return nil, nil +} From cd8509084b6eb4b93d5bfbd9063200d412c4471a Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Sun, 26 May 2013 21:16:00 +0300 Subject: [PATCH 02/10] cdrs before testing --- cdrs/cdrs.go | 5 +- cdrs/fscdr.go | 23 +++ data/storage/postgres/create_rater_tables.sql | 94 ++++++++++ mediator/mediator.go | 4 +- rater/cdr.go | 5 + rater/storage_gosexy.go | 9 +- rater/storage_interface.go | 8 +- rater/storage_map.go | 9 +- rater/storage_mongo.go | 9 +- rater/storage_mysql.go | 163 ++++++------------ rater/storage_postgres.go | 159 ++++++----------- rater/storage_redigo.go | 9 +- rater/storage_redis.go | 10 +- 13 files changed, 238 insertions(+), 269 deletions(-) create mode 100644 data/storage/postgres/create_rater_tables.sql diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 4e8193562..63c055b03 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/cgrates/mediator" "github.com/cgrates/cgrates/rater" "io/ioutil" - "log" "net/http" ) @@ -38,8 +37,8 @@ func cdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) if fsCdr, err := new(FSCdr).New(body); err == nil { storage.SetCdr(fsCdr) - if cfg.CDRSMediator == 'internal' { - medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) + if cfg.CDRSMediator == "internal" { + medi.MediateCdrFromDB(fsCdr, storage) } else { //TODO: use the connection to mediator } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index fefbec562..370be6f9f 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -20,6 +20,7 @@ package cdrs import ( "encoding/json" + "errors" "github.com/cgrates/cgrates/rater" "github.com/cgrates/cgrates/utils" "strconv" @@ -44,6 +45,7 @@ const ( END_TIME = "end_stamp" USERNAME = "user_name" FS_IP = "sip_local_network_addr" + RATE = "rate" ) type FSCdr map[string]string @@ -65,6 +67,15 @@ func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) { return nil, err } +func (fsCdr FSCdr) GetCgrId() string { + return "" +} +func (fsCdr FSCdr) GetAccId() string { + return "" +} +func (fsCdr FSCdr) GetCdrHost() string { + return "" +} func (fsCdr FSCdr) GetDirection() string { //TODO: implement direction return "OUT" @@ -118,3 +129,15 @@ func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) { t = time.Unix(0, st*1000) return } + +func (fsCdr FSCdr) GetRate() (float64, error) { + rate, ok := fsCdr[RATE] + if !ok { + return -1, errors.New("Not found") + } + return strconv.ParseFloat(rate, 64) +} + +func (fsCdr FSCdr) SetRate(rate float64) { + fsCdr[RATE] = strconv.FormatFloat(rate, 'f', -1, 64) +} diff --git a/data/storage/postgres/create_rater_tables.sql b/data/storage/postgres/create_rater_tables.sql new file mode 100644 index 000000000..118e742a8 --- /dev/null +++ b/data/storage/postgres/create_rater_tables.sql @@ -0,0 +1,94 @@ +CREATE TABLE ratingprofile IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + fallbackkey VARCHAR(512), +); +CREATE TABLE ratingdestinations IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE +); +CREATE TABLE destination IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, + name VARCHAR(512), + prefixes TEXT +); +CREATE TABLE activationprofile IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + activationtime TIMESTAMP +); +CREATE TABLE interval IF NOT EXISTS( + id SERIAL PRIMARY KEY, + activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, + years TEXT, + months TEXT, + monthdays TEXT, + weekdays TEXT, + starttime TIMESTAMP, + endtime TIMESTAMP, + weight FLOAT8, + connectfee FLOAT8, + price FLOAT8, + pricedunits FLOAT8, + rateincrements FLOAT8 +); +CREATE TABLE minutebucket IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + seconds FLOAT8, + weight FLOAT8, + price FLOAT8, + percent FLOAT8 +); +CREATE TABLE unitcounter IF NOT EXISTS( + id SERIAL PRIMARY KEY, + direction TEXT, + balance TEXT, + units FLOAT8 +); +CREATE TABLE unitcounterbucket IF NOT EXISTS( + id SERIAL PRIMARY KEY, + unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE +); +CREATE TABLE actiontrigger IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + actions INTEGER REFERENCES action(id) ON DELETE CASCADE, + balance TEXT, + direction TEXT, + thresholdvalue FLOAT8, + weight FLOAT8, + executed BOOL +); +CREATE TABLE balance IF NOT EXISTS( + id SERIAL PRIMARY KEY, + name TEXT; + value FLOAT8 +); +CREATE TABLE userbalance IF NOT EXISTS( + id SERIAL PRIMARY KEY, + unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE + actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, + balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, + type TEXT +); +CREATE TABLE actiontiming IF NOT EXISTS( + id SERIAL PRIMARY KEY, + tag TEXT, + userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, + timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, + actions INTEGER REFERENCES action(id) ON DELETE CASCADE, + weight FLOAT8 +); +CREATE TABLE action IF NOT EXISTS( + id SERIAL PRIMARY KEY, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, + actiontype TEXT, + balance TEXT, + direction TEXT, + units FLOAT8, + weight FLOAT8 +); \ No newline at end of file diff --git a/mediator/mediator.go b/mediator/mediator.go index 56a95cde6..935c29498 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -225,7 +225,7 @@ func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.Cal /* Calculates price for the specified cdr and writes the new cdr with price to the storage. If the cdr is nil then it will fetch it from the storage. */ -func (m *Mediator) MediateCdrFromDB(cdrID string, cdr rater.CDR, db rater.DataStorage) error { +func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error { cc := &rater.CallCost{} startTime, err := cdr.GetStartTime() if err != nil { @@ -247,5 +247,5 @@ func (m *Mediator) MediateCdrFromDB(cdrID string, cdr rater.CDR, db rater.DataSt if err := m.connector.GetCost(cd, cc); err != nil { return err } - return db.SetMediatedCdr(cdr, cc) + return db.SetRatedCdr(cdr, cc) } diff --git a/rater/cdr.go b/rater/cdr.go index 8fa743d4a..a355bf803 100644 --- a/rater/cdr.go +++ b/rater/cdr.go @@ -24,6 +24,9 @@ import ( type CDR interface { New([]byte) (CDR, error) + GetCgrId() string + GetAccId() string + GetCdrHost() string GetDirection() string GetOrigId() string GetSubject() string @@ -38,4 +41,6 @@ type CDR interface { GetEndTime() (time.Time, error) GetFallbackSubj() string GetExtraParameters() string + GetRate() (float64, error) + SetRate(float64) } diff --git a/rater/storage_gosexy.go b/rater/storage_gosexy.go index 6f704fd2a..982301388 100644 --- a/rater/storage_gosexy.go +++ b/rater/storage_gosexy.go @@ -218,17 +218,10 @@ func (rs *GosexyStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *GosexyStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (rs *GosexyStorage) SetCdr(CDR) error { return nil } -func (rs *GosexyStorage) SetMediatedCdr(CDR, *CallCost) error { +func (rs *GosexyStorage) SetRatedCdr(CDR, *CallCost) error { return nil } - -func (rs *GosexyStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil -} diff --git a/rater/storage_interface.go b/rater/storage_interface.go index 200ac8b94..a257c7ba1 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -37,10 +37,12 @@ const ( LOG_ACTION_TIMMING_PREFIX = "ltm_" LOG_ACTION_TRIGGER_PREFIX = "ltr_" LOG_ERR = "ler_" + LOG_CDR = "cdr_" + LOG_MEDIATED_CDR = "mcd_" // sources SESSION_MANAGER_SOURCE = "SMR" MEDIATOR_SOURCE = "MED" - SCHED_SOURCE = "MED" + SCHED_SOURCE = "SCH" RATER_SOURCE = "RAT" ) @@ -61,10 +63,8 @@ type DataStorage interface { GetActionTimings(string) ([]*ActionTiming, error) SetActionTimings(string, []*ActionTiming) error GetAllActionTimings() (map[string][]*ActionTiming, error) - GetCdr(string) (CDR, error) SetCdr(CDR) error - SetMediatedCdr(CDR, *CallCost) error - GetMediatedCdr(string) (CDR, error) + SetRatedCdr(CDR, *CallCost) error //GetAllActionTimingsLogs() (map[string][]*ActionTiming, error) LogCallCost(uuid, source string, cc *CallCost) error LogError(uuid, source, errstr string) error diff --git a/rater/storage_map.go b/rater/storage_map.go index 7bf8760d2..a0a05d0fc 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -183,17 +183,10 @@ func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) { return nil } -func (ms *MapStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (ms *MapStorage) SetCdr(CDR) error { return nil } -func (ms *MapStorage) SetMediatedCdr(CDR, *CallCost) error { +func (ms *MapStorage) SetRatedCdr(CDR, *CallCost) error { return nil } - -func (ms *MapStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil -} diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index da46e2416..7652601be 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -208,17 +208,10 @@ func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) { return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source}) } -func (ms *MongoStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (ms *MongoStorage) SetCdr(CDR) error { return nil } -func (ms *MongoStorage) SetMediatedCdr(CDR, *CallCost) error { +func (ms *MongoStorage) SetRatedCdr(CDR, *CallCost) error { return nil } - -func (ms *MongoStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil -} diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index a3b157823..c198522a9 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -29,105 +29,6 @@ type MySQLStorage struct { Db *sql.DB } -var ( - mysql_schema = ` -CREATE TABLE ratingprofile IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - fallbackkey VARCHAR(512), -); -CREATE TABLE ratingdestinations IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE -); -CREATE TABLE destination IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - name VARCHAR(512), - prefixes TEXT -); -CREATE TABLE activationprofile IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - activationtime TIMESTAMP -); -CREATE TABLE interval IF NOT EXISTS( - id SERIAL PRIMARY KEY, - activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, - years TEXT, - months TEXT, - monthdays TEXT, - weekdays TEXT, - starttime TIMESTAMP, - endtime TIMESTAMP, - weight FLOAT8, - connectfee FLOAT8, - price FLOAT8, - pricedunits FLOAT8, - rateincrements FLOAT8 -); -CREATE TABLE minutebucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - seconds FLOAT8, - weight FLOAT8, - price FLOAT8, - percent FLOAT8 -); -CREATE TABLE unitcounter IF NOT EXISTS( - id SERIAL PRIMARY KEY, - direction TEXT, - balance TEXT, - units FLOAT8 -); -CREATE TABLE unitcounterbucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE -); -CREATE TABLE actiontrigger IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - balance TEXT, - direction TEXT, - thresholdvalue FLOAT8, - weight FLOAT8, - executed BOOL -); -CREATE TABLE balance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - name TEXT; - value FLOAT8 -); -CREATE TABLE userbalance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE - actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, - balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, - type TEXT -); -CREATE TABLE actiontiming IF NOT EXISTS( - id SERIAL PRIMARY KEY, - tag TEXT, - userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, - timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - weight FLOAT8 -); -CREATE TABLE action IF NOT EXISTS( - id SERIAL PRIMARY KEY, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, - actiontype TEXT, - balance TEXT, - direction TEXT, - units FLOAT8, - weight FLOAT8 -); -` -) - func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) { db, err := sql.Open("mysql", "cgrates:testus@tcp(192.168.0.17:3306)/cgrates?charset=utf8") if err != nil { @@ -186,7 +87,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO call_costs VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", uuid, source, cc.Direction, @@ -205,7 +106,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err } func (mys *MySQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source)) + row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM call_costs WHERE uuid='%s' AND source='%s'", uuid, source)) var uuid_found string var timespansJson string err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) @@ -221,17 +122,57 @@ func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as []* } func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return } -func (mys *MySQLStorage) GetCdr(string) (CDR, error) { - return nil, nil -} -func (mys *MySQLStorage) SetCdr(CDR) error { - return nil +func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + endTime, err := cdr.GetEndTime() + if err != nil { + return err + } + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + cdr.GetCgrId(), + cdr.GetAccId(), + cdr.GetCdrHost(), + cdr.GetReqType(), + cdr.GetDirection(), + cdr.GetTenant(), + cdr.GetTOR(), + cdr.GetAccount(), + cdr.GetSubject(), + cdr.GetDestination(), + startTime, + endTime, //duration + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", + cdr.GetCgrId(), + cdr.GetExtraParameters(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } -func (mys *MySQLStorage) SetMediatedCdr(CDR, *CallCost) error { - return nil -} +func (mys *MySQLStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { + rate, err := cdr.GetRate() + if err != nil { + return err + } + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", + cdr.GetCgrId(), + rate, + "cgrcostid", + "cdrsrc", + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } -func (mys *MySQLStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil + return } diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index 7691b3130..fc908a0af 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -29,105 +29,6 @@ type PostgresStorage struct { Db *sql.DB } -var ( - postgres_schema = ` -CREATE TABLE ratingprofile IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - fallbackkey VARCHAR(512), -); -CREATE TABLE ratingdestinations IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE -); -CREATE TABLE destination IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - name VARCHAR(512), - prefixes TEXT -); -CREATE TABLE activationprofile IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - activationtime TIMESTAMP -); -CREATE TABLE interval IF NOT EXISTS( - id SERIAL PRIMARY KEY, - activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, - years TEXT, - months TEXT, - monthdays TEXT, - weekdays TEXT, - starttime TIMESTAMP, - endtime TIMESTAMP, - weight FLOAT8, - connectfee FLOAT8, - price FLOAT8, - pricedunits FLOAT8, - rateincrements FLOAT8 -); -CREATE TABLE minutebucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - seconds FLOAT8, - weight FLOAT8, - price FLOAT8, - percent FLOAT8 -); -CREATE TABLE unitcounter IF NOT EXISTS( - id SERIAL PRIMARY KEY, - direction TEXT, - balance TEXT, - units FLOAT8 -); -CREATE TABLE unitcounterbucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE -); -CREATE TABLE actiontrigger IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - balance TEXT, - direction TEXT, - thresholdvalue FLOAT8, - weight FLOAT8, - executed BOOL -); -CREATE TABLE balance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - name TEXT; - value FLOAT8 -); -CREATE TABLE userbalance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE - actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, - balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, - type TEXT -); -CREATE TABLE actiontiming IF NOT EXISTS( - id SERIAL PRIMARY KEY, - tag TEXT, - userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, - timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - weight FLOAT8 -); -CREATE TABLE action IF NOT EXISTS( - id SERIAL PRIMARY KEY, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, - actiontype TEXT, - balance TEXT, - direction TEXT, - units FLOAT8, - weight FLOAT8 -); -` -) - func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) { db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, name, user, password)) if err != nil { @@ -221,17 +122,57 @@ func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as } func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return } -func (psl *PostgresStorage) GetCdr(string) (CDR, error) { - return nil, nil -} -func (psl *PostgresStorage) SetCdr(CDR) error { - return nil +func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + endTime, err := cdr.GetEndTime() + if err != nil { + return err + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + cdr.GetCgrId(), + cdr.GetAccId(), + cdr.GetCdrHost(), + cdr.GetReqType(), + cdr.GetDirection(), + cdr.GetTenant(), + cdr.GetTOR(), + cdr.GetAccount(), + cdr.GetSubject(), + cdr.GetDestination(), + startTime, + endTime, //duration + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", + cdr.GetCgrId(), + cdr.GetExtraParameters(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } -func (psl *PostgresStorage) SetMediatedCdr(CDR, *CallCost) error { - return nil -} +func (psl *PostgresStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { + rate, err := cdr.GetRate() + if err != nil { + return err + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", + cdr.GetCgrId(), + rate, + "cgrcostid", + "cdrsrc", + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } -func (psl *PostgresStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil + return } diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index 604266cd4..0fb0b2bc6 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -215,17 +215,10 @@ func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *RedigoStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (rs *RedigoStorage) SetCdr(CDR) error { return nil } -func (rs *RedigoStorage) SetMediatedCdr(CDR, *CallCost) error { +func (rs *RedigoStorage) SetRatedCdr(CDR, *CallCost) error { return nil } - -func (rs *RedigoStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil -} diff --git a/rater/storage_redis.go b/rater/storage_redis.go index 17d43f9e3..623c4909a 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -241,17 +241,11 @@ func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) { } return } -func (rs *RedisStorage) GetCdr(string) (CDR, error) { - return nil, nil -} + func (rs *RedisStorage) SetCdr(CDR) error { return nil } -func (rs *RedisStorage) SetMediatedCdr(CDR, *CallCost) error { +func (rs *RedisStorage) SetRatedCdr(CDR, *CallCost) error { return nil } - -func (rs *RedisStorage) GetMediatedCdr(string) (CDR, error) { - return nil, nil -} From af970630318908282eabee690897b7093d855007 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 27 May 2013 17:49:35 +0200 Subject: [PATCH 03/10] Adding missing values out of fscdr --- cdrs/fscdr.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 370be6f9f..6bea99479 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -25,6 +25,8 @@ import ( "github.com/cgrates/cgrates/utils" "strconv" "time" + "fmt" + "crypto/sha1" ) const ( @@ -68,18 +70,20 @@ func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) { } func (fsCdr FSCdr) GetCgrId() string { - return "" + hasher := sha1.New() + hasher.Write([]byte( fsCdr[FS_IP] )) + hasher.Write([]byte( fsCdr[UUID] )) + return fmt.Sprintf("%x", hasher.Sum(nil)) } func (fsCdr FSCdr) GetAccId() string { - return "" + return fsCdr[UUID] } func (fsCdr FSCdr) GetCdrHost() string { - return "" + return fsCdr[FS_IP] } func (fsCdr FSCdr) GetDirection() string { - //TODO: implement direction + //TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account return "OUT" - //return fsCdr[DIRECTION] } func (fsCdr FSCdr) GetOrigId() string { return fsCdr[ORIG_ID] @@ -113,7 +117,7 @@ func (fsCdr FSCdr) GetReqType() string { return utils.FirstNonEmpty(fsCdr[REQTYPE], cfg.DefaultReqType) } func (fsCdr FSCdr) GetExtraParameters() string { - return "" + return "" // ToDo: Add and extract from config } func (fsCdr FSCdr) GetFallbackSubj() string { return cfg.DefaultSubject From a2afe0794a958f9818dd6a2a9794dc911ee2e7ad Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 28 May 2013 14:06:08 +0200 Subject: [PATCH 04/10] Fixup timestamps source in cdrs, modified cmd/cgr-rater to support disabling mediator as service in configuration --- cdrs/cdrs.go | 5 ++++- cdrs/fscdr.go | 6 +++--- cmd/cgr-rater/cgr-rater.go | 35 ++++++++++++++++++++++++++--------- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 63c055b03..38b8d19dc 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -38,7 +38,10 @@ func cdrHandler(w http.ResponseWriter, r *http.Request) { if fsCdr, err := new(FSCdr).New(body); err == nil { storage.SetCdr(fsCdr) if cfg.CDRSMediator == "internal" { - medi.MediateCdrFromDB(fsCdr, storage) + errMedi := medi.MediateCdrFromDB(fsCdr, storage) + if errMedi != nil { + rater.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) + } } else { //TODO: use the connection to mediator } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 6bea99479..697f8ec29 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -42,9 +42,9 @@ const ( UUID = "uuid" // -Unique ID for this call leg CSTMID = "cgr_cstmid" CALL_DEST_NR = "dialed_extension" - PARK_TIME = "start_stamp" - START_TIME = "answer_stamp" - END_TIME = "end_stamp" + PARK_TIME = "start_epoch" + START_TIME = "answer_epoch" + END_TIME = "end_epoch" USERNAME = "user_name" FS_IP = "sip_local_network_addr" RATE = "rate" diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index d7aaadf8f..40275c04e 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -121,14 +121,6 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) { } connector = &rater.RPCClientConnector{Client: client} } - if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir)) - exitChan <- true - } - if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir)) - exitChan <- true - } var err error medi, err = mediator.NewMediator(connector, loggerDb, cfg.MediatorCDROutDir, cfg.MediatorPseudoprepaid, cfg.FreeswitchDirectionIdx, cfg.FreeswitchTORIdx, cfg.FreeswitchTenantIdx, cfg.FreeswitchSubjectIdx, cfg.FreeswitchAccountIdx, @@ -138,7 +130,19 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) { exitChan <- true } - medi.TrackCDRFiles(cfg.MediatorCDRInDir) + if cfg.MediatorEnabled { //Mediator as standalone service + if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil { + rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir)) + exitChan <- true + } + if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil { + rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir)) + exitChan <- true + } + medi.TrackCDRFiles(cfg.MediatorCDRInDir) + } + + } func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) { @@ -319,6 +323,19 @@ func main() { } if cfg.CDRSListen != "" { rater.Logger.Info("Starting CGRateS CDR Server.") + if !cfg.MediatorEnabled { + go startMediator(responder, loggerDb) // Will start it internally, important to connect the responder + } + for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable + time.Sleep(time.Duration(i/2) * time.Second) + if medi!=nil { // Got our mediator, no need to wait any longer + break + } + } + if medi == nil { + rater.Logger.Crit(" Could not connect to mediator, exiting.") + exitChan <- true + } cs := cdrs.New(loggerDb, medi, cfg) go cs.StartCapturingCDRs() } From f0e82ba4af447862c38893aa6e17c600e8876208 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 28 May 2013 14:38:23 +0200 Subject: [PATCH 05/10] Refactoring startCDRS to be in trend with the rest of start functions --- cmd/cgr-rater/cgr-rater.go | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index 40275c04e..bb3620d6e 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -192,6 +192,25 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) exitChan <- true } +func startCDRS(responder *rater.Responder, loggerDb rater.DataStorage) { + if !cfg.MediatorEnabled { + go startMediator(responder, loggerDb) // Will start it internally, important to connect the responder + } + for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable + time.Sleep(time.Duration(i/2) * time.Second) + if medi!=nil { // Got our mediator, no need to wait any longer + break + } + } + if medi == nil { + rater.Logger.Crit(" Could not connect to mediator, exiting.") + exitChan <- true + } + cs := cdrs.New(loggerDb, medi, cfg) + cs.StartCapturingCDRs() + exitChan <- true +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED { rater.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!") @@ -323,21 +342,7 @@ func main() { } if cfg.CDRSListen != "" { rater.Logger.Info("Starting CGRateS CDR Server.") - if !cfg.MediatorEnabled { - go startMediator(responder, loggerDb) // Will start it internally, important to connect the responder - } - for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable - time.Sleep(time.Duration(i/2) * time.Second) - if medi!=nil { // Got our mediator, no need to wait any longer - break - } - } - if medi == nil { - rater.Logger.Crit(" Could not connect to mediator, exiting.") - exitChan <- true - } - cs := cdrs.New(loggerDb, medi, cfg) - go cs.StartCapturingCDRs() + go startCDRS(responder, loggerDb) } <-exitChan rater.Logger.Info("Stopped all components. CGRateS shutdown!") From ef6b1cc9f8d25a338b3b896573fb470fb59dff67 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 28 May 2013 16:29:33 +0200 Subject: [PATCH 06/10] Adding some more tables on mysql side --- data/storage/mysql/create_callcost_tables.sql | 21 +++++++++++++++++++ data/storage/mysql/create_user.sql | 5 +++++ 2 files changed, 26 insertions(+) create mode 100644 data/storage/mysql/create_callcost_tables.sql create mode 100644 data/storage/mysql/create_user.sql diff --git a/data/storage/mysql/create_callcost_tables.sql b/data/storage/mysql/create_callcost_tables.sql new file mode 100644 index 000000000..ff4aeea7d --- /dev/null +++ b/data/storage/mysql/create_callcost_tables.sql @@ -0,0 +1,21 @@ + +-- +-- Table structure for table `callcosts` +-- +CREATE TABLE `callcosts` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `uuid` varchar(80), + `source` varchar(32) NOT NULL, + `direction` varchar(32) NOT NULL, + `tenant` varchar(64) NOT NULL, + `tor` varchar(8) NOT NULL, + `account` varchar(64) NOT NULL, + `subject` varchar(64) NOT NULL, + `destination` varchar(64) NOT NULL, + `cost` double(20,4) default NULL, + `connect_fee` double(20,4) default NULL, + `timespans` text, + PRIMARY KEY (`id`), + UNIQUE KEY `cgrid` (`uuid`) +); + diff --git a/data/storage/mysql/create_user.sql b/data/storage/mysql/create_user.sql new file mode 100644 index 000000000..149a38db1 --- /dev/null +++ b/data/storage/mysql/create_user.sql @@ -0,0 +1,5 @@ + +-- +-- Sample user creation. Replace here with your own details +-- +GRANT ALL on cgrates.* TO 'cgrates'@'localhost' IDENTIFIED BY 'CGRateS.org'; From 6875f11c3585e99cf7e1fcff934863b54018ba5e Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 28 May 2013 16:30:29 +0200 Subject: [PATCH 07/10] Replacing GetEndTime with GetDuration, support for MySQL as LogDB --- cdrs/fscdr.go | 7 +++++++ cmd/cgr-rater/cgr-rater.go | 3 +++ mediator/mediator.go | 1 + rater/cdr.go | 1 + rater/storage_mysql.go | 19 ++++++++----------- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 697f8ec29..85b8001e1 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -45,6 +45,7 @@ const ( PARK_TIME = "start_epoch" START_TIME = "answer_epoch" END_TIME = "end_epoch" + DURATION = "billsec" USERNAME = "user_name" FS_IP = "sip_local_network_addr" RATE = "rate" @@ -133,6 +134,12 @@ func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) { t = time.Unix(0, st*1000) return } +// Extracts duration as considered by the telecom switch +func (fsCdr FSCdr) GetDuration() int64 { + dur, _ := strconv.ParseInt(fsCdr[DURATION], 0, 64) + return dur +} + func (fsCdr FSCdr) GetRate() (float64, error) { rate, ok := fsCdr[RATE] diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index bb3620d6e..601d4dbec 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -45,6 +45,7 @@ const ( JSON = "json" GOB = "gob" POSTGRES = "postgres" + MYSQL = "mysql" MONGO = "mongo" REDIS = "redis" SAME = "same" @@ -241,6 +242,8 @@ func configureDatabase(db_type, host, port, name, user, pass string) (getter rat getter, err = rater.NewMongoStorage(host, port, name, user, pass) case POSTGRES: getter, err = rater.NewPostgresStorage(host, port, name, user, pass) + case MYSQL: + getter, err = rater.NewMySQLStorage(host, port, name, user, pass) default: err = errors.New("unknown db") return nil, err diff --git a/mediator/mediator.go b/mediator/mediator.go index 935c29498..9654881e9 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -245,6 +245,7 @@ func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error { TimeStart: startTime, TimeEnd: endTime} if err := m.connector.GetCost(cd, cc); err != nil { + fmt.Println("Got error in the mediator getCost", err.Error()) return err } return db.SetRatedCdr(cdr, cc) diff --git a/rater/cdr.go b/rater/cdr.go index a355bf803..07c20c6c4 100644 --- a/rater/cdr.go +++ b/rater/cdr.go @@ -39,6 +39,7 @@ type CDR interface { GetReqType() string GetStartTime() (time.Time, error) GetEndTime() (time.Time, error) + GetDuration() int64 GetFallbackSubj() string GetExtraParameters() string GetRate() (float64, error) diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index c198522a9..0cb6d571c 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -30,7 +30,7 @@ type MySQLStorage struct { } func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) { - db, err := sql.Open("mysql", "cgrates:testus@tcp(192.168.0.17:3306)/cgrates?charset=utf8") + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", user, password, host, port, name)) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO call_costs VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO callcosts VALUES ('NULL','%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", uuid, source, cc.Direction, @@ -106,7 +106,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err } func (mys *MySQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM call_costs WHERE uuid='%s' AND source='%s'", uuid, source)) + row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM callcosts WHERE uuid='%s' AND source='%s'", uuid, source)) var uuid_found string var timespansJson string err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) @@ -127,11 +127,7 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { if err != nil { return err } - endTime, err := cdr.GetEndTime() - if err != nil { - return err - } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES (NULL, '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d)", cdr.GetCgrId(), cdr.GetAccId(), cdr.GetCdrHost(), @@ -143,12 +139,12 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { cdr.GetSubject(), cdr.GetDestination(), startTime, - endTime, //duration + cdr.GetDuration(), //duration )) if err != nil { Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')", cdr.GetCgrId(), cdr.GetExtraParameters(), )) @@ -162,9 +158,10 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { func (mys *MySQLStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { rate, err := cdr.GetRate() if err != nil { + fmt.Println("Could not find rate in cdr") return err } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), rate, "cgrcostid", From 72e054a9b38ccf4788fc41318910d79364a05e44 Mon Sep 17 00:00:00 2001 From: DanB Date: Wed, 29 May 2013 15:05:28 +0200 Subject: [PATCH 08/10] Adding tariffplan tables --- data/storage/mysql/create_cdrs_tables.sql | 2 +- .../mysql/create_tariffplan_tables.sql | 148 ++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 data/storage/mysql/create_tariffplan_tables.sql diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index 39d8e29d6..2af8370f9 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -9,7 +9,7 @@ CREATE TABLE `cdrs_primary` ( `reqtype` varchar(24) NOT NULL, `direction` enum('0','1','2') NOT NULL DEFAULT '1', `tenant` varchar(64) NOT NULL, - `tor` varchar(8) NOT NULL, + `tor` varchar(16) NOT NULL, `account` varchar(64) NOT NULL, `subject` varchar(64) NOT NULL, `destination` varchar(64) NOT NULL, diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql new file mode 100644 index 000000000..3b9ddbd9c --- /dev/null +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -0,0 +1,148 @@ +-- +-- Table structure for table `tp_timings` +-- + +CREATE TABLE `tp_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `years` varchar(255) NOT NULL, + `months` varchar(255) NOT NULL, + `month_days` varchar(255) NOT NULL, + `week_days` varchar(255) NOT NULL, + `time` varchar(16) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_destinations` +-- + +CREATE TABLE `tp_destinatins` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `prefix` varchar(24) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rates` +-- + +CREATE TABLE `tp_rates` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `connect_fee` DECIMAL(5,4) NOT NULL, + `rate` DECIMAL(5,4) NOT NULL, + `rate_increments` INT(11) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rate_timings` +-- + +CREATE TABLE `tp_rate_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `rates_tag` varchar(24) NOT NULL, + `timings_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rate_profiles` +-- + +CREATE TABLE `tp_rate_profiles` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tenant` varchar(64) NOT NULL, + `tor` varchar(16) NOT NULL, + `direction` varchar(8) NOT NULL, + `subject` varchar(64) NOT NULL, + `rates_fallback_subject` varchar(64), + `rates_timing_tag` varchar(24) NOT NULL, + `activation_time` char(3) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_actions` +-- + +CREATE TABLE `tp_actions` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `action` varchar(24) NOT NULL, + `balances_tag` varchar(24) NOT NULL, + `direction` varchar(8) NOT NULL, + `units` int(11) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `rate_type` varchar(8) NOT NULL, + `rate` DECIMAL(5,4) NOT NULL, + `minutes_weight` smallint(5) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_action_timings` +-- + +CREATE TABLE `tp_action_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `actions_tag` varchar(24) NOT NULL, + `timings_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_action_triggers` +-- + +CREATE TABLE `tp_action_triggers` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `balances_tag` varchar(24) NOT NULL, + `direction` varchar(8) NOT NULL, + `threshold` int(11) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `actions_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_account_actions` +-- + +CREATE TABLE `tp_account_actions` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tenant` varchar(64) NOT NULL, + `account` varchar(64) NOT NULL, + `direction` varchar(8) NOT NULL, + `action_timings_tag` varchar(24), + `action_triggers_tag` varchar(24), + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); From bd74e573b9d6d06e66b863cfea7643d13b5500f4 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 29 May 2013 19:16:39 +0300 Subject: [PATCH 09/10] added cost to rated cdrs --- cdrs/fscdr.go | 15 --------------- rater/cdr.go | 2 -- rater/storage_mysql.go | 9 ++------- rater/storage_postgres.go | 8 ++------ 4 files changed, 4 insertions(+), 30 deletions(-) diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index 370be6f9f..bc99aabc4 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -20,7 +20,6 @@ package cdrs import ( "encoding/json" - "errors" "github.com/cgrates/cgrates/rater" "github.com/cgrates/cgrates/utils" "strconv" @@ -45,7 +44,6 @@ const ( END_TIME = "end_stamp" USERNAME = "user_name" FS_IP = "sip_local_network_addr" - RATE = "rate" ) type FSCdr map[string]string @@ -123,21 +121,8 @@ func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) { t = time.Unix(0, st*1000) return } - func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) { st, err := strconv.ParseInt(fsCdr[END_TIME], 0, 64) t = time.Unix(0, st*1000) return } - -func (fsCdr FSCdr) GetRate() (float64, error) { - rate, ok := fsCdr[RATE] - if !ok { - return -1, errors.New("Not found") - } - return strconv.ParseFloat(rate, 64) -} - -func (fsCdr FSCdr) SetRate(rate float64) { - fsCdr[RATE] = strconv.FormatFloat(rate, 'f', -1, 64) -} diff --git a/rater/cdr.go b/rater/cdr.go index a355bf803..b24917e0f 100644 --- a/rater/cdr.go +++ b/rater/cdr.go @@ -41,6 +41,4 @@ type CDR interface { GetEndTime() (time.Time, error) GetFallbackSubj() string GetExtraParameters() string - GetRate() (float64, error) - SetRate(float64) } diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index c198522a9..87eeb02c5 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -159,20 +159,15 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { return } -func (mys *MySQLStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { - rate, err := cdr.GetRate() - if err != nil { - return err - } +func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), - rate, + cc.Cost, "cgrcostid", "cdrsrc", )) if err != nil { Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) } - return } diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index fc908a0af..78e11607e 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -159,14 +159,10 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { return } -func (psl *PostgresStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { - rate, err := cdr.GetRate() - if err != nil { - return err - } +func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), - rate, + cc.Cost, "cgrcostid", "cdrsrc", )) From 4e7dc77db9332de1c8d8764ca9d546e37f4cb3b7 Mon Sep 17 00:00:00 2001 From: Radu Ioan Fericean Date: Wed, 29 May 2013 19:21:24 +0300 Subject: [PATCH 10/10] get cost from call cost --- rater/storage_mysql.go | 9 ++------- rater/storage_postgres.go | 5 ++--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index 303a8d1ff..2cf62437e 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -155,15 +155,10 @@ func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { return } -func (mys *MySQLStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { - rate, err := cdr.GetRate() - if err != nil { - fmt.Println("Could not find rate in cdr") - return err - } +func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), - rate, + cc.Cost, "cgrcostid", "cdrsrc", )) diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index d4b303706..57b6d6028 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -155,14 +155,13 @@ func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { return } -func (psl *PostgresStorage) SetRatedCdr(cdr CDR, callcost *CallCost) (err error) { - rate, err := cdr.GetRate() +func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { if err != nil { return err } _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", cdr.GetCgrId(), - rate, + cc.Cost, "cgrcostid", "cdrsrc", ))