mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 10:06:24 +05:00
Move csvProvider in config (SliceDP) and add ers in integration test script
This commit is contained in:
committed by
Dan Christian Bogos
parent
2e292b94b0
commit
8a9ac38536
63
cdrc/csv.go
63
cdrc/csv.go
@@ -22,7 +22,6 @@ import (
|
||||
"encoding/csv"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -108,7 +107,7 @@ func (self *CsvRecordsProcessor) processFlatstoreRecord(record []string) ([]stri
|
||||
|
||||
// Takes the record from a slice and turns it into StoredCdrs, posting them to the cdrServer
|
||||
func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR, error) {
|
||||
csvProvider := newCsvProvider(record)
|
||||
csvProvider := config.NewSliceDP(record)
|
||||
recordCdrs := make([]*engine.CDR, 0) // More CDRs based on the number of filters and field templates
|
||||
for _, cdrcCfg := range self.cdrcCfgs { // cdrFields coming from more templates will produce individual storCdr records
|
||||
tenant, err := cdrcCfg.Tenant.ParseDataProvider(csvProvider, utils.NestingSep) // each profile of cdrc can have different tenant
|
||||
@@ -147,7 +146,7 @@ func (self *CsvRecordsProcessor) processRecord(record []string) ([]*engine.CDR,
|
||||
func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *config.CdrcCfg, tenant string) (*engine.CDR, error) {
|
||||
storedCdr := &engine.CDR{OriginHost: "0.0.0.0", Source: cdrcCfg.CdrSourceId, ExtraFields: make(map[string]string), Cost: -1}
|
||||
var err error
|
||||
csvProvider := newCsvProvider(record) // used for filterS and for RSRParsers
|
||||
csvProvider := config.NewSliceDP(record) // used for filterS and for RSRParsers
|
||||
var lazyHttpFields []*config.FCTemplate
|
||||
fldVals := make(map[string]string)
|
||||
for _, cdrFldCfg := range cdrcCfg.ContentFields {
|
||||
@@ -224,61 +223,3 @@ func (self *CsvRecordsProcessor) recordToStoredCdr(record []string, cdrcCfg *con
|
||||
}
|
||||
return storedCdr, nil
|
||||
}
|
||||
|
||||
// newCsvProvider constructs a DataProvider
|
||||
func newCsvProvider(record []string) (dP config.DataProvider) {
|
||||
dP = &csvProvider{req: record, cache: config.NewNavigableMap(nil)}
|
||||
return
|
||||
}
|
||||
|
||||
// csvProvider implements engine.DataProvider so we can pass it to filters
|
||||
type csvProvider struct {
|
||||
req []string
|
||||
cache *config.NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (cP *csvProvider) String() string {
|
||||
return utils.ToJSON(cP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = cP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
if cfgFieldIdx, err := strconv.Atoi(fldPath[0]); err != nil || len(cP.req) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err)
|
||||
} else {
|
||||
data = cP.req[cfgFieldIdx]
|
||||
}
|
||||
cP.cache.Set(fldPath, data, false, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = cP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(valIface), nil
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) AsNavigableMap([]*config.FCTemplate) (
|
||||
nm *config.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// RemoteHost is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) RemoteHost() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
|
||||
@@ -19,13 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package config
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// func TestNewCgrJsonCfgFromHttp(t *testing.T) {
|
||||
// addr := "https://raw.githubusercontent.com/cgrates/cgrates/master/data/conf/samples/tutmongo/cgrates.json"
|
||||
// expVal, err := NewCgrJsonCfgFromFile(path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo", "cgrates.json"))
|
||||
@@ -69,6 +62,7 @@ import (
|
||||
|
||||
// }
|
||||
|
||||
/* Needs to be rewritten with a static config
|
||||
func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
expected := map[string]interface{}{
|
||||
"Enabled": true,
|
||||
@@ -122,3 +116,4 @@ func TestCgrCfgV1ReloadConfigSection(t *testing.T) {
|
||||
t.Errorf("Expected: %+v, received: %+v", utils.ToJSON(expected), utils.ToJSON(rcv))
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
@@ -21,6 +21,7 @@ package config
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -156,3 +157,61 @@ func (objDP *ObjectDP) AsNavigableMap([]*FCTemplate) (
|
||||
func (objDP *ObjectDP) RemoteHost() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
|
||||
// NewSliceDP constructs a DataProvider
|
||||
func NewSliceDP(record []string) (dP DataProvider) {
|
||||
dP = &SliceDP{req: record, cache: NewNavigableMap(nil)}
|
||||
return
|
||||
}
|
||||
|
||||
// SliceDP implements engine.DataProvider so we can pass it to filters
|
||||
type SliceDP struct {
|
||||
req []string
|
||||
cache *NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (cP *SliceDP) String() string {
|
||||
return utils.ToJSON(cP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (cP *SliceDP) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = cP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
if cfgFieldIdx, err := strconv.Atoi(fldPath[0]); err != nil || len(cP.req) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err)
|
||||
} else {
|
||||
data = cP.req[cfgFieldIdx]
|
||||
}
|
||||
cP.cache.Set(fldPath, data, false, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (cP *SliceDP) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = cP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(valIface), nil
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (cP *SliceDP) AsNavigableMap([]*FCTemplate) (
|
||||
nm *NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// RemoteHost is part of engine.DataProvider interface
|
||||
func (cP *SliceDP) RemoteHost() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
|
||||
@@ -24,10 +24,8 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -142,7 +140,7 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
|
||||
}
|
||||
rowNr++ // increment the rowNr after checking if it's not the end of file
|
||||
agReq := agents.NewAgentRequest(
|
||||
newCsvProvider(record), reqVars,
|
||||
config.NewSliceDP(record), reqVars,
|
||||
nil, nil, rdr.Config().Tenant,
|
||||
rdr.cgrCfg.GeneralCfg().DefaultTenant,
|
||||
utils.FirstNonEmpty(rdr.Config().Timezone,
|
||||
@@ -177,61 +175,3 @@ func (rdr *CSVFileER) processFile(fPath, fName string) (err error) {
|
||||
utils.ERs, absPath, rowNr, evsPosted, time.Now().Sub(timeStart)))
|
||||
return
|
||||
}
|
||||
|
||||
// newCsvProvider constructs a DataProvider
|
||||
func newCsvProvider(record []string) (dP config.DataProvider) {
|
||||
dP = &csvProvider{req: record, cache: config.NewNavigableMap(nil)}
|
||||
return
|
||||
}
|
||||
|
||||
// csvProvider implements engine.DataProvider so we can pass it to filters
|
||||
type csvProvider struct {
|
||||
req []string
|
||||
cache *config.NavigableMap
|
||||
}
|
||||
|
||||
// String is part of engine.DataProvider interface
|
||||
// when called, it will display the already parsed values out of cache
|
||||
func (cP *csvProvider) String() string {
|
||||
return utils.ToJSON(cP)
|
||||
}
|
||||
|
||||
// FieldAsInterface is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsInterface(fldPath []string) (data interface{}, err error) {
|
||||
if len(fldPath) != 1 {
|
||||
return nil, utils.ErrNotFound
|
||||
}
|
||||
if data, err = cP.cache.FieldAsInterface(fldPath); err == nil ||
|
||||
err != utils.ErrNotFound { // item found in cache
|
||||
return
|
||||
}
|
||||
err = nil // cancel previous err
|
||||
if cfgFieldIdx, err := strconv.Atoi(fldPath[0]); err != nil || len(cP.req) <= cfgFieldIdx {
|
||||
return nil, fmt.Errorf("Ignoring record: %v with error : %+v", cP.req, err)
|
||||
} else {
|
||||
data = cP.req[cfgFieldIdx]
|
||||
}
|
||||
cP.cache.Set(fldPath, data, false, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FieldAsString is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) FieldAsString(fldPath []string) (data string, err error) {
|
||||
var valIface interface{}
|
||||
valIface, err = cP.FieldAsInterface(fldPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return utils.IfaceAsString(valIface), nil
|
||||
}
|
||||
|
||||
// AsNavigableMap is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) AsNavigableMap([]*config.FCTemplate) (
|
||||
nm *config.NavigableMap, err error) {
|
||||
return nil, utils.ErrNotImplemented
|
||||
}
|
||||
|
||||
// RemoteHost is part of engine.DataProvider interface
|
||||
func (cP *csvProvider) RemoteHost() net.Addr {
|
||||
return utils.LocalAddr()
|
||||
}
|
||||
|
||||
@@ -14,6 +14,9 @@ en=$?
|
||||
echo 'go test github.com/cgrates/cgrates/cdrc -tags=integration'
|
||||
go test github.com/cgrates/cgrates/cdrc -tags=integration
|
||||
cdrc=$?
|
||||
echo 'go test github.com/cgrates/cgrates/ers -tags=integration'
|
||||
go test github.com/cgrates/cgrates/ers -tags=integration
|
||||
ers=$?
|
||||
echo 'go test github.com/cgrates/cgrates/config -tags=integration'
|
||||
go test github.com/cgrates/cgrates/config -tags=integration
|
||||
cfg=$?
|
||||
@@ -39,4 +42,4 @@ echo 'go test github.com/cgrates/cgrates/loaders -tags=integration'
|
||||
go test github.com/cgrates/cgrates/loaders -tags=integration
|
||||
lds=$?
|
||||
|
||||
exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds
|
||||
exit $gen && $ap1 && $ap2 && $en && $cdrc && $cfg && $utl && $gnr && $agts && $smg && $mgr && $dis && $lds && $ers
|
||||
|
||||
Reference in New Issue
Block a user