From c3595169967d1bf227219abcdf7efa46beb9efd1 Mon Sep 17 00:00:00 2001 From: DanB Date: Fri, 10 Jul 2015 21:03:38 +0200 Subject: [PATCH] Partial implementation of db_flatstore CDRs from *ser --- cdrc/cdrc.go | 192 +++++++++++++---- cdrc/cdrc_test.go | 197 ++++++++++++++++++ config/cdrcconfig.go | 7 +- config/config_defaults.go | 4 +- config/config_json_test.go | 1 + config/configcdrc_test.go | 1 + config/libconfig_json.go | 2 + .../opensips/etc/opensips/opensips.cfg | 28 ++- general_tests/tutorial_osips_calls_test.go | 6 +- utils/consts.go | 2 + 10 files changed, 386 insertions(+), 54 deletions(-) diff --git a/cdrc/cdrc.go b/cdrc/cdrc.go index 564c537cb..57e55a100 100644 --- a/cdrc/cdrc.go +++ b/cdrc/cdrc.go @@ -21,6 +21,7 @@ package cdrc import ( "bufio" "encoding/csv" + "errors" "fmt" "io" "io/ioutil" @@ -104,9 +105,9 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS cdrc := &Cdrc{cdrsAddress: cdrcCfg.Cdrs, CdrFormat: cdrcCfg.CdrFormat, cdrInDir: cdrcCfg.CdrInDir, cdrOutDir: cdrcCfg.CdrOutDir, runDelay: cdrcCfg.RunDelay, csvSep: cdrcCfg.FieldSeparator, httpSkipTlsCheck: httpSkipTlsCheck, cdrServer: cdrServer, exitChan: exitChan, maxOpenFiles: make(chan struct{}, cdrcCfg.MaxOpenFiles)} - var processFile struct{} + var processCsvFile struct{} for i := 0; i < cdrcCfg.MaxOpenFiles; i++ { - cdrc.maxOpenFiles <- processFile // Empty initiate so we do not need to wait later when we pop + cdrc.maxOpenFiles <- processCsvFile // Empty initiate so we do not need to wait later when we pop } cdrc.cdrSourceIds = make([]string, len(cdrcCfgs)) cdrc.duMultiplyFactors = make([]float64, len(cdrcCfgs)) @@ -130,6 +131,64 @@ func NewCdrc(cdrcCfgs map[string]*config.CdrcConfig, httpSkipTlsCheck bool, cdrS return cdrc, nil } +func NewPartialFlatstoreRecord(record []string) (*PartialFlatstoreRecord, error) { + if len(record) < 7 { + return nil, errors.New("MISSING_IE") + } + pr := &PartialFlatstoreRecord{Method: record[0], AccId: record[3] + record[1] + record[2], Values: record} + var err error + if pr.Timestamp, err = utils.ParseTimeDetectLayout(record[6]); err != nil { + return nil, err + } + return pr, nil +} + +// This is a partial record received from Flatstore, can be INVITE or BYE and it needs to be paired in order to produce duration +type PartialFlatstoreRecord struct { + Method string // INVITE or BYE + AccId string // Copute here the AccId + Timestamp time.Time // Timestamp of the event, as written by db_flastore module + Values []string // Can contain original values or updated via UpdateValues +} + +// Pairs INVITE and BYE into final record containing as last element the duration +func pairToRecord(part1, part2 *PartialFlatstoreRecord) ([]string, error) { + var invite, bye *PartialFlatstoreRecord + if part1.Method == "INVITE" { + invite = part1 + } else if part2.Method == "INVITE" { + invite = part2 + } else { + return nil, errors.New("MISSING_INVITE") + } + if part1.Method == "BYE" { + bye = part1 + } else if part2.Method == "BYE" { + bye = part2 + } else { + return nil, errors.New("MISSING_BYE") + } + if len(invite.Values) != len(bye.Values) { + return nil, errors.New("INCONSISTENT_VALUES_LENGTH") + } + record := invite.Values + for idx := range record { + switch idx { + case 0, 1, 2, 3, 6: // Leave these values as they are + case 4, 5: + record[idx] = bye.Values[idx] // Update record with status from bye + default: + if bye.Values[idx] != "" { // Any value higher than 6 is dynamically inserted, overwrite if non empty + record[idx] = bye.Values[idx] + } + + } + } + callDur := bye.Timestamp.Sub(invite.Timestamp) + record = append(record, strconv.FormatFloat(callDur.Seconds(), 'f', -1, 64)) + return record, nil +} + type Cdrc struct { cdrsAddress, CdrFormat, @@ -145,7 +204,8 @@ type Cdrc struct { cdrServer *engine.CdrServer // Reference towards internal cdrServer if that is the case httpClient *http.Client exitChan chan struct{} - maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + maxOpenFiles chan struct{} // Maximum number of simultaneous files processed + partialRecords map[string]map[string]*PartialFlatstoreRecord // [FileName"][AccId]*PartialRecord } // When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing @@ -188,7 +248,7 @@ func (self *Cdrc) trackCDRFiles() (err error) { case ev := <-watcher.Events: if ev.Op&fsnotify.Create == fsnotify.Create && (self.CdrFormat != FS_CSV || path.Ext(ev.Name) != ".csv") { go func() { //Enable async processing here - if err = self.processFile(ev.Name); err != nil { + if err = self.processCsvFile(ev.Name); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", ev.Name, err.Error())) } }() @@ -206,7 +266,7 @@ func (self *Cdrc) processCdrDir() error { for _, file := range filesInDir { if self.CdrFormat != FS_CSV || path.Ext(file.Name()) != ".csv" { go func() { //Enable async processing here - if err := self.processFile(path.Join(self.cdrInDir, file.Name())); err != nil { + if err := self.processCsvFile(path.Join(self.cdrInDir, file.Name())); err != nil { engine.Logger.Err(fmt.Sprintf("Processing file %s, error: %s", file, err.Error())) } }() @@ -216,9 +276,9 @@ func (self *Cdrc) processCdrDir() error { } // Processe file at filePath and posts the valid cdr rows out of it -func (self *Cdrc) processFile(filePath string) error { - processFile := <-self.maxOpenFiles // Queue here for maxOpenFiles - defer func() { self.maxOpenFiles <- processFile }() +func (self *Cdrc) processCsvFile(filePath string) error { + processCsvFile := <-self.maxOpenFiles // Queue here for maxOpenFiles + defer func() { self.maxOpenFiles <- processCsvFile }() _, fn := path.Split(filePath) engine.Logger.Info(fmt.Sprintf(" Parsing: %s", filePath)) file, err := os.Open(filePath) @@ -241,43 +301,18 @@ func (self *Cdrc) processFile(filePath string) error { engine.Logger.Err(fmt.Sprintf(" Row %d - csv error: %s", procRowNr, err.Error())) continue // Other csv related errors, ignore } - recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates - for idx := range self.cdrFields { - // Make sure filters are matching - filterBreak := false - for _, rsrFilter := range self.cdrFilters[idx] { - if rsrFilter == nil { // Nil filter does not need to match anything - continue - } - if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { - return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) - } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { - filterBreak = true - break - } - } - if filterBreak { // Stop importing cdrc fields profile due to non matching filter + if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // partial records for flatstore CDRs + if record, err = self.processPartialRecord(record, fn); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing partial record, row: %d, error: %s", procRowNr, err.Error())) + continue + } else if record == nil { continue } - if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { - engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", procRowNr, err.Error())) - continue - } else { - recordCdrs = append(recordCdrs, storedCdr) - } + // Record was overwriten with complete information out of cache } - for _, storedCdr := range recordCdrs { - if self.cdrsAddress == utils.INTERNAL { - if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) - continue - } - } else { // CDRs listening on IP - if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_post", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { - engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", procRowNr, err.Error())) - continue - } - } + if err := self.processRecord(record, procRowNr); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed processing CDR, row: %d, error: %s", procRowNr, err.Error())) + continue } } // Finished with file, move it to processed folder @@ -291,14 +326,85 @@ func (self *Cdrc) processFile(filePath string) error { return nil } +// Processes a single partial record for flatstore CDRs, in case of failed calls it will emulate the BYE by copying the INVITE +func (self *Cdrc) processPartialRecord(record []string, fileName string) ([]string, error) { + pr, err := NewPartialFlatstoreRecord(record) + if err != nil { + return nil, err + } + // ToDo: cache locking + // ToDo: schedule dumping of the .unpaired files + if fileMp, hasFile := self.partialRecords[fileName]; !hasFile { + self.partialRecords[fileName] = map[string]*PartialFlatstoreRecord{pr.AccId: pr} + return nil, nil + } else if _, hasAccId := fileMp[pr.AccId]; !hasAccId { + self.partialRecords[fileName][pr.AccId] = pr + return nil, nil + } + // The paired is already in cache, build up the final record + return pairToRecord(self.partialRecords[fileName][pr.AccId], pr) +} + +// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer +func (self *Cdrc) processRecord(record []string, srcRowNr int) error { + recordCdrs := make([]*engine.StoredCdr, 0) // More CDRs based on the number of filters and field templates + for idx := range self.cdrFields { // cdrFields coming from more templates will produce individual storCdr records + // Make sure filters are matching + filterBreak := false + for _, rsrFilter := range self.cdrFilters[idx] { + if rsrFilter == nil { // Nil filter does not need to match anything + continue + } + if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx { + return fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter) + } else if !rsrFilter.FilterPasses(record[cfgFieldIdx]) { + filterBreak = true + break + } + } + if filterBreak { // Stop importing cdrc fields profile due to non matching filter + continue + } + if storedCdr, err := self.recordToStoredCdr(record, idx); err != nil { + engine.Logger.Err(fmt.Sprintf(" Row %d - failed converting to StoredCdr, error: %s", srcRowNr, err.Error())) + continue + } else { + recordCdrs = append(recordCdrs, storedCdr) + } + } + for _, storedCdr := range recordCdrs { + if self.cdrsAddress == utils.INTERNAL { + if err := self.cdrServer.ProcessCdr(storedCdr); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) + continue + } + } else { // CDRs listening on IP + if _, err := self.httpClient.PostForm(fmt.Sprintf("http://%s/cdr_post", self.cdrsAddress), storedCdr.AsHttpForm()); err != nil { + engine.Logger.Err(fmt.Sprintf(" Failed posting CDR, row: %d, error: %s", srcRowNr, err.Error())) + continue + } + } + } + return nil +} + // Takes the record out of csv and turns it into storedCdr which can be processed by CDRS func (self *Cdrc) recordToStoredCdr(record []string, cfgIdx int) (*engine.StoredCdr, error) { storedCdr := &engine.StoredCdr{CdrHost: "0.0.0.0", CdrSource: self.cdrSourceIds[cfgIdx], ExtraFields: make(map[string]string), Cost: -1} var err error var lazyHttpFields []*config.CfgCdrField for _, cdrFldCfg := range self.cdrFields[cfgIdx] { + if utils.IsSliceMember([]string{utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { // Hardcode some values in case of flatstore + switch cdrFldCfg.CdrFieldId { + case utils.ACCID: + cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile("3;1;2", utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us + case utils.USAGE: + cdrFldCfg.Value = utils.ParseRSRFieldsMustCompile(strconv.Itoa(len(record)-1), utils.INFIELD_SEP) // in case of flatstore, last element will be the duration computed by us + } + + } var fieldVal string - if utils.IsSliceMember([]string{CSV, FS_CSV}, self.CdrFormat) { + if utils.IsSliceMember([]string{CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE}, self.CdrFormat) { if cdrFldCfg.Type == utils.CDRFIELD { for _, cfgFieldRSR := range cdrFldCfg.Value { if cfgFieldRSR.IsStatic() { diff --git a/cdrc/cdrc_test.go b/cdrc/cdrc_test.go index fbf467716..909440162 100644 --- a/cdrc/cdrc_test.go +++ b/cdrc/cdrc_test.go @@ -19,6 +19,9 @@ along with this program. If not, see package cdrc import ( + "bytes" + "encoding/csv" + "io" "reflect" "testing" "time" @@ -238,3 +241,197 @@ func TestDnTdmCdrs(t *testing.T) { } */ + +func TestNewPartialFlatstoreRecord(t *testing.T) { + ePr := &PartialFlatstoreRecord{Method: "INVITE", AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local), + Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}} + if pr, err := NewPartialFlatstoreRecord(ePr.Values); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(ePr, pr) { + t.Errorf("Expecting: %+v, received: %+v", ePr, pr) + } + if _, err := NewPartialFlatstoreRecord([]string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK"}); err == nil || err.Error() != "MISSING_IE" { + t.Error(err) + } +} + +func TestPairToRecord(t *testing.T) { + eRecord := []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475", "2"} + invPr := &PartialFlatstoreRecord{Method: "INVITE", Timestamp: time.Date(2015, 7, 9, 17, 6, 48, 0, time.Local), + Values: []string{"INVITE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454408", "*prepaid", "1001", "1002", "", "3401:2069362475"}} + byePr := &PartialFlatstoreRecord{Method: "BYE", Timestamp: time.Date(2015, 7, 9, 17, 6, 50, 0, time.Local), + Values: []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "", "3401:2069362475"}} + if rec, err := pairToRecord(invPr, byePr); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eRecord, rec) { + t.Errorf("Expected: %+v, received: %+v", eRecord, rec) + } + if rec, err := pairToRecord(byePr, invPr); err != nil { + t.Error(err) + } else if !reflect.DeepEqual(eRecord, rec) { + t.Errorf("Expected: %+v, received: %+v", eRecord, rec) + } + if _, err := pairToRecord(byePr, byePr); err == nil || err.Error() != "MISSING_INVITE" { + t.Error(err) + } + if _, err := pairToRecord(invPr, invPr); err == nil || err.Error() != "MISSING_BYE" { + t.Error(err) + } + byePr.Values = []string{"BYE", "2daec40c", "548625ac", "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0", "200", "OK", "1436454410", "", "", "", "3401:2069362475"} // Took one value out + if _, err := pairToRecord(invPr, byePr); err == nil || err.Error() != "INCONSISTENT_VALUES_LENGTH" { + t.Error(err) + } +} + +func TestOsipsFlatstoreCdrs(t *testing.T) { + osipsCdrs := ` +INVITE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454408|*prepaid|1001|1002||3401:2069362475 +BYE|2daec40c|548625ac|dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:0|200|OK|1436454410|||||3401:2069362475 +INVITE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454647|*postpaid|1002|1001||1877:893549741 +BYE|f9d3d5c3|c863a6e3|214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0|200|OK|1436454651|||||1877:893549741 +INVITE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454657|*prepaid|1001|1002||2407:1884881533 +BYE|36e39a5|42d996f9|3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:0|200|OK|1436454661|||||2407:1884881533 +INVITE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454690|*prepaid|1001|1002||3099:1909036290 +BYE|3111f3c9|49ca4c42|a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:0|200|OK|1436454692|||||3099:1909036290 +` + /* + eCdrs := []*engine.StoredCdr{ + &engine.StoredCdr{ + CgrId: utils.Sha1("dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC).String()), + TOR: utils.VOICE, + AccId: "dd0c4c617a9919d29a6175cdff223a9e@0:0:0:0:0:0:0:02daec40c548625ac", + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + ReqType: utils.META_PREPAID, + Direction: "*out", + Tenant: "sip.test.deanconnect.nl", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC), + AnswerTime: time.Date(2014, 7, 2, 15, 24, 40, 0, time.UTC), + Usage: time.Duration(25) * time.Second, + Cost: -1, + }, + &engine.StoredCdr{ + CgrId: utils.Sha1("214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0f9d3d5c3c863a6e3", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()), + TOR: utils.VOICE, + AccId: "214d8f52b566e33a9349b184e72a4cca@0:0:0:0:0:0:0:0f9d3d5c3c863a6e3", + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + ReqType: utils.META_PREPAID, + Direction: "*out", + Tenant: "sip.test.deanconnect.nl", + Category: "call", + Account: "1002", + Subject: "1002", + Destination: "1001", + SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + Usage: time.Duration(8) * time.Second, + Cost: -1, + }, + &engine.StoredCdr{ + CgrId: utils.Sha1("3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:036e39a542d996f9", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()), + TOR: utils.VOICE, + AccId: "3a63321dd3b325eec688dc2aefb6ac2d@0:0:0:0:0:0:0:036e39a542d996f9", + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + ReqType: utils.META_PREPAID, + Direction: "*out", + Tenant: "sip.test.deanconnect.nl", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + Usage: time.Duration(8) * time.Second, + Cost: -1, + }, + &engine.StoredCdr{ + CgrId: utils.Sha1("a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:03111f3c949ca4c42", time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC).String()), + TOR: utils.VOICE, + AccId: "a58ebaae40d08d6757d8424fb09c4c54@0:0:0:0:0:0:0:03111f3c949ca4c42", + CdrHost: "0.0.0.0", + CdrSource: "TEST_CDRC", + ReqType: utils.META_PREPAID, + Direction: "*out", + Tenant: "sip.test.deanconnect.nl", + Category: "call", + Account: "1001", + Subject: "1001", + Destination: "1002", + SetupTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + AnswerTime: time.Date(2014, 7, 2, 15, 24, 41, 0, time.UTC), + Usage: time.Duration(8) * time.Second, + Cost: -1, + }, + } + */ + //cgrConfig, _ := config.NewDefaultCGRConfig() + cdrFields := [][]*config.CfgCdrField{[]*config.CfgCdrField{ + &config.CfgCdrField{Tag: "Tor", Type: utils.CDRFIELD, CdrFieldId: utils.TOR, Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "AccId", Type: utils.CDRFIELD, CdrFieldId: utils.ACCID, Mandatory: true}, + &config.CfgCdrField{Tag: "ReqType", Type: utils.CDRFIELD, CdrFieldId: utils.REQTYPE, Value: utils.ParseRSRFieldsMustCompile("7", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Direction", Type: utils.CDRFIELD, CdrFieldId: utils.DIRECTION, Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Direction", Type: utils.CDRFIELD, CdrFieldId: utils.DIRECTION, Value: utils.ParseRSRFieldsMustCompile("^*out", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Tenant", Type: utils.CDRFIELD, CdrFieldId: utils.TENANT, Value: utils.ParseRSRFieldsMustCompile("^cgrates.org", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Category", Type: utils.CDRFIELD, CdrFieldId: utils.CATEGORY, Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Account", Type: utils.CDRFIELD, CdrFieldId: utils.ACCOUNT, Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Subject", Type: utils.CDRFIELD, CdrFieldId: utils.SUBJECT, Value: utils.ParseRSRFieldsMustCompile("8", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Destination", Type: utils.CDRFIELD, CdrFieldId: utils.DESTINATION, Value: utils.ParseRSRFieldsMustCompile("9", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "SetupTime", Type: utils.CDRFIELD, CdrFieldId: utils.SETUP_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "AnswerTime", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Value: utils.ParseRSRFieldsMustCompile("6", utils.INFIELD_SEP), Mandatory: true}, + &config.CfgCdrField{Tag: "Duration", Type: utils.CDRFIELD, CdrFieldId: utils.ANSWER_TIME, Mandatory: true}, + &config.CfgCdrField{Tag: "DialogId", Type: utils.CDRFIELD, CdrFieldId: "DialogIdentifier", Value: utils.ParseRSRFieldsMustCompile("11", utils.INFIELD_SEP)}, + }} + cdrc := &Cdrc{CdrFormat: utils.OSIPS_FLATSTORE, cdrSourceIds: []string{"TEST_CDRC"}, cdrFields: cdrFields, partialRecords: make(map[string]map[string]*PartialFlatstoreRecord)} + cdrsContent := bytes.NewReader([]byte(osipsCdrs)) + csvReader := csv.NewReader(cdrsContent) + csvReader.Comma = '|' + cdrs := make([]*engine.StoredCdr, 0) + recNrs := 0 + for { + recNrs++ + cdrCsv, err := csvReader.Read() + if err != nil && err == io.EOF { + break // End of file + } else if err != nil { + t.Error("Unexpected error:", err) + } + record, err := cdrc.processPartialRecord(cdrCsv, "dummyfilename") + if err != nil { + t.Error(err) + } + if record == nil { + continue // Partial record + } + if storedCdr, err := cdrc.recordToStoredCdr(record, 0); err != nil { + t.Error(err) + } else if storedCdr != nil { + cdrs = append(cdrs, storedCdr) + } + } + /* + if !reflect.DeepEqual(eCdrs, cdrs) { + t.Errorf("Expecting: %+v, received: %+v", eCdrs, cdrs) + } + */ + +} + +/* +func TestOsipsFlatstoreMissedCdrs(t *testing.T) { + osipsCdrs := ` +INVITE|ef6c6256|da501581|0bfdd176d1b93e7df3de5c6f4873ee04@0:0:0:0:0:0:0:0|487|Request Terminated|1436454643|*prepaid|1001|1002||1224:339382783 +INVITE|7905e511||81880da80a94bda81b425b09009e055c@0:0:0:0:0:0:0:0|404|Not Found|1436454668|*prepaid|1001|1002||1980:1216490844 +INVITE|25f7def5||9984b9ec535a7d317b542744d48d0ed6@0:0:0:0:0:0:0:0|404|Not Found|1436454669|*prepaid|1001|1002||14:1595110662 +INVITE|ae0a7f6c||02f7fa9334db7aa4130bbf7627370621@0:0:0:0:0:0:0:0|404|Not Found|1436454670|*prepaid|1001|1002||176:1975670970 +INVITE|32b97104||3154c2a80294f538991a88d86f4e1085@0:0:0:0:0:0:0:0|404|Not Found|1436454678|*prepaid|1001|1002||2607:1024337552 +INVITE|324cb497|d4af7023|8deaadf2ae9a17809a391f05af31afb0@0:0:0:0:0:0:0:0|486|Busy here|1436454687|*postpaid|1002|1002||474:130115066 +INVITE|167ac4db|c53c85e5|4b3885cb78dde44dc7936abd2fa281e1@0:0:0:0:0:0:0:0|487|Request Terminated|1436454695|*postpaid|1002|1002||1922:549002535 +` +} +*/ diff --git a/config/cdrcconfig.go b/config/cdrcconfig.go index 4ef7a6d07..513a9c805 100644 --- a/config/cdrcconfig.go +++ b/config/cdrcconfig.go @@ -27,15 +27,17 @@ import ( type CdrcConfig struct { Enabled bool // Enable/Disable the profile Cdrs string // The address where CDRs can be reached - CdrFormat string // The type of CDR file to process + CdrFormat string // The type of CDR file to process FieldSeparator rune // The separator to use when reading csvs DataUsageMultiplyFactor float64 // Conversion factor for data usage RunDelay time.Duration // Delay between runs, 0 for inotify driven requests MaxOpenFiles int // Maximum number of files opened simultaneously CdrInDir string // Folder to process CDRs from CdrOutDir string // Folder to move processed CDRs to + FailedCallsPrefix string // Used in case of flatstore CDRs to avoid searching for BYE records CdrSourceId string // Source identifier for the processed CDRs CdrFilter utils.RSRFields // Filter CDR records to import + PartialRecordCache time.Duration // Duration to cache partial records when not pairing CdrFields []*CfgCdrField // List of fields to be processed } @@ -72,6 +74,9 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error { if jsnCfg.Cdr_out_dir != nil { self.CdrOutDir = *jsnCfg.Cdr_out_dir } + if jsnCfg.Failed_calls_prefix != nil { + self.FailedCallsPrefix = *jsnCfg.Failed_calls_prefix + } if jsnCfg.Cdr_source_id != nil { self.CdrSourceId = *jsnCfg.Cdr_source_id } diff --git a/config/config_defaults.go b/config/config_defaults.go index 99dc4ed3f..590bcb51a 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -152,15 +152,17 @@ const CGRATES_CFG_JSON = ` "*default": { "enabled": false, // enable CDR client functionality "cdrs": "internal", // address where to reach CDR server. - "cdr_format": "csv", // CDR file format + "cdr_format": "csv", // CDR file format "field_separator": ",", // separator used in case of csv files "run_delay": 0, // sleep interval in seconds between consecutive runs, 0 to use automation via inotify "max_open_files": 1024, // maximum simultaneous files to process "data_usage_multiply_factor": 1024, // conversion factor for data usage "cdr_in_dir": "/var/log/cgrates/cdrc/in", // absolute path towards the directory where the CDRs are stored "cdr_out_dir": "/var/log/cgrates/cdrc/out", // absolute path towards the directory where processed CDRs will be moved + "failed_calls_prefix": "missed_calls", // used in case of flatstore CDRs to avoid searching for BYE records "cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database "cdr_filter": "", // filter CDR records to import + "partial_record_cache": "10s", // duration to cache partial records when not pairing "cdr_fields":[ // import template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "tor", "cdr_field_id": "tor", "type": "cdrfield", "value": "2", "mandatory": true}, {"tag": "accid", "cdr_field_id": "accid", "type": "cdrfield", "value": "3", "mandatory": true}, diff --git a/config/config_json_test.go b/config/config_json_test.go index 5e2e3c1ed..970a53a88 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -294,6 +294,7 @@ func TestDfCdrcJsonCfg(t *testing.T) { Data_usage_multiply_factor: utils.Float64Pointer(1024.0), Cdr_in_dir: utils.StringPointer("/var/log/cgrates/cdrc/in"), Cdr_out_dir: utils.StringPointer("/var/log/cgrates/cdrc/out"), + Failed_calls_prefix: utils.StringPointer("missed_calls"), Cdr_source_id: utils.StringPointer("freeswitch_csv"), Cdr_filter: utils.StringPointer(""), Cdr_fields: &cdrFields, diff --git a/config/configcdrc_test.go b/config/configcdrc_test.go index 1dda59c00..de2bc106a 100644 --- a/config/configcdrc_test.go +++ b/config/configcdrc_test.go @@ -44,6 +44,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) { MaxOpenFiles: 1024, CdrInDir: "/var/log/cgrates/cdrc/in", CdrOutDir: "/var/log/cgrates/cdrc/out", + FailedCallsPrefix: "missed_calls", CdrSourceId: "freeswitch_csv", CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP), CdrFields: []*CfgCdrField{ diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 8dfe5b257..6309f76a9 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -132,9 +132,11 @@ type CdrcJsonCfg struct { Data_usage_multiply_factor *float64 Cdr_in_dir *string Cdr_out_dir *string + Failed_calls_prefix *string Cdr_source_id *string Cdr_filter *string Max_open_files *int + PartialRecordCache *string Cdr_fields *[]*CdrFieldJsonCfg } diff --git a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg index 2a52b73dc..bb8b57d66 100644 --- a/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg +++ b/data/tutorials/osips_async/opensips/etc/opensips/opensips.cfg @@ -37,8 +37,7 @@ modparam("tm", "onreply_avp_mode", 1) #### Record Route Module loadmodule "rr.so" -/* do not append from tag to the RR (no need for this script) */ -modparam("rr", "append_fromtag", 0) + #### MAX ForWarD module loadmodule "maxfwd.so" @@ -83,17 +82,31 @@ modparam("dialog", "dlg_match_mode", 1) modparam("dialog", "default_timeout", 21600) # 6 hours timeout modparam("dialog", "db_mode", 0) +#### DbFlatstore module +loadmodule "db_flatstore.so" +modparam("db_flatstore", "single_file", 1) + #### ACCounting module loadmodule "acc.so" +modparam("acc", "detect_direction", 1) #modparam("acc", "cdr_flag", "CDR") -modparam("acc", "evi_flag", "CDR") -modparam("acc", "evi_missed_flag", "CDR") +#modparam("acc", "evi_flag", "CDR") +#modparam("acc", "evi_missed_flag", "CDR") modparam("acc", "evi_extra", "cgr_reqtype=$avp(cgr_reqtype); cgr_account=$avp(cgr_account); cgr_destination=$avp(cgr_destination); cgr_supplier=$avp(cgr_supplier); dialog_id=$DLG_did") +modparam("acc", "db_url", "flatstore:/tmp") +modparam("acc", "db_flag", "CDR") +modparam("acc", "db_missed_flag", "CDR") +modparam("acc", "db_table_missed_calls", "cgr_missed") +modparam("acc", "db_extra", "cgr_reqtype=$avp(cgr_reqtype); + cgr_account=$avp(cgr_account); + cgr_destination=$avp(cgr_destination); + cgr_supplier=$avp(cgr_supplier); + dialog_id=$DLG_did") #### CfgUtils module loadmodule "cfgutils.so" @@ -205,9 +218,6 @@ route{ # during the dialog. record_route(); } - - - # route it out to whatever destination was set by loose_route() # in $du (destination URI). route(relay); @@ -300,7 +310,6 @@ route{ } if (!uri==myself) { - append_hf("P-hint: outbound\r\n"); route(relay); } @@ -349,6 +358,9 @@ route[location] { t_reply("404", "Not Found"); exit; } + append_branch(); + append_branch(); + setflag(CDR); } failure_route[missed_call] { diff --git a/general_tests/tutorial_osips_calls_test.go b/general_tests/tutorial_osips_calls_test.go index d48b74265..d2104e386 100644 --- a/general_tests/tutorial_osips_calls_test.go +++ b/general_tests/tutorial_osips_calls_test.go @@ -43,7 +43,7 @@ func TestTutOsipsCallsInitCfg(t *testing.T) { } // Init config first var err error - tutOsipsCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*dataDir, "tutorials", "kamevapi", "cgrates", "etc", "cgrates")) + tutOsipsCallsCfg, err = config.NewCGRConfigFromFolder(path.Join(*dataDir, "tutorials", "osips_async", "cgrates", "etc", "cgrates")) if err != nil { t.Error(err) } @@ -71,6 +71,7 @@ func TestTutOsipsCallsResetStorDb(t *testing.T) { } } +/* // start Kam server func TestTutOsipsCallsStartOsips(t *testing.T) { if !*testCalls { @@ -81,6 +82,7 @@ func TestTutOsipsCallsStartOsips(t *testing.T) { t.Fatal(err) } } +*/ // Start CGR Engine func TestTutOsipsCallsStartEngine(t *testing.T) { @@ -446,9 +448,11 @@ func TestTutOsipsCallsStopCgrEngine(t *testing.T) { } } +/* func TestTutOsipsCallsStopOpensips(t *testing.T) { if !*testCalls { return } engine.KillProcName("opensips", 100) } +*/ diff --git a/utils/consts.go b/utils/consts.go index 4e432df9d..1f0a58551 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -208,6 +208,8 @@ const ( CGR_DISCONNECT_CAUSE = "cgr_disconnectcause" CGR_COMPUTELCR = "cgr_computelcr" CGR_SUPPLIERS = "cgr_suppliers" + KAM_FLATSTORE = "kamailio_flatstore" + OSIPS_FLATSTORE = "opensips_flatstore" ) var (