mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-22 15:48:44 +05:00
Add for cdrc xml new filters
This commit is contained in:
committed by
Dan Christian Bogos
parent
2f52829b7e
commit
7f9b23d422
@@ -123,9 +123,9 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
csvprovider, _ := newCsvProvider(record)
|
||||
csvProvider, _ := newCsvProvider(record)
|
||||
if pass, err := self.filterS.Pass("cgrates.org",
|
||||
cdrcCfg.Filters, csvprovider); err != nil || !pass {
|
||||
cdrcCfg.Filters, csvProvider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
@@ -249,7 +249,7 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
return storedCdr, nil
|
||||
}
|
||||
|
||||
// newRADataProvider constructs a DataProvider
|
||||
// newCsvProvider constructs a DataProvider
|
||||
func newCsvProvider(record []string) (dP engine.DataProvider, err error) {
|
||||
dP = &csvProvider{req: record, cache: engine.NewNavigableMap(nil)}
|
||||
return
|
||||
|
||||
@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package cdrc
|
||||
|
||||
//
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/rpc"
|
||||
|
||||
87
cdrc/xml.go
87
cdrc/xml.go
@@ -132,21 +132,29 @@ func (xmlProc *XMLRecordsProcessor) ProcessNextRecord() (cdrs []*engine.CDR, err
|
||||
cdrXML := xmlProc.cdrXmlElmts[xmlProc.procItems]
|
||||
xmlProc.procItems += 1
|
||||
for _, cdrcCfg := range xmlProc.cdrcCfgs {
|
||||
filtersPassing := true
|
||||
for _, rsrFltr := range cdrcCfg.CdrFilter {
|
||||
if rsrFltr == nil {
|
||||
continue // Pass
|
||||
if len(cdrcCfg.Filters) == 0 {
|
||||
filtersPassing := true
|
||||
for _, rsrFltr := range cdrcCfg.CdrFilter {
|
||||
if rsrFltr == nil {
|
||||
continue // Pass
|
||||
}
|
||||
absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "")
|
||||
relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt
|
||||
fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true))
|
||||
if _, err := rsrFltr.Parse(fieldVal); err != nil {
|
||||
filtersPassing = false
|
||||
break
|
||||
}
|
||||
}
|
||||
absolutePath := utils.ParseHierarchyPath(rsrFltr.Id, "")
|
||||
relPath := utils.HierarchyPath(absolutePath[len(xmlProc.cdrPath)-1:]) // Need relative path to the xmlElmnt
|
||||
fieldVal, _ := elementText(cdrXML, relPath.AsString("/", true))
|
||||
if _, err := rsrFltr.Parse(fieldVal); err != nil {
|
||||
filtersPassing = false
|
||||
break
|
||||
if !filtersPassing {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
xmlProvider, _ := newXmlProvider(cdrXML, xmlProc.cdrPath)
|
||||
if pass, err := xmlProc.filterS.Pass("cgrates.org",
|
||||
cdrcCfg.Filters, xmlProvider); err != nil || !pass {
|
||||
continue // Not passes filters, ignore this CDR
|
||||
}
|
||||
}
|
||||
if !filtersPassing {
|
||||
continue
|
||||
}
|
||||
if cdr, err := xmlProc.recordToCDR(cdrXML, cdrcCfg); err != nil {
|
||||
return nil, fmt.Errorf("<CDRC> Failed converting to CDR, error: %s", err.Error())
|
||||
@@ -236,3 +244,56 @@ func (xmlProc *XMLRecordsProcessor) recordToCDR(xmlEntity tree.Res, cdrcCfg *con
|
||||
}
|
||||
return cdr, nil
|
||||
}
|
||||
|
||||
// newXmlProvider constructs a DataProvider
|
||||
func newXmlProvider(req tree.Res, cdrPath utils.HierarchyPath) (dP engine.DataProvider, err error) {
|
||||
dP = &xmlProvider{req: req, cdrPath: cdrPath, cache: engine.NewNavigableMap(nil)}
|
||||
return
|
||||
}
|
||||
|
||||
// xmlProvider implements engine.DataProvider so we can pass it to filters
|
||||
type xmlProvider struct {
|
||||
req tree.Res
|
||||
cdrPath utils.HierarchyPath //used to compute relative path
|
||||
cache *engine.NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (xP *xmlProvider) String() string {
|
||||
return utils.ToJSON(xP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (xP *xmlProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = xP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
absolutePath := utils.ParseHierarchyPath(fldPath[0], "")
|
||||
relPath := utils.HierarchyPath(absolutePath[len(xP.cdrPath)-1:]) // Need relative path to the xmlElmnt
|
||||
data, err = elementText(xP.req, relPath.AsString("/", true))
|
||||
xP.cache.Set(fldPath, data, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (xP *xmlProvider) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = xP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
data, _ = utils.CastFieldIfToString(valIface)
|
||||
return
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (xP *xmlProvider) AsNavigableMap([]*config.CfgCdrField) (
|
||||
nm *engine.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
@@ -130,3 +130,89 @@ func TestXmlITKillEngine(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Begin tests for cdrc xml with new filters
|
||||
func TestXmlIT2InitConfig(t *testing.T) {
|
||||
var err error
|
||||
xmlCfgPath = path.Join(*dataDir, "conf", "samples", "cdrcxmlwithfilter")
|
||||
if xmlCfg, err = config.NewCGRConfigFromFolder(xmlCfgPath); err != nil {
|
||||
t.Fatal("Got config error: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// InitDb so we can rely on count
|
||||
func TestXmlIT2InitCdrDb(t *testing.T) {
|
||||
if err := engine.InitStorDb(xmlCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestXmlIT2CreateCdrDirs(t *testing.T) {
|
||||
for _, cdrcProfiles := range xmlCfg.CdrcProfiles {
|
||||
for i, cdrcInst := range cdrcProfiles {
|
||||
for _, dir := range []string{cdrcInst.CdrInDir, cdrcInst.CdrOutDir} {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
t.Fatal("Error removing folder: ", dir, err)
|
||||
}
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
t.Fatal("Error creating folder: ", dir, err)
|
||||
}
|
||||
}
|
||||
if i == 0 { // Initialize the folders to check later
|
||||
xmlPathIn1 = cdrcInst.CdrInDir
|
||||
xmlPathOut1 = cdrcInst.CdrOutDir
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestXmlIT2StartEngine(t *testing.T) {
|
||||
if _, err := engine.StopStartEngine(xmlCfgPath, *waitRater); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect rpc client to rater
|
||||
func TestXmlIT2RpcConn(t *testing.T) {
|
||||
var err error
|
||||
cdrcXmlRPC, err = jsonrpc.Dial("tcp", xmlCfg.RPCJSONListen) // We connect over JSON so we can also troubleshoot if needed
|
||||
if err != nil {
|
||||
t.Fatal("Could not connect to rater: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// The default scenario, out of cdrc defined in .cfg file
|
||||
func TestXmlIT2HandleCdr1File(t *testing.T) {
|
||||
fileName := "file1.xml"
|
||||
tmpFilePath := path.Join("/tmp", fileName)
|
||||
if err := ioutil.WriteFile(tmpFilePath, []byte(cdrXmlBroadsoft), 0644); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := os.Rename(tmpFilePath, path.Join(xmlPathIn1, fileName)); err != nil {
|
||||
t.Fatal("Error moving file to processing directory: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestXmlIT2ProcessedFiles(t *testing.T) {
|
||||
time.Sleep(time.Duration(2**waitRater) * time.Millisecond)
|
||||
if outContent1, err := ioutil.ReadFile(path.Join(xmlPathOut1, "file1.xml")); err != nil {
|
||||
t.Error(err)
|
||||
} else if cdrXmlBroadsoft != string(outContent1) {
|
||||
t.Errorf("Expecting: %q, received: %q", cdrXmlBroadsoft, string(outContent1))
|
||||
}
|
||||
}
|
||||
|
||||
func TestXmlIT2AnalyseCDRs(t *testing.T) {
|
||||
var reply []*engine.ExternalCDR
|
||||
if err := cdrcXmlRPC.Call("ApierV2.GetCdrs", utils.RPCCDRsFilter{}, &reply); err != nil {
|
||||
t.Error("Unexpected error: ", err.Error())
|
||||
} else if len(reply) != 1 {
|
||||
t.Error("Unexpected number of CDRs returned: ", len(reply))
|
||||
}
|
||||
}
|
||||
|
||||
func TestXmlIT2KillEngine(t *testing.T) {
|
||||
if err := engine.KillEngine(*waitRater); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,3 +271,77 @@ func TestXMLRPProcess(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestXMLRPProcessWithNewFilters(t *testing.T) {
|
||||
cdrcCfgs := []*config.CdrcConfig{
|
||||
&config.CdrcConfig{
|
||||
ID: "XMLWithFilters",
|
||||
Enabled: true,
|
||||
CdrFormat: "xml",
|
||||
DataUsageMultiplyFactor: 1024,
|
||||
CDRPath: utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}),
|
||||
CdrSourceId: "XMLWithFilters",
|
||||
Filters: []string{"*string:broadWorksCDR>cdrData>headerModule>type:Normal"},
|
||||
ContentFields: []*config.CfgCdrField{
|
||||
&config.CfgCdrField{Tag: "TOR", Type: utils.META_COMPOSED, FieldId: utils.ToR,
|
||||
Value: utils.ParseRSRFieldsMustCompile("^*voice", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "OriginID", Type: utils.META_COMPOSED, FieldId: utils.OriginID,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>localCallId", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "RequestType", Type: utils.META_COMPOSED, FieldId: utils.RequestType,
|
||||
Value: utils.ParseRSRFieldsMustCompile("^*rated", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Tenant", Type: utils.META_COMPOSED, FieldId: utils.Tenant,
|
||||
Value: utils.ParseRSRFieldsMustCompile("~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Category", Type: utils.META_COMPOSED, FieldId: utils.Category,
|
||||
Value: utils.ParseRSRFieldsMustCompile("^call", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Account", Type: utils.META_COMPOSED, FieldId: utils.Account,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>userNumber", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Destination", Type: utils.META_COMPOSED, FieldId: utils.Destination,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>calledNumber", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "SetupTime", Type: utils.META_COMPOSED, FieldId: utils.SetupTime,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>startTime", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "AnswerTime", Type: utils.META_COMPOSED, FieldId: utils.AnswerTime,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>answerTime", utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "Usage", Type: utils.META_HANDLER,
|
||||
FieldId: utils.Usage, HandlerId: utils.HandlerSubstractUsage,
|
||||
Value: utils.ParseRSRFieldsMustCompile("broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime",
|
||||
utils.INFIELD_SEP), Mandatory: true},
|
||||
&config.CfgCdrField{Tag: "UsageSeconds", Type: utils.META_COMPOSED, FieldId: utils.Usage,
|
||||
Value: utils.ParseRSRFieldsMustCompile("^s", utils.INFIELD_SEP), Mandatory: true},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := engine.NewMapStorage()
|
||||
defaultCfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Errorf("Error: %+v", err)
|
||||
}
|
||||
xmlRP, err := NewXMLRecordsProcessor(bytes.NewBufferString(cdrXmlBroadsoft),
|
||||
utils.HierarchyPath([]string{"broadWorksCDR", "cdrData"}), "UTC", true,
|
||||
cdrcCfgs, engine.NewFilterS(defaultCfg, nil, engine.NewDataManager(data)))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
var cdrs []*engine.CDR
|
||||
for i := 0; i < 4; i++ {
|
||||
cdrs, err = xmlRP.ProcessNextRecord()
|
||||
if i == 1 { // Take second CDR since the first one cannot be processed
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expectedCDRs := []*engine.CDR{
|
||||
&engine.CDR{CGRID: "1f045359a0784d15e051d7e41ae30132b139d714",
|
||||
OriginHost: "0.0.0.0", Source: "XMLWithFilters", OriginID: "25160047719:0",
|
||||
ToR: "*voice", RequestType: "*rated", Tenant: "cgrates.org",
|
||||
Category: "call", Account: "1001", Destination: "+4986517174963",
|
||||
SetupTime: time.Date(2016, 4, 19, 21, 0, 5, 247000000, time.UTC),
|
||||
AnswerTime: time.Date(2016, 4, 19, 21, 0, 6, 813000000, time.UTC),
|
||||
Usage: time.Duration(13483000000),
|
||||
ExtraFields: map[string]string{}, Cost: -1},
|
||||
}
|
||||
if !reflect.DeepEqual(expectedCDRs, cdrs) {
|
||||
t.Errorf("Expecting: %+v\n, received: %+v\n", expectedCDRs, cdrs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@
|
||||
"cdr_format": "fwv", // CDR file format <csv|freeswitch_csv|fwv|opensips_flatstore>
|
||||
"cdr_in_dir": "/tmp/cgr_fwv/cdrc/in", // absolute path towards the directory where the CDRs are stored
|
||||
"cdr_out_dir": "/tmp/cgr_fwv/cdrc/out", // absolute path towards the directory where processed CDRs will be moved
|
||||
"cdr_source_id": "fwv_localtest", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_source_id": "cdrc", // free form field, tag identifying the source of the CDRs within CDRS database
|
||||
"cdr_filter": "", // filter CDR records to import
|
||||
"header_fields": [
|
||||
{"tag": "FileName", "cdr_field_id": "CdrFileName", "type": "cdrfield", "value": "95", "width": 40, "padding":"right"},
|
||||
|
||||
47
data/conf/samples/cdrcxmlwithfilter/cgrates.json
Executable file
47
data/conf/samples/cdrcxmlwithfilter/cgrates.json
Executable file
@@ -0,0 +1,47 @@
|
||||
{
|
||||
|
||||
// Real-time Charging System for Telecom & ISP environments
|
||||
// Copyright (C) ITsysCOM GmbH
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_password": "CGRateS.org", // password to use when connecting to stordb
|
||||
},
|
||||
|
||||
"rals": {
|
||||
"enabled": true
|
||||
},
|
||||
|
||||
"cdrs": {
|
||||
"enabled": true,
|
||||
"rals_conns": [],
|
||||
},
|
||||
|
||||
|
||||
"cdrc": [
|
||||
{
|
||||
"id": "XMLWithFilter",
|
||||
"enabled": true,
|
||||
"cdr_format": "xml",
|
||||
"cdr_in_dir": "/tmp/cdrcxmlwithfilters/xmlit1/in",
|
||||
"cdr_out_dir": "/tmp/cdrcxmlwithfilters/xmlit1/out",
|
||||
"cdr_path": "broadWorksCDR>cdrData",
|
||||
"cdr_source_id": "xmlit1",
|
||||
"filters": ["*string:broadWorksCDR>cdrData>basicModule>userNumber:1002","*string:broadWorksCDR>cdrData>headerModule>type:Normal"],
|
||||
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
|
||||
{"tag": "TOR", "field_id": "ToR", "type": "*composed", "value": "^*voice", "mandatory": true},
|
||||
{"tag": "OriginID", "field_id": "OriginID", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>localCallId", "mandatory": true},
|
||||
{"tag": "RequestType", "field_id": "RequestType", "type": "*composed", "value": "^*rated", "mandatory": true},
|
||||
{"tag": "Direction", "field_id": "Direction", "type": "*composed", "value": "^*out", "mandatory": true},
|
||||
{"tag": "Tenant", "field_id": "Tenant", "type": "*composed", "value": "~broadWorksCDR>cdrData>basicModule>userId:s/.*@(.*)/${1}/", "mandatory": true},
|
||||
{"tag": "Category", "field_id": "Category", "type": "*composed", "value": "^call", "mandatory": true},
|
||||
{"tag": "Account", "field_id": "Account", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>userNumber", "mandatory": true},
|
||||
{"tag": "Destination", "field_id": "Destination", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>calledNumber", "mandatory": true},
|
||||
{"tag": "SetupTime", "field_id": "SetupTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>startTime", "mandatory": true},
|
||||
{"tag": "AnswerTime", "field_id": "AnswerTime", "type": "*composed", "value": "broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true},
|
||||
{"tag": "Usage", "field_id": "Usage", "type": "*handler", "handler_id": "*substract_usage", "value": "broadWorksCDR>cdrData>basicModule>releaseTime;^|;broadWorksCDR>cdrData>basicModule>answerTime", "mandatory": true},
|
||||
],
|
||||
},
|
||||
],
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user