Merge branch 'master' into marsh

This commit is contained in:
Radu Ioan Fericean
2013-05-30 10:20:03 +03:00
19 changed files with 494 additions and 256 deletions

View File

@@ -24,7 +24,6 @@ import (
"github.com/cgrates/cgrates/mediator"
"github.com/cgrates/cgrates/rater"
"io/ioutil"
"log"
"net/http"
)
@@ -37,9 +36,15 @@ var (
func cdrHandler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
if fsCdr, err := new(FSCdr).New(body); err == nil {
log.Printf("CDR: %v", fsCdr)
//storage.SetCdr(fsCdr)
//medi.MediateCdrFromDB(fsCdr.GetAccount(), storage)
storage.SetCdr(fsCdr)
if cfg.CDRSMediator == "internal" {
errMedi := medi.MediateCdrFromDB(fsCdr, storage)
if errMedi != nil {
rater.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error()))
}
} else {
//TODO: use the connection to mediator
}
} else {
rater.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err))
}

View File

@@ -19,7 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrs
import (
"crypto/sha1"
"encoding/json"
"fmt"
"github.com/cgrates/cgrates/rater"
"github.com/cgrates/cgrates/utils"
"strconv"
@@ -39,9 +41,10 @@ const (
UUID = "uuid" // -Unique ID for this call leg
CSTMID = "cgr_cstmid"
CALL_DEST_NR = "dialed_extension"
PARK_TIME = "start_stamp"
START_TIME = "answer_stamp"
END_TIME = "end_stamp"
PARK_TIME = "start_epoch"
START_TIME = "answer_epoch"
END_TIME = "end_epoch"
DURATION = "billsec"
USERNAME = "user_name"
FS_IP = "sip_local_network_addr"
)
@@ -65,10 +68,21 @@ func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) {
return nil, err
}
func (fsCdr FSCdr) GetCgrId() string {
hasher := sha1.New()
hasher.Write([]byte(fsCdr[FS_IP]))
hasher.Write([]byte(fsCdr[UUID]))
return fmt.Sprintf("%x", hasher.Sum(nil))
}
func (fsCdr FSCdr) GetAccId() string {
return fsCdr[UUID]
}
func (fsCdr FSCdr) GetCdrHost() string {
return fsCdr[FS_IP]
}
func (fsCdr FSCdr) GetDirection() string {
//TODO: implement direction
//TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account
return "OUT"
//return fsCdr[DIRECTION]
}
func (fsCdr FSCdr) GetOrigId() string {
return fsCdr[ORIG_ID]
@@ -102,19 +116,24 @@ func (fsCdr FSCdr) GetReqType() string {
return utils.FirstNonEmpty(fsCdr[REQTYPE], cfg.DefaultReqType)
}
func (fsCdr FSCdr) GetExtraParameters() string {
return ""
return "" // ToDo: Add and extract from config
}
func (fsCdr FSCdr) GetFallbackSubj() string {
return cfg.DefaultSubject
}
func (fsCdr FSCdr) GetStartTime(field string) (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[field], 0, 64)
func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[START_TIME], 0, 64)
t = time.Unix(0, st*1000)
return
}
func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) {
st, err := strconv.ParseInt(fsCdr[END_TIME], 0, 64)
t = time.Unix(0, st*1000)
return
}
// Extracts duration as considered by the telecom switch
func (fsCdr FSCdr) GetDuration() int64 {
dur, _ := strconv.ParseInt(fsCdr[DURATION], 0, 64)
return dur
}

View File

