From e51177a77d0436c0ed705142e49a65efd171f914 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 25 Sep 2012 13:36:58 +0200 Subject: [PATCH] Adding multiple run for mediated cdr row --- cmd/cgr-rater/cgr-rater.go | 54 ++++++----- mediator/mediator.go | 182 +++++++++++++++++-------------------- 2 files changed, 117 insertions(+), 119 deletions(-) diff --git a/cmd/cgr-rater/cgr-rater.go b/cmd/cgr-rater/cgr-rater.go index d424ca1e5..f1c97e08d 100644 --- a/cmd/cgr-rater/cgr-rater.go +++ b/cmd/cgr-rater/cgr-rater.go @@ -93,15 +93,17 @@ var ( freeswitch_server = "localhost:8021" // freeswitch address host:port freeswitch_pass = "ClueCon" // reeswitch address host:port - freeswitch_direction = "" - freeswitch_tor = "" - freeswitch_tenant = "" - freeswitch_subject = "" - freeswitch_account = "" - freeswitch_destination = "" - freeswitch_time_start = "" - freeswitch_duration = "" - freeswitch_uuid = "" + freeswitch_direction = mediator.MediatorFieldIdxs{} + freeswitch_tor = mediator.MediatorFieldIdxs{} + freeswitch_tenant = mediator.MediatorFieldIdxs{} + freeswitch_subject = mediator.MediatorFieldIdxs{} + freeswitch_account = mediator.MediatorFieldIdxs{} + freeswitch_destination = mediator.MediatorFieldIdxs{} + freeswitch_time_start = mediator.MediatorFieldIdxs{} + freeswitch_duration = mediator.MediatorFieldIdxs{} + freeswitch_uuid = mediator.MediatorFieldIdxs{} + + cfgParseErr error bal = balancer2go.NewBalancer() exitChan = make(chan bool) @@ -148,15 +150,15 @@ func readConfig(c *conf.ConfigFile) { freeswitch_server, _ = c.GetString("freeswitch", "server") freeswitch_pass, _ = c.GetString("freeswitch", "pass") - freeswitch_tor, _ = c.GetString("freeswitch", "tor_index") - freeswitch_tenant, _ = c.GetString("freeswitch", "tenant_index") - freeswitch_direction, _ = c.GetString("freeswitch", "direction_index") - freeswitch_subject, _ = c.GetString("freeswitch", "subject_index") - freeswitch_account, _ = c.GetString("freeswitch", "account_index") - freeswitch_destination, _ = c.GetString("freeswitch", "destination_index") - freeswitch_time_start, _ = c.GetString("freeswitch", "time_start_index") - freeswitch_duration, _ = c.GetString("freeswitch", "duration_index") - freeswitch_uuid, _ = c.GetString("freeswitch", "uuid_index") + freeswitch_tor, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tor_index") + freeswitch_tenant, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "tenant_index") + freeswitch_direction, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "direction_index") + freeswitch_subject, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "subject_index") + freeswitch_account, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "account_index") + freeswitch_destination, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "destination_index") + freeswitch_time_start, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "time_start_index") + freeswitch_duration, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "duration_index") + freeswitch_uuid, cfgParseErr = mediator.GetFieldIdxs(c, "freeswitch", "uuid_index") } func listenToRPCRequests(rpcResponder interface{}, rpcAddress string, rpc_encoding string) { @@ -214,11 +216,21 @@ func startMediator(responder *timespans.Responder, loggerDb timespans.DataStorag timespans.Logger.Crit(fmt.Sprintf("The output path for mediator does not exist: %v", mediator_cdr_out_path)) exitChan <- true } - m, err := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid) - if err != nil { - timespans.Logger.Crit(fmt.Sprintf("Failed to start mediator: %v", err)) + // Check parsing errors + if cfgParseErr != nil { + timespans.Logger.Crit(fmt.Sprintf("Errors on config parsing: <%v>", cfgParseErr)) exitChan <- true } + // Make sure all indexes are having same lenght + refLen := len(freeswitch_subject) + for _,fldIdxs := range []mediator.MediatorFieldIdxs{freeswitch_tor, freeswitch_tenant, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid} { + if len(fldIdxs) != refLen { + timespans.Logger.Crit(fmt.Sprintf("All mediator index elements must be of the same size: %d(freeswitch_subject)", freeswitch_subject)) + exitChan <- true + } + } + + m := mediator.NewMediator(connector, loggerDb, mediator_skipdb, mediator_cdr_out_path, freeswitch_direction, freeswitch_tor, freeswitch_tenant, freeswitch_subject, freeswitch_account, freeswitch_destination, freeswitch_time_start, freeswitch_duration, freeswitch_uuid) m.TrackCDRFiles(mediator_cdr_path) } diff --git a/mediator/mediator.go b/mediator/mediator.go index 59d937841..e27a1aec6 100644 --- a/mediator/mediator.go +++ b/mediator/mediator.go @@ -30,93 +30,79 @@ import ( "strconv" "strings" "time" + "code.google.com/p/goconf/conf" ) -type csvindex int +type MediatorFieldIdxs []int type Mediator struct { connector timespans.Connector loggerDb timespans.DataStorage skipDb bool outputDir string - directionIndex, - torIndex, - tenantIndex, - subjectIndex, - accountIndex, - destinationIndex, - timeStartIndex, - durationIndex, - uuidIndex csvindex + directionIndexs, + torIndexs, + tenantIndexs, + subjectIndexs, + accountIndexs, + destinationIndexs, + timeStartIndexs, + durationIndexs, + uuidIndexs MediatorFieldIdxs } func NewMediator(connector timespans.Connector, loggerDb timespans.DataStorage, skipDb bool, - outputDir, - directionIndex, - torIndex, - tenantIndex, - subjectIndex, - accountIndex, - destinationIndex, - timeStartIndex, - durationIndex, - uuidIndex string) (*Mediator, error) { - m := &Mediator{ + outputDir string, + directionIndexs, + torIndexs, + tenantIndexs, + subjectIndexs, + accountIndexs, + destinationIndexs, + timeStartIndexs, + durationIndexs, + uuidIndexs MediatorFieldIdxs) *Mediator { + return &Mediator{ connector: connector, loggerDb: loggerDb, skipDb: skipDb, outputDir: outputDir, + directionIndexs: directionIndexs, + torIndexs: torIndexs, + tenantIndexs: tenantIndexs, + subjectIndexs: subjectIndexs, + accountIndexs: accountIndexs, + destinationIndexs: destinationIndexs, + timeStartIndexs: timeStartIndexs, + durationIndexs: durationIndexs, + uuidIndexs: uuidIndexs, } - /*i, err := strconv.Atoi(directionIndex) - if err != nil { - return nil, err - } - m.directionIndex = csvindex(i)*/ - i, err := strconv.Atoi(torIndex) - if err != nil { - return nil, err - } - m.torIndex = csvindex(i) - i, err = strconv.Atoi(tenantIndex) - if err != nil { - return nil, err - } - m.tenantIndex = csvindex(i) - i, err = strconv.Atoi(subjectIndex) - if err != nil { - return nil, err - } - m.subjectIndex = csvindex(i) - i, err = strconv.Atoi(accountIndex) - if err != nil { - return nil, err - } - m.accountIndex = csvindex(i) - i, err = strconv.Atoi(destinationIndex) - if err != nil { - return nil, err - } - m.destinationIndex = csvindex(i) - i, err = strconv.Atoi(timeStartIndex) - if err != nil { - return nil, err - } - m.timeStartIndex = csvindex(i) - i, err = strconv.Atoi(durationIndex) - if err != nil { - return nil, err - } - m.durationIndex = csvindex(i) - i, err = strconv.Atoi(uuidIndex) - if err != nil { - return nil, err - } - m.uuidIndex = csvindex(i) - return m, nil } +// Extends goconf to provide us the slice with indexes we need for multiple mediation +func GetFieldIdxs(cfg *conf.ConfigFile, section, option string) (MediatorFieldIdxs, error) { + strConf, err := cfg.GetString(section, option) + if err != nil { + return nil, err + } + cfgStrIdxs := strings.Split(strConf,",") + if len(cfgStrIdxs) == 0 { + return nil, fmt.Errorf("Undefined %s in section %s",option, section) + } + retIdxs := make( MediatorFieldIdxs, len(cfgStrIdxs) ) + for i,cfgStrIdx := range cfgStrIdxs { + if cfgIntIdx,errConv := strconv.Atoi(cfgStrIdx); errConv!= nil || cfgStrIdx == ""{ + return nil, fmt.Errorf("All [%s]-%s members must be ints",section, option) + } else { + retIdxs[i] = cfgIntIdx + } + } + return retIdxs, nil +} + + func (m *Mediator) TrackCDRFiles(cdrPath string) (err error) { watcher, err := inotify.NewWatcher() if err != nil { @@ -166,19 +152,24 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { for record, ok := csvReader.Read(); ok == nil; record, ok = csvReader.Read() { //t, _ := time.Parse("2012-05-21 17:48:20", record[5]) var cc *timespans.CallCost - if !m.skipDb { - cc, err = m.GetCostsFromDB(record) - } else { - cc, err = m.GetCostsFromRater(record) + for runIdx := range(m.subjectIndexs) { // Query costs for every run index given by subject + if runIdx == 0 && !m.skipDb { // The first index is matching the session manager one + cc, err = m.GetCostsFromDB(record, runIdx) + if err != nil || cc == nil { // Fallback on rater if no db record found + cc, err = m.GetCostsFromRater(record, runIdx) + } + } else { + cc, err = m.GetCostsFromRater(record, runIdx) + } + cost := "-1" + if err != nil { + timespans.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record with uuid:%s and subject:%s - %s", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], err.Error())) + } else { + cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) + timespans.Logger.Debug(fmt.Sprintf("Calculated for uuid:%s, subject:%s cost: %v", record[m.uuidIndexs[runIdx]], record[m.subjectIndexs[runIdx]], cost)) + } + record = append(record, cost) } - cost := "-1" - if err != nil { - timespans.Logger.Err(fmt.Sprintf("Could not get the cost for mediator record (%s): %v", record[m.uuidIndex], err)) - } else { - timespans.Logger.Debug(fmt.Sprintf("Calculated for %s cost: %v", record[m.uuidIndex], strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64))) - cost = strconv.FormatFloat(cc.ConnectFee+cc.Cost, 'f', -1, 64) - } - record = append(record, cost) w.WriteString(strings.Join(record, ",") + "\n") } @@ -186,45 +177,40 @@ func (m *Mediator) parseCSV(cdrfn string) (err error) { return } -func (m *Mediator) GetCostsFromDB(record []string) (cc *timespans.CallCost, err error) { - searchedUUID := record[m.uuidIndex] +func (m *Mediator) GetCostsFromDB(record []string, runIdx int) (cc *timespans.CallCost, err error) { + searchedUUID := record[m.uuidIndexs[runIdx]] cc, err = m.loggerDb.GetCallCostLog(searchedUUID, timespans.SESSION_MANAGER_SOURCE) - if err != nil || cc == nil { - cc, err = m.GetCostsFromRater(record) - } return } -func (m *Mediator) GetCostsFromRater(record []string) (cc *timespans.CallCost, err error) { - d, err := time.ParseDuration(record[m.durationIndex] + "s") +func (m *Mediator) GetCostsFromRater(record []string, runIdx int) (cc *timespans.CallCost, err error) { + d, err := time.ParseDuration(record[m.durationIndexs[runIdx]] + "s") if err != nil { return } - - cc = ×pans.CallCost{} - if d.Seconds() == 0 { // failed call return cc, nil } - t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndex]) + cc = ×pans.CallCost{} + t1, err := time.Parse("2006-01-02 15:04:05", record[m.timeStartIndexs[runIdx]]) if err != nil { return } cd := timespans.CallDescriptor{ - Direction: "OUT", //record[m.directionIndex] TODO: fix me - Tenant: record[m.tenantIndex], - TOR: record[m.torIndex], - Subject: record[m.subjectIndex], - Account: record[m.accountIndex], - Destination: record[m.destinationIndex], + Direction: "OUT", //record[m.directionIndexs[runIdx]] TODO: fix me + Tenant: record[m.tenantIndexs[runIdx]], + TOR: record[m.torIndexs[runIdx]], + Subject: record[m.subjectIndexs[runIdx]], + Account: record[m.accountIndexs[runIdx]], + Destination: record[m.destinationIndexs[runIdx]], TimeStart: t1, TimeEnd: t1.Add(d)} err = m.connector.GetCost(cd, cc) if err != nil { - m.loggerDb.LogError(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, err.Error()) + m.loggerDb.LogError(record[m.uuidIndexs[runIdx]], timespans.MEDIATOR_SOURCE, err.Error()) } else { - m.loggerDb.LogCallCost(record[m.uuidIndex], timespans.MEDIATOR_SOURCE, cc) + m.loggerDb.LogCallCost(record[m.uuidIndexs[runIdx]], timespans.MEDIATOR_SOURCE, cc) } return }