refactored mediator index parsing

This commit is contained in:
Radu Ioan Fericean
2012-09-26 12:06:02 +03:00
parent 3ba5c28125
commit 2b974655f4
2 changed files with 89 additions and 70 deletions

View File

@@ -93,15 +93,15 @@ var (
freeswitch_server = "localhost:8021" // freeswitch address host:port
freeswitch_pass = "ClueCon" // reeswitch address host:port
freeswitch_direction = mediator.MediatorFieldIdxs{}
freeswitch_tor = mediator.MediatorFieldIdxs{}
freeswitch_tenant = mediator.MediatorFieldIdxs{}
freeswitch_subject = mediator.MediatorFieldIdxs{}
freeswitch_account = mediator.MediatorFieldIdxs{}
freeswitch_destination = mediator.MediatorFieldIdxs{}
freeswitch_time_start = mediator.MediatorFieldIdxs{}
freeswitch_duration = mediator.MediatorFieldIdxs{}
freeswitch_uuid = mediator.MediatorFieldIdxs{}
freeswitch_direction = ""
freeswitch_tor = ""
freeswitch_tenant = ""
freeswitch_subject = ""
freeswitch_account = ""
freeswitch_destination = ""
freeswitch_time_start = ""
freeswitch_duration = ""
freeswitch_uuid = ""
cfgParseErr error
@@ -150,15 +150,15 @@ func readConfig(c *conf.ConfigFile) {
freeswitch_server, _ = c.GetString("freeswitch", "server")
freeswitch_pass, _ = c.GetString("freeswitch", "pass")
freeswitch_tor, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tor_index")
freeswitch_tenant, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tenant_index")
freeswitch_direction, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "direction_index")
freeswitch_subject, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "subject_index")
freeswitch_account, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "account_index")
freeswitch_destination, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "destination_index")
freeswitch_time_start, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "time_start_index")
freeswitch_duration, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "duration_index")
freeswitch_uuid, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "uuid_index")
freeswitch_tor, _ = c.GetString("freeswitch", "tor_index")
freeswitch_tenant, _ = c.GetString("freeswitch", "tenant_index")
freeswitch_direction, _ = c.GetString("freeswitch", "direction_index")
freeswitch_subject, _ = c.GetString("freeswitch", "subject_index")
freeswitch_account, _ = c.GetString("freeswitch", "account_index")
freeswitch_destination, _ = c.GetString("freeswitch", "destination_index")
freeswitch_time_start, _ = c.GetString("freeswitch", "time_start_index")
freeswitch_duration, _ = c.GetString("freeswitch", "duration_index")
freeswitch_uuid, _ = c.GetString("freeswitch", "uuid_index")
}
func listenToRPCRequests(rpcResponder interface{}, rpcAddress string, rpc_encoding string) {
@@ -221,16 +221,13 @@ func startMediator(responder *timespans.Responder, loggerDb timespans.DataStorag
timespans.Logger.Crit(fmt.Sprintf("Errors on config parsing: <%v>", cfgParseErr))
exitChan <- true
}
// Make sure all indexes are having same lenght
refLen := len(freeswitch_subject)
for _,fldIdxs := range []mediator.MediatorFieldIdxs{freeswitch_tor, freeswitch_tenant, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid} {
if len(fldIdxs) != refLen {
timespans.Logger.Crit(fmt.Sprintf("All mediator index elements must be of the same size: %d(freeswitch_subject)", len(freeswitch_subject)))
exitChan <- true
}
m, err := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid)
if err != nil || m.ValidateIndexses() {
timespans.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
exitChan <- true
}
m := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid)
m.TrackCDRFiles(mediator_cdr_path)
}

View File