@@ -45,6 +45,7 @@ const (
JSON = "json"
GOB = "gob"
POSTGRES = "postgres"
MYSQL = "mysql"
MONGO = "mongo"
REDIS = "redis"
SAME = "same"
@@ -121,14 +122,6 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) {
}
connector = &rater.RPCClientConnector{Client: client}
}
if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir))
exitChan <- true
}
if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir))
exitChan <- true
}
var err error
medi, err = mediator.NewMediator(connector, loggerDb, cfg.MediatorCDROutDir, cfg.MediatorPseudoprepaid,
cfg.FreeswitchDirectionIdx, cfg.FreeswitchTORIdx, cfg.FreeswitchTenantIdx, cfg.FreeswitchSubjectIdx, cfg.FreeswitchAccountIdx,
@@ -138,7 +131,19 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) {
exitChan <- true
}
medi.TrackCDRFiles(cfg.MediatorCDRInDir)
if cfg.MediatorEnabled { //Mediator as standalone service
if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir))
exitChan <- true
}
if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil {
rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir))
exitChan <- true
}
medi.TrackCDRFiles(cfg.MediatorCDRInDir)
}
}
func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) {
@@ -188,6 +193,25 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage)
exitChan <- true
}
func startCDRS(responder *rater.Responder, loggerDb rater.DataStorage) {
if !cfg.MediatorEnabled {
go startMediator(responder, loggerDb) // Will start it internally, important to connect the responder
}
for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable
time.Sleep(time.Duration(i/2) * time.Second)
if medi!=nil { // Got our mediator, no need to wait any longer
break
}
}
if medi == nil {
rater.Logger.Crit("<CDRS> Could not connect to mediator, exiting.")
exitChan <- true
}
cs := cdrs.New(loggerDb, medi, cfg)
cs.StartCapturingCDRs()
exitChan <- true
}
func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED {
rater.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!")
@@ -218,6 +242,8 @@ func configureDatabase(db_type, host, port, name, user, pass string) (getter rat
getter, err = rater.NewMongoStorage(host, port, name, user, pass)
case POSTGRES:
getter, err = rater.NewPostgresStorage(host, port, name, user, pass)
case MYSQL:
getter, err = rater.NewMySQLStorage(host, port, name, user, pass)
default:
err = errors.New("unknown db")
return nil, err
@@ -317,11 +343,9 @@ func main() {
rater.Logger.Info("Starting CGRateS Mediator.")
go startMediator(responder, loggerDb)
}
if cfg.CDRSListen!="" {
if cfg.CDRSListen != "" {
rater.Logger.Info("Starting CGRateS CDR Server.")
cs := cdrs.New(loggerDb, medi, cfg)
go cs.StartCapturingCDRs()
go startCDRS(responder, loggerDb)
}
<-exitChan
rater.Logger.Info("Stopped all components. CGRateS shutdown!")

View File

@@ -0,0 +1,21 @@
--
-- Table structure for table `callcosts`
--
CREATE TABLE `callcosts` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uuid` varchar(80),
`source` varchar(32) NOT NULL,
`direction` varchar(32) NOT NULL,
`tenant` varchar(64) NOT NULL,
`tor` varchar(8) NOT NULL,
`account` varchar(64) NOT NULL,
`subject` varchar(64) NOT NULL,
`destination` varchar(64) NOT NULL,
`cost` double(20,4) default NULL,
`connect_fee` double(20,4) default NULL,
`timespans` text,
PRIMARY KEY (`id`),
UNIQUE KEY `cgrid` (`uuid`)
);

View File

@@ -9,7 +9,7 @@ CREATE TABLE `cdrs_primary` (
`reqtype` varchar(24) NOT NULL,
`direction` enum('0','1','2') NOT NULL DEFAULT '1',
`tenant` varchar(64) NOT NULL,
`tor` varchar(8) NOT NULL,
`tor` varchar(16) NOT NULL,
`account` varchar(64) NOT NULL,
`subject` varchar(64) NOT NULL,
`destination` varchar(64) NOT NULL,

View File

@@ -0,0 +1,148 @@
--
-- Table structure for table `tp_timings`
--
CREATE TABLE `tp_timings` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`years` varchar(255) NOT NULL,
`months` varchar(255) NOT NULL,
`month_days` varchar(255) NOT NULL,
`week_days` varchar(255) NOT NULL,
`time` varchar(16) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_destinations`
--
CREATE TABLE `tp_destinatins` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`prefix` varchar(24) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_rates`
--
CREATE TABLE `tp_rates` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`destinations_tag` varchar(24) NOT NULL,
`connect_fee` DECIMAL(5,4) NOT NULL,
`rate` DECIMAL(5,4) NOT NULL,
`rate_increments` INT(11) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_rate_timings`
--
CREATE TABLE `tp_rate_timings` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`rates_tag` varchar(24) NOT NULL,
`timings_tag` varchar(24) NOT NULL,
`weight` smallint(5) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_rate_profiles`
--
CREATE TABLE `tp_rate_profiles` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tenant` varchar(64) NOT NULL,
`tor` varchar(16) NOT NULL,
`direction` varchar(8) NOT NULL,
`subject` varchar(64) NOT NULL,
`rates_fallback_subject` varchar(64),
`rates_timing_tag` varchar(24) NOT NULL,
`activation_time` char(3) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_actions`
--
CREATE TABLE `tp_actions` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`action` varchar(24) NOT NULL,
`balances_tag` varchar(24) NOT NULL,
`direction` varchar(8) NOT NULL,
`units` int(11) NOT NULL,
`destinations_tag` varchar(24) NOT NULL,
`rate_type` varchar(8) NOT NULL,
`rate` DECIMAL(5,4) NOT NULL,
`minutes_weight` smallint(5) NOT NULL,
`weight` smallint(5) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_action_timings`
--
CREATE TABLE `tp_action_timings` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`actions_tag` varchar(24) NOT NULL,
`timings_tag` varchar(24) NOT NULL,
`weight` smallint(5) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_action_triggers`
--
CREATE TABLE `tp_action_triggers` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tag` varchar(24) NOT NULL,
`balances_tag` varchar(24) NOT NULL,
`direction` varchar(8) NOT NULL,
`threshold` int(11) NOT NULL,
`destinations_tag` varchar(24) NOT NULL,
`actions_tag` varchar(24) NOT NULL,
`weight` smallint(5) NOT NULL,
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);
--
-- Table structure for table `tp_account_actions`
--
CREATE TABLE `tp_account_actions` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`tpid` char(40) NOT NULL,
`tenant` varchar(64) NOT NULL,
`account` varchar(64) NOT NULL,
`direction` varchar(8) NOT NULL,
`action_timings_tag` varchar(24),
`action_triggers_tag` varchar(24),
PRIMARY KEY (`id`),
KEY `tpid` (`tpid`)
);

View File

@@ -0,0 +1,5 @@
--
-- Sample user creation. Replace here with your own details
--
GRANT ALL on cgrates.* TO 'cgrates'@'localhost' IDENTIFIED BY 'CGRateS.org';

View File

@@ -0,0 +1,94 @@
CREATE TABLE ratingprofile IF NOT EXISTS (
id SERIAL PRIMARY KEY,
fallbackkey VARCHAR(512),
);
CREATE TABLE ratingdestinations IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE
);
CREATE TABLE destination IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
name VARCHAR(512),
prefixes TEXT
);
CREATE TABLE activationprofile IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
activationtime TIMESTAMP
);
CREATE TABLE interval IF NOT EXISTS(
id SERIAL PRIMARY KEY,
activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE,
years TEXT,
months TEXT,
monthdays TEXT,
weekdays TEXT,
starttime TIMESTAMP,
endtime TIMESTAMP,
weight FLOAT8,
connectfee FLOAT8,
price FLOAT8,
pricedunits FLOAT8,
rateincrements FLOAT8
);
CREATE TABLE minutebucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
seconds FLOAT8,
weight FLOAT8,
price FLOAT8,
percent FLOAT8
);
CREATE TABLE unitcounter IF NOT EXISTS(
id SERIAL PRIMARY KEY,
direction TEXT,
balance TEXT,
units FLOAT8
);
CREATE TABLE unitcounterbucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
);
CREATE TABLE actiontrigger IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
balance TEXT,
direction TEXT,
thresholdvalue FLOAT8,
weight FLOAT8,
executed BOOL
);
CREATE TABLE balance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
name TEXT;
value FLOAT8
);
CREATE TABLE userbalance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE,
balances INTEGER REFERENCES balance(id) ON DELETE CASCADE,
type TEXT
);
CREATE TABLE actiontiming IF NOT EXISTS(
id SERIAL PRIMARY KEY,
tag TEXT,
userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE,
timing INTEGER REFERENCES interval(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
weight FLOAT8
);
CREATE TABLE action IF NOT EXISTS(
id SERIAL PRIMARY KEY,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE,
actiontype TEXT,
balance TEXT,
direction TEXT,
units FLOAT8,
weight FLOAT8
);

View File

@@ -223,6 +223,30 @@ func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.Cal
return
}
func (m *Mediator) MediateCdrFromDB(cdrID string, db rater.DataStorage) error {
return nil
/* Calculates price for the specified cdr and writes the new cdr with price to
the storage. If the cdr is nil then it will fetch it from the storage. */
func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error {
cc := &rater.CallCost{}
startTime, err := cdr.GetStartTime()
if err != nil {
return err
}
endTime, err := cdr.GetEndTime()
if err != nil {
return err
}
cd := rater.CallDescriptor{
Direction: cdr.GetDirection(),
Tenant: cdr.GetTenant(),
TOR: cdr.GetTOR(),
Subject: cdr.GetSubject(),
Account: cdr.GetAccount(),
Destination: cdr.GetDestination(),
TimeStart: startTime,
TimeEnd: endTime}
if err := m.connector.GetCost(cd, cc); err != nil {
fmt.Println("Got error in the mediator getCost", err.Error())
return err
}
return db.SetRatedCdr(cdr, cc)
}

View File

@@ -33,6 +33,7 @@ func init() {
Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS")
if err != nil {
Logger = new(utils.StdLogger)
Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err))
}
}

