mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Updated integration tests
This commit is contained in:
committed by
Dan Christian Bogos
parent
1d13dd2ec3
commit
a1e99e37d5
@@ -96,10 +96,12 @@ func TestRadComposedFieldValue(t *testing.T) {
|
||||
if err := pkt.AddAVPWithName("Cisco-NAS-Port", "CGR1", "Cisco"); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
|
||||
vars := utils.NavigableMap2{
|
||||
MetaRadReqType: utils.NewNMData(MetaRadAcctStart),
|
||||
"Cisco": utils.NewNMData("CGR1"),
|
||||
"User-Name": utils.NewNMData("flopsy"),
|
||||
}
|
||||
agReq := NewAgentRequest(nil, vars, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
|
||||
eOut := "*radAcctStart|flopsy|CGR1"
|
||||
if out := radComposedFieldValue(pkt, agReq,
|
||||
config.NewRSRParsersMustCompile("~*vars.*radReqType;|;~*vars.User-Name;|;~*vars.Cisco", true, utils.INFIELD_SEP)); out != eOut {
|
||||
@@ -116,10 +118,12 @@ func TestRadFieldOutVal(t *testing.T) {
|
||||
t.Error(err)
|
||||
}
|
||||
eOut := fmt.Sprintf("%s|flopsy|CGR1", MetaRadAcctStart)
|
||||
agReq := NewAgentRequest(nil, nil, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
|
||||
agReq.Vars.Set([]string{MetaRadReqType}, MetaRadAcctStart, false, false)
|
||||
agReq.Vars.Set([]string{"Cisco"}, "CGR1", false, false)
|
||||
agReq.Vars.Set([]string{"User-Name"}, "flopsy", false, false)
|
||||
vars := utils.NavigableMap2{
|
||||
MetaRadReqType: utils.NewNMData(MetaRadAcctStart),
|
||||
"Cisco": utils.NewNMData("CGR1"),
|
||||
"User-Name": utils.NewNMData("flopsy"),
|
||||
}
|
||||
agReq := NewAgentRequest(nil, vars, nil, nil, nil, "cgrates.org", "", nil, nil, nil)
|
||||
cfgFld := &config.FCTemplate{Tag: "ComposedTest", Type: utils.META_COMPOSED, Path: utils.Destination,
|
||||
Value: config.NewRSRParsersMustCompile("~*vars.*radReqType;|;~*vars.User-Name;|;~*vars.Cisco", true, utils.INFIELD_SEP), Mandatory: true}
|
||||
if outVal, err := radFieldOutVal(pkt, agReq, cfgFld); err != nil {
|
||||
|
||||
@@ -346,7 +346,7 @@ func testCacheSClear(t *testing.T) {
|
||||
func testCacheSPrecacheStatus(t *testing.T) {
|
||||
var reply map[string]string
|
||||
expected := make(map[string]string)
|
||||
for k := range utils.CachePartitions.Data() {
|
||||
for k := range utils.CachePartitions {
|
||||
expected[k] = utils.MetaReady
|
||||
}
|
||||
if err := chcRPC.Call(utils.CacheSv1PrecacheStatus, &utils.AttrCacheIDsWithArgDispatcher{}, &reply); err != nil {
|
||||
|
||||
@@ -441,6 +441,9 @@ func testCGRConfigReloadERs(t *testing.T) {
|
||||
{Tag: utils.AnswerTime, Path: utils.MetaCgreq + utils.NestingSep + utils.AnswerTime, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.12", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
{Tag: utils.Usage, Path: utils.MetaCgreq + utils.NestingSep + utils.Usage, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.13", true, utils.INFIELD_SEP), Mandatory: true},
|
||||
}
|
||||
for _, v := range content {
|
||||
v.ComputePath()
|
||||
}
|
||||
expAttr := &ERsCfg{
|
||||
Enabled: true,
|
||||
SessionSConns: []string{utils.MetaLocalHost},
|
||||
|
||||
@@ -206,7 +206,16 @@ func TestMfHttpAgentMultipleFields(t *testing.T) {
|
||||
ReplyFields: []*FCTemplate{},
|
||||
}}},
|
||||
}
|
||||
|
||||
for _, profile := range expected {
|
||||
for _, rp := range profile.RequestProcessors {
|
||||
for _, v := range rp.ReplyFields {
|
||||
v.ComputePath()
|
||||
}
|
||||
for _, v := range rp.RequestFields {
|
||||
v.ComputePath()
|
||||
}
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(mfCgrCfg.HttpAgentCfg(), expected) {
|
||||
t.Errorf("Expected: %+v\n, recived: %+v", utils.ToJSON(expected), utils.ToJSON(mfCgrCfg.HttpAgentCfg()))
|
||||
}
|
||||
|
||||
@@ -1182,3 +1182,333 @@ func TestNavMapGetKeys(t *testing.T) {
|
||||
t.Errorf("Expecting: %+v, received: %+v", utils.ToJSON(expKeys), utils.ToJSON(keys))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMAsXMLElements(t *testing.T) {
|
||||
nM := utils.NewOrderedNavigableMap()
|
||||
order := []utils.PathItems{
|
||||
{{Field: "FirstLevel2"}, {Field: "SecondLevel2"}, {Field: "Field2"}},
|
||||
{{Field: "FirstLevel"}, {Field: "SecondLevel"}, {Field: "ThirdLevel"}, {Field: "Fld1"}},
|
||||
{{Field: "FirstLevel2"}, {Field: "Field3"}},
|
||||
{{Field: "FirstLevel2"}, {Field: "Field5"}},
|
||||
{{Field: "Field4"}},
|
||||
{{Field: "FirstLevel2"}, {Field: "Field6"}},
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[0].String(), PathItems: order[0]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[0].String(), utils.NestingSep),
|
||||
Data: "attrVal1",
|
||||
Config: &FCTemplate{Tag: "AttributeTest", AttributeID: "attribute1"}},
|
||||
&NMItem{Path: strings.Split(order[0].String(), utils.NestingSep),
|
||||
Data: "Value2"}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[1].String(), PathItems: order[1]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[1].String(), utils.NestingSep),
|
||||
Data: "Val1"}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[2].String(), PathItems: order[2]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[2].String(), utils.NestingSep),
|
||||
Data: "Value3"}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[3].String(), PathItems: order[3]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[3].String(), utils.NestingSep),
|
||||
Data: "Value5"},
|
||||
&NMItem{Path: strings.Split(order[3].String(), utils.NestingSep),
|
||||
Data: "attrVal5",
|
||||
Config: &FCTemplate{Tag: "AttributeTest", AttributeID: "attribute5"}}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[4].String(), PathItems: order[4]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[4].String(), utils.NestingSep),
|
||||
Data: "Val4"},
|
||||
&NMItem{Path: strings.Split(order[4].String(), utils.NestingSep),
|
||||
Data: "attrVal2",
|
||||
Config: &FCTemplate{Tag: "AttributeTest", AttributeID: "attribute2"}}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: order[5].String(), PathItems: order[5]}, &utils.NMSlice{
|
||||
&NMItem{Path: strings.Split(order[5].String(), utils.NestingSep),
|
||||
Data: "Value6",
|
||||
Config: &FCTemplate{Tag: "NewBranchTest", NewBranch: true}},
|
||||
&NMItem{Path: strings.Split(order[5].String(), utils.NestingSep),
|
||||
Data: "attrVal6",
|
||||
Config: &FCTemplate{Tag: "AttributeTest", AttributeID: "attribute6"}}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
eXMLElmnts := []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[0][0].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[0][1].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[0][2].String()},
|
||||
Attributes: []*xml.Attr{
|
||||
&xml.Attr{
|
||||
Name: xml.Name{Local: "attribute1"},
|
||||
Value: "attrVal1",
|
||||
},
|
||||
},
|
||||
Value: "Value2",
|
||||
},
|
||||
},
|
||||
},
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: "Field3"},
|
||||
Value: "Value3",
|
||||
},
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[3][1].String()},
|
||||
Attributes: []*xml.Attr{
|
||||
&xml.Attr{
|
||||
Name: xml.Name{Local: "attribute5"},
|
||||
Value: "attrVal5",
|
||||
},
|
||||
},
|
||||
Value: "Value5",
|
||||
},
|
||||
},
|
||||
},
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[1][0].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[1][1].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[1][2].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: "Fld1"},
|
||||
Value: "Val1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[4][0].String()},
|
||||
Attributes: []*xml.Attr{
|
||||
&xml.Attr{
|
||||
Name: xml.Name{Local: "attribute2"},
|
||||
Value: "attrVal2",
|
||||
},
|
||||
},
|
||||
Value: "Val4",
|
||||
},
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[5][0].String()},
|
||||
Elements: []*XMLElement{
|
||||
&XMLElement{
|
||||
XMLName: xml.Name{Local: order[5][1].String()},
|
||||
Attributes: []*xml.Attr{
|
||||
&xml.Attr{
|
||||
Name: xml.Name{Local: "attribute6"},
|
||||
Value: "attrVal6",
|
||||
},
|
||||
},
|
||||
Value: "Value6",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
xmlEnts, err := NMAsXMLElements(nM)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eXMLElmnts, xmlEnts) {
|
||||
t.Errorf("expecting: %s, received: %s", utils.ToJSON(eXMLElmnts), utils.ToJSON(xmlEnts))
|
||||
}
|
||||
eXML := []byte(`<FirstLevel2>
|
||||
<SecondLevel2>
|
||||
<Field2 attribute1="attrVal1">Value2</Field2>
|
||||
</SecondLevel2>
|
||||
<Field3>Value3</Field3>
|
||||
<Field5 attribute5="attrVal5">Value5</Field5>
|
||||
</FirstLevel2>
|
||||
<FirstLevel>
|
||||
<SecondLevel>
|
||||
<ThirdLevel>
|
||||
<Fld1>Val1</Fld1>
|
||||
</ThirdLevel>
|
||||
</SecondLevel>
|
||||
</FirstLevel>
|
||||
<Field4 attribute2="attrVal2">Val4</Field4>
|
||||
<FirstLevel2>
|
||||
<Field6 attribute6="attrVal6">Value6</Field6>
|
||||
</FirstLevel2>`)
|
||||
if output, err := xml.MarshalIndent(xmlEnts, "", " "); err != nil {
|
||||
t.Error(err)
|
||||
} else if !reflect.DeepEqual(eXML, output) {
|
||||
t.Errorf("expecting: \n%s, received: \n%s\n", string(eXML), string(output))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMAsCGREvent(t *testing.T) {
|
||||
if cgrEv := NMAsCGREvent(nil, "cgrates.org",
|
||||
utils.NestingSep); cgrEv != nil {
|
||||
t.Errorf("expecting: %+v, \nreceived: %+v", utils.ToJSON(nil), utils.ToJSON(cgrEv.Event))
|
||||
}
|
||||
|
||||
nM := utils.NewOrderedNavigableMap()
|
||||
if cgrEv := NMAsCGREvent(nM, "cgrates.org",
|
||||
utils.NestingSep); cgrEv != nil {
|
||||
t.Errorf("expecting: %+v, \nreceived: %+v", utils.ToJSON(nil), utils.ToJSON(cgrEv.Event))
|
||||
}
|
||||
|
||||
path := utils.PathItems{{Field: "FirstLevel"}, {Field: "SecondLevel"}, {Field: "ThirdLevel"}, {Field: "Fld1"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Val1",
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
path = utils.PathItems{{Field: "FirstLevel2"}, {Field: "SecondLevel2"}, {Field: "Field2"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "attrVal1",
|
||||
Config: &FCTemplate{Tag: "AttributeTest",
|
||||
AttributeID: "attribute1"},
|
||||
}, &NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Value2",
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
path = utils.PathItems{{Field: "FirstLevel2"}, {Field: "Field3"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Value3",
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
path = utils.PathItems{{Field: "FirstLevel2"}, {Field: "Field5"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Value5",
|
||||
}, &NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "attrVal5",
|
||||
Config: &FCTemplate{Tag: "AttributeTest",
|
||||
AttributeID: "attribute5"},
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
path = utils.PathItems{{Field: "FirstLevel2"}, {Field: "Field6"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Value6",
|
||||
Config: &FCTemplate{Tag: "NewBranchTest",
|
||||
NewBranch: true},
|
||||
}, &NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "attrVal6",
|
||||
Config: &FCTemplate{Tag: "AttributeTest",
|
||||
AttributeID: "attribute6"},
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
path = utils.PathItems{{Field: "Field4"}}
|
||||
if _, err := nM.Set(&utils.FullPath{Path: path.String(), PathItems: path}, &utils.NMSlice{&NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "Val4",
|
||||
}, &NMItem{
|
||||
Path: strings.Split(path.String(), utils.NestingSep),
|
||||
Data: "attrVal2",
|
||||
Config: &FCTemplate{Tag: "AttributeTest",
|
||||
AttributeID: "attribute2"},
|
||||
}}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
eEv := map[string]interface{}{
|
||||
"FirstLevel2.SecondLevel2.Field2": "Value2",
|
||||
"FirstLevel.SecondLevel.ThirdLevel.Fld1": "Val1",
|
||||
"FirstLevel2.Field3": "Value3",
|
||||
"FirstLevel2.Field5": "Value5",
|
||||
"FirstLevel2.Field6": "Value6",
|
||||
"Field4": "Val4",
|
||||
}
|
||||
if cgrEv := NMAsCGREvent(nM, "cgrates.org",
|
||||
utils.NestingSep); cgrEv.Tenant != "cgrates.org" ||
|
||||
cgrEv.Time == nil ||
|
||||
!reflect.DeepEqual(eEv, cgrEv.Event) {
|
||||
t.Errorf("expecting: %+v, \nreceived: %+v", utils.ToJSON(eEv), utils.ToJSON(cgrEv.Event))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemLen(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if rply := nm.Len(); rply != 0 {
|
||||
t.Errorf("Expected 0 ,received: %v", rply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemString(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
expected := "{\"Path\":null,\"Data\":\"1001\",\"Config\":null}"
|
||||
if rply := nm.String(); rply != expected {
|
||||
t.Errorf("Expected %q ,received: %q", expected, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemInterface(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
expected := "1001"
|
||||
if rply := nm.Interface(); rply != expected {
|
||||
t.Errorf("Expected %q ,received: %q", expected, rply)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemField(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if _, err := nm.Field(nil); err != utils.ErrNotImplemented {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemRemove(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if err := nm.Remove(nil); err != utils.ErrNotImplemented {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemEmpty(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if nm.Empty() {
|
||||
t.Error("Expected not empty type")
|
||||
}
|
||||
nm = &NMItem{Data: nil}
|
||||
if !nm.Empty() {
|
||||
t.Error("Expected empty type")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemType(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if nm.Type() != utils.NMDataType {
|
||||
t.Errorf("Expected %v ,received: %v", utils.NMDataType, nm.Type())
|
||||
}
|
||||
}
|
||||
|
||||
func TestNMItemSet(t *testing.T) {
|
||||
var nm utils.NMInterface = &NMItem{Data: "1001"}
|
||||
if _, err := nm.Set(utils.PathItems{{}}, nil); err != utils.ErrWrongPath {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := nm.Set(nil, &NMItem{Data: "1002"}); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expected := "1002"
|
||||
if rply := nm.Interface(); rply != expected {
|
||||
t.Errorf("Expected %q ,received: %q", expected, rply)
|
||||
}
|
||||
}
|
||||
|
||||
180
ers/filejson.go
180
ers/filejson.go
@@ -1,180 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package ers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/agents"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
)
|
||||
|
||||
func NewJSONFileER(cfg *config.CGRConfig, cfgIdx int,
|
||||
rdrEvents chan *erEvent, rdrErr chan error,
|
||||
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
|
||||
srcPath := cfg.ERsCfg().Readers[cfgIdx].SourcePath
|
||||
if strings.HasSuffix(srcPath, utils.Slash) {
|
||||
srcPath = srcPath[:len(srcPath)-1]
|
||||
}
|
||||
jsonEr := &JSONFileER{
|
||||
cgrCfg: cfg,
|
||||
cfgIdx: cfgIdx,
|
||||
fltrS: fltrS,
|
||||
rdrDir: srcPath,
|
||||
rdrEvents: rdrEvents,
|
||||
rdrError: rdrErr,
|
||||
rdrExit: rdrExit,
|
||||
conReqs: make(chan struct{}, cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs)}
|
||||
var processFile struct{}
|
||||
for i := 0; i < cfg.ERsCfg().Readers[cfgIdx].ConcurrentReqs; i++ {
|
||||
jsonEr.conReqs <- processFile // Empty initiate so we do not need to wait later when we pop
|
||||
}
|
||||
return jsonEr, nil
|
||||
}
|
||||
|
||||
// JSONFileER implements EventReader interface for .json files
|
||||
type JSONFileER struct {
|
||||
sync.RWMutex
|
||||
cgrCfg *config.CGRConfig
|
||||
cfgIdx int // index of config instance within ERsCfg.Readers
|
||||
fltrS *engine.FilterS
|
||||
rdrDir string
|
||||
rdrEvents chan *erEvent // channel to dispatch the events created to
|
||||
rdrError chan error
|
||||
rdrExit chan struct{}
|
||||
conReqs chan struct{} // limit number of opened files
|
||||
}
|
||||
|
||||
func (rdr *JSONFileER) Config() *config.EventReaderCfg {
|
||||
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
|
||||
}
|
||||
|
||||
func (rdr *JSONFileER) Serve() (err error) {
|
||||
switch rdr.Config().RunDelay {
|
||||
case time.Duration(0): // 0 disables the automatic read, maybe done per API
|
||||
return
|
||||
case time.Duration(-1):
|
||||
return watchDir(rdr.rdrDir, rdr.processFile,
|
||||
utils.ERs, rdr.rdrExit)
|
||||
default:
|
||||
go func() {
|
||||
for {
|
||||
// Not automated, process and sleep approach
|
||||
select {
|
||||
case <-rdr.rdrExit:
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> stop monitoring path <%s>",
|
||||
utils.ERs, rdr.rdrDir))
|
||||
return
|
||||
default:
|
||||
}
|
||||
filesInDir, _ := ioutil.ReadDir(rdr.rdrDir)
|
||||
for _, file := range filesInDir {
|
||||
if !strings.HasSuffix(file.Name(), utils.JSNSuffix) { // hardcoded file extension for json event reader
|
||||
continue // used in order to filter the files from directory
|
||||
}
|
||||
go func(fileName string) {
|
||||
if err := rdr.processFile(rdr.rdrDir, fileName); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> processing file %s, error: %s",
|
||||
utils.ERs, fileName, err.Error()))
|
||||
}
|
||||
}(file.Name())
|
||||
}
|
||||
time.Sleep(rdr.Config().RunDelay)
|
||||
}
|
||||
}()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// processFile is called for each file in a directory and dispatches erEvents from it
|
||||
func (rdr *JSONFileER) processFile(fPath, fName string) (err error) {
|
||||
if cap(rdr.conReqs) != 0 { // 0 goes for no limit
|
||||
processFile := <-rdr.conReqs // Queue here for maxOpenFiles
|
||||
defer func() { rdr.conReqs <- processFile }()
|
||||
}
|
||||
absPath := path.Join(fPath, fName)
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("<%s> parsing <%s>", utils.ERs, absPath))
|
||||
var file *os.File
|
||||
if file, err = os.Open(absPath); err != nil {
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
timeStart := time.Now()
|
||||
var byteValue []byte
|
||||
if byteValue, err = ioutil.ReadAll(file); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var data map[string]interface{}
|
||||
if err = json.Unmarshal(byteValue, &data); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
evsPosted := 0
|
||||
reqVars := utils.NavigableMap2{utils.FileName: utils.NewNMData(fName)}
|
||||
|
||||
agReq := agents.NewAgentRequest(
|
||||
config.NewNavigableMap(data), reqVars,
|
||||
nil, nil, rdr.Config().Tenant,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(rdr.Config().Timezone,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
|
||||
rdr.fltrS, nil, nil) // create an AgentRequest
|
||||
if pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
|
||||
agReq); err != nil || !pass {
|
||||
return err
|
||||
}
|
||||
if err = agReq.SetFields(rdr.Config().Fields); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> reading file: <%s> ignoring due to error: <%s>",
|
||||
utils.ERs, absPath, err.Error()))
|
||||
return
|
||||
}
|
||||
rdr.rdrEvents <- &erEvent{
|
||||
cgrEvent: config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep),
|
||||
rdrCfg: rdr.Config(),
|
||||
}
|
||||
evsPosted++
|
||||
|
||||
if rdr.Config().ProcessedPath != "" {
|
||||
// Finished with file, move it to processed folder
|
||||
outPath := path.Join(rdr.Config().ProcessedPath, fName)
|
||||
if err = os.Rename(absPath, outPath); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
utils.Logger.Info(
|
||||
fmt.Sprintf("%s finished processing file <%s>. Events posted: %d, run duration: %s",
|
||||
utils.ERs, absPath, evsPosted, time.Now().Sub(timeStart)))
|
||||
return
|
||||
}
|
||||
@@ -503,8 +503,8 @@ func testCDRsOnExpFileFailover(t *testing.T) {
|
||||
t.Errorf("For file <%s> and event <%s> received %s", filePath, utils.ToJSON(ev), err)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(expectedFormats.Data(), rcvFormats.Data()) {
|
||||
t.Errorf("Missing format expecting: %s received: %s", utils.ToJSON(expectedFormats.Data()), utils.ToJSON(rcvFormats.Data()))
|
||||
if !reflect.DeepEqual(expectedFormats, rcvFormats) {
|
||||
t.Errorf("Missing format expecting: %s received: %s", utils.ToJSON(expectedFormats), utils.ToJSON(rcvFormats))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user