Adding multiple run for mediated cdr row

This commit is contained in:
DanB
2012-09-25 13:36:58 +02:00
parent 752bd98123
commit e51177a77d
2 changed files with 117 additions and 119 deletions

View File

@@ -93,15 +93,17 @@ var (
freeswitch_server = "localhost:8021" // freeswitch address host:port
freeswitch_pass = "ClueCon" // reeswitch address host:port
freeswitch_direction = ""
freeswitch_tor = ""
freeswitch_tenant = ""
freeswitch_subject = ""
freeswitch_account = ""
freeswitch_destination = ""
freeswitch_time_start = ""
freeswitch_duration = ""
freeswitch_uuid = ""
freeswitch_direction = mediator.MediatorFieldIdxs{}
freeswitch_tor = mediator.MediatorFieldIdxs{}
freeswitch_tenant = mediator.MediatorFieldIdxs{}
freeswitch_subject = mediator.MediatorFieldIdxs{}
freeswitch_account = mediator.MediatorFieldIdxs{}
freeswitch_destination = mediator.MediatorFieldIdxs{}
freeswitch_time_start = mediator.MediatorFieldIdxs{}
freeswitch_duration = mediator.MediatorFieldIdxs{}
freeswitch_uuid = mediator.MediatorFieldIdxs{}
cfgParseErr error
bal = balancer2go.NewBalancer()
exitChan = make(chan bool)
@@ -148,15 +150,15 @@ func readConfig(c *conf.ConfigFile) {
freeswitch_server, _ = c.GetString("freeswitch", "server")
freeswitch_pass, _ = c.GetString("freeswitch", "pass")
freeswitch_tor, _ = c.GetString("freeswitch", "tor_index")
freeswitch_tenant, _ = c.GetString("freeswitch", "tenant_index")
freeswitch_direction, _ = c.GetString("freeswitch", "direction_index")
freeswitch_subject, _ = c.GetString("freeswitch", "subject_index")
freeswitch_account, _ = c.GetString("freeswitch", "account_index")
freeswitch_destination, _ = c.GetString("freeswitch", "destination_index")
freeswitch_time_start, _ = c.GetString("freeswitch", "time_start_index")
freeswitch_duration, _ = c.GetString("freeswitch", "duration_index")
freeswitch_uuid, _ = c.GetString("freeswitch", "uuid_index")
freeswitch_tor, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tor_index")
freeswitch_tenant, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tenant_index")
freeswitch_direction, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "direction_index")
freeswitch_subject, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "subject_index")
freeswitch_account, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "account_index")
freeswitch_destination, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "destination_index")
freeswitch_time_start, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "time_start_index")
freeswitch_duration, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "duration_index")
freeswitch_uuid, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "uuid_index")
}
func listenToRPCRequests(rpcResponder interface{}, rpcAddress string, rpc_encoding string) {
@@ -214,11 +216,21 @@ func startMediator(responder *timespans.Responder, loggerDb timespans.DataStorag
timespans.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", mediator_cdr_out_path))
exitChan <- true
}
m, err := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid)
if err != nil {
timespans.Logger.Crit(fmt.Sprintf("Failed to start mediator: %v", err))
// Check parsing errors
if cfgParseErr != nil {
timespans.Logger.Crit(fmt.Sprintf("Errors on config parsing: <%v>", cfgParseErr))
exitChan <- true
}
// Make sure all indexes are having same lenght
refLen := len(freeswitch_subject)
for _,fldIdxs := range []mediator.MediatorFieldIdxs{freeswitch_tor, freeswitch_tenant, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid} {
if len(fldIdxs) != refLen {
timespans.Logger.Crit(fmt.Sprintf("All mediator index elements must be of the same size: %d(freeswitch_subject)", freeswitch_subject))
exitChan <- true
}
}
m := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid)
m.TrackCDRFiles(mediator_cdr_path)
}

View File