View File

@@ -24,6 +24,9 @@ import (
type CDR interface {
New([]byte) (CDR, error)
GetCgrId() string
GetAccId() string
GetCdrHost() string
GetDirection() string
GetOrigId() string
GetSubject() string
@@ -34,8 +37,9 @@ type CDR interface {
GetUUID() string
GetTenant() string
GetReqType() string
GetStartTime(string) (time.Time, error)
GetStartTime() (time.Time, error)
GetEndTime() (time.Time, error)
GetDuration() int64
GetFallbackSubj() string
GetExtraParameters() string
}

View File

@@ -217,9 +217,10 @@ func (rs *GosexyStorage) LogError(uuid, source, errstr string) (err error) {
return
}
func (rs *GosexyStorage) GetCdr(string) (CDR, error) {
return nil, nil
}
func (rs *GosexyStorage) SetCdr(CDR) error {
return nil
}
func (rs *GosexyStorage) SetRatedCdr(CDR, *CallCost) error {
return nil
}

View File

@@ -38,10 +38,12 @@ const (
LOG_ACTION_TIMMING_PREFIX = "ltm_"
LOG_ACTION_TRIGGER_PREFIX = "ltr_"
LOG_ERR = "ler_"
LOG_CDR = "cdr_"
LOG_MEDIATED_CDR = "mcd_"
// sources
SESSION_MANAGER_SOURCE = "SMR"
MEDIATOR_SOURCE = "MED"
SCHED_SOURCE = "MED"
SCHED_SOURCE = "SCH"
RATER_SOURCE = "RAT"
)
@@ -62,8 +64,8 @@ type DataStorage interface {
GetActionTimings(string) ([]*ActionTiming, error)
SetActionTimings(string, []*ActionTiming) error
GetAllActionTimings() (map[string][]*ActionTiming, error)
GetCdr(string) (CDR, error)
SetCdr(CDR) error
SetRatedCdr(CDR, *CallCost) error
//GetAllActionTimingsLogs() (map[string][]*ActionTiming, error)
LogCallCost(uuid, source string, cc *CallCost) error
LogError(uuid, source, errstr string) error

View File

@@ -183,9 +183,10 @@ func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) {
return nil
}
func (ms *MapStorage) GetCdr(string) (CDR, error) {
return nil, nil
}
func (ms *MapStorage) SetCdr(CDR) error {
return nil
}
func (ms *MapStorage) SetRatedCdr(CDR, *CallCost) error {
return nil
}

View File

@@ -208,9 +208,10 @@ func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) {
return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source})
}
func (ms *MongoStorage) GetCdr(string) (CDR, error) {
return nil, nil
}
func (ms *MongoStorage) SetCdr(CDR) error {
return nil
}
func (ms *MongoStorage) SetRatedCdr(CDR, *CallCost) error {
return nil
}

