mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-20 22:58:44 +05:00
Add for cdrc csv new filters
This commit is contained in:
committed by
Dan Christian Bogos
parent
acef7743c6
commit
2f52829b7e
16
cdrc/cdrc.go
16
cdrc/cdrc.go
@@ -55,7 +55,8 @@ Common parameters within configs processed:
|
||||
Parameters specific per config instance:
|
||||
* duMultiplyFactor, cdrSourceId, cdrFilter, cdrFields
|
||||
*/
|
||||
func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection, closeChan chan struct{}, dfltTimezone string, roundDecimals int) (*Cdrc, error) {
|
||||
func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclient.RpcClientConnection,
|
||||
closeChan chan struct{}, dfltTimezone string, roundDecimals int, filterS *engine.FilterS) (*Cdrc, error) {
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
@@ -80,6 +81,7 @@ func NewCdrc(cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool, cdrs rpcclien
|
||||
return nil, fmt.Errorf("Nonexistent folder: %s", dir)
|
||||
}
|
||||
}
|
||||
cdrc.filterS = filterS
|
||||
cdrc.httpClient = new(http.Client)
|
||||
return cdrc, nil
|
||||
}
|
||||
@@ -95,6 +97,7 @@ type Cdrc struct {
|
||||
maxOpenFiles chan struct{} // Maximum number of simultaneous files processed
|
||||
unpairedRecordsCache *UnpairedRecordsCache // Shared between all files in the folder we process
|
||||
partialRecordsCache *PartialRecordsCache
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
// When called fires up folder monitoring, either automated via inotify or manual by sleeping between processing
|
||||
@@ -181,12 +184,15 @@ func (self *Cdrc) processFile(filePath string) error {
|
||||
case CSV, FS_CSV, utils.KAM_FLATSTORE, utils.OSIPS_FLATSTORE, utils.PartialCSV:
|
||||
csvReader := csv.NewReader(bufio.NewReader(file))
|
||||
csvReader.Comma = self.dfltCdrcCfg.FieldSeparator
|
||||
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg, self.cdrcCfgs,
|
||||
self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache, self.dfltCdrcCfg.CacheDumpFields)
|
||||
recordsProcessor = NewCsvRecordsProcessor(csvReader, self.timezone, fn, self.dfltCdrcCfg,
|
||||
self.cdrcCfgs, self.httpSkipTlsCheck, self.unpairedRecordsCache, self.partialRecordsCache,
|
||||
self.dfltCdrcCfg.CacheDumpFields, self.filterS)
|
||||
case utils.FWV:
|
||||
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs, self.httpClient, self.httpSkipTlsCheck, self.timezone)
|
||||
recordsProcessor = NewFwvRecordsProcessor(file, self.dfltCdrcCfg, self.cdrcCfgs,
|
||||
self.httpClient, self.httpSkipTlsCheck, self.timezone, self.filterS)
|
||||
case utils.XML:
|
||||
if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRPath, self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs); err != nil {
|
||||
if recordsProcessor, err = NewXMLRecordsProcessor(file, self.dfltCdrcCfg.CDRPath,
|
||||
self.timezone, self.httpSkipTlsCheck, self.cdrcCfgs, self.filterS); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
|
||||
89
cdrc/csv.go
89
cdrc/csv.go
@@ -33,10 +33,11 @@ import (
|
||||
|
||||
func NewCsvRecordsProcessor(csvReader *csv.Reader, timezone, fileName string,
|
||||
dfltCdrcCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig,
|
||||
httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache, cacheDumpFields []*config.CfgCdrField) *CsvRecordsProcessor {
|
||||
httpSkipTlsCheck bool, unpairedRecordsCache *UnpairedRecordsCache, partialRecordsCache *PartialRecordsCache,
|
||||
cacheDumpFields []*config.CfgCdrField, filterS *engine.FilterS) *CsvRecordsProcessor {
|
||||
return &CsvRecordsProcessor{csvReader: csvReader, timezone: timezone, fileName: fileName,
|
||||
dfltCdrcCfg: dfltCdrcCfg, cdrcCfgs: cdrcCfgs, httpSkipTlsCheck: httpSkipTlsCheck, unpairedRecordsCache: unpairedRecordsCache,
|
||||
partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields}
|
||||
partialRecordsCache: partialRecordsCache, partialCacheDumpFields: cacheDumpFields, filterS: filterS}
|
||||
|
||||
}
|
||||
|
||||
@@ -51,6 +52,7 @@ type CsvRecordsProcessor struct {
|
||||
unpairedRecordsCache *UnpairedRecordsCache // Shared by cdrc so we can cache for all files in a folder
|
||||
partialRecordsCache *PartialRecordsCache // Cache records which are of type "Partial"
|
||||
partialCacheDumpFields []*config.CfgCdrField
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
func (self *CsvRecordsProcessor) ProcessedRecordsNr() int64 {
|
||||
@@ -100,24 +102,33 @@ func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]stri
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) {
|
||||
utils.Logger.Debug(fmt.Sprintf("Record from CSV : %+v \n", record))
|
||||
recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates
|
||||
for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records
|
||||
// Make sure filters are matching
|
||||
passes := true
|
||||
for _, rsrFilter := range cdrcCfg.CdrFilter {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
if len(cdrcCfg.Filters) == 0 {
|
||||
for _, rsrFilter := range cdrcCfg.CdrFilter {
|
||||
if rsrFilter == nil { // Nil filter does not need to match anything
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if _, err := rsrFilter.Parse(record[cfgFieldIdx]); err != nil {
|
||||
passes = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !passes { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
if cfgFieldIdx, _ := strconv.Atoi(rsrFilter.Id); len(record) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v - cannot compile filter %+v", record, rsrFilter)
|
||||
} else if _, err := rsrFilter.Parse(record[cfgFieldIdx]); err != nil {
|
||||
passes = false
|
||||
break
|
||||
} else {
|
||||
csvprovider, _ := newCsvProvider(record)
|
||||
if pass, err := self.filterS.Pass("cgrates.org",
|
||||
cdrcCfg.Filters, csvprovider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
if !passes { // Stop importing cdrc fields profile due to non matching filter
|
||||
continue
|
||||
}
|
||||
storedCdr, err := self.recordToStoredCdr(record, cdrcCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed converting to StoredCdr, error: %s", err.Error())
|
||||
@@ -237,3 +248,57 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
}
|
||||
return storedCdr, nil
|
||||
}
|
||||
|
||||
// newRADataProvider constructs a DataProvider
|
||||
func newCsvProvider(record []string) (dP engine.DataProvider, err error) {
|
||||
dP = &csvProvider{req: record, cache: engine.NewNavigableMap(nil)}
|
||||
return
|
||||
}
|
||||
|
||||
// csvProvider implements engine.DataProvider so we can pass it to filters
|
||||
type csvProvider struct {
|
||||
req []string
|
||||
cache *engine.NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (cP *csvProvider) String() string {
|
||||
return utils.ToJSON(cP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = cP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
if cfgFieldIdx, _ := strconv.Atoi(fldPath[0]); len(cP.req) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v", cP.req)
|
||||
} else {
|
||||
data = cP.req[cfgFieldIdx]
|
||||
}
|
||||
cP.cache.Set(fldPath, data, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = cP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
data, _ = utils.CastFieldIfToString(valIface)
|
||||
return
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) AsNavigableMap([]*config.CfgCdrField) (
|
||||
nm *engine.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
@@ -165,3 +165,180 @@ func TestCsvITKillEngine(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Begin tests for cdrc csv with new filters
|
||||
var fileContent1_2 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1
|
||||
accid22;*postpaid;itsyscom.com;1002;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1
|
||||
accid23;*rated;cgrates.org;1001;086517174963;2013-02-03 19:54:00;26;val_extra3;"";val_extra1`
|
||||
|
||||
func TestCsvIT2InitConfig(t *testing.T) {
|
||||
var err error
|
||||
csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsvwithfilter")
|
||||
if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil {
|
||||
t.Fatal("Got config error: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// InitDb so we can rely on count
|
||||
func TestCsvIT2InitCdrDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(csvCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT2CreateCdrDirs(t *testing.T) {
|
||||
for _, cdrcProfiles := range csvCfg.CdrcProfiles {
|
||||
for _, cdrcInst := range cdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT2StartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestCsvIT2RpcConn(t *testing.T) {
|
||||
var err error
|
||||
cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario out of first .xml config
|
||||
func TestCsvIT2HandleCdr2File(t *testing.T) {
|
||||
fileName := "file1.csv"
|
||||
tmpFilePath := path.Join("/tmp", fileName)
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_2), 0644); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctestswithfilters/csvit1/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT2ProcessedFiles(t *testing.T) {
|
||||
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
|
||||
if outContent2, err := ioutil.ReadFile("/tmp/cdrctestswithfilters/csvit1/out/file1.csv"); err != nil {
|
||||
t.Error(err)
|
||||
} else if fileContent1_2 != string(outContent2) {
|
||||
t.Errorf("Expecting: %q, received: %q", fileContent1_2, string(outContent2))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT2AnalyseCDRs(t *testing.T) {
|
||||
var reply []*engine.ExternalCDR
|
||||
if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(reply) != 2 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{RequestTypes: []string{utils.META_PREPAID}}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err) // Original 08651 was converted
|
||||
} else if len(reply) != 1 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT2KillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Begin tests for cdrc csv with new filters
|
||||
var fileContent1_3 = `accid21;*prepaid;itsyscom.com;1002;086517174963;2013-02-03 19:54:00;62;val_extra3;"";val_extra1
|
||||
accid22;*prepaid;itsyscom.com;1001;+4986517174963;2013-02-03 19:54:00;123;val_extra3;"";val_extra1
|
||||
accid23;*prepaid;cgrates.org;1002;086517174963;2013-02-03 19:54:00;76;val_extra3;"";val_extra1`
|
||||
|
||||
func TestCsvIT3InitConfig(t *testing.T) {
|
||||
var err error
|
||||
csvCfgPath = path.Join(*dataDir, "conf", "samples", "cdrccsvwithfilter")
|
||||
if csvCfg, err = config.NewCGRConfigFromFolder(csvCfgPath); err != nil {
|
||||
t.Fatal("Got config error: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// InitDb so we can rely on count
|
||||
func TestCsvIT3InitCdrDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(csvCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT3CreateCdrDirs(t *testing.T) {
|
||||
for _, cdrcProfiles := range csvCfg.CdrcProfiles {
|
||||
for _, cdrcInst := range cdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT3StartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(csvCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestCsvIT3RpcConn(t *testing.T) {
|
||||
var err error
|
||||
cdrcRpc, err = jsonrpc.Dial("tcp", csvCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Scenario out of first .xml config
|
||||
func TestCsvIT3HandleCdr2File(t *testing.T) {
|
||||
fileName := "file1.csv"
|
||||
tmpFilePath := path.Join("/tmp", fileName)
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(fileContent1_3), 0644); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join("/tmp/cdrctestswithfilters/csvit2/in", fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT3ProcessedFiles(t *testing.T) {
|
||||
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
|
||||
if outContent2, err := ioutil.ReadFile("/tmp/cdrctestswithfilters/csvit2/out/file1.csv"); err != nil {
|
||||
t.Error(err)
|
||||
} else if fileContent1_3 != string(outContent2) {
|
||||
t.Errorf("Expecting: %q, received: %q", fileContent1_3, string(outContent2))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT3AnalyseCDRs(t *testing.T) {
|
||||
var reply []*engine.ExternalCDR
|
||||
if err := cdrcRpc.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(reply) != 1 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCsvIT3KillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,8 +49,9 @@ func fwvValue(cdrLine string, indexStart, width int, padding string) string {
|
||||
return rawVal
|
||||
}
|
||||
|
||||
func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, httpClient *http.Client, httpSkipTlsCheck bool, timezone string) *FwvRecordsProcessor {
|
||||
return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone}
|
||||
func NewFwvRecordsProcessor(file *os.File, dfltCfg *config.CdrcConfig, cdrcCfgs []*config.CdrcConfig, httpClient *http.Client,
|
||||
httpSkipTlsCheck bool, timezone string, filterS *engine.FilterS) *FwvRecordsProcessor {
|
||||
return &FwvRecordsProcessor{file: file, cdrcCfgs: cdrcCfgs, dfltCfg: dfltCfg, httpSkipTlsCheck: httpSkipTlsCheck, timezone: timezone, filterS: filterS}
|
||||
}
|
||||
|
||||
type FwvRecordsProcessor struct {
|
||||
@@ -65,6 +66,7 @@ type FwvRecordsProcessor struct {
|
||||
processedRecordsNr int64 // Number of content records in file
|
||||
trailerOffset int64 // Index where trailer starts, to be used as boundary when reading cdrs
|
||||
headerCdr *engine.CDR // Cache here the general purpose stored CDR
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
// Sets the line length based on first line, sets offset back to initial after reading
|
||||
|
||||
@@ -92,7 +92,7 @@ func handlerSubstractUsage(xmlElmnt tree.Res, argsTpl utils.RSRFields, cdrPath u
|
||||
}
|
||||
|
||||
func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath, timezone string,
|
||||
httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig) (*XMLRecordsProcessor, error) {
|
||||
httpSkipTlsCheck bool, cdrcCfgs []*config.CdrcConfig, filterS *engine.FilterS) (*XMLRecordsProcessor, error) {
|
||||
xp, err := goxpath.Parse(cdrPath.AsString("/", true))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -105,7 +105,7 @@ func NewXMLRecordsProcessor(recordsReader io.Reader, cdrPath utils.HierarchyPath
|
||||
return nil, err
|
||||
}
|
||||
xmlProc := &XMLRecordsProcessor{cdrPath: cdrPath, timezone: timezone,
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs}
|
||||
httpSkipTlsCheck: httpSkipTlsCheck, cdrcCfgs: cdrcCfgs, filterS: filterS}
|
||||
xmlProc.cdrXmlElmts = goxpath.MustExec(xp, xmlNode, nil)
|
||||
return xmlProc, nil
|
||||
}
|
||||
@@ -117,6 +117,7 @@ type XMLRecordsProcessor struct {
|
||||
timezone string
|
||||
httpSkipTlsCheck bool
|
||||
cdrcCfgs []*config.CdrcConfig // individual configs for the folder CDRC is monitoring
|
||||
filterS *engine.FilterS
|
||||
}
|
||||
|
||||
func (xmlProc *XMLRecordsProcessor) ProcessedRecordsNr() int64 {
|
||||
|
||||
@@ -242,7 +242,8 @@ func TestXMLRPProcess(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft), utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs)
|
||||
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft),
|
||||
utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true, cdrcCfgs, nil)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
@@ -72,7 +72,11 @@ var (
|
||||
cfg *config.CGRConfig
|
||||
)
|
||||
|
||||
func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, exitChan chan bool) {
|
||||
func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection,
|
||||
exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
// Not sure here if FilterS is passed correct at line 103 in fo start cdrc
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
cdrcInitialized := false // Control whether the cdrc was already initialized (so we don't reload in that case)
|
||||
var cdrcChildrenChan chan struct{} // Will use it to communicate with the children of one fork
|
||||
for {
|
||||
@@ -94,9 +98,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
|
||||
enabledCfgs = append(enabledCfgs, cdrcCfg)
|
||||
}
|
||||
}
|
||||
|
||||
if len(enabledCfgs) != 0 {
|
||||
go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, cfg.HttpSkipTlsVerify, cdrcChildrenChan, exitChan)
|
||||
go startCdrc(internalCdrSChan, internalRaterChan, enabledCfgs, cfg.HttpSkipTlsVerify,
|
||||
cdrcChildrenChan, exitChan, filterSChan)
|
||||
} else {
|
||||
utils.Logger.Info("<CDRC> No enabled CDRC clients")
|
||||
}
|
||||
@@ -107,7 +111,9 @@ func startCdrcs(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConn
|
||||
|
||||
// Fires up a cdrc instance
|
||||
func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConnection, cdrcCfgs []*config.CdrcConfig, httpSkipTlsCheck bool,
|
||||
closeChan chan struct{}, exitChan chan bool) {
|
||||
closeChan chan struct{}, exitChan chan bool, filterSChan chan *engine.FilterS) {
|
||||
filterS := <-filterSChan
|
||||
filterSChan <- filterS
|
||||
var cdrcCfg *config.CdrcConfig
|
||||
for _, cdrcCfg = range cdrcCfgs { // Take the first config out, does not matter which one
|
||||
break
|
||||
@@ -120,7 +126,8 @@ func startCdrc(internalCdrSChan, internalRaterChan chan rpcclient.RpcClientConne
|
||||
exitChan <- true
|
||||
return
|
||||
}
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn, closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals)
|
||||
cdrc, err := cdrc.NewCdrc(cdrcCfgs, httpSkipTlsCheck, cdrsConn,
|
||||
closeChan, cfg.DefaultTimezone, cfg.RoundingDecimals, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Crit(fmt.Sprintf("Cdrc config parsing error: %s", err.Error()))
|
||||
exitChan <- true
|
||||
@@ -1244,7 +1251,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Start CDRC components if necessary
|
||||
go startCdrcs(internalCdrSChan, internalRaterChan, exitChan)
|
||||
go startCdrcs(internalCdrSChan, internalRaterChan, exitChan, filterSChan)
|
||||
|
||||
// Start SM-Generic
|
||||
if cfg.SessionSCfg().Enabled {
|
||||
|
||||
@@ -41,8 +41,9 @@ type CdrcConfig struct {
|
||||
CDRPath utils.HierarchyPath // used for XML CDRs to specify the path towards CDR elements
|
||||
CdrSourceId string // Source identifier for the processed CDRs
|
||||
CdrFilter utils.RSRFields // Filter CDR records to import
|
||||
ContinueOnSuccess bool // Continue after execution
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
Filters []string
|
||||
ContinueOnSuccess bool // Continue after execution
|
||||
PartialRecordCache time.Duration // Duration to cache partial records when not pairing
|
||||
PartialCacheExpiryAction string
|
||||
HeaderFields []*CfgCdrField
|
||||
ContentFields []*CfgCdrField
|
||||
@@ -110,6 +111,12 @@ func (self *CdrcConfig) loadFromJsonCfg(jsnCfg *CdrcJsonCfg) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Filters != nil {
|
||||
self.Filters = make([]string, len(*jsnCfg.Filters))
|
||||
for i, fltr := range *jsnCfg.Filters {
|
||||
self.Filters[i] = fltr
|
||||
}
|
||||
}
|
||||
if jsnCfg.Continue_on_success != nil {
|
||||
self.ContinueOnSuccess = *jsnCfg.Continue_on_success
|
||||
}
|
||||
@@ -166,6 +173,10 @@ func (self *CdrcConfig) Clone() *CdrcConfig {
|
||||
for i, path := range self.CDRPath {
|
||||
clnCdrc.CDRPath[i] = path
|
||||
}
|
||||
clnCdrc.Filters = make([]string, len(self.Filters))
|
||||
for i, fltr := range self.Filters {
|
||||
clnCdrc.Filters[i] = fltr
|
||||
}
|
||||
clnCdrc.CdrSourceId = self.CdrSourceId
|
||||
clnCdrc.PartialRecordCache = self.PartialRecordCache
|
||||
clnCdrc.PartialCacheExpiryAction = self.PartialCacheExpiryAction
|
||||
|
||||
@@ -256,6 +256,7 @@ const CGRATES_CFG_JSON = `
|
||||
"cdr_path": "", // path towards one CDR element in case of XML CDRs
|
||||
"cdr_source_id": "freeswitch_csv", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_filter": "", // filter CDR records to import
|
||||
"filters" :[], // new filters used in FilterS subsystem
|
||||
"continue_on_success": false, // continue to the next template if executed
|
||||
"partial_record_cache": "10s", // duration to cache partial records when not pairing
|
||||
"partial_cache_expiry_action": "*dump_to_file", // action taken when cache when records in cache are timed-out <*dump_to_file|*post_cdr>
|
||||
|
||||
@@ -476,6 +476,7 @@ func TestDfCdrcJsonCfg(t *testing.T) {
|
||||
Cdr_path: utils.StringPointer(""),
|
||||
Cdr_source_id: utils.StringPointer("freeswitch_csv"),
|
||||
Cdr_filter: utils.StringPointer(""),
|
||||
Filters: &[]string{},
|
||||
Continue_on_success: utils.BoolPointer(false),
|
||||
Partial_record_cache: utils.StringPointer("10s"),
|
||||
Partial_cache_expiry_action: utils.StringPointer(utils.MetaDumpToFile),
|
||||
|
||||
@@ -187,6 +187,7 @@ func TestCgrCfgCDRC(t *testing.T) {
|
||||
FailedCallsPrefix: "missed_calls",
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
Filters: []string{},
|
||||
ContinueOnSuccess: false,
|
||||
PartialRecordCache: time.Duration(10 * time.Second),
|
||||
PartialCacheExpiryAction: "*dump_to_file",
|
||||
|
||||
@@ -49,6 +49,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "freeswitch_csv",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
PartialCacheExpiryAction: utils.MetaDumpToFile,
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
@@ -146,6 +147,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv1",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
PartialCacheExpiryAction: utils.MetaDumpToFile,
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
@@ -243,6 +245,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv2",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
PartialCacheExpiryAction: utils.MetaDumpToFile,
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
@@ -310,6 +313,7 @@ func TestLoadCdrcConfigMultipleFiles(t *testing.T) {
|
||||
CDRPath: utils.HierarchyPath([]string{""}),
|
||||
CdrSourceId: "csv3",
|
||||
CdrFilter: utils.ParseRSRFieldsMustCompile("", utils.INFIELD_SEP),
|
||||
Filters: []string{},
|
||||
PartialRecordCache: time.Duration(10) * time.Second,
|
||||
PartialCacheExpiryAction: utils.MetaDumpToFile,
|
||||
HeaderFields: make([]*CfgCdrField, 0),
|
||||
|
||||
@@ -201,6 +201,7 @@ type CdrcJsonCfg struct {
|
||||
Cdr_path *string
|
||||
Cdr_source_id *string
|
||||
Cdr_filter *string
|
||||
Filters *[]string
|
||||
Continue_on_success *bool
|
||||
Max_open_files *int
|
||||
Partial_record_cache *string
|
||||
|
||||
81
data/conf/samples/cdrccsvwithfilter/cgrates.json
Executable file
81
data/conf/samples/cdrccsvwithfilter/cgrates.json
Executable file
@@ -0,0 +1,81 @@
|
||||
{
|
||||
|
||||
// Real-time Charging System for Telecom & ISP environments
|
||||
// Copyright (C) ITsysCOM GmbH
|
||||
//
|
||||
// This file contains the default configuration hardcoded into CGRateS.
|
||||
// This is what you get when you load CGRateS with an empty configuration file.
|
||||
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true // so we can query CDRs
|
||||
},
|
||||
|
||||
"cdrs": {
|
||||
"enabled": true,
|
||||
"rals_conns": [], // no rating support, just *raw CDR testing
|
||||
},
|
||||
|
||||
|
||||
|
||||
"cdrc": [
|
||||
{
|
||||
"id": "*CSVWithFilter1", // identifier of the CDRC runner
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"field_separator": ";",
|
||||
"cdr_in_dir": "/tmp/cdrctestswithfilters/csvit1/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cdrctestswithfilters/csvit1/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "csvit1", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"filters":["*string:3:1002"], //filter Account to be 1002
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true},
|
||||
{"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
|
||||
{"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true},
|
||||
{"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true},
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true},
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "*CSVWithFilter2", // identifier of the CDRC runner
|
||||
"enabled": true, // enable CDR client functionality
|
||||
"field_separator": ";",
|
||||
"cdr_in_dir": "/tmp/cdrctestswithfilters/csvit2/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cdrctestswithfilters/csvit2/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "csvit2", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"filters":["*string:3:1002","*string:1:*prepaid","*gte:6:70"], //filter Account to be 1002 and RequestType *prepaid
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "0", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "1", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "2", "mandatory": true},
|
||||
{"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
|
||||
{"tag": "Account", "field_id": "Account", "type": "*composed", "value": "3", "mandatory": true},
|
||||
{"tag": "Subject", "field_id": "Subject", "type": "*composed", "value": "3", "mandatory": true},
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "~4:s/0([1-9]\\d+)/+49${1}/", "mandatory": true},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "5", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "5", "mandatory": true},
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra3", "field_id": "HDRExtra3", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra2", "field_id": "HDRExtra2", "type": "*composed", "value": "6", "mandatory": true},
|
||||
{"tag": "HDRExtra1", "field_id": "HDRExtra1", "type": "*composed", "value": "6", "mandatory": true},
|
||||
],
|
||||
},
|
||||
],
|
||||
|
||||
|
||||
}
|
||||
@@ -249,7 +249,7 @@ func testDspSupTestAuthKey2(t *testing.T) {
|
||||
Sorting: utils.MetaLeastCost,
|
||||
SortedSuppliers: []*engine.SortedSupplier{
|
||||
&engine.SortedSupplier{
|
||||
SupplierID: "supplier2",
|
||||
SupplierID: "supplier1",
|
||||
SupplierParameters: "",
|
||||
SortingData: map[string]interface{}{
|
||||
utils.Cost: 0.1166,
|
||||
@@ -258,7 +258,7 @@ func testDspSupTestAuthKey2(t *testing.T) {
|
||||
},
|
||||
},
|
||||
&engine.SortedSupplier{
|
||||
SupplierID: "supplier1",
|
||||
SupplierID: "supplier2",
|
||||
SupplierParameters: "",
|
||||
SortingData: map[string]interface{}{
|
||||
utils.Cost: 0.2334,
|
||||
|
||||
Reference in New Issue
Block a user