@@ -30,10 +30,25 @@ import (
"strconv"
"strings"
"time"
"code.google.com/p/goconf/conf"
)
type MediatorFieldIdxs []int
type mediatorFieldIdxs []int
// Extends goconf to provide us the slice with indexes we need for multiple mediation
func (mfi *mediatorFieldIdxs) Load(idxs string) error {
cfgStrIdxs := strings.Split(idxs, ",")
if len(cfgStrIdxs) == 0 {
return fmt.Errorf("Undefined %s", idxs)
}
for _, cfgStrIdx := range cfgStrIdxs {
if cfgIntIdx, errConv := strconv.Atoi(cfgStrIdx); errConv != nil || cfgStrIdx == "" {
return fmt.Errorf("All %s members must be ints", idxs)
} else {
*mfi = append(*mfi, cfgIntIdx)
}
}
return nil
}
type Mediator struct {
connector timespans.Connector
@@ -48,61 +63,68 @@ type Mediator struct {
destinationIndexs,
timeStartIndexs,
durationIndexs,
uuidIndexs MediatorFieldIdxs
uuidIndexs mediatorFieldIdxs
}
func NewMediator(connector timespans.Connector,
loggerDb timespans.DataStorage,
skipDb bool,
outputDir string,
directionIndexs,
torIndexs,
tenantIndexs,
subjectIndexs,
accountIndexs,
destinationIndexs,
timeStartIndexs,
durationIndexs,
uuidIndexs MediatorFieldIdxs) *Mediator {
return &Mediator{
directionIndexs, torIndexs, tenantIndexs, subjectIndexs, accountIndexs, destinationIndexs, timeStartIndexs, durationIndexs, uuidIndexs string) (m *Mediator, err error) {
m = &Mediator{
connector: connector,
loggerDb: loggerDb,
skipDb: skipDb,
outputDir: outputDir,
directionIndexs: directionIndexs,
torIndexs: torIndexs,
tenantIndexs: tenantIndexs,
subjectIndexs: subjectIndexs,
accountIndexs: accountIndexs,
destinationIndexs: destinationIndexs,
timeStartIndexs: timeStartIndexs,
durationIndexs: durationIndexs,
uuidIndexs: uuidIndexs,
}
err = m.directionIndexs.Load(directionIndexs)
if err != nil {
return
}
err = m.torIndexs.Load(torIndexs)
if err != nil {
return
}
err = m.tenantIndexs.Load(tenantIndexs)
if err != nil {
return
}
err = m.subjectIndexs.Load(subjectIndexs)
if err != nil {
return
}
err = m.accountIndexs.Load(accountIndexs)
if err != nil {
return
}
err = m.destinationIndexs.Load(destinationIndexs)
if err != nil {
return
}
err = m.timeStartIndexs.Load(timeStartIndexs)
if err != nil {
return
}
err = m.durationIndexs.Load(durationIndexs)
if err != nil {
return
}
err = m.uuidIndexs.Load(uuidIndexs)
return
}
// Extends goconf to provide us the slice with indexes we need for multiple mediation
func GetFieldIdxs(cfg *conf.ConfigFile, section, option string) (MediatorFieldIdxs, error) {
strConf, err := cfg.GetString(section, option)
if err != nil {
return nil, err
}
cfgStrIdxs := strings.Split(strConf,",")
if len(cfgStrIdxs) == 0 {
return nil, fmt.Errorf("Undefined %s in section %s",option, section)
}
retIdxs := make( MediatorFieldIdxs, len(cfgStrIdxs) )
for i,cfgStrIdx := range cfgStrIdxs {
if cfgIntIdx,errConv := strconv.Atoi(cfgStrIdx); errConv!= nil || cfgStrIdx == ""{
return nil, fmt.Errorf("All [%s]-%s members must be ints",section, option)
} else {
retIdxs[i] = cfgIntIdx
// Make sure all indexes are having same lenght
func (m *Mediator) ValidateIndexses() bool {
refLen := len(m.subjectIndexs)
for _, fldIdxs := range []mediatorFieldIdxs{m.directionIndexs, m.torIndexs, m.tenantIndexs,
m.accountIndexs, m.destinationIndexs, m.timeStartIndexs, m.durationIndexs, m.uuidIndexs} {
if len(fldIdxs) != refLen {
return false
}
}
return retIdxs, nil
return true
}
func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) {
watcher, err := inotify.NewWatcher()
if err != nil {
@@ -152,7 +174,7 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
//t, _ := time.Parse("2012-05-21 17:48:20", record[5])
var cc *timespans.CallCost
for runIdx := range(m.subjectIndexs) { // Query costs for every run index given by subject
for runIdx := range m.subjectIndexs { // Query costs for every run index given by subject
if runIdx == 0 && !m.skipDb { // The first index is matching the session manager one
cc, err = m.GetCostsFromDB(record, runIdx)
if err != nil || cc == nil { // Fallback on rater if no db record found
@@ -191,8 +213,8 @@ func (m *Mediator) GetCostsFromRater(record []string, runIdx int) (cc *timespans
cc = &timespans.CallCost{}
if d.Seconds() == 0 { // failed call, returning empty callcost, no error
return cc, nil
}
return cc, nil
}
t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndexs[runIdx]])
if err != nil {
return