Cdrc further refactoring

This commit is contained in:
DanB
2013-12-22 13:06:40 +01:00
parent ca3b13651f
commit c38179c01a
16 changed files with 135 additions and 93 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -24,30 +24,66 @@ import (
"github.com/howeyc/fsnotify"
"os"
"path"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"bufio"
"encoding/csv"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/cgrates/engine"
)
func NewCdrc(config *config.CGRConfig) (*Cdrc, error) {
cdrc := &Cdrc{cgrCfg: config}
// Before processing, make sure in and out folders exist
for _, dir := range []string{ cdrc.cgrCfg.CdrcCdrInDir, cdrc.cgrCfg.CdrcCdrOutDir } {
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("Folder %s does not exist", dir)
}
}
if err := cdrc.parseFieldIndexesFromConfig(); err != nil {
return nil, err
}
return cdrc, nil
}
type Cdrc struct {
cgrCfg *config.CGRConfig
fieldIndxes map[string]int // Key is the name of the field, int is the position in the csv file
httpClient *http.Client
}
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
func (self *Cdrc) Run() error {
if self.cgrCfg.CdrcRunDelay == time.Duration(0) { // Automated via inotify
return self.trackCDRFiles()
}
// No automated, process and sleep approach
for {
engine.Logger.Info(fmt.Sprintf("Parsing folder %s for CDR files.", self.cgrCfg.CdrcCdrInDir))
filesInDir,_ := ioutil.ReadDir(self.cgrCfg.CdrcCdrInDir)
for _, file := range filesInDir {
if err := self.processFile(file.Name()); err != nil {
return err
}
}
time.Sleep(self.cgrCfg.CdrcRunDelay)
}
return nil
}
// Parses fieldIndex strings into fieldIndex integers needed
func (self *Cdrc) parseFieldIndexesFromConfig() error {
var err error
// Add main fields here
self.fieldIndxes = make(map[string]int)
// PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
// PrimaryCdrFields []string = []string{ACCID, CDRHOST, CDRSOURCE, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
fieldKeys := []string{utils.ACCID, utils.REQTYPE, utils.DIRECTION, utils.TENANT, utils.TOR, utils.ACCOUNT, utils.SUBJECT, utils.DESTINATION, utils.ANSWER_TIME, utils.DURATION}
fieldIdxStrs := []string{self.cgrCfg.CdrcAccIdField, self.cgrCfg.CdrcReqTypeField, self.cgrCfg.CdrcDirectionField, self.cgrCfg.CdrcTenantField, self.cgrCfg.CdrcTorField,
self.cgrCfg.CdrcAccountField, self.cgrCfg.CdrcSubjectField, self.cgrCfg.CdrcDestinationField, self.cgrCfg.CdrcAnswerTimeField, self.cgrCfg.CdrcDurationField}
@@ -75,6 +111,7 @@ func (self *Cdrc) parseFieldIndexesFromConfig() error {
// Takes the record out of csv and turns it into http form which can be posted
func (self *Cdrc) cdrAsHttpForm(record []string) (url.Values, error) {
v := url.Values{}
v.Set(utils.CDRSOURCE, self.cgrCfg.CdrcSourceId)
for fldName, idx := range self.fieldIndxes {
if len(record) <= idx {
return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, fldName)
@@ -100,7 +137,6 @@ func (self *Cdrc) trackCDRFiles() (err error) {
select {
case ev := <-watcher.Event:
if ev.IsCreate() && path.Ext(ev.Name) != ".csv" {
engine.Logger.Info(fmt.Sprintf("Parsing: %s", ev.Name))
if err = self.processFile(ev.Name); err != nil {
return err
}
@@ -114,6 +150,8 @@ func (self *Cdrc) trackCDRFiles() (err error) {
// Processe file at filePath and posts the valid cdr rows out of it
func (self *Cdrc) processFile(filePath string) error {
_, fn := path.Split(filePath)
engine.Logger.Info(fmt.Sprintf("Parsing: %s", filePath))
file, err := os.Open(filePath)
defer file.Close()
if err != nil {
@@ -133,6 +171,5 @@ func (self *Cdrc) processFile(filePath string) error {
}
}
// Finished with file, move it to processed folder
_, fn := path.Split(filePath)
return os.Rename(filePath, path.Join(self.cgrCfg.CdrcCdrOutDir, fn))
}

View File

@@ -32,6 +32,9 @@ func TestCdrAsHttpForm(t *testing.T) {
if err != nil {
t.Error("Failed to parse CDR in form", err)
}
if cdrAsForm.Get(utils.CDRSOURCE) != cgrConfig.CdrcSourceId {
t.Error("Unexpected cdrsource received", cdrAsForm.Get(utils.CDRSOURCE))
}
if cdrAsForm.Get(utils.REQTYPE) != "prepaid" {
t.Error("Unexpected CDR value received", cdrAsForm.Get(utils.REQTYPE))
}

View File

@@ -41,48 +41,48 @@ func NewCgrCdrFromHttpReq(req *http.Request) (CgrCdr, error) {
type CgrCdr map[string]string
func (cgrCdr CgrCdr) GetCgrId() string {
return utils.FSCgrId(cgrCdr[ACCID])
return utils.FSCgrId(cgrCdr[utils.ACCID])
}
func (cgrCdr CgrCdr) GetAccId() string {
return cgrCdr[ACCID]
return cgrCdr[utils.ACCID]
}
func (cgrCdr CgrCdr) GetCdrHost() string {
return cgrCdr[CDRHOST]
return cgrCdr[utils.CDRHOST]
}
func (cgrCdr CgrCdr) GetCdrSource() string {
return cgrCdr[CDRSOURCE]
return cgrCdr[utils.CDRSOURCE]
}
func (cgrCdr CgrCdr) GetDirection() string {
//TODO: implement direction
return "*out"
}
func (cgrCdr CgrCdr) GetOrigId() string {
return cgrCdr[CDRHOST]
return cgrCdr[utils.CDRHOST]
}
func (cgrCdr CgrCdr) GetSubject() string {
return cgrCdr[SUBJECT]
return cgrCdr[utils.SUBJECT]
}
func (cgrCdr CgrCdr) GetAccount() string {
return cgrCdr[ACCOUNT]
return cgrCdr[utils.ACCOUNT]
}
// Charging destination number
func (cgrCdr CgrCdr) GetDestination() string {
return cgrCdr[DESTINATION]
return cgrCdr[utils.DESTINATION]
}
func (cgrCdr CgrCdr) GetTOR() string {
return cgrCdr[TOR]
return cgrCdr[utils.TOR]
}
func (cgrCdr CgrCdr) GetTenant() string {
return cgrCdr[TENANT]
return cgrCdr[utils.TENANT]
}
func (cgrCdr CgrCdr) GetReqType() string {
return cgrCdr[REQTYPE]
return cgrCdr[utils.REQTYPE]
}
func (cgrCdr CgrCdr) GetExtraFields() map[string]string {
extraFields := make(map[string]string)
@@ -94,11 +94,11 @@ func (cgrCdr CgrCdr) GetExtraFields() map[string]string {
return extraFields
}
func (cgrCdr CgrCdr) GetAnswerTime() (t time.Time, err error) {
return utils.ParseDate(cgrCdr[TIME_ANSWER])
return utils.ParseDate(cgrCdr[utils.ANSWER_TIME])
}
// Extracts duration as considered by the telecom switch
func (cgrCdr CgrCdr) GetDuration() int64 {
dur, _ := strconv.ParseInt(cgrCdr[DURATION], 0, 64)
dur, _ := strconv.ParseInt(cgrCdr[utils.DURATION], 0, 64)
return dur
}

View File

@@ -32,7 +32,7 @@ curl --data "accid=asbfdsaf&cdrhost=192.168.1.1&reqtype=rated&direction=*out&ten
func TestCgrCdrFields(t *testing.T) {
cfg, _ = config.NewDefaultCGRConfig()
cgrCdr := CgrCdr{"accid": "dsafdsaf", "cdrhost": "192.168.1.1", "reqtype": "rated", "direction": "*out", "tenant": "cgrates.org", "tor": "call",
"account": "1001", "subject": "1001", "destination": "1002", "time_answer": "1383813746", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
"account": "1001", "subject": "1001", "destination": "1002", "answer_time": "1383813746", "duration": "10", "field_extr1": "val_extr1", "fieldextr2": "valextr2"}
if cgrCdr.GetCgrId() != utils.FSCgrId("dsafdsaf") {
t.Error("Error parsing cdr: ", cgrCdr)
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/balancer2go"
"github.com/cgrates/cgrates/cdrs"
"github.com/cgrates/cgrates/cdrc"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/history"
@@ -133,10 +134,19 @@ func startMediator(responder *engine.Responder, loggerDb engine.LogStorage, cdrD
engine.Logger.Crit(fmt.Sprintf("Mediator config parsing error: %v", err))
exitChan <- true
}
}
if cfg.MediatorCDRType == utils.FSCDR_FILE_CSV { //Mediator as standalone service for file CDRs
medi.TrackCDRFiles()
func startCdrc() {
cdrc, err := cdrc.NewCdrc(cfg)
if err != nil {
engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
exitChan <- true
return
}
if err := cdrc.Run(); err != nil {
engine.Logger.Crit(fmt.Sprintf("Cdrc run error: %s", err.Error()))
}
exitChan <- true // If run stopped, something is bad, stop the application
}
func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage) {
@@ -416,6 +426,10 @@ func main() {
engine.Logger.Info("Starting History Service.")
go startHistoryScribe()
}
if cfg.CdrcEnabled {
engine.Logger.Info("Starting CGRateS CDR Client.")
go startCdrc()
}
<-exitChan
engine.Logger.Info("Stopped all components. CGRateS shutdown!")
}

View File

@@ -21,6 +21,7 @@ package config
import (
"errors"
"fmt"
"time"
"code.google.com/p/goconf/conf"
"github.com/cgrates/cgrates/utils"
@@ -81,7 +82,7 @@ type CGRConfig struct {
CdrcEnabled bool // Enable CDR client functionality
CdrcCdrs string // Address where to reach CDR server
CdrcCdrsMethod string // Mechanism to use when posting CDRs on server <http_cgr>
CdrcRunDelay int // Sleep interval in seconds between consecutive runs, 0 to use automation via inotify
CdrcRunDelay time.Duration // Sleep interval between consecutive runs, if time unit missing, defaults to seconds, 0 to use automation via inotify
CdrcCdrType string // CDR file format <csv>.
CdrcCdrInDir string // Absolute path towards the directory where the CDRs are stored.
CdrcCdrOutDir string // Absolute path towards the directory where processed CDRs will be moved.
@@ -169,7 +170,7 @@ func (self *CGRConfig) setDefaults() error {
self.CdrcEnabled = false
self.CdrcCdrs = "127.0.0.1:2022"
self.CdrcCdrsMethod = "http_cgr"
self.CdrcRunDelay = 0
self.CdrcRunDelay = time.Duration(0)
self.CdrcCdrType = "csv"
self.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
self.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
@@ -372,7 +373,10 @@ func loadConfig(c *conf.ConfigFile) (*CGRConfig, error) {
cfg.CdrcCdrsMethod, _ = c.GetString("cdrc", "cdrs_method")
}
if hasOpt = c.HasOption("cdrc", "run_delay"); hasOpt {
cfg.CdrcRunDelay, _ = c.GetInt("cdrc", "run_delay")
durStr,_ := c.GetString("cdrc", "run_delay")
if cfg.CdrcRunDelay, errParse = utils.ParseDurationWithSecs(durStr); errParse != nil {
return nil, errParse
}
}
if hasOpt = c.HasOption("cdrc", "cdr_type"); hasOpt {
cfg.CdrcCdrType, _ = c.GetString("cdrc", "cdr_type")

View File

@@ -22,6 +22,7 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
@@ -73,7 +74,7 @@ func TestDefaults(t *testing.T) {
eCfg.CdrcEnabled = false
eCfg.CdrcCdrs = "127.0.0.1:2022"
eCfg.CdrcCdrsMethod = "http_cgr"
eCfg.CdrcRunDelay = 0
eCfg.CdrcRunDelay = time.Duration(0)
eCfg.CdrcCdrType = "csv"
eCfg.CdrcCdrInDir = "/var/log/cgrates/cdr/in/csv"
eCfg.CdrcCdrOutDir = "/var/log/cgrates/cdr/out/csv"
@@ -197,7 +198,7 @@ func TestConfigFromFile(t *testing.T) {
eCfg.CdrcEnabled = true
eCfg.CdrcCdrs = "test"
eCfg.CdrcCdrsMethod = "test"
eCfg.CdrcRunDelay = 99
eCfg.CdrcRunDelay = time.Duration(99)*time.Second
eCfg.CdrcCdrType = "test"
eCfg.CdrcCdrInDir = "test"
eCfg.CdrcCdrOutDir = "test"

View File

@@ -21,7 +21,6 @@ package console
import (
"fmt"
"time"
"strconv"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -71,11 +70,7 @@ func (self *CmdGetCost) FromArgs(args []string) error {
return fmt.Errorf(self.Usage(""))
}
}
if _, err := strconv.Atoi(args[7]); err == nil { // No suffix, default to seconds
fmt.Println("\n*duration* needs suffix, defaulting to *s* (valid suffixes: ns, us/µs, ms, s, m, h)\n")
args[7] += "s"
}
callDur, err := time.ParseDuration(args[7])
callDur, err := utils.ParseDurationWithSecs(args[7])
if err != nil {
fmt.Println("\n\tExample durations: 60s for 60 seconds, 25m for 25minutes, 1m25s for one minute and 25 seconds\n")
return fmt.Errorf(self.Usage(""))

View File

@@ -19,18 +19,12 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package mediator
import (
"bufio"
"encoding/csv"
"errors"
"flag"
"fmt"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"os"
"path"
"strconv"
"strings"
"time"
)
@@ -41,11 +35,9 @@ func NewMediator(connector engine.Connector, logDb engine.LogStorage, cdrDb engi
cdrDb: cdrDb,
cgrCfg: cfg,
}
m.fieldNames = make(map[string][]string)
m.fieldIdxs = make(map[string][]int)
// Load config fields
if errLoad := m.loadConfig(); errLoad != nil {
return nil, errLoad
// Parse config
if err := m.parseConfig(); err != nil {
return nil, err
}
return m, nil
}
@@ -55,16 +47,10 @@ type Mediator struct {
logDb engine.LogStorage
cdrDb engine.CdrStorage
cgrCfg *config.CGRConfig
cdrInDir, cdrOutDir string
accIdField string
accIdIdx int // Populated only for csv files where we have no names but indexes for the fields
fieldNames map[string][]string
fieldIdxs map[string][]int // Populated only for csv files where we have no names but indexes for the fields
}
/*
Responsible for loading configuration fields out of CGRConfig instance, doing the necessary pre-checks.
@param fieldKeys: stores the keys which will be referenced inside fieldNames/fieldIdxs
Responsible for parsing configuration fields out of CGRConfig instance, doing the necessary pre-checks.
@param cfgVals: keep ordered references to configuration fields from CGRConfig instance.
fieldKeys and cfgVals are directly related through index.
Method logic:
@@ -73,14 +59,12 @@ Method logic:
* Accounting id field should not be empty.
* If we run mediation on csv file:
* Make sure cdrInDir and cdrOutDir are valid paths.
* Populate accIdIdx by converting accIdField into integer.
* Populate fieldIdxs by converting fieldNames into integers
*/
func (self *Mediator) loadConfig() error {
fieldKeys := []string{"subject", "reqtype", "direction", "tenant", "tor", "account", "destination", "time_start", "duration"}
func (self *Mediator) parseConfig() error {
cfgVals := [][]string{self.cgrCfg.MediatorSubjectFields, self.cgrCfg.MediatorReqTypeFields, self.cgrCfg.MediatorDirectionFields,
self.cgrCfg.MediatorTenantFields, self.cgrCfg.MediatorTORFields, self.cgrCfg.MediatorAccountFields, self.cgrCfg.MediatorDestFields,
self.cgrCfg.MediatorTimeAnswerFields, self.cgrCfg.MediatorDurationFields}
self.cgrCfg.MediatorAnswerTimeFields, self.cgrCfg.MediatorDurationFields}
if len(self.cgrCfg.MediatorRunIds) == 0 {
return errors.New("Unconfigured mediator run_ids")
@@ -93,44 +77,6 @@ func (self *Mediator) loadConfig() error {
}
}
// AccIdField has no special requirements, should just exist
if len(self.cgrCfg.MediatorAccIdField) == 0 {
return errors.New("Undefined mediator accid field")
}
self.accIdField = self.cgrCfg.MediatorAccIdField
var errConv error
// Specific settings of CSV style CDRS
if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV {
// Check paths to be valid before adding as configuration
if _, err := os.Stat(self.cgrCfg.MediatorCDRInDir); err != nil {
return fmt.Errorf("The input path for mediator does not exist: %v", self.cgrCfg.MediatorCDRInDir)
} else {
self.cdrInDir = self.cgrCfg.MediatorCDRInDir
}
if _, err := os.Stat(self.cgrCfg.MediatorCDROutDir); err != nil {
return fmt.Errorf("The output path for mediator does not exist: %v", self.cgrCfg.MediatorCDROutDir)
} else {
self.cdrOutDir = self.cgrCfg.MediatorCDROutDir
}
if self.accIdIdx, errConv = strconv.Atoi(self.cgrCfg.MediatorAccIdField); errConv != nil {
return errors.New("AccIdIndex must be integer.")
}
}
// Load here field names and convert to integers in case of unamed cdrs like CSV
for idx, key := range fieldKeys {
self.fieldNames[key] = cfgVals[idx]
if self.cgrCfg.MediatorCDRType == utils.FSCDR_FILE_CSV { // Special case when field names represent indexes of their location in file
self.fieldIdxs[key] = make([]int, len(cfgVals[idx]))
for iStr, cfgStr := range cfgVals[idx] {
if self.fieldIdxs[key][iStr], errConv = strconv.Atoi(cfgStr); errConv != nil {
return fmt.Errorf("All mediator index members (%s) must be ints", key)
}
}
}
}
return nil
}

View File

@@ -8,6 +8,7 @@ go test -i github.com/cgrates/cgrates/mediator
go test -i github.com/cgrates/fsock
go test -i github.com/cgrates/cgrates/cache2go
go test -i github.com/cgrates/cgrates/cdrs
go test -i github.com/cgrates/cgrates/cdrc
go test -i github.com/cgrates/cgrates/utils
go test -i github.com/cgrates/cgrates/history
go test -i github.com/cgrates/cgrates/cdrexporter
@@ -23,7 +24,9 @@ cr=$?
go test github.com/cgrates/cgrates/mediator
md=$?
go test github.com/cgrates/cgrates/cdrs
cdr=$?
cdrs=$?
go test github.com/cgrates/cgrates/cdrc
cdrcs=$?
go test github.com/cgrates/cgrates/utils
ut=$?
go test github.com/cgrates/fsock
@@ -36,4 +39,4 @@ go test github.com/cgrates/cgrates/cdrexporter
cdre=$?
exit $en && $sm && $cfg && $bl && $cr && $md && $cdr && $fs && $ut && $hs && $c2g && $cdre
exit $en && $sm && $cfg && $bl && $cr && $md && $cdrs && $cdrc && $fs && $ut && $hs && $c2g && $cdre

View File

@@ -22,12 +22,13 @@ import (
"time"
)
var PrimaryCdrFields []string = []string{ACCID, CDRHOST, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
var PrimaryCdrFields []string = []string{ACCID, CDRHOST, CDRSOURCE, REQTYPE, DIRECTION, TENANT, TOR, ACCOUNT, SUBJECT, DESTINATION, ANSWER_TIME, DURATION}
type CDR interface {
GetCgrId() string
GetAccId() string
GetCdrHost() string
GetCdrSource() string
GetDirection() string
GetSubject() string
GetAccount() string

View File

@@ -148,3 +148,11 @@ func CopyHour(src, dest time.Time) time.Time {
}
return time.Date(dest.Year(), dest.Month(), dest.Day(), src.Hour(), src.Minute(), src.Second(), src.Nanosecond(), src.Location())
}
// Parses duration, considers s as time unit if not provided
func ParseDurationWithSecs(durStr string) (time.Duration, error) {
if _, err := strconv.Atoi(durStr); err == nil { // No suffix, default to seconds
durStr += "s"
}
return time.ParseDuration(durStr)
}

View File

@@ -27,6 +27,7 @@ type RatedCDR struct {
CgrId string
AccId string
CdrHost string
CdrSource string
ReqType string
Direction string
Tenant string
@@ -52,6 +53,10 @@ func (ratedCdr *RatedCDR) GetCdrHost() string {
return ratedCdr.CdrHost
}
func (ratedCdr *RatedCDR) GetCdrSource() string {
return ratedCdr.CdrSource
}
func (ratedCdr *RatedCDR) GetDirection() string {
return ratedCdr.Direction
}

View File

@@ -247,3 +247,28 @@ func TestSplitPrefixEmpty(t *testing.T) {
t.Error("Error splitting prefix: ", a)
}
}
func TestParseDurationWithSecs(t *testing.T) {
durStr := "2"
durExpected := time.Duration(2)*time.Second
if parsed, err := ParseDurationWithSecs(durStr); err != nil {
t.Error(err)
} else if parsed != durExpected {
t.Error("Parsed different than expected")
}
durStr = "2s"
if parsed, err := ParseDurationWithSecs(durStr); err != nil {
t.Error(err)
} else if parsed != durExpected {
t.Error("Parsed different than expected")
}
durStr = "2ms"
durExpected = time.Duration(2)*time.Millisecond
if parsed, err := ParseDurationWithSecs(durStr); err != nil {
t.Error(err)
} else if parsed != durExpected {
t.Error("Parsed different than expected")
}
}