View File

@@ -29,107 +29,8 @@ type MySQLStorage struct {
Db *sql.DB
}
var (
mysql_schema = `
CREATE TABLE ratingprofile IF NOT EXISTS (
id SERIAL PRIMARY KEY,
fallbackkey VARCHAR(512),
);
CREATE TABLE ratingdestinations IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE
);
CREATE TABLE destination IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
name VARCHAR(512),
prefixes TEXT
);
CREATE TABLE activationprofile IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
activationtime TIMESTAMP
);
CREATE TABLE interval IF NOT EXISTS(
id SERIAL PRIMARY KEY,
activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE,
years TEXT,
months TEXT,
monthdays TEXT,
weekdays TEXT,
starttime TIMESTAMP,
endtime TIMESTAMP,
weight FLOAT8,
connectfee FLOAT8,
price FLOAT8,
pricedunits FLOAT8,
rateincrements FLOAT8
);
CREATE TABLE minutebucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
seconds FLOAT8,
weight FLOAT8,
price FLOAT8,
percent FLOAT8
);
CREATE TABLE unitcounter IF NOT EXISTS(
id SERIAL PRIMARY KEY,
direction TEXT,
balance TEXT,
units FLOAT8
);
CREATE TABLE unitcounterbucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
);
CREATE TABLE actiontrigger IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
balance TEXT,
direction TEXT,
thresholdvalue FLOAT8,
weight FLOAT8,
executed BOOL
);
CREATE TABLE balance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
name TEXT;
value FLOAT8
);
CREATE TABLE userbalance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE,
balances INTEGER REFERENCES balance(id) ON DELETE CASCADE,
type TEXT
);
CREATE TABLE actiontiming IF NOT EXISTS(
id SERIAL PRIMARY KEY,
tag TEXT,
userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE,
timing INTEGER REFERENCES interval(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
weight FLOAT8
);
CREATE TABLE action IF NOT EXISTS(
id SERIAL PRIMARY KEY,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE,
actiontype TEXT,
balance TEXT,
direction TEXT,
units FLOAT8,
weight FLOAT8
);
`
)
func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) {
db, err := sql.Open("mysql", "cgrates:testus@tcp(192.168.0.17:3306)/cgrates?charset=utf8")
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", user, password, host, port, name))
if err != nil {
return nil, err
}
@@ -186,7 +87,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err
if err != nil {
Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err))
}
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')",
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO callcosts VALUES ('NULL','%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')",
uuid,
source,
cc.Direction,
@@ -205,7 +106,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err
}
func (mys *MySQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) {
row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source))
row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM callcosts WHERE uuid='%s' AND source='%s'", uuid, source))
var uuid_found string
var timespansJson string
err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, &timespansJson)
@@ -221,9 +122,49 @@ func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as []*
}
func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return }
func (mys *MySQLStorage) GetCdr(string) (CDR, error) {
return nil, nil
func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) {
startTime, err := cdr.GetStartTime()
if err != nil {
return err
}
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES (NULL, '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d)",
cdr.GetCgrId(),
cdr.GetAccId(),
cdr.GetCdrHost(),
cdr.GetReqType(),
cdr.GetDirection(),
cdr.GetTenant(),
cdr.GetTOR(),
cdr.GetAccount(),
cdr.GetSubject(),
cdr.GetDestination(),
startTime,
cdr.GetDuration(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')",
cdr.GetCgrId(),
cdr.GetExtraParameters(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
return
}
func (mys *MySQLStorage) SetCdr(CDR) error {
return nil
func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) {
_, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')",
cdr.GetCgrId(),
cc.Cost,
"cgrcostid",
"cdrsrc",
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
return
}

View File

@@ -29,105 +29,6 @@ type PostgresStorage struct {
Db *sql.DB
}
var (
postgres_schema = `
CREATE TABLE ratingprofile IF NOT EXISTS (
id SERIAL PRIMARY KEY,
fallbackkey VARCHAR(512),
);
CREATE TABLE ratingdestinations IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE
);
CREATE TABLE destination IF NOT EXISTS (
id SERIAL PRIMARY KEY,
ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE,
name VARCHAR(512),
prefixes TEXT
);
CREATE TABLE activationprofile IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
activationtime TIMESTAMP
);
CREATE TABLE interval IF NOT EXISTS(
id SERIAL PRIMARY KEY,
activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE,
years TEXT,
months TEXT,
monthdays TEXT,
weekdays TEXT,
starttime TIMESTAMP,
endtime TIMESTAMP,
weight FLOAT8,
connectfee FLOAT8,
price FLOAT8,
pricedunits FLOAT8,
rateincrements FLOAT8
);
CREATE TABLE minutebucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
seconds FLOAT8,
weight FLOAT8,
price FLOAT8,
percent FLOAT8
);
CREATE TABLE unitcounter IF NOT EXISTS(
id SERIAL PRIMARY KEY,
direction TEXT,
balance TEXT,
units FLOAT8
);
CREATE TABLE unitcounterbucket IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
);
CREATE TABLE actiontrigger IF NOT EXISTS(
id SERIAL PRIMARY KEY,
destination INTEGER REFERENCES destination(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
balance TEXT,
direction TEXT,
thresholdvalue FLOAT8,
weight FLOAT8,
executed BOOL
);
CREATE TABLE balance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
name TEXT;
value FLOAT8
);
CREATE TABLE userbalance IF NOT EXISTS(
id SERIAL PRIMARY KEY,
unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE
actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE,
balances INTEGER REFERENCES balance(id) ON DELETE CASCADE,
type TEXT
);
CREATE TABLE actiontiming IF NOT EXISTS(
id SERIAL PRIMARY KEY,
tag TEXT,
userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE,
timing INTEGER REFERENCES interval(id) ON DELETE CASCADE,
actions INTEGER REFERENCES action(id) ON DELETE CASCADE,
weight FLOAT8
);
CREATE TABLE action IF NOT EXISTS(
id SERIAL PRIMARY KEY,
minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE,
actiontype TEXT,
balance TEXT,
direction TEXT,
units FLOAT8,
weight FLOAT8
);
`
)
func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) {
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, name, user, password))
if err != nil {
@@ -221,9 +122,52 @@ func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as
}
func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return }
func (psl *PostgresStorage) GetCdr(string) (CDR, error) {
return nil, nil
func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) {
startTime, err := cdr.GetStartTime()
if err != nil {
return err
}
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')",
cdr.GetCgrId(),
cdr.GetAccId(),
cdr.GetCdrHost(),
cdr.GetReqType(),
cdr.GetDirection(),
cdr.GetTenant(),
cdr.GetTOR(),
cdr.GetAccount(),
cdr.GetSubject(),
cdr.GetDestination(),
startTime,
cdr.GetDuration(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')",
cdr.GetCgrId(),
cdr.GetExtraParameters(),
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
return
}
func (psl *PostgresStorage) SetCdr(CDR) error {
return nil
func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) {
if err != nil {
return err
}
_, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')",
cdr.GetCgrId(),
cc.Cost,
"cgrcostid",
"cdrsrc",
))
if err != nil {
Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err))
}
return
}

View File

@@ -215,9 +215,10 @@ func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) {
return
}
func (rs *RedigoStorage) GetCdr(string) (CDR, error) {
return nil, nil
}
func (rs *RedigoStorage) SetCdr(CDR) error {
return nil
}
func (rs *RedigoStorage) SetRatedCdr(CDR, *CallCost) error {
return nil
}

View File

@@ -241,9 +241,11 @@ func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) {
}
return
}
func (rs *RedisStorage) GetCdr(string) (CDR, error) {
return nil, nil
}
func (rs *RedisStorage) SetCdr(CDR) error {
return nil
}
func (rs *RedisStorage) SetRatedCdr(CDR, *CallCost) error {
return nil
}