From 6e47749d897ae0381b19ffcfaa0d3d94c5131a37 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 26 Jul 2015 18:33:59 +0200 Subject: [PATCH] CDRC refactoring to support plugable file processors --- apier/v1/auth.go | 4 - cdrc/cdrc.go | 360 ++++++----------------------------- cdrc/cdrc_test.go | 135 ------------- cdrc/csv.go | 336 ++++++++++++++++++++++++++++++++ cdrc/csv_test.go | 151 +++++++++++++++ cdrc/flatstore_local_test.go | 4 +- cmd/cgr-engine/cgr-engine.go | 16 +- 7 files changed, 556 insertions(+), 450 deletions(-) create mode 100644 cdrc/csv.go create mode 100644 cdrc/csv_test.go diff --git a/apier/v1/auth.go b/apier/v1/auth.go index d0ef5e8b3..c7a4a1902 100644 --- a/apier/v1/auth.go +++ b/apier/v1/auth.go @@ -19,7 +19,6 @@ along with this program. If not, see package v1 import ( - "strconv" "time" "github.com/cgrates/cgrates/engine" @@ -54,9 +53,6 @@ func (self *ApierV1) GetMaxUsage(usageRecord engine.UsageRecord, maxUsage *float if usageRecord.SetupTime == "" { usageRecord.SetupTime = utils.META_NOW } - if usageRecord.Usage == "" { - usageRecord.Usage = strconv.FormatFloat(self.Config.MaxCallDuration.Seconds(), 'f', -1, 64) - } storedCdr, err := usageRecord.AsStoredCdr() if err != nil { return utils.NewErrServerError(err) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 5bd186bf2..8e9c64a73 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -21,15 +21,12 @@ package cdrc import ( "bufio" "encoding/csv" - "errors" "fmt" "io" "io/ioutil" "net/http" "os" "path" - "strconv" - "strings" "time" "github.com/cgrates/cgrates/config" @@ -92,62 +89,9 @@ func populateStoredCdrField(cdr *engine.StoredCdr, fieldId, fieldVal string) err return nil } -func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) { - if len(record) < 7 { - return nil, errors.New("MISSING_IE") - } - pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record} - var err error - if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil { - return nil, err - } - return pr, nil -} - -// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration -type PartialFlatstoreRecord struct { - Method string // INVITE or BYE - AccId string // Copute here the AccId - Timestamp time.Time // Timestamp of the event, as written by db_flastore module - Values []string // Can contain original values or updated via UpdateValues -} - -// Pairs INVITE and BYE into final record containing as last element the duration -func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { - var invite, bye *PartialFlatstoreRecord - if part1.Method == "INVITE" { - invite = part1 - } else if part2.Method == "INVITE" { - invite = part2 - } else { - return nil, errors.New("MISSING_INVITE") - } - if part1.Method == "BYE" { - bye = part1 - } else if part2.Method == "BYE" { - bye = part2 - } else { - return nil, errors.New("MISSING_BYE") - } - if len(invite.Values) != len(bye.Values) { - return nil, errors.New("INCONSISTENT_VALUES_LENGTH") - } - record := invite.Values - for idx := range record { - switch idx { - case 0, 1, 2, 3, 6: // Leave these values as they are - case 4, 5: - record[idx] = bye.Values[idx] // Update record with status from bye - default: - if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty - record[idx] = bye.Values[idx] - } - - } - } - callDur := bye.Timestamp.Sub(invite.Timestamp) - record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) - return record, nil +// Understands and processes a specific format of cdr (eg: .csv or .fwv) +type RecordsProcessor interface { + ProcessNextRecord() ([]*engine.StoredCdr, error) // Process a single record in the CDR file, return a slice of CDRs since based on configuration we can have more templates } /* @@ -157,28 +101,31 @@ Common parameters within configs processed: Parameters specific per config instance: * duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields */ -func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrServer *engine.CdrServer, exitChan chan struct{}) (*Cdrc, error) { +func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrs engine.Connector, exitChan chan struct{}) (*Cdrc, error) { var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } - cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, + cdrc := &Cdrc{cdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, - httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), - partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()} - var processCsvFile struct{} + httpSkipTlsCheck: httpSkipTlsCheck, cdrs: cdrs, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles), + } + var processFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { - cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop + cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop } cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) cdrc.cdrFilters = make([]utils.RSRFields, len(cdrcCfgs)) cdrc.cdrFields = make([][]*config.CfgCdrField, len(cdrcCfgs)) idx := 0 + var err error for _, cfg := range cdrcCfgs { if idx == 0 { // Steal the config from just one instance since it should be the same for all - cdrc.partialRecordCache = cfg.PartialRecordCache cdrc.failedCallsPrefix = cfg.FailedCallsPrefix + if cdrc.partialRecordsCache, err = NewPartialRecordsCache(cdrcCfg.PartialRecordCache, cdrcCfg.CdrOutDir, cdrcCfg.FieldSeparator); err != nil { + return nil, err + } } cdrc.cdrSourceIds[idx] = cfg.CdrSourceId cdrc.duMultiplyFactors[idx] = cfg.DataUsageMultiplyFactor @@ -197,25 +144,22 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS } type Cdrc struct { - cdrsAddress, - CdrFormat, + cdrFormat, cdrInDir, cdrOutDir string - failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs - cdrSourceIds []string // Should be in sync with cdrFields on indexes - runDelay time.Duration - csvSep rune - duMultiplyFactors []float64 - cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes - cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters - httpSkipTlsCheck bool - cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case - httpClient *http.Client - exitChan chan struct{} - maxOpenFiles chan struct{} // Maximum number of simultaneous files processed - partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord - partialRecordCache time.Duration // Duration to cache partial records for - guard *engine.GuardianLock + failedCallsPrefix string // Configured failedCallsPrefix, used in case of flatstore CDRs + cdrSourceIds []string // Should be in sync with cdrFields on indexes + runDelay time.Duration + csvSep rune + duMultiplyFactors []float64 + cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes + cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters + httpSkipTlsCheck bool + cdrs engine.Connector + httpClient *http.Client + exitChan chan struct{} + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + partialRecordsCache *PartialRecordsCache // Shared between all files in the folder we process } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -256,9 +200,9 @@ func (self *Cdrc) trackCDRFiles() (err error) { engine.Logger.Info(fmt.Sprintf(" Shutting down CDRC on path %s.", self.cdrInDir)) return nil case ev := <-watcher.Events: - if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") { + if ev.Op&fsnotify.Create == fsnotify.Create && (self.cdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") { go func() { //Enable async processing here - if err = self.processCsvFile(ev.Name); err != nil { + if err = self.processFile(ev.Name); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) } }() @@ -274,9 +218,9 @@ func (self *Cdrc) processCdrDir() error { engine.Logger.Info(fmt.Sprintf(" Parsing folder %s for CDR files.", self.cdrInDir)) filesInDir, _ := ioutil.ReadDir(self.cdrInDir) for _, file := range filesInDir { - if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" { + if self.cdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" { go func() { //Enable async processing here - if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil { + if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error())) } }() @@ -286,10 +230,10 @@ func (self *Cdrc) processCdrDir() error { } // Processe file at filePath and posts the valid cdr rows out of it -func (self *Cdrc) processCsvFile(filePath string) error { +func (self *Cdrc) processFile(filePath string) error { if cap(self.maxOpenFiles) != 0 { // 0 goes for no limit - processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles - defer func() { self.maxOpenFiles <- processCsvFile }() + processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles + defer func() { self.maxOpenFiles <- processFile }() } _, fn := path.Split(filePath) engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) @@ -299,33 +243,33 @@ func (self *Cdrc) processCsvFile(filePath string) error { engine.Logger.Crit(err.Error()) return err } - csvReader := csv.NewReader(bufio.NewReader(file)) - csvReader.Comma = self.csvSep + var recordsProcessor RecordsProcessor + if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) { + csvReader := csv.NewReader(bufio.NewReader(file)) + csvReader.Comma = self.csvSep + recordsProcessor = NewCsvRecordsProcessor(csvReader, self.cdrFormat, fn, self.failedCallsPrefix, + self.cdrSourceIds, self.duMultiplyFactors, self.cdrFilters, self.cdrFields, self.httpSkipTlsCheck, self.partialRecordsCache) + } procRowNr := 0 timeStart := time.Now() for { - record, err := csvReader.Read() - if err != nil && err == io.EOF { - break // End of file - } - procRowNr += 1 // Only increase if not end of file + cdrs, err := recordsProcessor.ProcessNextRecord() if err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d - csv error: %s", procRowNr, err.Error())) - continue // Other csv related errors, ignore - } - if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs - if record, err = self.processPartialRecord(record, fn); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed processing partial record, row: %d, error: %s", procRowNr, err.Error())) - continue - } else if record == nil { - continue + if err == io.EOF { + break } - // Record was overwriten with complete information out of cache - } - if err := self.processRecord(record, procRowNr); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, row: %d, error: %s", procRowNr, err.Error())) + engine.Logger.Err(fmt.Sprintf(" Row %d, error: %s", procRowNr, err.Error())) continue } + procRowNr += 1 + for _, storedCdr := range cdrs { // Send CDRs to CDRS + var reply string + if err := self.cdrs.ProcessCdr(storedCdr, &reply); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed sending CDR, %+v, error: %s", storedCdr, err.Error())) + } else if reply != "OK" { + engine.Logger.Err(fmt.Sprintf(" Received unexpected reply for CDR, %+v, reply: %s", storedCdr, reply)) + } + } } // Finished with file, move it to processed folder newPath := path.Join(self.cdrOutDir, fn) @@ -337,199 +281,3 @@ func (self *Cdrc) processCsvFile(filePath string) error { fn, newPath, procRowNr, time.Now().Sub(timeStart))) return nil } - -// Processes a single partial record for flatstore CDRs -func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) { - if strings.HasPrefix(fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs - record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further - return record, nil - } - pr, err := NewPartialFlatstoreRecord(record) - if err != nil { - return nil, err - } - // Retrieve and complete the record from cache - var cachedFilename string - var cachedPartial *PartialFlatstoreRecord - cachedFNames := []string{fileName} // Higher probability to match as firstFileName - for fName := range self.partialRecords { - if fName != fileName { - cachedFNames = append(cachedFNames, fName) - } - } - for _, fName := range cachedFNames { // Need to lock them individually - self.guard.Guard(func() (interface{}, error) { - var hasPartial bool - if cachedPartial, hasPartial = self.partialRecords[fName][pr.AccId]; hasPartial { - cachedFilename = fName - } - return nil, nil - }, fName) - if cachedPartial != nil { - break - } - } - - if cachedPartial == nil { // Not cached, do it here and stop processing - self.guard.Guard(func() (interface{}, error) { - if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { - self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} - if self.partialRecordCache != 0 { // Schedule expiry/dump of the just created entry in cache - go func() { - time.Sleep(self.partialRecordCache) - self.dumpUnpairedRecords(fileName) - }() - } - } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { - self.partialRecords[fileName][pr.AccId] = pr - } - return nil, nil - }, fileName) - return nil, nil - } - - pairedRecord, err := pairToRecord(cachedPartial, pr) - if err != nil { - return nil, err - } - self.guard.Guard(func() (interface{}, error) { - delete(self.partialRecords[cachedFilename], pr.AccId) // Remove the record out of cache - return nil, nil - }, fileName) - return pairedRecord, nil -} - -// Dumps the cache into a .unpaired file in the outdir and cleans cache after -func (self *Cdrc) dumpUnpairedRecords(fileName string) error { - _, err := self.guard.Guard(func() (interface{}, error) { - if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache - unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) - fileOut, err := os.Create(unpairedFilePath) - if err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) - return nil, err - } - csvWriter := csv.NewWriter(fileOut) - csvWriter.Comma = self.csvSep - for _, pr := range self.partialRecords[fileName] { - if err := csvWriter.Write(pr.Values); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) - return nil, err - } - } - csvWriter.Flush() - } - delete(self.partialRecords, fileName) - return nil, nil - }, fileName) - return err -} - -// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer -func (self *Cdrc) processRecord(record []string, srcRowNr int) error { - recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates - for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records - // Make sure filters are matching - filterBreak := false - for _, rsrFilter := range self.cdrFilters[idx] { - if rsrFilter == nil { // Nil filter does not need to match anything - continue - } - if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { - return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) - } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { - filterBreak = true - break - } - } - if filterBreak { // Stop importing cdrc fields profile due to non matching filter - continue - } - if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error())) - continue - } else { - recordCdrs = append(recordCdrs, storedCdr) - } - } - for _, storedCdr := range recordCdrs { - if self.cdrsAddress == utils.INTERNAL { - if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) - continue - } - } else { // CDRs listening on IP - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_http", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) - continue - } - } - } - return nil -} - -// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS -func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) { - storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1} - var err error - var lazyHttpFields []*config.CfgCdrField - for _, cdrFldCfg := range self.cdrFields[cfgIdx] { - if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore - switch cdrFldCfg.CdrFieldId { - case utils.ACCID: - cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag - case utils.USAGE: - cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us - } - - } - var fieldVal string - if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { - if cdrFldCfg.Type == utils.CDRFIELD { - for _, cfgFieldRSR := range cdrFldCfg.Value { - if cfgFieldRSR.IsStatic() { - fieldVal += cfgFieldRSR.ParseValue("") - } else { // Dynamic value extracted using index - if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx { - return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag) - } else { - fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx]) - } - } - } - } else if cdrFldCfg.Type == utils.HTTP_POST { - lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server - } else { - return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type) - } - } else { // Modify here when we add more supported cdr formats - return nil, fmt.Errorf("Unsupported CDR file format: %s", self.CdrFormat) - } - if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil { - return nil, err - } - } - storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String()) - if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 { - storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx]) - } - for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields - var outValByte []byte - var fieldVal, httpAddr string - for _, rsrFld := range httpFieldCfg.Value { - httpAddr += rsrFld.ParseValue("") - } - if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory { - return nil, err - } else { - fieldVal = string(outValByte) - if len(fieldVal) == 0 && httpFieldCfg.Mandatory { - return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag) - } - if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil { - return nil, err - } - } - } - return storedCdr, nil -} diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index a87f5eabb..ac515b468 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -18,113 +18,6 @@ along with this program. If not, see package cdrc -import ( - //"bytes" - //"encoding/csv" - //"io" - "reflect" - "testing" - "time" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func TestRecordForkCdr(t *testing.T) { - cgrConfig, _ := config.NewDefaultCGRConfig() - cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] - cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}}) - cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE, - Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}}) - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} - cdrRow := []string{"firstField", "secondField"} - _, err := cdrc.recordToStoredCdr(cdrRow, 0) - if err == nil { - t.Error("Failed to corectly detect missing fields from record") - } - cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", - "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0) - if err != nil { - t.Error("Failed to parse CDR in rated cdr", err) - } - expectedCdr := &engine.StoredCdr{ - CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()), - TOR: cdrRow[2], - AccId: cdrRow[3], - CdrHost: "0.0.0.0", // Got it over internal interface - CdrSource: "TEST_CDRC", - ReqType: cdrRow[4], - Direction: cdrRow[5], - Tenant: cdrRow[6], - Category: cdrRow[7], - Account: cdrRow[8], - Subject: cdrRow[9], - Destination: cdrRow[10], - SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC), - AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), - Usage: time.Duration(62) * time.Second, - Supplier: "supplier1", - DisconnectCause: "NORMAL_DISCONNECT", - ExtraFields: map[string]string{}, - Cost: -1, - } - if !reflect.DeepEqual(expectedCdr, rtCdr) { - t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) - } -} - -func TestDataMultiplyFactor(t *testing.T) { - cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}}, - &config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}} - cdrc := &Cdrc{CdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}} - cdrRow := []string{"*data", "1"} - rtCdr, err := cdrc.recordToStoredCdr(cdrRow, 0) - if err != nil { - t.Error("Failed to parse CDR in rated cdr", err) - } - var sTime time.Time - expectedCdr := &engine.StoredCdr{ - CgrId: utils.Sha1("", sTime.String()), - TOR: cdrRow[0], - CdrHost: "0.0.0.0", - CdrSource: "TEST_CDRC", - Usage: time.Duration(1) * time.Second, - ExtraFields: map[string]string{}, - Cost: -1, - } - if !reflect.DeepEqual(expectedCdr, rtCdr) { - t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) - } - cdrc.duMultiplyFactors = []float64{1024} - expectedCdr = &engine.StoredCdr{ - CgrId: utils.Sha1("", sTime.String()), - TOR: cdrRow[0], - CdrHost: "0.0.0.0", - CdrSource: "TEST_CDRC", - Usage: time.Duration(1024) * time.Second, - ExtraFields: map[string]string{}, - Cost: -1, - } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { - t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) - } - cdrRow = []string{"*voice", "1"} - expectedCdr = &engine.StoredCdr{ - CgrId: utils.Sha1("", sTime.String()), - TOR: cdrRow[0], - CdrHost: "0.0.0.0", - CdrSource: "TEST_CDRC", - Usage: time.Duration(1) * time.Second, - ExtraFields: map[string]string{}, - Cost: -1, - } - if rtCdr, _ := cdrc.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { - t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) - } -} - /* func TestNewPartialFlatstoreRecord(t *testing.T) { ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC), @@ -140,34 +33,6 @@ func TestNewPartialFlatstoreRecord(t *testing.T) { } */ -func TestPairToRecord(t *testing.T) { - eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"} - invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC), - Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}} - byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 15, 6, 50, 0, time.UTC), - Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}} - if rec, err := pairToRecord(invPr, byePr); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eRecord, rec) { - t.Errorf("Expected: %+v, received: %+v", eRecord, rec) - } - if rec, err := pairToRecord(byePr, invPr); err != nil { - t.Error(err) - } else if !reflect.DeepEqual(eRecord, rec) { - t.Errorf("Expected: %+v, received: %+v", eRecord, rec) - } - if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" { - t.Error(err) - } - if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" { - t.Error(err) - } - byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out - if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" { - t.Error(err) - } -} - /* func TestOsipsFlatstoreCdrs(t *testing.T) { flatstoreCdrs := ` diff --git a/cdrc/csv.go b/cdrc/csv.go new file mode 100644 index 000000000..88a6548e7 --- /dev/null +++ b/cdrc/csv.go @@ -0,0 +1,336 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "encoding/csv" + "errors" + "fmt" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) { + if len(record) < 7 { + return nil, errors.New("MISSING_IE") + } + pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record} + var err error + if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil { + return nil, err + } + return pr, nil +} + +// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration +type PartialFlatstoreRecord struct { + Method string // INVITE or BYE + AccId string // Copute here the AccId + Timestamp time.Time // Timestamp of the event, as written by db_flastore module + Values []string // Can contain original values or updated via UpdateValues +} + +// Pairs INVITE and BYE into final record containing as last element the duration +func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { + var invite, bye *PartialFlatstoreRecord + if part1.Method == "INVITE" { + invite = part1 + } else if part2.Method == "INVITE" { + invite = part2 + } else { + return nil, errors.New("MISSING_INVITE") + } + if part1.Method == "BYE" { + bye = part1 + } else if part2.Method == "BYE" { + bye = part2 + } else { + return nil, errors.New("MISSING_BYE") + } + if len(invite.Values) != len(bye.Values) { + return nil, errors.New("INCONSISTENT_VALUES_LENGTH") + } + record := invite.Values + for idx := range record { + switch idx { + case 0, 1, 2, 3, 6: // Leave these values as they are + case 4, 5: + record[idx] = bye.Values[idx] // Update record with status from bye + default: + if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty + record[idx] = bye.Values[idx] + } + + } + } + callDur := bye.Timestamp.Sub(invite.Timestamp) + record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) + return record, nil +} + +func NewPartialRecordsCache(ttl time.Duration, cdrOutDir string, csvSep rune) (*PartialRecordsCache, error) { + return &PartialRecordsCache{ttl: ttl, cdrOutDir: cdrOutDir, csvSep: csvSep, + partialRecords: make(map[string]map[string]*PartialFlatstoreRecord), guard: engine.NewGuardianLock()}, nil +} + +type PartialRecordsCache struct { + ttl time.Duration + cdrOutDir string + csvSep rune + partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord + guard *engine.GuardianLock +} + +// Dumps the cache into a .unpaired file in the outdir and cleans cache after +func (self *PartialRecordsCache) dumpUnpairedRecords(fileName string) error { + _, err := self.guard.Guard(func() (interface{}, error) { + if len(self.partialRecords[fileName]) != 0 { // Only write the file if there are records in the cache + unpairedFilePath := path.Join(self.cdrOutDir, fileName+UNPAIRED_SUFFIX) + fileOut, err := os.Create(unpairedFilePath) + if err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed creating %s, error: %s", unpairedFilePath, err.Error())) + return nil, err + } + csvWriter := csv.NewWriter(fileOut) + csvWriter.Comma = self.csvSep + for _, pr := range self.partialRecords[fileName] { + if err := csvWriter.Write(pr.Values); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed writing unpaired record %v to file: %s, error: %s", pr, unpairedFilePath, err.Error())) + return nil, err + } + } + csvWriter.Flush() + } + delete(self.partialRecords, fileName) + return nil, nil + }, fileName) + return err +} + +// Search in cache and return the partial record with accountind id defined, prefFilename is searched at beginning because of better match probability +func (self *PartialRecordsCache) GetPartialRecord(accId, prefFileName string) (string, *PartialFlatstoreRecord) { + var cachedFilename string + var cachedPartial *PartialFlatstoreRecord + checkCachedFNames := []string{prefFileName} // Higher probability to match as firstFileName + for fName := range self.partialRecords { + if fName != prefFileName { + checkCachedFNames = append(checkCachedFNames, fName) + } + } + for _, fName := range checkCachedFNames { // Need to lock them individually + self.guard.Guard(func() (interface{}, error) { + var hasPartial bool + if cachedPartial, hasPartial = self.partialRecords[fName][accId]; hasPartial { + cachedFilename = fName + } + return nil, nil + }, fName) + if cachedPartial != nil { + break + } + } + return cachedFilename, cachedPartial +} + +func (self *PartialRecordsCache) CachePartial(fileName string, pr *PartialFlatstoreRecord) { + self.guard.Guard(func() (interface{}, error) { + if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { + self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} + if self.ttl != 0 { // Schedule expiry/dump of the just created entry in cache + go func() { + time.Sleep(self.ttl) + self.dumpUnpairedRecords(fileName) + }() + } + } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { + self.partialRecords[fileName][pr.AccId] = pr + } + return nil, nil + }, fileName) +} + +func (self *PartialRecordsCache) UncachePartial(fileName string, pr *PartialFlatstoreRecord) { + self.guard.Guard(func() (interface{}, error) { + delete(self.partialRecords[fileName], pr.AccId) // Remove the record out of cache + return nil, nil + }, fileName) +} + +func NewCsvRecordsProcessor(csvReader *csv.Reader, cdrFormat, fileName, failedCallsPrefix string, + cdrSourceIds []string, duMultiplyFactors []float64, cdrFilters []utils.RSRFields, cdrFields [][]*config.CfgCdrField, + httpSkipTlsCheck bool, partialRecordsCache *PartialRecordsCache) *CsvRecordsProcessor { + return &CsvRecordsProcessor{csvReader: csvReader, cdrFormat: cdrFormat, fileName: fileName, + failedCallsPrefix: failedCallsPrefix, cdrSourceIds: cdrSourceIds, + duMultiplyFactors: duMultiplyFactors, cdrFilters: cdrFilters, cdrFields: cdrFields, + httpSkipTlsCheck: httpSkipTlsCheck, partialRecordsCache: partialRecordsCache} + +} + +type CsvRecordsProcessor struct { + csvReader *csv.Reader + cdrFormat string + fileName string + failedCallsPrefix string + cdrSourceIds []string // Should be in sync with cdrFields on indexes + duMultiplyFactors []float64 + cdrFilters []utils.RSRFields // Should be in sync with cdrFields on indexes + cdrFields [][]*config.CfgCdrField // Profiles directly connected with cdrFilters + httpSkipTlsCheck bool + partialRecordsCache *PartialRecordsCache // Shared by cdrc so we can cache for all files in a folder +} + +func (self *CsvRecordsProcessor) ProcessNextRecord() ([]*engine.StoredCdr, error) { + record, err := self.csvReader.Read() + if err != nil { + return nil, err + } + if record, err = self.processPartialRecord(record); err != nil { + return nil, err + } else if record == nil { + return nil, nil // Due to partial, none returned + } + // Record was overwriten with complete information out of cache + return self.processRecord(record) +} + +// Processes a single partial record for flatstore CDRs +func (self *CsvRecordsProcessor) processPartialRecord(record []string) ([]string, error) { + if strings.HasPrefix(self.fileName, self.failedCallsPrefix) { // Use the first index since they should be the same in all configs + record = append(record, "0") // Append duration 0 for failed calls flatstore CDR and do not process it further + return record, nil + } + pr, err := NewPartialFlatstoreRecord(record) + if err != nil { + return nil, err + } + // Retrieve and complete the record from cache + cachedFilename, cachedPartial := self.partialRecordsCache.GetPartialRecord(pr.AccId, self.fileName) + if cachedPartial == nil { // Not cached, do it here and stop processing + self.partialRecordsCache.CachePartial(self.fileName, pr) + return nil, nil + } + pairedRecord, err := pairToRecord(cachedPartial, pr) + if err != nil { + return nil, err + } + self.partialRecordsCache.UncachePartial(cachedFilename, pr) + return pairedRecord, nil +} + +// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer +func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.StoredCdr, error) { + recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates + for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records + // Make sure filters are matching + filterBreak := false + for _, rsrFilter := range self.cdrFilters[idx] { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { + filterBreak = true + break + } + } + if filterBreak { // Stop importing cdrc fields profile due to non matching filter + continue + } + if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { + return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error()) + } else { + recordCdrs = append(recordCdrs, storedCdr) + } + } + return recordCdrs, nil +} + +// Takes the record out of csv and turns it into storedCdr which can be processed by CDRS +func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) { + storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1} + var err error + var lazyHttpFields []*config.CfgCdrField + for _, cdrFldCfg := range self.cdrFields[cfgIdx] { + if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) { // Hardcode some values in case of flatstore + switch cdrFldCfg.CdrFieldId { + case utils.ACCID: + cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, accounting id is made up out of callid, from_tag and to_tag + case utils.USAGE: + cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us + } + + } + var fieldVal string + if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.cdrFormat) { + if cdrFldCfg.Type == utils.CDRFIELD { + for _, cfgFieldRSR := range cdrFldCfg.Value { + if cfgFieldRSR.IsStatic() { + fieldVal += cfgFieldRSR.ParseValue("") + } else { // Dynamic value extracted using index + if cfgFieldIdx, _ := strconv.Atoi(cfgFieldRSR.Id); len(record) <= cfgFieldIdx { + return nil, fmt.Errorf("Ignoring record: %v - cannot extract field %s", record, cdrFldCfg.Tag) + } else { + fieldVal += cfgFieldRSR.ParseValue(record[cfgFieldIdx]) + } + } + } + } else if cdrFldCfg.Type == utils.HTTP_POST { + lazyHttpFields = append(lazyHttpFields, cdrFldCfg) // Will process later so we can send an estimation of storedCdr to http server + } else { + return nil, fmt.Errorf("Unsupported field type: %s", cdrFldCfg.Type) + } + } else { // Modify here when we add more supported cdr formats + return nil, fmt.Errorf("Unsupported CDR file format: %s", self.cdrFormat) + } + if err := populateStoredCdrField(storedCdr, cdrFldCfg.CdrFieldId, fieldVal); err != nil { + return nil, err + } + } + storedCdr.CgrId = utils.Sha1(storedCdr.AccId, storedCdr.SetupTime.String()) + if storedCdr.TOR == utils.DATA && self.duMultiplyFactors[cfgIdx] != 0 { + storedCdr.Usage = time.Duration(float64(storedCdr.Usage.Nanoseconds()) * self.duMultiplyFactors[cfgIdx]) + } + for _, httpFieldCfg := range lazyHttpFields { // Lazy process the http fields + var outValByte []byte + var fieldVal, httpAddr string + for _, rsrFld := range httpFieldCfg.Value { + httpAddr += rsrFld.ParseValue("") + } + if outValByte, err = utils.HttpJsonPost(httpAddr, self.httpSkipTlsCheck, storedCdr); err != nil && httpFieldCfg.Mandatory { + return nil, err + } else { + fieldVal = string(outValByte) + if len(fieldVal) == 0 && httpFieldCfg.Mandatory { + return nil, fmt.Errorf("MandatoryIeMissing: Empty result for http_post field: %s", httpFieldCfg.Tag) + } + if err := populateStoredCdrField(storedCdr, httpFieldCfg.CdrFieldId, fieldVal); err != nil { + return nil, err + } + } + } + return storedCdr, nil +} diff --git a/cdrc/csv_test.go b/cdrc/csv_test.go new file mode 100644 index 000000000..7044362ca --- /dev/null +++ b/cdrc/csv_test.go @@ -0,0 +1,151 @@ +/* +Real-time Charging System for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package cdrc + +import ( + "reflect" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +func TestCsvRecordForkCdr(t *testing.T) { + cgrConfig, _ := config.NewDefaultCGRConfig() + cdrcConfig := cgrConfig.CdrcProfiles["/var/log/cgrates/cdrc/in"][utils.META_DEFAULT] + cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "SupplierTest", Type: utils.CDRFIELD, CdrFieldId: "supplier", Value: []*utils.RSRField{&utils.RSRField{Id: "14"}}}) + cdrcConfig.CdrFields = append(cdrcConfig.CdrFields, &config.CfgCdrField{Tag: "DisconnectCauseTest", Type: utils.CDRFIELD, CdrFieldId: utils.DISCONNECT_CAUSE, + Value: []*utils.RSRField{&utils.RSRField{Id: "16"}}}) + csvProcessor := &CsvRecordsProcessor{cdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: [][]*config.CfgCdrField{cdrcConfig.CdrFields}} + cdrRow := []string{"firstField", "secondField"} + _, err := csvProcessor.recordToStoredCdr(cdrRow, 0) + if err == nil { + t.Error("Failed to corectly detect missing fields from record") + } + cdrRow = []string{"ignored", "ignored", utils.VOICE, "acc1", utils.META_PREPAID, "*out", "cgrates.org", "call", "1001", "1001", "+4986517174963", + "2013-02-03 19:50:00", "2013-02-03 19:54:00", "62", "supplier1", "172.16.1.1", "NORMAL_DISCONNECT"} + rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, 0) + if err != nil { + t.Error("Failed to parse CDR in rated cdr", err) + } + expectedCdr := &engine.StoredCdr{ + CgrId: utils.Sha1(cdrRow[3], time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC).String()), + TOR: cdrRow[2], + AccId: cdrRow[3], + CdrHost: "0.0.0.0", // Got it over internal interface + CdrSource: "TEST_CDRC", + ReqType: cdrRow[4], + Direction: cdrRow[5], + Tenant: cdrRow[6], + Category: cdrRow[7], + Account: cdrRow[8], + Subject: cdrRow[9], + Destination: cdrRow[10], + SetupTime: time.Date(2013, 2, 3, 19, 50, 0, 0, time.UTC), + AnswerTime: time.Date(2013, 2, 3, 19, 54, 0, 0, time.UTC), + Usage: time.Duration(62) * time.Second, + Supplier: "supplier1", + DisconnectCause: "NORMAL_DISCONNECT", + ExtraFields: map[string]string{}, + Cost: -1, + } + if !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) + } +} + +func TestCsvDataMultiplyFactor(t *testing.T) { + cdrFields := []*config.CfgCdrField{&config.CfgCdrField{Tag: "TORField", Type: utils.CDRFIELD, CdrFieldId: "tor", Value: []*utils.RSRField{&utils.RSRField{Id: "0"}}}, + &config.CfgCdrField{Tag: "UsageField", Type: utils.CDRFIELD, CdrFieldId: "usage", Value: []*utils.RSRField{&utils.RSRField{Id: "1"}}}} + csvProcessor := &CsvRecordsProcessor{cdrFormat: CSV, cdrSourceIds: []string{"TEST_CDRC"}, duMultiplyFactors: []float64{0}, cdrFields: [][]*config.CfgCdrField{cdrFields}} + cdrRow := []string{"*data", "1"} + rtCdr, err := csvProcessor.recordToStoredCdr(cdrRow, 0) + if err != nil { + t.Error("Failed to parse CDR in rated cdr", err) + } + var sTime time.Time + expectedCdr := &engine.StoredCdr{ + CgrId: utils.Sha1("", sTime.String()), + TOR: cdrRow[0], + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + Usage: time.Duration(1) * time.Second, + ExtraFields: map[string]string{}, + Cost: -1, + } + if !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) + } + csvProcessor.duMultiplyFactors = []float64{1024} + expectedCdr = &engine.StoredCdr{ + CgrId: utils.Sha1("", sTime.String()), + TOR: cdrRow[0], + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + Usage: time.Duration(1024) * time.Second, + ExtraFields: map[string]string{}, + Cost: -1, + } + if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) + } + cdrRow = []string{"*voice", "1"} + expectedCdr = &engine.StoredCdr{ + CgrId: utils.Sha1("", sTime.String()), + TOR: cdrRow[0], + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + Usage: time.Duration(1) * time.Second, + ExtraFields: map[string]string{}, + Cost: -1, + } + if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, 0); !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) + } +} + +func TestCsvPairToRecord(t *testing.T) { + eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"} + invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 15, 6, 48, 0, time.UTC), + Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}} + byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 15, 6, 50, 0, time.UTC), + Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}} + if rec, err := pairToRecord(invPr, byePr); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eRecord, rec) { + t.Errorf("Expected: %+v, received: %+v", eRecord, rec) + } + if rec, err := pairToRecord(byePr, invPr); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eRecord, rec) { + t.Errorf("Expected: %+v, received: %+v", eRecord, rec) + } + if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" { + t.Error(err) + } + if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" { + t.Error(err) + } + byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out + if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" { + t.Error(err) + } +} diff --git a/cdrc/flatstore_local_test.go b/cdrc/flatstore_local_test.go index 4cf03887d..44c3dcdc7 100644 --- a/cdrc/flatstore_local_test.go +++ b/cdrc/flatstore_local_test.go @@ -1,6 +1,6 @@ /* -Rating system designed to be used in VoIP Carriers World -Copyright (C) 2012-2015 ITsysCOM +Real-time Charging System for Telecom & ISP environments +Copyright (C) 2012-2015 ITsysCOM GmbH This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 5b60033f2..9e728235f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -88,15 +88,25 @@ func cacheData(ratingDb engine.RatingStorage, accountDb engine.AccountingStorage } // Fires up a cdrc instance -func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) { +func startCdrc(responder *engine.Responder, cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, closeChan chan struct{}) { + var cdrsConn engine.Connector var cdrcCfg *config.CdrcConfig for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one break } if cdrcCfg.Cdrs == utils.INTERNAL { <-cdrsChan // Wait for CDRServer to come up before start processing + cdrsConn = responder + } else { + conn, err := rpcclient.NewRpcClient("tcp", cdrcCfg.Cdrs, cfg.ConnectAttempts, cfg.Reconnects, utils.GOB) + if err != nil { + engine.Logger.Crit(fmt.Sprintf(" Could not connect to CDRS via RPC: %v", err)) + exitChan <- true + return + } + cdrsConn = &engine.RPCClientConnector{Client: conn} } - cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrServer, closeChan) + cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan) if err != nil { engine.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error())) exitChan <- true @@ -619,7 +629,7 @@ func main() { } else if !cdrcEnabled { cdrcEnabled = true // Mark that at least one cdrc service is active } - go startCdrc(cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC]) + go startCdrc(responder, cdrsChan, cdrcCfgs, cfg.HttpSkipTlsVerify, cfg.ConfigReloads[utils.CDRC]) } if cdrcEnabled { engine.Logger.Info("Starting CGRateS CDR client.")