diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index 0e83a4b8f..38b8d19dc 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -24,7 +24,6 @@ import ( "github.com/cgrates/cgrates/mediator" "github.com/cgrates/cgrates/rater" "io/ioutil" - "log" "net/http" ) @@ -37,9 +36,15 @@ var ( func cdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) if fsCdr, err := new(FSCdr).New(body); err == nil { - log.Printf("CDR: %v", fsCdr) - //storage.SetCdr(fsCdr) - //medi.MediateCdrFromDB(fsCdr.GetAccount(), storage) + storage.SetCdr(fsCdr) + if cfg.CDRSMediator == "internal" { + errMedi := medi.MediateCdrFromDB(fsCdr, storage) + if errMedi != nil { + rater.Logger.Err(fmt.Sprintf("Could not run mediation on CDR: %s", errMedi.Error())) + } + } else { + //TODO: use the connection to mediator + } } else { rater.Logger.Err(fmt.Sprintf("Could not create CDR entry: %v", err)) } diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go index df01dfac2..1a5808e34 100644 --- a/cdrs/fscdr.go +++ b/cdrs/fscdr.go @@ -19,7 +19,9 @@ along with this program. If not, see package cdrs import ( + "crypto/sha1" "encoding/json" + "fmt" "github.com/cgrates/cgrates/rater" "github.com/cgrates/cgrates/utils" "strconv" @@ -39,9 +41,10 @@ const ( UUID = "uuid" // -Unique ID for this call leg CSTMID = "cgr_cstmid" CALL_DEST_NR = "dialed_extension" - PARK_TIME = "start_stamp" - START_TIME = "answer_stamp" - END_TIME = "end_stamp" + PARK_TIME = "start_epoch" + START_TIME = "answer_epoch" + END_TIME = "end_epoch" + DURATION = "billsec" USERNAME = "user_name" FS_IP = "sip_local_network_addr" ) @@ -65,10 +68,21 @@ func (fsCdr FSCdr) New(body []byte) (rater.CDR, error) { return nil, err } +func (fsCdr FSCdr) GetCgrId() string { + hasher := sha1.New() + hasher.Write([]byte(fsCdr[FS_IP])) + hasher.Write([]byte(fsCdr[UUID])) + return fmt.Sprintf("%x", hasher.Sum(nil)) +} +func (fsCdr FSCdr) GetAccId() string { + return fsCdr[UUID] +} +func (fsCdr FSCdr) GetCdrHost() string { + return fsCdr[FS_IP] +} func (fsCdr FSCdr) GetDirection() string { - //TODO: implement direction + //TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account return "OUT" - //return fsCdr[DIRECTION] } func (fsCdr FSCdr) GetOrigId() string { return fsCdr[ORIG_ID] @@ -102,19 +116,24 @@ func (fsCdr FSCdr) GetReqType() string { return utils.FirstNonEmpty(fsCdr[REQTYPE], cfg.DefaultReqType) } func (fsCdr FSCdr) GetExtraParameters() string { - return "" + return "" // ToDo: Add and extract from config } func (fsCdr FSCdr) GetFallbackSubj() string { return cfg.DefaultSubject } -func (fsCdr FSCdr) GetStartTime(field string) (t time.Time, err error) { - st, err := strconv.ParseInt(fsCdr[field], 0, 64) +func (fsCdr FSCdr) GetStartTime() (t time.Time, err error) { + st, err := strconv.ParseInt(fsCdr[START_TIME], 0, 64) t = time.Unix(0, st*1000) return } - func (fsCdr FSCdr) GetEndTime() (t time.Time, err error) { st, err := strconv.ParseInt(fsCdr[END_TIME], 0, 64) t = time.Unix(0, st*1000) return } + +// Extracts duration as considered by the telecom switch +func (fsCdr FSCdr) GetDuration() int64 { + dur, _ := strconv.ParseInt(fsCdr[DURATION], 0, 64) + return dur +} diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index fe1b847ce..601d4dbec 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -45,6 +45,7 @@ const ( JSON = "json" GOB = "gob" POSTGRES = "postgres" + MYSQL = "mysql" MONGO = "mongo" REDIS = "redis" SAME = "same" @@ -121,14 +122,6 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) { } connector = &rater.RPCClientConnector{Client: client} } - if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir)) - exitChan <- true - } - if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil { - rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir)) - exitChan <- true - } var err error medi, err = mediator.NewMediator(connector, loggerDb, cfg.MediatorCDROutDir, cfg.MediatorPseudoprepaid, cfg.FreeswitchDirectionIdx, cfg.FreeswitchTORIdx, cfg.FreeswitchTenantIdx, cfg.FreeswitchSubjectIdx, cfg.FreeswitchAccountIdx, @@ -138,7 +131,19 @@ func startMediator(responder *rater.Responder, loggerDb rater.DataStorage) { exitChan <- true } - medi.TrackCDRFiles(cfg.MediatorCDRInDir) + if cfg.MediatorEnabled { //Mediator as standalone service + if _, err := os.Stat(cfg.MediatorCDRInDir); err != nil { + rater.Logger.Crit(fmt.Sprintf("The input path for mediator does not exist: %v", cfg.MediatorCDRInDir)) + exitChan <- true + } + if _, err := os.Stat(cfg.MediatorCDROutDir); err != nil { + rater.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", cfg.MediatorCDROutDir)) + exitChan <- true + } + medi.TrackCDRFiles(cfg.MediatorCDRInDir) + } + + } func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) { @@ -188,6 +193,25 @@ func startSessionManager(responder *rater.Responder, loggerDb rater.DataStorage) exitChan <- true } +func startCDRS(responder *rater.Responder, loggerDb rater.DataStorage) { + if !cfg.MediatorEnabled { + go startMediator(responder, loggerDb) // Will start it internally, important to connect the responder + } + for i := 0; i < 3; i++ { // ToDo: If the right approach, make the reconnects configurable + time.Sleep(time.Duration(i/2) * time.Second) + if medi!=nil { // Got our mediator, no need to wait any longer + break + } + } + if medi == nil { + rater.Logger.Crit(" Could not connect to mediator, exiting.") + exitChan <- true + } + cs := cdrs.New(loggerDb, medi, cfg) + cs.StartCapturingCDRs() + exitChan <- true +} + func checkConfigSanity() error { if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != DISABLED { rater.Logger.Crit("The session manager must not be enabled on a worker rater (change [rater]/balancer to disabled)!") @@ -218,6 +242,8 @@ func configureDatabase(db_type, host, port, name, user, pass string) (getter rat getter, err = rater.NewMongoStorage(host, port, name, user, pass) case POSTGRES: getter, err = rater.NewPostgresStorage(host, port, name, user, pass) + case MYSQL: + getter, err = rater.NewMySQLStorage(host, port, name, user, pass) default: err = errors.New("unknown db") return nil, err @@ -317,11 +343,9 @@ func main() { rater.Logger.Info("Starting CGRateS Mediator.") go startMediator(responder, loggerDb) } - - if cfg.CDRSListen!="" { + if cfg.CDRSListen != "" { rater.Logger.Info("Starting CGRateS CDR Server.") - cs := cdrs.New(loggerDb, medi, cfg) - go cs.StartCapturingCDRs() + go startCDRS(responder, loggerDb) } <-exitChan rater.Logger.Info("Stopped all components. CGRateS shutdown!") diff --git a/data/storage/mysql/create_callcost_tables.sql b/data/storage/mysql/create_callcost_tables.sql new file mode 100644 index 000000000..ff4aeea7d --- /dev/null +++ b/data/storage/mysql/create_callcost_tables.sql @@ -0,0 +1,21 @@ + +-- +-- Table structure for table `callcosts` +-- +CREATE TABLE `callcosts` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `uuid` varchar(80), + `source` varchar(32) NOT NULL, + `direction` varchar(32) NOT NULL, + `tenant` varchar(64) NOT NULL, + `tor` varchar(8) NOT NULL, + `account` varchar(64) NOT NULL, + `subject` varchar(64) NOT NULL, + `destination` varchar(64) NOT NULL, + `cost` double(20,4) default NULL, + `connect_fee` double(20,4) default NULL, + `timespans` text, + PRIMARY KEY (`id`), + UNIQUE KEY `cgrid` (`uuid`) +); + diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index 39d8e29d6..2af8370f9 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -9,7 +9,7 @@ CREATE TABLE `cdrs_primary` ( `reqtype` varchar(24) NOT NULL, `direction` enum('0','1','2') NOT NULL DEFAULT '1', `tenant` varchar(64) NOT NULL, - `tor` varchar(8) NOT NULL, + `tor` varchar(16) NOT NULL, `account` varchar(64) NOT NULL, `subject` varchar(64) NOT NULL, `destination` varchar(64) NOT NULL, diff --git a/data/storage/mysql/create_tariffplan_tables.sql b/data/storage/mysql/create_tariffplan_tables.sql new file mode 100644 index 000000000..3b9ddbd9c --- /dev/null +++ b/data/storage/mysql/create_tariffplan_tables.sql @@ -0,0 +1,148 @@ +-- +-- Table structure for table `tp_timings` +-- + +CREATE TABLE `tp_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `years` varchar(255) NOT NULL, + `months` varchar(255) NOT NULL, + `month_days` varchar(255) NOT NULL, + `week_days` varchar(255) NOT NULL, + `time` varchar(16) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_destinations` +-- + +CREATE TABLE `tp_destinatins` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `prefix` varchar(24) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rates` +-- + +CREATE TABLE `tp_rates` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `connect_fee` DECIMAL(5,4) NOT NULL, + `rate` DECIMAL(5,4) NOT NULL, + `rate_increments` INT(11) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rate_timings` +-- + +CREATE TABLE `tp_rate_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `rates_tag` varchar(24) NOT NULL, + `timings_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_rate_profiles` +-- + +CREATE TABLE `tp_rate_profiles` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tenant` varchar(64) NOT NULL, + `tor` varchar(16) NOT NULL, + `direction` varchar(8) NOT NULL, + `subject` varchar(64) NOT NULL, + `rates_fallback_subject` varchar(64), + `rates_timing_tag` varchar(24) NOT NULL, + `activation_time` char(3) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_actions` +-- + +CREATE TABLE `tp_actions` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `action` varchar(24) NOT NULL, + `balances_tag` varchar(24) NOT NULL, + `direction` varchar(8) NOT NULL, + `units` int(11) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `rate_type` varchar(8) NOT NULL, + `rate` DECIMAL(5,4) NOT NULL, + `minutes_weight` smallint(5) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_action_timings` +-- + +CREATE TABLE `tp_action_timings` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `actions_tag` varchar(24) NOT NULL, + `timings_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_action_triggers` +-- + +CREATE TABLE `tp_action_triggers` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tag` varchar(24) NOT NULL, + `balances_tag` varchar(24) NOT NULL, + `direction` varchar(8) NOT NULL, + `threshold` int(11) NOT NULL, + `destinations_tag` varchar(24) NOT NULL, + `actions_tag` varchar(24) NOT NULL, + `weight` smallint(5) NOT NULL, + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); + +-- +-- Table structure for table `tp_account_actions` +-- + +CREATE TABLE `tp_account_actions` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `tpid` char(40) NOT NULL, + `tenant` varchar(64) NOT NULL, + `account` varchar(64) NOT NULL, + `direction` varchar(8) NOT NULL, + `action_timings_tag` varchar(24), + `action_triggers_tag` varchar(24), + PRIMARY KEY (`id`), + KEY `tpid` (`tpid`) +); diff --git a/data/storage/mysql/create_user.sql b/data/storage/mysql/create_user.sql new file mode 100644 index 000000000..149a38db1 --- /dev/null +++ b/data/storage/mysql/create_user.sql @@ -0,0 +1,5 @@ + +-- +-- Sample user creation. Replace here with your own details +-- +GRANT ALL on cgrates.* TO 'cgrates'@'localhost' IDENTIFIED BY 'CGRateS.org'; diff --git a/data/storage/postgres/create_rater_tables.sql b/data/storage/postgres/create_rater_tables.sql new file mode 100644 index 000000000..118e742a8 --- /dev/null +++ b/data/storage/postgres/create_rater_tables.sql @@ -0,0 +1,94 @@ +CREATE TABLE ratingprofile IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + fallbackkey VARCHAR(512), +); +CREATE TABLE ratingdestinations IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE +); +CREATE TABLE destination IF NOT EXISTS ( + id SERIAL PRIMARY KEY, + ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, + name VARCHAR(512), + prefixes TEXT +); +CREATE TABLE activationprofile IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + activationtime TIMESTAMP +); +CREATE TABLE interval IF NOT EXISTS( + id SERIAL PRIMARY KEY, + activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, + years TEXT, + months TEXT, + monthdays TEXT, + weekdays TEXT, + starttime TIMESTAMP, + endtime TIMESTAMP, + weight FLOAT8, + connectfee FLOAT8, + price FLOAT8, + pricedunits FLOAT8, + rateincrements FLOAT8 +); +CREATE TABLE minutebucket IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + seconds FLOAT8, + weight FLOAT8, + price FLOAT8, + percent FLOAT8 +); +CREATE TABLE unitcounter IF NOT EXISTS( + id SERIAL PRIMARY KEY, + direction TEXT, + balance TEXT, + units FLOAT8 +); +CREATE TABLE unitcounterbucket IF NOT EXISTS( + id SERIAL PRIMARY KEY, + unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE +); +CREATE TABLE actiontrigger IF NOT EXISTS( + id SERIAL PRIMARY KEY, + destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, + actions INTEGER REFERENCES action(id) ON DELETE CASCADE, + balance TEXT, + direction TEXT, + thresholdvalue FLOAT8, + weight FLOAT8, + executed BOOL +); +CREATE TABLE balance IF NOT EXISTS( + id SERIAL PRIMARY KEY, + name TEXT; + value FLOAT8 +); +CREATE TABLE userbalance IF NOT EXISTS( + id SERIAL PRIMARY KEY, + unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE + actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, + balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, + type TEXT +); +CREATE TABLE actiontiming IF NOT EXISTS( + id SERIAL PRIMARY KEY, + tag TEXT, + userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, + timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, + actions INTEGER REFERENCES action(id) ON DELETE CASCADE, + weight FLOAT8 +); +CREATE TABLE action IF NOT EXISTS( + id SERIAL PRIMARY KEY, + minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, + actiontype TEXT, + balance TEXT, + direction TEXT, + units FLOAT8, + weight FLOAT8 +); \ No newline at end of file diff --git a/mediator/mediator.go b/mediator/mediator.go index 2b8c806f8..9654881e9 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -223,6 +223,30 @@ func (m *Mediator) getCostsFromRater(record []string, runIdx int) (cc *rater.Cal return } -func (m *Mediator) MediateCdrFromDB(cdrID string, db rater.DataStorage) error { - return nil +/* Calculates price for the specified cdr and writes the new cdr with price to +the storage. If the cdr is nil then it will fetch it from the storage. */ +func (m *Mediator) MediateCdrFromDB(cdr rater.CDR, db rater.DataStorage) error { + cc := &rater.CallCost{} + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + endTime, err := cdr.GetEndTime() + if err != nil { + return err + } + cd := rater.CallDescriptor{ + Direction: cdr.GetDirection(), + Tenant: cdr.GetTenant(), + TOR: cdr.GetTOR(), + Subject: cdr.GetSubject(), + Account: cdr.GetAccount(), + Destination: cdr.GetDestination(), + TimeStart: startTime, + TimeEnd: endTime} + if err := m.connector.GetCost(cd, cc); err != nil { + fmt.Println("Got error in the mediator getCost", err.Error()) + return err + } + return db.SetRatedCdr(cdr, cc) } diff --git a/rater/calldesc.go b/rater/calldesc.go index 62e7d1acd..ca5c2711a 100644 --- a/rater/calldesc.go +++ b/rater/calldesc.go @@ -33,6 +33,7 @@ func init() { Logger, err = syslog.New(syslog.LOG_INFO, "CGRateS") if err != nil { Logger = new(utils.StdLogger) + Logger.Err(fmt.Sprintf("Could not connect to syslog: %v", err)) } } diff --git a/rater/cdr.go b/rater/cdr.go index db2d2b106..a25b66e3c 100644 --- a/rater/cdr.go +++ b/rater/cdr.go @@ -24,6 +24,9 @@ import ( type CDR interface { New([]byte) (CDR, error) + GetCgrId() string + GetAccId() string + GetCdrHost() string GetDirection() string GetOrigId() string GetSubject() string @@ -34,8 +37,9 @@ type CDR interface { GetUUID() string GetTenant() string GetReqType() string - GetStartTime(string) (time.Time, error) + GetStartTime() (time.Time, error) GetEndTime() (time.Time, error) + GetDuration() int64 GetFallbackSubj() string GetExtraParameters() string } diff --git a/rater/storage_gosexy.go b/rater/storage_gosexy.go index 6840131de..e82d4654c 100644 --- a/rater/storage_gosexy.go +++ b/rater/storage_gosexy.go @@ -217,9 +217,10 @@ func (rs *GosexyStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *GosexyStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (rs *GosexyStorage) SetCdr(CDR) error { return nil } + +func (rs *GosexyStorage) SetRatedCdr(CDR, *CallCost) error { + return nil +} diff --git a/rater/storage_interface.go b/rater/storage_interface.go index 359d99832..876b65a2d 100644 --- a/rater/storage_interface.go +++ b/rater/storage_interface.go @@ -38,10 +38,12 @@ const ( LOG_ACTION_TIMMING_PREFIX = "ltm_" LOG_ACTION_TRIGGER_PREFIX = "ltr_" LOG_ERR = "ler_" + LOG_CDR = "cdr_" + LOG_MEDIATED_CDR = "mcd_" // sources SESSION_MANAGER_SOURCE = "SMR" MEDIATOR_SOURCE = "MED" - SCHED_SOURCE = "MED" + SCHED_SOURCE = "SCH" RATER_SOURCE = "RAT" ) @@ -62,8 +64,8 @@ type DataStorage interface { GetActionTimings(string) ([]*ActionTiming, error) SetActionTimings(string, []*ActionTiming) error GetAllActionTimings() (map[string][]*ActionTiming, error) - GetCdr(string) (CDR, error) SetCdr(CDR) error + SetRatedCdr(CDR, *CallCost) error //GetAllActionTimingsLogs() (map[string][]*ActionTiming, error) LogCallCost(uuid, source string, cc *CallCost) error LogError(uuid, source, errstr string) error diff --git a/rater/storage_map.go b/rater/storage_map.go index 4336279c3..92885fdf7 100644 --- a/rater/storage_map.go +++ b/rater/storage_map.go @@ -183,9 +183,10 @@ func (ms *MapStorage) LogError(uuid, source, errstr string) (err error) { return nil } -func (ms *MapStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (ms *MapStorage) SetCdr(CDR) error { return nil } + +func (ms *MapStorage) SetRatedCdr(CDR, *CallCost) error { + return nil +} diff --git a/rater/storage_mongo.go b/rater/storage_mongo.go index 43e7726eb..7652601be 100644 --- a/rater/storage_mongo.go +++ b/rater/storage_mongo.go @@ -208,9 +208,10 @@ func (ms *MongoStorage) LogError(uuid, source, errstr string) (err error) { return ms.db.C("errlog").Insert(&LogErrEntry{uuid, errstr, source}) } -func (ms *MongoStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (ms *MongoStorage) SetCdr(CDR) error { return nil } + +func (ms *MongoStorage) SetRatedCdr(CDR, *CallCost) error { + return nil +} diff --git a/rater/storage_mysql.go b/rater/storage_mysql.go index a30822965..2cf62437e 100644 --- a/rater/storage_mysql.go +++ b/rater/storage_mysql.go @@ -29,107 +29,8 @@ type MySQLStorage struct { Db *sql.DB } -var ( - mysql_schema = ` -CREATE TABLE ratingprofile IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - fallbackkey VARCHAR(512), -); -CREATE TABLE ratingdestinations IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE -); -CREATE TABLE destination IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - name VARCHAR(512), - prefixes TEXT -); -CREATE TABLE activationprofile IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - activationtime TIMESTAMP -); -CREATE TABLE interval IF NOT EXISTS( - id SERIAL PRIMARY KEY, - activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, - years TEXT, - months TEXT, - monthdays TEXT, - weekdays TEXT, - starttime TIMESTAMP, - endtime TIMESTAMP, - weight FLOAT8, - connectfee FLOAT8, - price FLOAT8, - pricedunits FLOAT8, - rateincrements FLOAT8 -); -CREATE TABLE minutebucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - seconds FLOAT8, - weight FLOAT8, - price FLOAT8, - percent FLOAT8 -); -CREATE TABLE unitcounter IF NOT EXISTS( - id SERIAL PRIMARY KEY, - direction TEXT, - balance TEXT, - units FLOAT8 -); -CREATE TABLE unitcounterbucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE -); -CREATE TABLE actiontrigger IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - balance TEXT, - direction TEXT, - thresholdvalue FLOAT8, - weight FLOAT8, - executed BOOL -); -CREATE TABLE balance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - name TEXT; - value FLOAT8 -); -CREATE TABLE userbalance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE - actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, - balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, - type TEXT -); -CREATE TABLE actiontiming IF NOT EXISTS( - id SERIAL PRIMARY KEY, - tag TEXT, - userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, - timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - weight FLOAT8 -); -CREATE TABLE action IF NOT EXISTS( - id SERIAL PRIMARY KEY, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, - actiontype TEXT, - balance TEXT, - direction TEXT, - units FLOAT8, - weight FLOAT8 -); -` -) - func NewMySQLStorage(host, port, name, user, password string) (DataStorage, error) { - db, err := sql.Open("mysql", "cgrates:testus@tcp(192.168.0.17:3306)/cgrates?charset=utf8") + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8", user, password, host, port, name)) if err != nil { return nil, err } @@ -186,7 +87,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err if err != nil { Logger.Err(fmt.Sprintf("Error marshalling timespans to json: %v", err)) } - _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdr VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO callcosts VALUES ('NULL','%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", uuid, source, cc.Direction, @@ -205,7 +106,7 @@ func (mys *MySQLStorage) LogCallCost(uuid, source string, cc *CallCost) (err err } func (mys *MySQLStorage) GetCallCostLog(uuid, source string) (cc *CallCost, err error) { - row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM cdr WHERE uuid='%s' AND source='%s'", uuid, source)) + row := mys.Db.QueryRow(fmt.Sprintf("SELECT * FROM callcosts WHERE uuid='%s' AND source='%s'", uuid, source)) var uuid_found string var timespansJson string err = row.Scan(&uuid_found, &cc.Direction, &cc.Tenant, &cc.TOR, &cc.Subject, &cc.Destination, &cc.Cost, &cc.ConnectFee, ×pansJson) @@ -221,9 +122,49 @@ func (mys *MySQLStorage) LogActionTiming(source string, at *ActionTiming, as []* } func (mys *MySQLStorage) LogError(uuid, source, errstr string) (err error) { return } -func (mys *MySQLStorage) GetCdr(string) (CDR, error) { - return nil, nil +func (mys *MySQLStorage) SetCdr(cdr CDR) (err error) { + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES (NULL, '%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', %d)", + cdr.GetCgrId(), + cdr.GetAccId(), + cdr.GetCdrHost(), + cdr.GetReqType(), + cdr.GetDirection(), + cdr.GetTenant(), + cdr.GetTOR(), + cdr.GetAccount(), + cdr.GetSubject(), + cdr.GetDestination(), + startTime, + cdr.GetDuration(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('NULL','%s', '%s')", + cdr.GetCgrId(), + cdr.GetExtraParameters(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } -func (mys *MySQLStorage) SetCdr(CDR) error { - return nil + +func (mys *MySQLStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { + _, err = mys.Db.Exec(fmt.Sprintf("INSERT INTO rated_cdrs VALUES ('%s', '%s', '%s', '%s')", + cdr.GetCgrId(), + cc.Cost, + "cgrcostid", + "cdrsrc", + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } diff --git a/rater/storage_postgres.go b/rater/storage_postgres.go index 54822171a..57b6d6028 100644 --- a/rater/storage_postgres.go +++ b/rater/storage_postgres.go @@ -29,105 +29,6 @@ type PostgresStorage struct { Db *sql.DB } -var ( - postgres_schema = ` -CREATE TABLE ratingprofile IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - fallbackkey VARCHAR(512), -); -CREATE TABLE ratingdestinations IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE -); -CREATE TABLE destination IF NOT EXISTS ( - id SERIAL PRIMARY KEY, - ratingprofile INTEGER REFERENCES ratingprofile(id) ON DELETE CASCADE, - name VARCHAR(512), - prefixes TEXT -); -CREATE TABLE activationprofile IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - activationtime TIMESTAMP -); -CREATE TABLE interval IF NOT EXISTS( - id SERIAL PRIMARY KEY, - activationprofile INTEGER REFERENCES activationprofile(id) ON DELETE CASCADE, - years TEXT, - months TEXT, - monthdays TEXT, - weekdays TEXT, - starttime TIMESTAMP, - endtime TIMESTAMP, - weight FLOAT8, - connectfee FLOAT8, - price FLOAT8, - pricedunits FLOAT8, - rateincrements FLOAT8 -); -CREATE TABLE minutebucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - seconds FLOAT8, - weight FLOAT8, - price FLOAT8, - percent FLOAT8 -); -CREATE TABLE unitcounter IF NOT EXISTS( - id SERIAL PRIMARY KEY, - direction TEXT, - balance TEXT, - units FLOAT8 -); -CREATE TABLE unitcounterbucket IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE -); -CREATE TABLE actiontrigger IF NOT EXISTS( - id SERIAL PRIMARY KEY, - destination INTEGER REFERENCES destination(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - balance TEXT, - direction TEXT, - thresholdvalue FLOAT8, - weight FLOAT8, - executed BOOL -); -CREATE TABLE balance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - name TEXT; - value FLOAT8 -); -CREATE TABLE userbalance IF NOT EXISTS( - id SERIAL PRIMARY KEY, - unitcounter INTEGER REFERENCES unitcounter(id) ON DELETE CASCADE, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE - actiontriggers INTEGER REFERENCES actiontrigger(id) ON DELETE CASCADE, - balances INTEGER REFERENCES balance(id) ON DELETE CASCADE, - type TEXT -); -CREATE TABLE actiontiming IF NOT EXISTS( - id SERIAL PRIMARY KEY, - tag TEXT, - userbalances INTEGER REFERENCES userbalance(id) ON DELETE CASCADE, - timing INTEGER REFERENCES interval(id) ON DELETE CASCADE, - actions INTEGER REFERENCES action(id) ON DELETE CASCADE, - weight FLOAT8 -); -CREATE TABLE action IF NOT EXISTS( - id SERIAL PRIMARY KEY, - minutebucket INTEGER REFERENCES minutebucket(id) ON DELETE CASCADE, - actiontype TEXT, - balance TEXT, - direction TEXT, - units FLOAT8, - weight FLOAT8 -); -` -) - func NewPostgresStorage(host, port, name, user, password string) (DataStorage, error) { db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=disable", host, port, name, user, password)) if err != nil { @@ -221,9 +122,52 @@ func (psl *PostgresStorage) LogActionTiming(source string, at *ActionTiming, as } func (psl *PostgresStorage) LogError(uuid, source, errstr string) (err error) { return } -func (psl *PostgresStorage) GetCdr(string) (CDR, error) { - return nil, nil +func (psl *PostgresStorage) SetCdr(cdr CDR) (err error) { + startTime, err := cdr.GetStartTime() + if err != nil { + return err + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_primary VALUES ('%s', '%s','%s', '%s', '%s', '%s', '%s', '%s', %v, %v, '%s')", + cdr.GetCgrId(), + cdr.GetAccId(), + cdr.GetCdrHost(), + cdr.GetReqType(), + cdr.GetDirection(), + cdr.GetTenant(), + cdr.GetTOR(), + cdr.GetAccount(), + cdr.GetSubject(), + cdr.GetDestination(), + startTime, + cdr.GetDuration(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s')", + cdr.GetCgrId(), + cdr.GetExtraParameters(), + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } -func (psl *PostgresStorage) SetCdr(CDR) error { - return nil + +func (psl *PostgresStorage) SetRatedCdr(cdr CDR, cc *CallCost) (err error) { + if err != nil { + return err + } + _, err = psl.Db.Exec(fmt.Sprintf("INSERT INTO cdrs_extra VALUES ('%s', '%s', '%s', '%s')", + cdr.GetCgrId(), + cc.Cost, + "cgrcostid", + "cdrsrc", + )) + if err != nil { + Logger.Err(fmt.Sprintf("failed to execute cdr insert statement: %v", err)) + } + + return } diff --git a/rater/storage_redigo.go b/rater/storage_redigo.go index 25051761e..0fb0b2bc6 100644 --- a/rater/storage_redigo.go +++ b/rater/storage_redigo.go @@ -215,9 +215,10 @@ func (rs *RedigoStorage) LogError(uuid, source, errstr string) (err error) { return } -func (rs *RedigoStorage) GetCdr(string) (CDR, error) { - return nil, nil -} func (rs *RedigoStorage) SetCdr(CDR) error { return nil } + +func (rs *RedigoStorage) SetRatedCdr(CDR, *CallCost) error { + return nil +} diff --git a/rater/storage_redis.go b/rater/storage_redis.go index e503c74a4..623c4909a 100644 --- a/rater/storage_redis.go +++ b/rater/storage_redis.go @@ -241,9 +241,11 @@ func (rs *RedisStorage) LogError(uuid, source, errstr string) (err error) { } return } -func (rs *RedisStorage) GetCdr(string) (CDR, error) { - return nil, nil -} + func (rs *RedisStorage) SetCdr(CDR) error { return nil } + +func (rs *RedisStorage) SetRatedCdr(CDR, *CallCost) error { + return nil +}