Adding cdrc together with config dependencies and tests

This commit is contained in:
DanB
2013-12-20 22:57:45 +01:00
parent d8977fc504
commit ca3b13651f
15 changed files with 425 additions and 189 deletions

BIN
cdrc/.cdrc.go.swp Normal file

Binary file not shown.

BIN
cdrc/.cdrc_test.go.swp Normal file

Binary file not shown.

138
cdrc/cdrc.go Normal file
View File

@@ -0,0 +1,138 @@
/*
Rating system designed to be used in VoIP Carriers World
Copyright (C) 2013 ITsysCOM
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package cdrc
import (
"fmt"
"errors"
"github.com/howeyc/fsnotify"
"os"
"path"
"net/http"
"net/url"
"strconv"
"strings"
"bufio"
"encoding/csv"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
)
type Cdrc struct {
cgrCfg *config.CGRConfig
fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
httpClient *http.Client
}
// Parses fieldIndex strings into fieldIndex integers needed
func (self *Cdrc) parseFieldIndexesFromConfig() error {
var err error
// Add main fields here
self.fieldIndxes = make(map[string]int)
// PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION}
fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField,
self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField}
for i, strVal := range fieldIdxStrs {
if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil {
return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i])
}
}
// Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"}
for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields {
splt := strings.Split(fieldWithIdx, ":")
if len(splt) != 2 {
return errors.New("Cannot parse cdrc.extra_fields")
}
if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) {
return errors.New("Extra cdrc.extra_fields overwriting primary fields")
}
if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil {
return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1])
}
}
return nil
}
// Takes the record out of csv and turns it into http form which can be posted
func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
v := url.Values{}
for fldName, idx := range self.fieldIndxes {
if len(record) <= idx {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName)
}
v.Set(fldName, record[idx])
}
return v, nil
}
// Watch the specified folder for file moves and parse the files on events
func (self *Cdrc) trackCDRFiles() (err error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return
}
defer watcher.Close()
err = watcher.Watch(self.cgrCfg.CdrcCdrInDir)
if err != nil {
return
}
engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir))
for {
select {
case ev := <-watcher.Event:
if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
if err = self.processFile(ev.Name); err != nil {
return err
}
}
case err := <-watcher.Error:
engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
}
}
return
}
// Processe file at filePath and posts the valid cdr rows out of it
func (self *Cdrc) processFile(filePath string) error {
file, err := os.Open(filePath)
defer file.Close()
if err != nil {
engine.Logger.Crit(err.Error())
return err
}
csvReader := csv.NewReader(bufio.NewReader(file))
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
cdrAsForm, err := self.cdrAsHttpForm(record)
if err != nil {
engine.Logger.Err(err.Error())
continue
}
if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil {
engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s",err.Error()))
continue
}
}
// Finished with file, move it to processed folder
_, fn := path.Split(filePath)
return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn))
}

41
cdrc/cdrc_test.go Normal file
View File

@@ -0,0 +1,41 @@
package cdrc
import (
"testing"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
var cgrConfig *config.CGRConfig
var cdrc *Cdrc
func init() {
cgrConfig, _ = config.NewDefaultCGRConfig()
cdrc = &Cdrc{cgrCfg:cgrConfig}
}
func TestParseFieldIndexesFromConfig(t *testing.T) {
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
t.Error("Failed parsing default fieldIndexesFromConfig", err)
}
}
func TestCdrAsHttpForm(t *testing.T) {
cdrRow := []string{"firstField", "secondField"}
_, err := cdrc.cdrAsHttpForm(cdrRow)
if err == nil {
t.Error("Failed to corectly detect missing fields from record")
}
cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow);
if err != nil {
t.Error("Failed to parse CDR in form", err)
}
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}
if cdrAsForm.Get("supplier") != "supplier1" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}
}

View File

@@ -25,22 +25,6 @@ import (
"time"
)
const (
ACCID = "accid"
CDRHOST = "cdrhost"
REQTYPE = "reqtype"
DIRECTION = "direction"
TENANT = "tenant"
TOR = "tor"
ACCOUNT = "account"
SUBJECT = "subject"
DESTINATION = "destination"
TIME_ANSWER = "time_answer"
DURATION = "duration"
)
var primaryFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, TIME_ANSWER, DURATION}
func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) {
if req.Form == nil {
if err := req.ParseForm(); err != nil {
@@ -67,6 +51,10 @@ func (cgrCdr CgrCdr) GetAccId() string {
func (cgrCdr CgrCdr) GetCdrHost() string {
return cgrCdr[CDRHOST]
}
func (cgrCdr CgrCdr) GetCdrSource() string {
return cgrCdr[CDRSOURCE]
}
func (cgrCdr CgrCdr) GetDirection() string {
//TODO: implement direction
return "*out"
@@ -99,7 +87,7 @@ func (cgrCdr CgrCdr) GetReqType() string {
func (cgrCdr CgrCdr) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
for k, v := range cgrCdr {
if !utils.IsSliceMember(primaryFields, k) {
if !utils.IsSliceMember(utils.PrimaryCdrFields, k) {
extraFields[k] = v
}
}

View File

@@ -44,6 +44,7 @@ const (
FS_DURATION = "billsec"
FS_USERNAME = "user_name"
FS_IP = "sip_local_network_addr"
FS_CDR_SOURCE = "freeswitch_json"
)
type FSCdr map[string]string
@@ -74,6 +75,9 @@ func (fsCdr FSCdr) GetAccId() string {
func (fsCdr FSCdr) GetCdrHost() string {
return fsCdr[FS_IP]
}
func (fsCdr FSCdr) GetCdrSource() string {
return FS_CDR_SOURCE
}
func (fsCdr FSCdr) GetDirection() string {
//TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account
return "*out"

View File

@@ -54,7 +54,7 @@ type CGRConfig struct {
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
StorDBPort string // The port to bind to.
StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
@@ -78,6 +78,25 @@ type CGRConfig struct {
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSExportPath string // Path towards exported cdrs
CDRSExportExtraFields []string // Extra fields list to add in exported CDRs
CdrcEnabled bool // Enable CDR client functionality
CdrcCdrs string // Address where to reach CDR server
CdrcCdrsMethod string // Mechanism to use when posting CDRs on server <http_cgr>
CdrcRunDelay int // Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
CdrcCdrType string // CDR file format <csv>.
CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database.
CdrcAccIdField string // Accounting id field identifier. Use index number in case of .csv cdrs.
CdrcReqTypeField string // Request type field identifier. Use index number in case of .csv cdrs.
CdrcDirectionField string // Direction field identifier. Use index numbers in case of .csv cdrs.
CdrcTenantField string // Tenant field identifier. Use index numbers in case of .csv cdrs.
CdrcTorField string // Type of Record field identifier. Use index numbers in case of .csv cdrs.
CdrcAccountField string // Account field identifier. Use index numbers in case of .csv cdrs.
CdrcSubjectField string // Subject field identifier. Use index numbers in case of .csv CDRs.
CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs.
CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs.
CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs.
CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
@@ -87,20 +106,16 @@ type CGRConfig struct {
MediatorListen string // Mediator's listening interface: <internal>.
MediatorRater string // Address where to reach the Rater: <internal|x.y.z.y:1234>
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
MediatorCDRType string // CDR type <freeswitch_http_json|freeswitch_file_csv>.
MediatorAccIdField string // Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
MediatorRunIds []string // Identifiers for each mediation run on CDRs
MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTimeAnswerFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAnswerTimeFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorCDRInDir string // Absolute path towards the directory where the CDRs are kept (file stored CDRs).
MediatorCDROutDir string // Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
@@ -151,24 +166,39 @@ func (self *CGRConfig) setDefaults() error {
self.CDRSMediator = ""
self.CDRSExportPath = "/var/log/cgrates/cdr/out"
self.CDRSExportExtraFields = []string{}
self.CdrcEnabled = false
self.CdrcCdrs = "127.0.0.1:2022"
self.CdrcCdrsMethod = "http_cgr"
self.CdrcRunDelay = 0
self.CdrcCdrType = "csv"
self.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
self.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
self.CdrcSourceId = "freeswitch_csv"
self.CdrcAccIdField = "0"
self.CdrcReqTypeField = "1"
self.CdrcDirectionField = "2"
self.CdrcTenantField = "3"
self.CdrcTorField = "4"
self.CdrcAccountField = "5"
self.CdrcSubjectField = "6"
self.CdrcDestinationField = "7"
self.CdrcAnswerTimeField = "8"
self.CdrcDurationField = "9"
self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
self.MediatorRaterReconnects = 3
self.MediatorCDRType = utils.FSCDR_HTTP_JSON
self.MediatorAccIdField = "accid"
self.MediatorRunIds = []string{"default"}
self.MediatorSubjectFields = []string{"subject"}
self.MediatorReqTypeFields = []string{"reqtype"}
self.MediatorDirectionFields = []string{"direction"}
self.MediatorTenantFields = []string{"tenant"}
self.MediatorTORFields = []string{"tor"}
self.MediatorAccountFields = []string{"account"}
self.MediatorDestFields = []string{"destination"}
self.MediatorTimeAnswerFields = []string{"time_answer"}
self.MediatorDurationFields = []string{"duration"}
self.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
self.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
self.MediatorRunIds = []string{}
self.MediatorSubjectFields = []string{}
self.MediatorReqTypeFields = []string{}
self.MediatorDirectionFields = []string{}
self.MediatorTenantFields = []string{}
self.MediatorTORFields = []string{}
self.MediatorAccountFields = []string{}
self.MediatorDestFields = []string{}
self.MediatorAnswerTimeFields = []string{}
self.MediatorDurationFields = []string{}
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = "127.0.0.1:2012"
@@ -332,6 +362,65 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
if hasOpt = c.HasOption("cdrc", "enabled"); hasOpt {
cfg.CdrcEnabled, _ = c.GetBool("cdrc", "enabled")
}
if hasOpt = c.HasOption("cdrc", "cdrs"); hasOpt {
cfg.CdrcCdrs, _ = c.GetString("cdrc", "cdrs")
}
if hasOpt = c.HasOption("cdrc", "cdrs_method"); hasOpt {
cfg.CdrcCdrsMethod, _ = c.GetString("cdrc", "cdrs_method")
}
if hasOpt = c.HasOption("cdrc", "run_delay"); hasOpt {
cfg.CdrcRunDelay, _ = c.GetInt("cdrc", "run_delay")
}
if hasOpt = c.HasOption("cdrc", "cdr_type"); hasOpt {
cfg.CdrcCdrType, _ = c.GetString("cdrc", "cdr_type")
}
if hasOpt = c.HasOption("cdrc", "cdr_in_dir"); hasOpt {
cfg.CdrcCdrInDir, _ = c.GetString("cdrc", "cdr_in_dir")
}
if hasOpt = c.HasOption("cdrc", "cdr_out_dir"); hasOpt {
cfg.CdrcCdrOutDir, _ = c.GetString("cdrc", "cdr_out_dir")
}
if hasOpt = c.HasOption("cdrc", "cdr_source_id"); hasOpt {
cfg.CdrcSourceId, _ = c.GetString("cdrc", "cdr_source_id")
}
if hasOpt = c.HasOption("cdrc", "accid_field"); hasOpt {
cfg.CdrcAccIdField, _ = c.GetString("cdrc", "accid_field")
}
if hasOpt = c.HasOption("cdrc", "reqtype_field"); hasOpt {
cfg.CdrcReqTypeField, _ = c.GetString("cdrc", "reqtype_field")
}
if hasOpt = c.HasOption("cdrc", "direction_field"); hasOpt {
cfg.CdrcDirectionField, _ = c.GetString("cdrc", "direction_field")
}
if hasOpt = c.HasOption("cdrc", "tenant_field"); hasOpt {
cfg.CdrcTenantField, _ = c.GetString("cdrc", "tenant_field")
}
if hasOpt = c.HasOption("cdrc", "tor_field"); hasOpt {
cfg.CdrcTorField, _ = c.GetString("cdrc", "tor_field")
}
if hasOpt = c.HasOption("cdrc", "account_field"); hasOpt {
cfg.CdrcAccountField, _ = c.GetString("cdrc", "account_field")
}
if hasOpt = c.HasOption("cdrc", "subject_field"); hasOpt {
cfg.CdrcSubjectField, _ = c.GetString("cdrc", "subject_field")
}
if hasOpt = c.HasOption("cdrc", "destination_field"); hasOpt {
cfg.CdrcDestinationField, _ = c.GetString("cdrc", "destination_field")
}
if hasOpt = c.HasOption("cdrc", "answer_time_field"); hasOpt {
cfg.CdrcAnswerTimeField, _ = c.GetString("cdrc", "answer_time_field")
}
if hasOpt = c.HasOption("cdrc", "duration_field"); hasOpt {
cfg.CdrcDurationField, _ = c.GetString("cdrc", "duration_field")
}
if hasOpt = c.HasOption("cdrc", "extra_fields"); hasOpt {
if cfg.CdrcExtraFields, errParse = ConfigSlice(c, "cdrc", "extra_fields"); errParse != nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
}
@@ -344,12 +433,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt {
cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects")
}
if hasOpt = c.HasOption("mediator", "cdr_type"); hasOpt {
cfg.MediatorCDRType, _ = c.GetString("mediator", "cdr_type")
}
if hasOpt = c.HasOption("mediator", "accid_field"); hasOpt {
cfg.MediatorAccIdField, _ = c.GetString("mediator", "accid_field")
}
if hasOpt = c.HasOption("mediator", "run_ids"); hasOpt {
if cfg.MediatorRunIds, errParse = ConfigSlice(c, "mediator", "run_ids"); errParse != nil {
return nil, errParse
@@ -390,8 +473,8 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "time_answer_fields"); hasOpt {
if cfg.MediatorTimeAnswerFields, errParse = ConfigSlice(c, "mediator", "time_answer_fields"); errParse != nil {
if hasOpt = c.HasOption("mediator", "answer_time_fields"); hasOpt {
if cfg.MediatorAnswerTimeFields, errParse = ConfigSlice(c, "mediator", "answer_time_fields"); errParse != nil {
return nil, errParse
}
}
@@ -400,12 +483,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt {
cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir")
}
if hasOpt = c.HasOption("mediator", "cdr_out_dir"); hasOpt {
cfg.MediatorCDROutDir, _ = c.GetString("mediator", "cdr_out_dir")
}
if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt {
cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled")
}

View File

@@ -70,6 +70,25 @@ func TestDefaults(t *testing.T) {
eCfg.CDRSEnabled = false
eCfg.CDRSListen = "127.0.0.1:2022"
eCfg.CDRSExtraFields = []string{}
eCfg.CdrcEnabled = false
eCfg.CdrcCdrs = "127.0.0.1:2022"
eCfg.CdrcCdrsMethod = "http_cgr"
eCfg.CdrcRunDelay = 0
eCfg.CdrcCdrType = "csv"
eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
eCfg.CdrcSourceId = "freeswitch_csv"
eCfg.CdrcAccIdField = "0"
eCfg.CdrcReqTypeField = "1"
eCfg.CdrcDirectionField = "2"
eCfg.CdrcTenantField = "3"
eCfg.CdrcTorField = "4"
eCfg.CdrcAccountField = "5"
eCfg.CdrcSubjectField = "6"
eCfg.CdrcDestinationField = "7"
eCfg.CdrcAnswerTimeField = "8"
eCfg.CdrcDurationField = "9"
eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
eCfg.CDRSMediator = ""
eCfg.CDRSExportPath = "/var/log/cgrates/cdr/out"
eCfg.CDRSExportExtraFields = []string{}
@@ -77,20 +96,16 @@ func TestDefaults(t *testing.T) {
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
eCfg.MediatorRaterReconnects = 3
eCfg.MediatorCDRType = "freeswitch_http_json"
eCfg.MediatorAccIdField = "accid"
eCfg.MediatorRunIds = []string{"default"}
eCfg.MediatorSubjectFields = []string{"subject"}
eCfg.MediatorReqTypeFields = []string{"reqtype"}
eCfg.MediatorDirectionFields = []string{"direction"}
eCfg.MediatorTenantFields = []string{"tenant"}
eCfg.MediatorTORFields = []string{"tor"}
eCfg.MediatorAccountFields = []string{"account"}
eCfg.MediatorDestFields = []string{"destination"}
eCfg.MediatorTimeAnswerFields = []string{"time_answer"}
eCfg.MediatorDurationFields = []string{"duration"}
eCfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
eCfg.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
eCfg.MediatorRunIds = []string{}
eCfg.MediatorSubjectFields = []string{}
eCfg.MediatorReqTypeFields = []string{}
eCfg.MediatorDirectionFields = []string{}
eCfg.MediatorTenantFields = []string{}
eCfg.MediatorTORFields = []string{}
eCfg.MediatorAccountFields = []string{}
eCfg.MediatorDestFields = []string{}
eCfg.MediatorAnswerTimeFields = []string{}
eCfg.MediatorDurationFields = []string{}
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS
eCfg.SMRater = "127.0.0.1:2012"
@@ -179,12 +194,29 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CDRSMediator = "test"
eCfg.CDRSExportPath = "test"
eCfg.CDRSExportExtraFields = []string{"test"}
eCfg.CdrcEnabled = true
eCfg.CdrcCdrs = "test"
eCfg.CdrcCdrsMethod = "test"
eCfg.CdrcRunDelay = 99
eCfg.CdrcCdrType = "test"
eCfg.CdrcCdrInDir = "test"
eCfg.CdrcCdrOutDir = "test"
eCfg.CdrcSourceId = "test"
eCfg.CdrcAccIdField = "test"
eCfg.CdrcReqTypeField = "test"
eCfg.CdrcDirectionField = "test"
eCfg.CdrcTenantField = "test"
eCfg.CdrcTorField = "test"
eCfg.CdrcAccountField = "test"
eCfg.CdrcSubjectField = "test"
eCfg.CdrcDestinationField = "test"
eCfg.CdrcAnswerTimeField = "test"
eCfg.CdrcDurationField = "test"
eCfg.CdrcExtraFields = []string{"test"}
eCfg.MediatorEnabled = true
eCfg.MediatorListen = "test"
eCfg.MediatorRater = "test"
eCfg.MediatorRaterReconnects = 99
eCfg.MediatorCDRType = "test"
eCfg.MediatorAccIdField = "test"
eCfg.MediatorRunIds = []string{"test"}
eCfg.MediatorSubjectFields = []string{"test"}
eCfg.MediatorReqTypeFields = []string{"test"}
@@ -193,10 +225,8 @@ func TestConfigFromFile(t *testing.T) {
eCfg.MediatorTORFields = []string{"test"}
eCfg.MediatorAccountFields = []string{"test"}
eCfg.MediatorDestFields = []string{"test"}
eCfg.MediatorTimeAnswerFields = []string{"test"}
eCfg.MediatorAnswerTimeFields = []string{"test"}
eCfg.MediatorDurationFields = []string{"test"}
eCfg.MediatorCDRInDir = "test"
eCfg.MediatorCDROutDir = "test"
eCfg.SMEnabled = true
eCfg.SMSwitchType = "test"
eCfg.SMRater = "test"

View File

@@ -50,13 +50,32 @@ mediator = test # Address where to reach the Mediator. Empty for disabling me
export_path = test # Path where exported cdrs will be written
export_extra_fields = test # Extra fields list to be exported
[cdrc]
enabled = true # Enable CDR client functionality
cdrs = test # Address where to reach CDR server
cdrs_method = test # Mechanism to use when posting CDRs on server <http_cgr>
run_delay = 99 # Period to sleep between two runs, 0 to use automation via inotify
cdr_type = test # CDR file format <csv>.
cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be moved after processing.
cdr_source_id = test # Tag identifying the source of the CDRs within CGRS database.
accid_field = test # Accounting id field identifier. Use index number in case of .csv cdrs.
reqtype_field = test # Request type field identifier. Use index number in case of .csv cdrs.
direction_field = test # Direction field identifier. Use index numbers in case of .csv cdrs.
tenant_field = test # Tenant field identifier. Use index numbers in case of .csv cdrs.
tor_field = test # Type of Record field identifier. Use index numbers in case of .csv cdrs.
account_field = test # Account field identifier. Use index numbers in case of .csv cdrs.
subject_field = test # Subject field identifier. Use index numbers in case of .csv CDRs.
destination_field = test # Destination field identifier. Use index numbers in case of .csv cdrs.
answer_time_field = test # Answer time field identifier. Use index numbers in case of .csv cdrs.
duration_field = test # Duration field identifier. Use index numbers in case of .csv cdrs.
extra_fields = test # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
[mediator]
enabled = true # Starts Mediator service: <true|false>.
listen=test # Mediator's listening interface: <internal>.
rater = test # Address where to reach the Rater: <internal|x.y.z.y:1234>
rater_reconnects = 99 # Number of reconnects to rater before giving up.
cdr_type = test # CDR type <freeswitch_http_json|freeswitch_file_csv>.
accid_field = test # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
run_ids = test # Identifiers for each mediation run on CDRs
subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
reqtype_fields = test # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
@@ -65,10 +84,8 @@ tenant_fields = test # Name of tenant fields to be used during mediation. Use
tor_fields = test # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
time_answer_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
answer_time_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
[session_manager]
enabled = true # Starts SessionManager service: <true|false>.

View File

@@ -32,7 +32,6 @@
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
[balancer]
# enabled = false # Start Balancer service: <true|false>.
# listen = 127.0.0.1:2012 # Balancer listen interface: <disabled|x.y.z.y:1234>.
@@ -50,29 +49,45 @@
# listen=127.0.0.1:2022 # CDRS's listening interface: <x.y.z.y:1234>.
# extra_fields = # Extra fields to store in CDRs
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
# export_path = /var/log/cgrates/cdr/out # Path where the exported CDRs will be placed
# export_path = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed
# export_extra_fields = # List of extra fields to be exported out in CDRs
[cdrc]
# enabled = false # Enable CDR client functionality
# cdrs = 127.0.0.1:2022 # Address where to reach CDR server
# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server <http_cgr>
# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
# cdr_type = csv # CDR file format <csv>.
# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored.
# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved.
# cdr_source_id = freeswitch_csv # Tag identifying the source of the CDRs within CGRS database.
# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs.
# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs.
# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs.
# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs.
# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs.
# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs.
# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs.
# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
# extra_fields = 10:supplier,11:orig_ip # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
[mediator]
# enabled = false # Starts Mediator service: <true|false>.
# listen=internal # Mediator's listening interface: <internal>.
# rater = 127.0.0.1:2012 # Address where to reach the Rater: <internal|x.y.z.y:1234>
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
# run_ids = default # Identifiers for each mediation run on CDRs
# subject_fields = subject # Name of fields to be used during mediation. Use index numbers in case of .csv cdrs.
# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
# cdr_type = # CDR type, used when running mediator as service <freeswitch_http_json|freeswitch_file_csv>.
# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv
# Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
# run_ids = # Identifiers of each extra mediation to run on CDRs
# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs.
# direction_fields = # Name of direction fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# tenant_fields = # Name of tenant fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# tor_fields = # Name of tor fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# account_fields = # Name of account fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# subject_fields = # Name of fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# destination_fields = # Name of destination fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# answer_time_fields = # Name of time_answer fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
# duration_fields = # Name of duration fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
[session_manager]
# enabled = false # Starts SessionManager service: <true|false>.

View File

@@ -5,6 +5,7 @@ CREATE TABLE cdrs_primary (
cgrid char(40) NOT NULL,
accid varchar(64) NOT NULL,
cdrhost varchar(64) NOT NULL,
cdrsource varchar(64) NOT NULL,
reqtype varchar(24) NOT NULL,
direction varchar(8) NOT NULL,
tenant varchar(64) NOT NULL,

View File

@@ -6,9 +6,10 @@ DROP TABLE IF EXISTS `rated_cdrs`;
CREATE TABLE `rated_cdrs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cgrid` char(40) NOT NULL,
`runid` varchar(64) NOT NULL,
`subject` varchar(64) NOT NULL,
`cost` DECIMAL(20,4) DEFAULT NULL,
`extra_info` text,
PRIMARY KEY (`id`),
UNIQUE KEY `costid` (`cgrid`,`subject`)
UNIQUE KEY `costid` (`cgrid`,`runid`)
);

View File

@@ -27,7 +27,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/howeyc/fsnotify"
"os"
"path"
"strconv"
@@ -83,20 +82,19 @@ func (self *Mediator) loadConfig() error {
self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields,
self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields}
refIdx := 0 // Subject becomes reference for our checks
if len(cfgVals[refIdx]) == 0 {
return fmt.Errorf("Unconfigured %s fields", fieldKeys[refIdx])
if len(self.cgrCfg.MediatorRunIds) == 0 {
return errors.New("Unconfigured mediator run_ids")
}
// All other configured fields must match the length of reference fields
for iCfgVal := range cfgVals {
if len(cfgVals[refIdx]) != len(cfgVals[iCfgVal]) {
// Make sure we have everywhere the length of reference key (subject)
if len(self.cgrCfg.MediatorRunIds) != len(cfgVals[iCfgVal]) {
// Make sure we have everywhere the length of runIds
return errors.New("Inconsistent lenght of mediator fields.")
}
}
// AccIdField has no special requirements, should just exist
if self.cgrCfg.MediatorAccIdField == "" {
if len(self.cgrCfg.MediatorAccIdField) == 0 {
return errors.New("Undefined mediator accid field")
}
self.accIdField = self.cgrCfg.MediatorAccIdField
@@ -136,35 +134,6 @@ func (self *Mediator) loadConfig() error {
return nil
}
// Watch the specified folder for file moves and parse the files on events
func (self *Mediator) TrackCDRFiles() (err error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return
}
defer watcher.Close()
err = watcher.Watch(self.cdrInDir)
if err != nil {
return
}
engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cdrInDir))
for {
select {
case ev := <-watcher.Event:
if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
err = self.MediateCSVCDR(ev.Name)
if err != nil {
return err
}
}
case err := <-watcher.Error:
engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
}
}
return
}
// Retrive the cost from logging database
func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) {
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
@@ -217,66 +186,7 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error)
return cc, err
}
// Parse the files and get cost for every record
func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) {
flag.Parse()
file, err := os.Open(cdrfn)
defer file.Close()
if err != nil {
engine.Logger.Crit(err.Error())
os.Exit(1)
}
csvReader := csv.NewReader(bufio.NewReader(file))
_, fn := path.Split(cdrfn)
fout, err := os.Create(path.Join(self.cdrOutDir, fn))
if err != nil {
return err
}
defer fout.Close()
w := bufio.NewWriter(fout)
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
//t, _ := time.Parse("2006-01-02 15:04:05", record[5])
var cc *engine.CallCost
for runIdx := range self.fieldIdxs["subject"] { // Query costs for every run index given by subject
csvCDR, errCDR := NewFScsvCDR(record, self.accIdIdx,
self.fieldIdxs["subject"][runIdx],
self.fieldIdxs["reqtype"][runIdx],
self.fieldIdxs["direction"][runIdx],
self.fieldIdxs["tenant"][runIdx],
self.fieldIdxs["tor"][runIdx],
self.fieldIdxs["account"][runIdx],
self.fieldIdxs["destination"][runIdx],
self.fieldIdxs["answer_time"][runIdx],
self.fieldIdxs["duration"][runIdx],
self.cgrCfg)
if errCDR != nil {
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for accid: <%s>, err: <%s>",
record[self.accIdIdx], errCDR.Error()))
}
var errCost error
if csvCDR.GetReqType() == utils.PREPAID || csvCDR.GetReqType() == utils.POSTPAID {
// Should be previously calculated and stored in DB
cc, errCost = self.getCostsFromDB(csvCDR)
} else {
cc, errCost = self.getCostsFromRater(csvCDR)
}
cost := "-1"
if errCost != nil || cc == nil {
engine.Logger.Err(fmt.Sprintf("<Mediator> Could not calculate price for accid: <%s>, err: <%s>, cost: <%v>", csvCDR.GetAccId(), err.Error(), cc))
} else {
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
engine.Logger.Debug(fmt.Sprintf("Calculated for accid:%s, cost: %v", csvCDR.GetAccId(), cost))
}
record = append(record, cost)
}
w.WriteString(strings.Join(record, ",") + "\n")
}
w.Flush()
return
}
func (self *Mediator) MediateDBCDR(cdr utils.CDR) error {
var qryCC *engine.CallCost

View File

@@ -22,6 +22,8 @@ import (
"time"
)
var PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
type CDR interface {
GetCgrId() string
GetAccId() string

View File

@@ -63,4 +63,16 @@ const (
JSON = "json"
MSGPACK = "msgpack"
CSV_LOAD = "CSVLOAD"
ACCID = "accid"
CDRHOST = "cdrhost"
CDRSOURCE = "cdrsource"
REQTYPE = "reqtype"
DIRECTION = "direction"
TENANT = "tenant"
TOR = "tor"
ACCOUNT = "account"
SUBJECT = "subject"
DESTINATION = "destination"
ANSWER_TIME = "answer_time"
DURATION = "duration"
)