Implementation of ApierV1.ExportCsvCdrs

This commit is contained in:
DanB
2013-11-10 14:37:20 +01:00
parent 25db02a6fb
commit 84fbf30376
10 changed files with 184 additions and 68 deletions

View File

@@ -24,6 +24,7 @@ import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/scheduler"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/config"
)
const (
@@ -33,7 +34,9 @@ const (
type ApierV1 struct {
StorDb engine.LoadStorage
DataDb engine.DataStorage
CdrDb engine.CdrStorage
Sched *scheduler.Scheduler
Config *config.CGRConfig
}
type AttrDestination struct {

72
apier/v1/cdrs.go Normal file
View File

@@ -0,0 +1,72 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package apier
import (
"fmt"
"os"
"time"
"path"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/cdrexporter"
)
type AttrExpCsvCdrs struct {
TimeStart string // If provided, will represent the starting of the CDRs interval (>=)
TimeEnd string // If provided, will represent the end of the CDRs interval (<)
}
func (self *ApierV1) ExportCsvCdrs(attr *AttrExpCsvCdrs, reply *string) error {
var tStart, tEnd time.Time
var err error
if len(attr.TimeStart) !=0 {
if tStart, err = utils.ParseDate( attr.TimeStart ); err != nil {
return err
}
}
if len(attr.TimeEnd) !=0 {
if tEnd, err = utils.ParseDate( attr.TimeEnd ); err != nil {
return err
}
}
cdrs, err := self.CdrDb.GetRatedCdrs(tStart, tEnd)
if err != nil {
return err
}
fileName := path.Join(self.Config.CDRSExportPath, "cgr", "csv", fmt.Sprintf("cdrs_%d.csv",time.Now().Unix()))
fileOut, err := os.Create(fileName)
if err != nil {
return err
} else {
defer fileOut.Close()
}
csvWriter := cdrexporter.NewCsvCdrWriter(fileOut, self.Config.RoundingDecimals, self.Config.CDRSExportExtraFields)
for _, cdr := range cdrs {
if err := csvWriter.Write( cdr.(*utils.RatedCDR) ); err != nil {
os.Remove(fileName)
return err
}
}
csvWriter.Close()
*reply = fileName
return nil
}

View File

@@ -18,7 +18,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package cdrexporter
import (
"github.com/cgrates/cgrates/utils"
)
type CdrWriter interface {
Write(cdr map[string]string) string
Write(cdr utils.RatedCDR) string
Close()
}

View File

@@ -21,20 +21,35 @@ package cdrexporter
import (
"encoding/csv"
"io"
"sort"
"strconv"
"github.com/cgrates/cgrates/utils"
)
type CsvCdrWriter struct {
writer *csv.Writer
roundDecimals int // Round floats like Cost using this number of decimals
extraFields []string // Extra fields to append after primary ones, order important
}
func NewCsvCdrWriter(writer io.Writer) *CsvCdrWriter {
return &CsvCdrWriter{csv.NewWriter(writer)}
func NewCsvCdrWriter(writer io.Writer, roundDecimals int, extraFields []string) *CsvCdrWriter {
return &CsvCdrWriter{csv.NewWriter(writer), roundDecimals, extraFields}
}
func (dcw *CsvCdrWriter) Write(cdr map[string]string) error {
var row []string
for _, v := range cdr {
row = append(row, v)
func (dcw *CsvCdrWriter) Write(cdr *utils.RatedCDR) error {
primaryFields := []string{cdr.CgrId, cdr.AccId, cdr.CdrHost, cdr.ReqType, cdr.Direction, cdr.Tenant, cdr.TOR, cdr.Account, cdr.Subject,
cdr.Destination, cdr.AnswerTime.String(), strconv.Itoa(int(cdr.Duration)), strconv.FormatFloat(cdr.Cost, 'f', dcw.roundDecimals, 64)}
if len(dcw.extraFields) == 0 {
dcw.extraFields = utils.MapKeys(cdr.ExtraFields)
sort.Strings(dcw.extraFields) // Controlled order in case of dynamic extra fields
}
lenPrimary := len(primaryFields)
row := make([]string, lenPrimary+len(dcw.extraFields))
for idx, fld := range primaryFields { // Add primary fields
row[idx] = fld
}
for idx, fldKey := range dcw.extraFields { // Add extra fields
row[lenPrimary+idx] = cdr.ExtraFields[fldKey]
}
return dcw.writer.Write(row)
}

View File

@@ -346,7 +346,7 @@ func main() {
go stopRaterSingnalHandler()
}
responder := &engine.Responder{ExitChan: exitChan}
apier := &apier.ApierV1{StorDb: loadDb, DataDb: dataDb}
apier := &apier.ApierV1{StorDb: loadDb, DataDb: dataDb, CdrDb: cdrDb, Config: cfg}
if cfg.RaterEnabled && !cfg.BalancerEnabled && cfg.RaterListen != INTERNAL {
engine.Logger.Info(fmt.Sprintf("Starting CGRateS Rater on %s.", cfg.RaterListen))
go listenToRPCRequests(responder, apier, cfg.RaterListen, cfg.RPCEncoding, dataDb, logDb)

View File

@@ -70,6 +70,8 @@ type CGRConfig struct {
CDRSListen string // CDRS's listening interface: <x.y.z.y:1234>.
CDRSExtraFields []string //Extra fields to store in CDRs
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSExportPath string // Path towards exported cdrs
CDRSExportExtraFields []string // Extra fields list to add in exported CDRs
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
@@ -133,6 +135,8 @@ func (self *CGRConfig) setDefaults() error {
self.CDRSListen = "127.0.0.1:2022"
self.CDRSExtraFields = []string{}
self.CDRSMediator = ""
self.CDRSExportPath = "/var/log/cgrates/cdr/out"
self.CDRSExportExtraFields = []string{}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
@@ -287,6 +291,14 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("cdrs", "mediator"); hasOpt {
cfg.CDRSMediator, _ = c.GetString("cdrs", "mediator")
}
if hasOpt = c.HasOption("cdrs", "export_path"); hasOpt {
cfg.CDRSExportPath, _ = c.GetString("cdrs", "export_path")
}
if hasOpt = c.HasOption("cdrs", "export_extra_fields"); hasOpt {
if cfg.CDRSExportExtraFields, errParse = ConfigSlice(c, "cdrs", "export_extra_fields"); errParse != nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
}

View File

@@ -64,6 +64,8 @@ func TestDefaults(t *testing.T) {
eCfg.CDRSListen = "127.0.0.1:2022"
eCfg.CDRSExtraFields = []string{}
eCfg.CDRSMediator = ""
eCfg.CDRSExportPath = "/var/log/cgrates/cdr/out"
eCfg.CDRSExportExtraFields = []string{}
eCfg.MediatorEnabled = false
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
@@ -160,6 +162,8 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CDRSListen = "test"
eCfg.CDRSExtraFields = []string{"test"}
eCfg.CDRSMediator = "test"
eCfg.CDRSExportPath = "test"
eCfg.CDRSExportExtraFields = []string{"test"}
eCfg.MediatorEnabled = true
eCfg.MediatorListen = "test"
eCfg.MediatorRater = "test"

View File

@@ -41,6 +41,8 @@ enabled = true # Start the CDR Server service: <true|false>.
listen=test # CDRS's listening interface: <x.y.z.y:1234>.
extra_fields = test # Extra fields to store in CDRs
mediator = test # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
export_path = test # Path where exported cdrs will be written
export_extra_fields = test # Extra fields list to be exported
[mediator]
enabled = true # Starts Mediator service: <true|false>.

View File

@@ -5,82 +5,85 @@
# [global] must exist in all files, rest of the configuration is inter-changeable.
[global]
# datadb_type = redis # The main database: <redis>.
# datadb_host = 127.0.0.1 # Database host address.
# datadb_port = 6379 # Port to reach the database.
# datadb_name = 10 # The name of the database to connect to.
# datadb_user = # Username to use when connecting to database.
# datadb_passwd = # Password to use when connecting to database.
# stordb_type = mysql # Log/stored database type to use: <same|postgres|mongo|redis|mysql>
# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
# stordb_port = 3306 # The port to reach the logdb.
# stordb_name = cgrates # The name of the log database to connect to.
# stordb_user = cgrates # Username to use when connecting to stordb.
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
# dbdata_encoding = msgpack # The encoding used to store object data in strings: <msgpack|json>
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
# default_tor = 0 # Default Type of Record to consider when missing from requests.
# default_tenant = 0 # Default Tenant to consider when missing from requests.
# default_subject = 0 # Default rating Subject to consider when missing from requests.
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
# datadb_type = redis # The main database: <redis>.
# datadb_host = 127.0.0.1 # Database host address.
# datadb_port = 6379 # Port to reach the database.
# datadb_name = 10 # The name of the database to connect to.
# datadb_user = # Username to use when connecting to database.
# datadb_passwd = # Password to use when connecting to database.
# stordb_type = mysql # Log/stored database type to use: <same|postgres|mongo|redis|mysql>
# stordb_host = 127.0.0.1 # The host to connect to. Values that start with / are for UNIX domain sockets.
# stordb_port = 3306 # The port to reach the logdb.
# stordb_name = cgrates # The name of the log database to connect to.
# stordb_user = cgrates # Username to use when connecting to stordb.
# stordb_passwd = CGRateS.org # Password to use when connecting to stordb.
# dbdata_encoding = msgpack # The encoding used to store object data in strings: <msgpack|json>
# rpc_encoding = json # RPC encoding used on APIs: <gob|json>.
# default_reqtype = rated # Default request type to consider when missing from requests: <""|prepaid|postpaid|pseudoprepaid|rated>.
# default_tor = 0 # Default Type of Record to consider when missing from requests.
# default_tenant = 0 # Default Tenant to consider when missing from requests.
# default_subject = 0 # Default rating Subject to consider when missing from requests.
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
[balancer]
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
[rater]
# enabled = false # Enable Rater service: <true|false>.
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
# enabled = false # Enable RaterCDRSExportPath service: <true|false>.
# balancer = disabled # Register to Balancer as worker: <enabled|disabled>.
# listen = 127.0.0.1:2012 # Rater's listening interface: <internal|x.y.z.y:1234>.
[scheduler]
# enabled = false # Starts Scheduler service: <true|false>.
# enabled = false # Starts Scheduler service: <true|false>.
[cdrs]
# enabled = false # Start the CDR Server service: <true|false>.
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
# extra_fields = # Extra fields to store in CDRs
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# enabled = false # Start the CDR Server service: <true|false>.
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
# extra_fields = # Extra fields to store in CDRs
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# export_path = /var/log/cgrates/cdr/out # Path where the exported CDRs will be placed
# export_extra_fields = # List of extra fields to be exported out in CDRs
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
# listen=internal # Mediator's listening interface: <internal>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
# subject_fields = subject # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
# cdr_type = # CDR type, used when running mediator as service <freeswitch_http_json|freeswitch_file_csv>.
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
# enabled = false # Starts Mediator service: <true|false>.
# listen=internal # Mediator's listening interface: <internal>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
# subject_fields = subject # Name of sCDRSExportPathubject fields to be used during mediation. Use index numbers in case of .csv cdrs.
# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
# cdr_type = # CDR type, used when running mediator as service <freeswitch_http_json|freeswitch_file_csv>.
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv
# Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
[session_manager]
# enabled = false # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
# enabled = false # Starts SessionManager service: <true|false>.
# switch_type = freeswitch # Defines the type of switch behind: <freeswitch>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater.
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# debit_interval = 5 # Interval to perform debits on.
[freeswitch]
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
# passwd = ClueCon # FreeSWITCH socket password.
# reconnects = 5 # Number of attempts on connect failure.
# server = 127.0.0.1:8021 # Adress where to connect to FreeSWITCH socket.
# passwd = ClueCon # FreeSWITCH socket password.
# reconnects = 5 # Number of attempts on connect failure.
[history_agent]
#enabled = false # Starts History as a client: <true|false>.
#server = 127.0.0.1:2013 # Address where to reach the master history server: <internal|x.y.z.y:1234>
#enabled = false # Starts History as a client: <true|false>.
#server = 127.0.0.1:2013 # Address where to reach the master history server: <internal|x.y.z.y:1234>
[history_server]
#enabled = false # Starts History service: <true|false>.
#listen = 127.0.0.1:2013 # Listening addres for history server: <internal|x.y.z.y:1234>
#path = /var/log/cgrates/history # Location on disk where to store history files.
#enabled = false # Starts History service: <true|false>.
#listen = 127.0.0.1:2013 # Listening addres for history server: <internal|x.y.z.y:1234>
#path = /var/log/cgrates/history # Location on disk where to store history files.

View File

@@ -18,5 +18,6 @@ mkdir -p $PKG_DIR/usr/share/cgrates
cp -r ../../data/ $PKG_DIR/usr/share/cgrates/
mkdir -p $PKG_DIR/usr/bin
cp /usr/local/goapps/bin/cgr-* $PKG_DIR/usr/bin/
mkdir -p $PKG_DIR/var/log/cgrates/cdr/out
dpkg-deb --build $PKG_DIR