diff --git a/cdrc/.cdrc.go.swp b/cdrc/.cdrc.go.swp
new file mode 100644
index 000000000..4916112cb
Binary files /dev/null and b/cdrc/.cdrc.go.swp differ
diff --git a/cdrc/.cdrc_test.go.swp b/cdrc/.cdrc_test.go.swp
new file mode 100644
index 000000000..e8ddb1e84
Binary files /dev/null and b/cdrc/.cdrc_test.go.swp differ
diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go
new file mode 100644
index 000000000..c426d616a
--- /dev/null
+++ b/cdrc/cdrc.go
@@ -0,0 +1,138 @@
+/*
+Rating system designed to be used in VoIP Carriers World
+Copyright (C) 2013 ITsysCOM
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see
+*/
+
+package cdrc
+
+import (
+ "fmt"
+ "errors"
+ "github.com/howeyc/fsnotify"
+ "os"
+ "path"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "bufio"
+ "encoding/csv"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+ "github.com/cgrates/cgrates/engine"
+)
+
+
+type Cdrc struct {
+ cgrCfg *config.CGRConfig
+ fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
+ httpClient *http.Client
+}
+
+// Parses fieldIndex strings into fieldIndex integers needed
+func (self *Cdrc) parseFieldIndexesFromConfig() error {
+ var err error
+ // Add main fields here
+ self.fieldIndxes = make(map[string]int)
+ // PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
+ fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION}
+ fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField,
+ self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField}
+ for i, strVal := range fieldIdxStrs {
+ if self.fieldIndxes[fieldKeys[i]], err = strconv.Atoi(strVal); err != nil {
+ return fmt.Errorf("Cannot parse configuration field %s into integer", fieldKeys[i])
+ }
+ }
+ // Add extra fields here, extra fields in the form of []string{"indxInCsv1:fieldName1","indexInCsv2:fieldName2"}
+ for _, fieldWithIdx := range self.cgrCfg.CdrcExtraFields {
+ splt := strings.Split(fieldWithIdx, ":")
+ if len(splt) != 2 {
+ return errors.New("Cannot parse cdrc.extra_fields")
+ }
+ if utils.IsSliceMember(utils.PrimaryCdrFields, splt[0]) {
+ return errors.New("Extra cdrc.extra_fields overwriting primary fields")
+ }
+ if self.fieldIndxes[splt[1]], err = strconv.Atoi(splt[0]); err != nil {
+ return fmt.Errorf("Cannot parse configuration cdrc extra field %s into integer", splt[1])
+ }
+ }
+ return nil
+}
+
+// Takes the record out of csv and turns it into http form which can be posted
+func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
+ v := url.Values{}
+ for fldName, idx := range self.fieldIndxes {
+ if len(record) <= idx {
+ return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName)
+ }
+ v.Set(fldName, record[idx])
+ }
+ return v, nil
+}
+
+// Watch the specified folder for file moves and parse the files on events
+func (self *Cdrc) trackCDRFiles() (err error) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ return
+ }
+ defer watcher.Close()
+ err = watcher.Watch(self.cgrCfg.CdrcCdrInDir)
+ if err != nil {
+ return
+ }
+ engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cgrCfg.CdrcCdrInDir))
+ for {
+ select {
+ case ev := <-watcher.Event:
+ if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
+ engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
+ if err = self.processFile(ev.Name); err != nil {
+ return err
+ }
+ }
+ case err := <-watcher.Error:
+ engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
+ }
+ }
+ return
+}
+
+// Processe file at filePath and posts the valid cdr rows out of it
+func (self *Cdrc) processFile(filePath string) error {
+ file, err := os.Open(filePath)
+ defer file.Close()
+ if err != nil {
+ engine.Logger.Crit(err.Error())
+ return err
+ }
+ csvReader := csv.NewReader(bufio.NewReader(file))
+ for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
+ cdrAsForm, err := self.cdrAsHttpForm(record)
+ if err != nil {
+ engine.Logger.Err(err.Error())
+ continue
+ }
+ if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cgr", self.cgrCfg.CdrcCdrs), cdrAsForm); err != nil {
+ engine.Logger.Err(fmt.Sprintf("Failed posting CDR, error: %s",err.Error()))
+ continue
+ }
+ }
+ // Finished with file, move it to processed folder
+ _, fn := path.Split(filePath)
+ return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn))
+}
diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go
new file mode 100644
index 000000000..4f3b4fb1b
--- /dev/null
+++ b/cdrc/cdrc_test.go
@@ -0,0 +1,41 @@
+package cdrc
+
+import (
+ "testing"
+ "github.com/cgrates/cgrates/config"
+ "github.com/cgrates/cgrates/utils"
+)
+
+var cgrConfig *config.CGRConfig
+var cdrc *Cdrc
+
+func init() {
+ cgrConfig, _ = config.NewDefaultCGRConfig()
+ cdrc = &Cdrc{cgrCfg:cgrConfig}
+}
+
+func TestParseFieldIndexesFromConfig(t *testing.T) {
+ if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
+ t.Error("Failed parsing default fieldIndexesFromConfig", err)
+ }
+}
+
+
+func TestCdrAsHttpForm(t *testing.T) {
+ cdrRow := []string{"firstField", "secondField"}
+ _, err := cdrc.cdrAsHttpForm(cdrRow)
+ if err == nil {
+ t.Error("Failed to corectly detect missing fields from record")
+ }
+ cdrRow = []string{"acc1", "prepaid", "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1"}
+ cdrAsForm, err := cdrc.cdrAsHttpForm(cdrRow);
+ if err != nil {
+ t.Error("Failed to parse CDR in form", err)
+ }
+ if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
+ t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
+ }
+ if cdrAsForm.Get("supplier") != "supplier1" {
+ t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
+ }
+}
diff --git a/cdrs/cgrcdr.go b/cdrs/cgrcdr.go
index 4dd46f4a2..69e141984 100644
--- a/cdrs/cgrcdr.go
+++ b/cdrs/cgrcdr.go
@@ -25,22 +25,6 @@ import (
"time"
)
-const (
- ACCID = "accid"
- CDRHOST = "cdrhost"
- REQTYPE = "reqtype"
- DIRECTION = "direction"
- TENANT = "tenant"
- TOR = "tor"
- ACCOUNT = "account"
- SUBJECT = "subject"
- DESTINATION = "destination"
- TIME_ANSWER = "time_answer"
- DURATION = "duration"
-)
-
-var primaryFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, TIME_ANSWER, DURATION}
-
func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) {
if req.Form == nil {
if err := req.ParseForm(); err != nil {
@@ -67,6 +51,10 @@ func (cgrCdr CgrCdr) GetAccId() string {
func (cgrCdr CgrCdr) GetCdrHost() string {
return cgrCdr[CDRHOST]
}
+
+func (cgrCdr CgrCdr) GetCdrSource() string {
+ return cgrCdr[CDRSOURCE]
+}
func (cgrCdr CgrCdr) GetDirection() string {
//TODO: implement direction
return "*out"
@@ -99,7 +87,7 @@ func (cgrCdr CgrCdr) GetReqType() string {
func (cgrCdr CgrCdr) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
for k, v := range cgrCdr {
- if !utils.IsSliceMember(primaryFields, k) {
+ if !utils.IsSliceMember(utils.PrimaryCdrFields, k) {
extraFields[k] = v
}
}
diff --git a/cdrs/fscdr.go b/cdrs/fscdr.go
index 69889103e..da76f7c57 100644
--- a/cdrs/fscdr.go
+++ b/cdrs/fscdr.go
@@ -44,6 +44,7 @@ const (
FS_DURATION = "billsec"
FS_USERNAME = "user_name"
FS_IP = "sip_local_network_addr"
+ FS_CDR_SOURCE = "freeswitch_json"
)
type FSCdr map[string]string
@@ -74,6 +75,9 @@ func (fsCdr FSCdr) GetAccId() string {
func (fsCdr FSCdr) GetCdrHost() string {
return fsCdr[FS_IP]
}
+func (fsCdr FSCdr) GetCdrSource() string {
+ return FS_CDR_SOURCE
+}
func (fsCdr FSCdr) GetDirection() string {
//TODO: implement direction, not related to FS_DIRECTION but traffic towards or from subject/account
return "*out"
diff --git a/config/config.go b/config/config.go
index 98aadb482..ccd03bb92 100644
--- a/config/config.go
+++ b/config/config.go
@@ -54,7 +54,7 @@ type CGRConfig struct {
AccountDBPass string // The user's password.
StorDBType string // Should reflect the database type used to store logs
StorDBHost string // The host to connect to. Values that start with / are for UNIX domain sockets.
- StorDBPort string // The port to bind to.
+ StorDBPort string // Th e port to bind to.
StorDBName string // The name of the database to connect to.
StorDBUser string // The user to sign in as.
StorDBPass string // The user's password.
@@ -78,6 +78,25 @@ type CGRConfig struct {
CDRSMediator string // Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
CDRSExportPath string // Path towards exported cdrs
CDRSExportExtraFields []string // Extra fields list to add in exported CDRs
+ CdrcEnabled bool // Enable CDR client functionality
+ CdrcCdrs string // Address where to reach CDR server
+ CdrcCdrsMethod string // Mechanism to use when posting CDRs on server
+ CdrcRunDelay int // Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
+ CdrcCdrType string // CDR file format .
+ CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
+ CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
+ CdrcSourceId string // Tag identifying the source of the CDRs within CGRS database.
+ CdrcAccIdField string // Accounting id field identifier. Use index number in case of .csv cdrs.
+ CdrcReqTypeField string // Request type field identifier. Use index number in case of .csv cdrs.
+ CdrcDirectionField string // Direction field identifier. Use index numbers in case of .csv cdrs.
+ CdrcTenantField string // Tenant field identifier. Use index numbers in case of .csv cdrs.
+ CdrcTorField string // Type of Record field identifier. Use index numbers in case of .csv cdrs.
+ CdrcAccountField string // Account field identifier. Use index numbers in case of .csv cdrs.
+ CdrcSubjectField string // Subject field identifier. Use index numbers in case of .csv CDRs.
+ CdrcDestinationField string // Destination field identifier. Use index numbers in case of .csv cdrs.
+ CdrcAnswerTimeField string // Answer time field identifier. Use index numbers in case of .csv cdrs.
+ CdrcDurationField string // Duration field identifier. Use index numbers in case of .csv cdrs.
+ CdrcExtraFields []string // Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
SMEnabled bool
SMSwitchType string
SMRater string // address where to access rater. Can be internal, direct rater address or the address of a balancer
@@ -87,20 +106,16 @@ type CGRConfig struct {
MediatorListen string // Mediator's listening interface: .
MediatorRater string // Address where to reach the Rater:
MediatorRaterReconnects int // Number of reconnects to rater before giving up.
- MediatorCDRType string // CDR type .
- MediatorAccIdField string // Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
MediatorRunIds []string // Identifiers for each mediation run on CDRs
- MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorReqTypeFields []string // Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
MediatorDirectionFields []string // Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTenantFields []string // Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorTORFields []string // Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorAccountFields []string // Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
+ MediatorSubjectFields []string // Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDestFields []string // Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
- MediatorTimeAnswerFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
+ MediatorAnswerTimeFields []string // Name of time_start fields to be used during mediation. Use index numbers in case of .csv cdrs.
MediatorDurationFields []string // Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
- MediatorCDRInDir string // Absolute path towards the directory where the CDRs are kept (file stored CDRs).
- MediatorCDROutDir string // Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
FreeswitchServer string // freeswitch address host:port
FreeswitchPass string // FS socket password
FreeswitchReconnects int // number of times to attempt reconnect after connect fails
@@ -151,24 +166,39 @@ func (self *CGRConfig) setDefaults() error {
self.CDRSMediator = ""
self.CDRSExportPath = "/var/log/cgrates/cdr/out"
self.CDRSExportExtraFields = []string{}
+ self.CdrcEnabled = false
+ self.CdrcCdrs = "127.0.0.1:2022"
+ self.CdrcCdrsMethod = "http_cgr"
+ self.CdrcRunDelay = 0
+ self.CdrcCdrType = "csv"
+ self.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
+ self.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
+ self.CdrcSourceId = "freeswitch_csv"
+ self.CdrcAccIdField = "0"
+ self.CdrcReqTypeField = "1"
+ self.CdrcDirectionField = "2"
+ self.CdrcTenantField = "3"
+ self.CdrcTorField = "4"
+ self.CdrcAccountField = "5"
+ self.CdrcSubjectField = "6"
+ self.CdrcDestinationField = "7"
+ self.CdrcAnswerTimeField = "8"
+ self.CdrcDurationField = "9"
+ self.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
self.MediatorEnabled = false
self.MediatorListen = "127.0.0.1:2032"
self.MediatorRater = "127.0.0.1:2012"
self.MediatorRaterReconnects = 3
- self.MediatorCDRType = utils.FSCDR_HTTP_JSON
- self.MediatorAccIdField = "accid"
- self.MediatorRunIds = []string{"default"}
- self.MediatorSubjectFields = []string{"subject"}
- self.MediatorReqTypeFields = []string{"reqtype"}
- self.MediatorDirectionFields = []string{"direction"}
- self.MediatorTenantFields = []string{"tenant"}
- self.MediatorTORFields = []string{"tor"}
- self.MediatorAccountFields = []string{"account"}
- self.MediatorDestFields = []string{"destination"}
- self.MediatorTimeAnswerFields = []string{"time_answer"}
- self.MediatorDurationFields = []string{"duration"}
- self.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
- self.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
+ self.MediatorRunIds = []string{}
+ self.MediatorSubjectFields = []string{}
+ self.MediatorReqTypeFields = []string{}
+ self.MediatorDirectionFields = []string{}
+ self.MediatorTenantFields = []string{}
+ self.MediatorTORFields = []string{}
+ self.MediatorAccountFields = []string{}
+ self.MediatorDestFields = []string{}
+ self.MediatorAnswerTimeFields = []string{}
+ self.MediatorDurationFields = []string{}
self.SMEnabled = false
self.SMSwitchType = FS
self.SMRater = "127.0.0.1:2012"
@@ -332,6 +362,65 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
+ if hasOpt = c.HasOption("cdrc", "enabled"); hasOpt {
+ cfg.CdrcEnabled, _ = c.GetBool("cdrc", "enabled")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdrs"); hasOpt {
+ cfg.CdrcCdrs, _ = c.GetString("cdrc", "cdrs")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdrs_method"); hasOpt {
+ cfg.CdrcCdrsMethod, _ = c.GetString("cdrc", "cdrs_method")
+ }
+ if hasOpt = c.HasOption("cdrc", "run_delay"); hasOpt {
+ cfg.CdrcRunDelay, _ = c.GetInt("cdrc", "run_delay")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdr_type"); hasOpt {
+ cfg.CdrcCdrType, _ = c.GetString("cdrc", "cdr_type")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdr_in_dir"); hasOpt {
+ cfg.CdrcCdrInDir, _ = c.GetString("cdrc", "cdr_in_dir")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdr_out_dir"); hasOpt {
+ cfg.CdrcCdrOutDir, _ = c.GetString("cdrc", "cdr_out_dir")
+ }
+ if hasOpt = c.HasOption("cdrc", "cdr_source_id"); hasOpt {
+ cfg.CdrcSourceId, _ = c.GetString("cdrc", "cdr_source_id")
+ }
+ if hasOpt = c.HasOption("cdrc", "accid_field"); hasOpt {
+ cfg.CdrcAccIdField, _ = c.GetString("cdrc", "accid_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "reqtype_field"); hasOpt {
+ cfg.CdrcReqTypeField, _ = c.GetString("cdrc", "reqtype_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "direction_field"); hasOpt {
+ cfg.CdrcDirectionField, _ = c.GetString("cdrc", "direction_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "tenant_field"); hasOpt {
+ cfg.CdrcTenantField, _ = c.GetString("cdrc", "tenant_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "tor_field"); hasOpt {
+ cfg.CdrcTorField, _ = c.GetString("cdrc", "tor_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "account_field"); hasOpt {
+ cfg.CdrcAccountField, _ = c.GetString("cdrc", "account_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "subject_field"); hasOpt {
+ cfg.CdrcSubjectField, _ = c.GetString("cdrc", "subject_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "destination_field"); hasOpt {
+ cfg.CdrcDestinationField, _ = c.GetString("cdrc", "destination_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "answer_time_field"); hasOpt {
+ cfg.CdrcAnswerTimeField, _ = c.GetString("cdrc", "answer_time_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "duration_field"); hasOpt {
+ cfg.CdrcDurationField, _ = c.GetString("cdrc", "duration_field")
+ }
+ if hasOpt = c.HasOption("cdrc", "extra_fields"); hasOpt {
+ if cfg.CdrcExtraFields, errParse = ConfigSlice(c, "cdrc", "extra_fields"); errParse != nil {
+ return nil, errParse
+ }
+ }
if hasOpt = c.HasOption("mediator", "enabled"); hasOpt {
cfg.MediatorEnabled, _ = c.GetBool("mediator", "enabled")
}
@@ -344,12 +433,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
if hasOpt = c.HasOption("mediator", "rater_reconnects"); hasOpt {
cfg.MediatorRaterReconnects, _ = c.GetInt("mediator", "rater_reconnects")
}
- if hasOpt = c.HasOption("mediator", "cdr_type"); hasOpt {
- cfg.MediatorCDRType, _ = c.GetString("mediator", "cdr_type")
- }
- if hasOpt = c.HasOption("mediator", "accid_field"); hasOpt {
- cfg.MediatorAccIdField, _ = c.GetString("mediator", "accid_field")
- }
if hasOpt = c.HasOption("mediator", "run_ids"); hasOpt {
if cfg.MediatorRunIds, errParse = ConfigSlice(c, "mediator", "run_ids"); errParse != nil {
return nil, errParse
@@ -390,8 +473,8 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
- if hasOpt = c.HasOption("mediator", "time_answer_fields"); hasOpt {
- if cfg.MediatorTimeAnswerFields, errParse = ConfigSlice(c, "mediator", "time_answer_fields"); errParse != nil {
+ if hasOpt = c.HasOption("mediator", "answer_time_fields"); hasOpt {
+ if cfg.MediatorAnswerTimeFields, errParse = ConfigSlice(c, "mediator", "answer_time_fields"); errParse != nil {
return nil, errParse
}
}
@@ -400,12 +483,6 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
return nil, errParse
}
}
- if hasOpt = c.HasOption("mediator", "cdr_in_dir"); hasOpt {
- cfg.MediatorCDRInDir, _ = c.GetString("mediator", "cdr_in_dir")
- }
- if hasOpt = c.HasOption("mediator", "cdr_out_dir"); hasOpt {
- cfg.MediatorCDROutDir, _ = c.GetString("mediator", "cdr_out_dir")
- }
if hasOpt = c.HasOption("session_manager", "enabled"); hasOpt {
cfg.SMEnabled, _ = c.GetBool("session_manager", "enabled")
}
diff --git a/config/config_test.go b/config/config_test.go
index 873c67971..06e8d3f0b 100644
--- a/config/config_test.go
+++ b/config/config_test.go
@@ -70,6 +70,25 @@ func TestDefaults(t *testing.T) {
eCfg.CDRSEnabled = false
eCfg.CDRSListen = "127.0.0.1:2022"
eCfg.CDRSExtraFields = []string{}
+ eCfg.CdrcEnabled = false
+ eCfg.CdrcCdrs = "127.0.0.1:2022"
+ eCfg.CdrcCdrsMethod = "http_cgr"
+ eCfg.CdrcRunDelay = 0
+ eCfg.CdrcCdrType = "csv"
+ eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
+ eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
+ eCfg.CdrcSourceId = "freeswitch_csv"
+ eCfg.CdrcAccIdField = "0"
+ eCfg.CdrcReqTypeField = "1"
+ eCfg.CdrcDirectionField = "2"
+ eCfg.CdrcTenantField = "3"
+ eCfg.CdrcTorField = "4"
+ eCfg.CdrcAccountField = "5"
+ eCfg.CdrcSubjectField = "6"
+ eCfg.CdrcDestinationField = "7"
+ eCfg.CdrcAnswerTimeField = "8"
+ eCfg.CdrcDurationField = "9"
+ eCfg.CdrcExtraFields = []string{"10:supplier","11:orig_ip"}
eCfg.CDRSMediator = ""
eCfg.CDRSExportPath = "/var/log/cgrates/cdr/out"
eCfg.CDRSExportExtraFields = []string{}
@@ -77,20 +96,16 @@ func TestDefaults(t *testing.T) {
eCfg.MediatorListen = "127.0.0.1:2032"
eCfg.MediatorRater = "127.0.0.1:2012"
eCfg.MediatorRaterReconnects = 3
- eCfg.MediatorCDRType = "freeswitch_http_json"
- eCfg.MediatorAccIdField = "accid"
- eCfg.MediatorRunIds = []string{"default"}
- eCfg.MediatorSubjectFields = []string{"subject"}
- eCfg.MediatorReqTypeFields = []string{"reqtype"}
- eCfg.MediatorDirectionFields = []string{"direction"}
- eCfg.MediatorTenantFields = []string{"tenant"}
- eCfg.MediatorTORFields = []string{"tor"}
- eCfg.MediatorAccountFields = []string{"account"}
- eCfg.MediatorDestFields = []string{"destination"}
- eCfg.MediatorTimeAnswerFields = []string{"time_answer"}
- eCfg.MediatorDurationFields = []string{"duration"}
- eCfg.MediatorCDRInDir = "/var/log/freeswitch/cdr-csv"
- eCfg.MediatorCDROutDir = "/var/log/cgrates/cdr/out/freeswitch/csv"
+ eCfg.MediatorRunIds = []string{}
+ eCfg.MediatorSubjectFields = []string{}
+ eCfg.MediatorReqTypeFields = []string{}
+ eCfg.MediatorDirectionFields = []string{}
+ eCfg.MediatorTenantFields = []string{}
+ eCfg.MediatorTORFields = []string{}
+ eCfg.MediatorAccountFields = []string{}
+ eCfg.MediatorDestFields = []string{}
+ eCfg.MediatorAnswerTimeFields = []string{}
+ eCfg.MediatorDurationFields = []string{}
eCfg.SMEnabled = false
eCfg.SMSwitchType = FS
eCfg.SMRater = "127.0.0.1:2012"
@@ -179,12 +194,29 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CDRSMediator = "test"
eCfg.CDRSExportPath = "test"
eCfg.CDRSExportExtraFields = []string{"test"}
+ eCfg.CdrcEnabled = true
+ eCfg.CdrcCdrs = "test"
+ eCfg.CdrcCdrsMethod = "test"
+ eCfg.CdrcRunDelay = 99
+ eCfg.CdrcCdrType = "test"
+ eCfg.CdrcCdrInDir = "test"
+ eCfg.CdrcCdrOutDir = "test"
+ eCfg.CdrcSourceId = "test"
+ eCfg.CdrcAccIdField = "test"
+ eCfg.CdrcReqTypeField = "test"
+ eCfg.CdrcDirectionField = "test"
+ eCfg.CdrcTenantField = "test"
+ eCfg.CdrcTorField = "test"
+ eCfg.CdrcAccountField = "test"
+ eCfg.CdrcSubjectField = "test"
+ eCfg.CdrcDestinationField = "test"
+ eCfg.CdrcAnswerTimeField = "test"
+ eCfg.CdrcDurationField = "test"
+ eCfg.CdrcExtraFields = []string{"test"}
eCfg.MediatorEnabled = true
eCfg.MediatorListen = "test"
eCfg.MediatorRater = "test"
eCfg.MediatorRaterReconnects = 99
- eCfg.MediatorCDRType = "test"
- eCfg.MediatorAccIdField = "test"
eCfg.MediatorRunIds = []string{"test"}
eCfg.MediatorSubjectFields = []string{"test"}
eCfg.MediatorReqTypeFields = []string{"test"}
@@ -193,10 +225,8 @@ func TestConfigFromFile(t *testing.T) {
eCfg.MediatorTORFields = []string{"test"}
eCfg.MediatorAccountFields = []string{"test"}
eCfg.MediatorDestFields = []string{"test"}
- eCfg.MediatorTimeAnswerFields = []string{"test"}
+ eCfg.MediatorAnswerTimeFields = []string{"test"}
eCfg.MediatorDurationFields = []string{"test"}
- eCfg.MediatorCDRInDir = "test"
- eCfg.MediatorCDROutDir = "test"
eCfg.SMEnabled = true
eCfg.SMSwitchType = "test"
eCfg.SMRater = "test"
diff --git a/config/test_data.txt b/config/test_data.txt
index 8e491467b..9bc43ddc2 100644
--- a/config/test_data.txt
+++ b/config/test_data.txt
@@ -50,13 +50,32 @@ mediator = test # Address where to reach the Mediator. Empty for disabling me
export_path = test # Path where exported cdrs will be written
export_extra_fields = test # Extra fields list to be exported
+[cdrc]
+enabled = true # Enable CDR client functionality
+cdrs = test # Address where to reach CDR server
+cdrs_method = test # Mechanism to use when posting CDRs on server
+run_delay = 99 # Period to sleep between two runs, 0 to use automation via inotify
+cdr_type = test # CDR file format .
+cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
+cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be moved after processing.
+cdr_source_id = test # Tag identifying the source of the CDRs within CGRS database.
+accid_field = test # Accounting id field identifier. Use index number in case of .csv cdrs.
+reqtype_field = test # Request type field identifier. Use index number in case of .csv cdrs.
+direction_field = test # Direction field identifier. Use index numbers in case of .csv cdrs.
+tenant_field = test # Tenant field identifier. Use index numbers in case of .csv cdrs.
+tor_field = test # Type of Record field identifier. Use index numbers in case of .csv cdrs.
+account_field = test # Account field identifier. Use index numbers in case of .csv cdrs.
+subject_field = test # Subject field identifier. Use index numbers in case of .csv CDRs.
+destination_field = test # Destination field identifier. Use index numbers in case of .csv cdrs.
+answer_time_field = test # Answer time field identifier. Use index numbers in case of .csv cdrs.
+duration_field = test # Duration field identifier. Use index numbers in case of .csv cdrs.
+extra_fields = test # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
+
[mediator]
enabled = true # Starts Mediator service: .
listen=test # Mediator's listening interface: .
rater = test # Address where to reach the Rater:
rater_reconnects = 99 # Number of reconnects to rater before giving up.
-cdr_type = test # CDR type .
-accid_field = test # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
run_ids = test # Identifiers for each mediation run on CDRs
subject_fields = test # Name of subject fields to be used during mediation. Use index numbers in case of .csv cdrs.
reqtype_fields = test # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
@@ -65,10 +84,8 @@ tenant_fields = test # Name of tenant fields to be used during mediation. Use
tor_fields = test # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
account_fields = test # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
destination_fields = test # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
-time_answer_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
+answer_time_fields = test # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
duration_fields = test # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
-cdr_in_dir = test # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
-cdr_out_dir = test # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
[session_manager]
enabled = true # Starts SessionManager service: .
diff --git a/data/conf/cgrates.cfg b/data/conf/cgrates.cfg
index 7da70c77e..af1e17c3a 100644
--- a/data/conf/cgrates.cfg
+++ b/data/conf/cgrates.cfg
@@ -32,7 +32,6 @@
# rounding_method = *middle # Rounding method for floats/costs: <*up|*middle|*down>
# rounding_decimals = 4 # Number of decimals to round float/costs at
-
[balancer]
# enabled = false # Start Balancer service: .
# listen = 127.0.0.1:2012 # Balancer listen interface: .
@@ -50,29 +49,45 @@
# listen=127.0.0.1:2022 # CDRS's listening interface: .
# extra_fields = # Extra fields to store in CDRs
# mediator = # Address where to reach the Mediator. Empty for disabling mediation. <""|internal>
-# export_path = /var/log/cgrates/cdr/out # Path where the exported CDRs will be placed
+# export_path = /var/log/cgrates/cdr/out/cgr # Path where the exported CDRs will be placed
# export_extra_fields = # List of extra fields to be exported out in CDRs
+[cdrc]
+# enabled = false # Enable CDR client functionality
+# cdrs = 127.0.0.1:2022 # Address where to reach CDR server
+# cdrs_method = http_cgr # Mechanism to use when posting CDRs on server
+# run_delay = 0 # Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
+# cdr_type = csv # CDR file format .
+# cdr_in_dir = /var/log/cgrates/cdr/in/csv # Absolute path towards the directory where the CDRs are stored.
+# cdr_out_dir = /var/log/cgrates/cdr/out/csv # Absolute path towards the directory where processed CDRs will be moved.
+# cdr_source_id = freeswitch_csv # Tag identifying the source of the CDRs within CGRS database.
+# accid_field = 0 # Accounting id field identifier. Use index number in case of .csv cdrs.
+# reqtype_field = 1 # Request type field identifier. Use index number in case of .csv cdrs.
+# direction_field = 2 # Direction field identifier. Use index numbers in case of .csv cdrs.
+# tenant_field = 3 # Tenant field identifier. Use index numbers in case of .csv cdrs.
+# tor_field = 4 # Type of Record field identifier. Use index numbers in case of .csv cdrs.
+# account_field = 5 # Account field identifier. Use index numbers in case of .csv cdrs.
+# subject_field = 6 # Subject field identifier. Use index numbers in case of .csv CDRs.
+# destination_field = 7 # Destination field identifier. Use index numbers in case of .csv cdrs.
+# answer_time_field = 8 # Answer time field identifier. Use index numbers in case of .csv cdrs.
+# duration_field = 9 # Duration field identifier. Use index numbers in case of .csv cdrs.
+# extra_fields = 10:supplier,11:orig_ip # Field identifiers of the fields to add in extra fields section, special format in case of .csv "index1:field1,index2:field2"
+
[mediator]
# enabled = false # Starts Mediator service: .
# listen=internal # Mediator's listening interface: .
# rater = 127.0.0.1:2012 # Address where to reach the Rater:
# rater_reconnects = 3 # Number of reconnects to rater before giving up.
-# accid_field = accid # Name of field identifying accounting id used during mediation. Use index number in case of .csv cdrs.
-# run_ids = default # Identifiers for each mediation run on CDRs
-# subject_fields = subject # Name of fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# reqtype_fields = reqtype # Name of request type fields to be used during mediation. Use index number in case of .csv cdrs.
-# direction_fields = direction # Name of direction fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# tenant_fields = tenant # Name of tenant fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# tor_fields = tor # Name of tor fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# account_fields = account # Name of account fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# destination_fields = destination # Name of destination fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# time_answer_fields = time_answer # Name of time_answer fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# duration_fields = duration # Name of duration fields to be used during mediation. Use index numbers in case of .csv cdrs.
-# cdr_type = # CDR type, used when running mediator as service .
-# cdr_in_dir = /var/log/freeswitch/cdr-csv # Absolute path towards the directory where the CDRs are kept (file stored CDRs).
-# cdr_out_dir = /var/log/cgrates/cdr/out/freeswitch/csv
- # Absolute path towards the directory where processed CDRs will be exported (file stored CDRs).
+# run_ids = # Identifiers of each extra mediation to run on CDRs
+# reqtype_fields = # Name of request type fields to be used during extra mediation. Use index number in case of .csv cdrs.
+# direction_fields = # Name of direction fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# tenant_fields = # Name of tenant fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# tor_fields = # Name of tor fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# account_fields = # Name of account fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# subject_fields = # Name of fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# destination_fields = # Name of destination fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# answer_time_fields = # Name of time_answer fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
+# duration_fields = # Name of duration fields to be used during extra mediation. Use index numbers in case of .csv cdrs.
[session_manager]
# enabled = false # Starts SessionManager service: .
diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql
index e8f453f44..43d6f5bf5 100644
--- a/data/storage/mysql/create_cdrs_tables.sql
+++ b/data/storage/mysql/create_cdrs_tables.sql
@@ -5,6 +5,7 @@ CREATE TABLE cdrs_primary (
cgrid char(40) NOT NULL,
accid varchar(64) NOT NULL,
cdrhost varchar(64) NOT NULL,
+ cdrsource varchar(64) NOT NULL,
reqtype varchar(24) NOT NULL,
direction varchar(8) NOT NULL,
tenant varchar(64) NOT NULL,
diff --git a/data/storage/mysql/create_mediator_tables.sql b/data/storage/mysql/create_mediator_tables.sql
index d5c98120a..0b2e017d2 100644
--- a/data/storage/mysql/create_mediator_tables.sql
+++ b/data/storage/mysql/create_mediator_tables.sql
@@ -6,9 +6,10 @@ DROP TABLE IF EXISTS `rated_cdrs`;
CREATE TABLE `rated_cdrs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`cgrid` char(40) NOT NULL,
+ `runid` varchar(64) NOT NULL,
`subject` varchar(64) NOT NULL,
`cost` DECIMAL(20,4) DEFAULT NULL,
`extra_info` text,
PRIMARY KEY (`id`),
- UNIQUE KEY `costid` (`cgrid`,`subject`)
+ UNIQUE KEY `costid` (`cgrid`,`runid`)
);
diff --git a/mediator/mediator.go b/mediator/mediator.go
index 3f4f0d563..a86ccfd98 100644
--- a/mediator/mediator.go
+++ b/mediator/mediator.go
@@ -27,7 +27,6 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
- "github.com/howeyc/fsnotify"
"os"
"path"
"strconv"
@@ -83,20 +82,19 @@ func (self *Mediator) loadConfig() error {
self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields,
self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields}
- refIdx := 0 // Subject becomes reference for our checks
- if len(cfgVals[refIdx]) == 0 {
- return fmt.Errorf("Unconfigured %s fields", fieldKeys[refIdx])
+ if len(self.cgrCfg.MediatorRunIds) == 0 {
+ return errors.New("Unconfigured mediator run_ids")
}
// All other configured fields must match the length of reference fields
for iCfgVal := range cfgVals {
- if len(cfgVals[refIdx]) != len(cfgVals[iCfgVal]) {
- // Make sure we have everywhere the length of reference key (subject)
+ if len(self.cgrCfg.MediatorRunIds) != len(cfgVals[iCfgVal]) {
+ // Make sure we have everywhere the length of runIds
return errors.New("Inconsistent lenght of mediator fields.")
}
}
// AccIdField has no special requirements, should just exist
- if self.cgrCfg.MediatorAccIdField == "" {
+ if len(self.cgrCfg.MediatorAccIdField) == 0 {
return errors.New("Undefined mediator accid field")
}
self.accIdField = self.cgrCfg.MediatorAccIdField
@@ -136,35 +134,6 @@ func (self *Mediator) loadConfig() error {
return nil
}
-// Watch the specified folder for file moves and parse the files on events
-func (self *Mediator) TrackCDRFiles() (err error) {
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- return
- }
- defer watcher.Close()
- err = watcher.Watch(self.cdrInDir)
- if err != nil {
- return
- }
- engine.Logger.Info(fmt.Sprintf("Monitoring %s for file moves.", self.cdrInDir))
- for {
- select {
- case ev := <-watcher.Event:
- if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
- engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
- err = self.MediateCSVCDR(ev.Name)
- if err != nil {
- return err
- }
- }
- case err := <-watcher.Error:
- engine.Logger.Err(fmt.Sprintf("Inotify error: %s", err.Error()))
- }
- }
- return
-}
-
// Retrive the cost from logging database
func (self *Mediator) getCostsFromDB(cdr utils.CDR) (cc *engine.CallCost, err error) {
for i := 0; i < 3; i++ { // Mechanism to avoid concurrency between SessionManager writing the costs and mediator picking them up
@@ -217,66 +186,7 @@ func (self *Mediator) getCostsFromRater(cdr utils.CDR) (*engine.CallCost, error)
return cc, err
}
-// Parse the files and get cost for every record
-func (self *Mediator) MediateCSVCDR(cdrfn string) (err error) {
- flag.Parse()
- file, err := os.Open(cdrfn)
- defer file.Close()
- if err != nil {
- engine.Logger.Crit(err.Error())
- os.Exit(1)
- }
- csvReader := csv.NewReader(bufio.NewReader(file))
- _, fn := path.Split(cdrfn)
- fout, err := os.Create(path.Join(self.cdrOutDir, fn))
- if err != nil {
- return err
- }
- defer fout.Close()
-
- w := bufio.NewWriter(fout)
- for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
- //t, _ := time.Parse("2006-01-02 15:04:05", record[5])
- var cc *engine.CallCost
-
- for runIdx := range self.fieldIdxs["subject"] { // Query costs for every run index given by subject
- csvCDR, errCDR := NewFScsvCDR(record, self.accIdIdx,
- self.fieldIdxs["subject"][runIdx],
- self.fieldIdxs["reqtype"][runIdx],
- self.fieldIdxs["direction"][runIdx],
- self.fieldIdxs["tenant"][runIdx],
- self.fieldIdxs["tor"][runIdx],
- self.fieldIdxs["account"][runIdx],
- self.fieldIdxs["destination"][runIdx],
- self.fieldIdxs["answer_time"][runIdx],
- self.fieldIdxs["duration"][runIdx],
- self.cgrCfg)
- if errCDR != nil {
- engine.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>",
- record[self.accIdIdx], errCDR.Error()))
- }
- var errCost error
- if csvCDR.GetReqType() == utils.PREPAID || csvCDR.GetReqType() == utils.POSTPAID {
- // Should be previously calculated and stored in DB
- cc, errCost = self.getCostsFromDB(csvCDR)
- } else {
- cc, errCost = self.getCostsFromRater(csvCDR)
- }
- cost := "-1"
- if errCost != nil || cc == nil {
- engine.Logger.Err(fmt.Sprintf(" Could not calculate price for accid: <%s>, err: <%s>, cost: <%v>", csvCDR.GetAccId(), err.Error(), cc))
- } else {
- cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
- engine.Logger.Debug(fmt.Sprintf("Calculated for accid:%s, cost: %v", csvCDR.GetAccId(), cost))
- }
- record = append(record, cost)
- }
- w.WriteString(strings.Join(record, ",") + "\n")
- }
- w.Flush()
- return
-}
func (self *Mediator) MediateDBCDR(cdr utils.CDR) error {
var qryCC *engine.CallCost
diff --git a/utils/cdr.go b/utils/cdr.go
index 2c3cbb28b..dc1f179aa 100644
--- a/utils/cdr.go
+++ b/utils/cdr.go
@@ -22,6 +22,8 @@ import (
"time"
)
+var PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
+
type CDR interface {
GetCgrId() string
GetAccId() string
diff --git a/utils/consts.go b/utils/consts.go
index bc24fa015..ec820bf6a 100644
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -63,4 +63,16 @@ const (
JSON = "json"
MSGPACK = "msgpack"
CSV_LOAD = "CSVLOAD"
+ ACCID = "accid"
+ CDRHOST = "cdrhost"
+ CDRSOURCE = "cdrsource"
+ REQTYPE = "reqtype"
+ DIRECTION = "direction"
+ TENANT = "tenant"
+ TOR = "tor"
+ ACCOUNT = "account"
+ SUBJECT = "subject"
+ DESTINATION = "destination"
+ ANSWER_TIME = "answer_time"
+ DURATION = "duration"
)