@@ -30,93 +30,79 @@ import (
"strconv"
"strings"
"time"
"code.google.com/p/goconf/conf"
)
type csvindex int
type MediatorFieldIdxs []int
type Mediator struct {
connector timespans.Connector
loggerDb timespans.DataStorage
skipDb bool
outputDir string
directionIndex,
torIndex,
tenantIndex,
subjectIndex,
accountIndex,
destinationIndex,
timeStartIndex,
durationIndex,
uuidIndex csvindex
directionIndexs,
torIndexs,
tenantIndexs,
subjectIndexs,
accountIndexs,
destinationIndexs,
timeStartIndexs,
durationIndexs,
uuidIndexs MediatorFieldIdxs
}
func NewMediator(connector timespans.Connector,
loggerDb timespans.DataStorage,
skipDb bool,
outputDir,
directionIndex,
torIndex,
tenantIndex,
subjectIndex,
accountIndex,
destinationIndex,
timeStartIndex,
durationIndex,
uuidIndex string) (*Mediator, error) {
m := &Mediator{
outputDir string,
directionIndexs,
torIndexs,
tenantIndexs,
subjectIndexs,
accountIndexs,
destinationIndexs,
timeStartIndexs,
durationIndexs,
uuidIndexs MediatorFieldIdxs) *Mediator {
return &Mediator{
connector: connector,
loggerDb: loggerDb,
skipDb: skipDb,
outputDir: outputDir,
directionIndexs: directionIndexs,
torIndexs: torIndexs,
tenantIndexs: tenantIndexs,
subjectIndexs: subjectIndexs,
accountIndexs: accountIndexs,
destinationIndexs: destinationIndexs,
timeStartIndexs: timeStartIndexs,
durationIndexs: durationIndexs,
uuidIndexs: uuidIndexs,
}
/*i, err := strconv.Atoi(directionIndex)
if err != nil {
return nil, err
}
m.directionIndex = csvindex(i)*/
i, err := strconv.Atoi(torIndex)
if err != nil {
return nil, err
}
m.torIndex = csvindex(i)
i, err = strconv.Atoi(tenantIndex)
if err != nil {
return nil, err
}
m.tenantIndex = csvindex(i)
i, err = strconv.Atoi(subjectIndex)
if err != nil {
return nil, err
}
m.subjectIndex = csvindex(i)
i, err = strconv.Atoi(accountIndex)
if err != nil {
return nil, err
}
m.accountIndex = csvindex(i)
i, err = strconv.Atoi(destinationIndex)
if err != nil {
return nil, err
}
m.destinationIndex = csvindex(i)
i, err = strconv.Atoi(timeStartIndex)
if err != nil {
return nil, err
}
m.timeStartIndex = csvindex(i)
i, err = strconv.Atoi(durationIndex)
if err != nil {
return nil, err
}
m.durationIndex = csvindex(i)
i, err = strconv.Atoi(uuidIndex)
if err != nil {
return nil, err
}
m.uuidIndex = csvindex(i)
return m, nil
}
// Extends goconf to provide us the slice with indexes we need for multiple mediation
func GetFieldIdxs(cfg *conf.ConfigFile, section, option string) (MediatorFieldIdxs, error) {
strConf, err := cfg.GetString(section, option)
if err != nil {
return nil, err
}
cfgStrIdxs := strings.Split(strConf,",")
if len(cfgStrIdxs) == 0 {
return nil, fmt.Errorf("Undefined %s in section %s",option, section)
}
retIdxs := make( MediatorFieldIdxs, len(cfgStrIdxs) )
for i,cfgStrIdx := range cfgStrIdxs {
if cfgIntIdx,errConv := strconv.Atoi(cfgStrIdx); errConv!= nil || cfgStrIdx == ""{
return nil, fmt.Errorf("All [%s]-%s members must be ints",section, option)
} else {
retIdxs[i] = cfgIntIdx
}
}
return retIdxs, nil
}
func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) {
watcher, err := inotify.NewWatcher()
if err != nil {
@@ -166,19 +152,24 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() {
//t, _ := time.Parse("2012-05-21 17:48:20", record[5])
var cc *timespans.CallCost
if !m.skipDb {
cc, err = m.GetCostsFromDB(record)
} else {
cc, err = m.GetCostsFromRater(record)
for runIdx := range(m.subjectIndexs) { // Query costs for every run index given by subject
if runIdx == 0 && !m.skipDb { // The first index is matching the session manager one
cc, err = m.GetCostsFromDB(record, runIdx)
if err != nil || cc == nil { // Fallback on rater if no db record found
cc, err = m.GetCostsFromRater(record, runIdx)
}
} else {
cc, err = m.GetCostsFromRater(record, runIdx)
}
cost := "-1"
if err != nil {
timespans.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record with uuid:%s and subject:%s - %s", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], err.Error()))
} else {
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
timespans.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, subject:%s cost: %v", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], cost))
}
record = append(record, cost)
}
cost := "-1"
if err != nil {
timespans.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record (%s): %v", record[m.uuidIndex], err))
} else {
timespans.Logger.Debug(fmt.Sprintf("Calculated for %s cost: %v", record[m.uuidIndex], strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)))
cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64)
}
record = append(record, cost)
w.WriteString(strings.Join(record, ",") + "\n")
}
@@ -186,45 +177,40 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) {
return
}
func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) {
searchedUUID := record[m.uuidIndex]
func (m *Mediator) GetCostsFromDB(record []string, runIdx int) (cc *timespans.CallCost, err error) {
searchedUUID := record[m.uuidIndexs[runIdx]]
cc, err = m.loggerDb.GetCallCostLog(searchedUUID, timespans.SESSION_MANAGER_SOURCE)
if err != nil || cc == nil {
cc, err = m.GetCostsFromRater(record)
}
return
}
func (m *Mediator) GetCostsFromRater(record []string) (cc *timespans.CallCost, err error) {
d, err := time.ParseDuration(record[m.durationIndex] + "s")
func (m *Mediator) GetCostsFromRater(record []string, runIdx int) (cc *timespans.CallCost, err error) {
d, err := time.ParseDuration(record[m.durationIndexs[runIdx]] + "s")
if err != nil {
return
}
cc = &timespans.CallCost{}
if d.Seconds() == 0 { // failed call
return cc, nil
}
t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndex])
cc = &timespans.CallCost{}
t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndexs[runIdx]])
if err != nil {
return
}
cd := timespans.CallDescriptor{
Direction: "OUT", //record[m.directionIndex] TODO: fix me
Tenant: record[m.tenantIndex],
TOR: record[m.torIndex],
Subject: record[m.subjectIndex],
Account: record[m.accountIndex],
Destination: record[m.destinationIndex],
Direction: "OUT", //record[m.directionIndexs[runIdx]] TODO: fix me
Tenant: record[m.tenantIndexs[runIdx]],
TOR: record[m.torIndexs[runIdx]],
Subject: record[m.subjectIndexs[runIdx]],
Account: record[m.accountIndexs[runIdx]],
Destination: record[m.destinationIndexs[runIdx]],
TimeStart: t1,
TimeEnd: t1.Add(d)}
err = m.connector.GetCost(cd, cc)
if err != nil {
m.loggerDb.LogError(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, err.Error())
m.loggerDb.LogError(record[m.uuidIndexs[runIdx]], timespans.MEDIATOR_SOURCE, err.Error())
} else {
m.loggerDb.LogCallCost(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, cc)
m.loggerDb.LogCallCost(record[m.uuidIndexs[runIdx]], timespans.MEDIATOR_SOURCE, cc)
}
return
}