Added RawJsonReader

This commit is contained in:
Trial97
2018-10-22 16:03:38 +03:00
committed by Dan Christian Bogos
parent 08e2304516
commit 922fffc63f
6 changed files with 212 additions and 143 deletions

View File

@@ -22,8 +22,6 @@ import (
"encoding/json"
"io"
"os"
"github.com/DisposaBoy/JsonConfigReader"
)
const (
@@ -75,7 +73,7 @@ const (
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
func NewCgrJsonCfgFromReader(r io.Reader) (*CgrJsonCfg, error) {
var cgrJsonCfg CgrJsonCfg
jr := JsonConfigReader.New(r)
jr := NewRawJSONReader(r)
if err := json.NewDecoder(jr).Decode(&cgrJsonCfg); err != nil {
return nil, err
}

View File

@@ -20,73 +20,127 @@ package config
import (
"bufio"
"bytes"
"errors"
// "fmt"
"io"
"os"
"github.com/cgrates/cgrates/utils"
)
// Reads the enviorment variable
func ReadEnv(key string) (string, error) {
if env := os.Getenv(key); env != "" {
return env, nil
}
return "", errEnvNotFound
}
/*
*ToDo:
* -add errors to util x
* -make 'new' functions ?
* -restructurate the code !!
* -optimize it a lot !!!
* -add test for them +
*/
func NewRawJSONReader(r io.Reader) io.Reader {
return &EnvReader{
rd: &comentByteReader{
rd: bufio.NewReader(r),
rd: &rawJson{
rdr: bufio.NewReader(r),
},
}
}
type comentByteReader struct {
rd *bufio.Reader
isPosComment bool // for comment removal
isInString bool // ignore coment delimiters in string declarations
}
func isNL(c byte) bool {
func isNewLine(c byte) bool {
return c == '\n' || c == '\r'
}
func isWS(c byte) bool {
return c == ' ' || c == '\t' || isNL(c)
func isWhiteSpace(c byte) bool {
return c == ' ' || c == '\t' || isNewLine(c) || c == 0
}
func (b *comentByteReader) consumeNL() error {
// Reads the enviorment variable
func ReadEnv(key string) (string, error) { //it shod print a warning not a error
if env := os.Getenv(key); env != "" {
return env, nil
}
return "", utils.ErrEnvNotFound(key)
}
func isAlfanum(bit byte) bool {
return (bit >= 'a' && bit <= 'z') ||
(bit >= 'A' && bit <= 'Z') ||
(bit >= '0' && bit <= '9')
}
type rawJson struct {
isInString bool
rdr *bufio.Reader
}
func (b *rawJson) ReadByte() (bit byte, err error) {
if b.isInString { //ignore commas in strings
return b.ReadByteWC()
}
bit, err = b.ReadByteWC()
if err != nil {
return bit, err
}
if bit == ',' {
bit2, err := b.PeekByteWC()
if err != nil {
return bit, err
}
if bit2 == ']' || bit2 == '}' {
return b.ReadByteWC()
}
}
return bit, err
}
func (b *rawJson) consumeComent(pkbit byte) (bool, error) {
switch pkbit {
case '/':
for {
bit, err := b.rdr.ReadByte()
if err != nil || isNewLine(bit) { //read until newline or EOF
return true, err
}
}
case '*':
for bit, err := b.rdr.ReadByte(); bit != '*'; bit, err = b.rdr.ReadByte() { //max 2 reads
if err == io.EOF {
return true, utils.ErrJsonIncompleteComment
}
if err != nil {
return true, err
}
}
simbolMeet := false
for {
bit, err := b.rdr.ReadByte()
if err == io.EOF {
return true, utils.ErrJsonIncompleteComment
}
if err != nil {
return true, err
}
if simbolMeet && bit == '/' {
return true, nil
}
simbolMeet = bit == '*'
}
}
return false, nil
}
func (b *rawJson) readFirstNonWhiteSpace() (byte, error) {
for {
bit, err := b.rd.ReadByte()
if err != nil || isNL(bit) {
return err
bit, err := b.rdr.ReadByte()
if err != nil || !isWhiteSpace(bit) {
return bit, err
}
}
}
func (b *comentByteReader) consumeMLC() error {
stop := false
for {
bit, err := b.rd.ReadByte()
if err != nil && err != io.EOF {
return err
}
if err == io.EOF {
return errors.New("Incomplete Comment")
}
if stop && bit == '/' {
return nil
}
stop = bit == '*'
func (b *rawJson) ReadByteWC() (bit byte, err error) {
if b.isInString {
bit, err = b.rdr.ReadByte()
} else {
bit, err = b.readFirstNonWhiteSpace()
}
}
func (b *comentByteReader) ReadByte() (byte, error) {
bit, err := b.rd.ReadByte()
if err != nil {
return bit, err
}
@@ -94,30 +148,52 @@ func (b *comentByteReader) ReadByte() (byte, error) {
b.isInString = !b.isInString
return bit, err
}
if !b.isInString {
if bit == '/' {
bit2, err := b.rd.Peek(1)
if err != nil {
return bit, err
}
switch bit2[0] {
case '/':
err = b.consumeNL()
if err == io.EOF {
return 0, err
}
return '\n', err
case '*':
if err = b.consumeMLC(); err != nil {
return 0, err
}
return b.rd.ReadByte()
}
if !b.isInString && bit == '/' {
bit2, err := b.rdr.Peek(1)
if err != nil {
return bit, err
}
isComment, err := b.consumeComent(bit2[0])
if err != nil && err != io.EOF {
return bit, err
}
if isComment {
return b.ReadByteWC()
}
}
return bit, err
}
func (b *rawJson) PeekByteWC() (byte, error) {
for {
bit, err := b.rdr.Peek(1)
if err != nil {
return bit[0], err
}
if !b.isInString && bit[0] == '/' { //try consume comment
bit, err = b.rdr.Peek(2)
if err != nil {
return bit[0], err
}
isComment, err := b.consumeComent(bit[1])
if err != nil {
return bit[0], err
}
if isComment {
return b.PeekByteWC()
}
return bit[0], err
}
if !isWhiteSpace(bit[0]) {
return bit[0], err
}
bit2, err := b.rdr.ReadByte()
if err != nil {
return bit2, err
}
}
}
type EnvReader struct {
buf []byte
rd io.ByteReader // reader provided by the client
@@ -125,17 +201,13 @@ type EnvReader struct {
m int // meta Ofset
}
var errNegativeRead = errors.New("reader returned negative count from Read")
var errEnvNotFound = errors.New("reader cant find enviormental variable")
func (b *EnvReader) readEnvName() (name []byte, bit byte, err error) { //0 if not set
esc := []byte{' ', '\t', '\n', '\r', ',', '}', ']', '\'', '"', '/'}
for { //read byte by byte
bit, err := b.rd.ReadByte()
if err != nil {
return name, 0, err
}
if bytes.IndexByte(esc, bit) != -1 {
if !isAlfanum(bit) && bit != '_' { //[a-zA-Z_]+[a-zA-Z0-9_]*
return name, bit, nil
}
name = append(name, bit)
@@ -152,7 +224,7 @@ func (b *EnvReader) replaceEnv(buf []byte, startEnv, midEnv int) (n int, err err
return 0, err
}
if endEnv := midEnv + len(key); endEnv > len(b.buf) { // garbage
if endEnv := midEnv + len(key); endEnv > len(b.buf) { // for garbage colector
b.buf = nil
}
@@ -164,12 +236,12 @@ func (b *EnvReader) replaceEnv(buf []byte, startEnv, midEnv int) (n int, err err
if startEnv+i < len(buf) { //add the bit
buf[startEnv+i] = bit
for j := startEnv + i + 1; j <= midEnv && j < len(buf); j++ { //replace *env: if value < len("*env:")
buf[j] = 0
buf[j] = ' '
}
return startEnv + i, nil
}
if i <= len(value) { //pune restul in buferul extra
if i <= len(value) { // add the remaining in the extra buffer
b.buf = make([]byte, len(value)-i+1)
for j := 0; j+i < len(value); j++ {
b.buf[j] = value[j+i]
@@ -188,7 +260,7 @@ func (b *EnvReader) checkMeta(bit byte) bool {
b.m++
return false
}
b.m = 0
b.m = 0 //reset counting
return false
}
@@ -203,9 +275,7 @@ func (b *EnvReader) Read(p []byte) (n int, err error) {
for ; b.r < len(b.buf) && b.r-pOf < len(p); b.r++ {
p[b.r-pOf] = b.buf[b.r]
if isEnv := b.checkMeta(p[b.r-pOf]); isEnv {
startEnv := b.r - len(utils.MetaEnv) + 1
midEnv := b.r
b.r, err = b.replaceEnv(p, startEnv, midEnv)
b.r, err = b.replaceEnv(p, b.r-len(utils.MetaEnv)+1, b.r)
if err != nil {
return b.r, err
}
@@ -226,51 +296,32 @@ func (b *EnvReader) Read(p []byte) (n int, err error) {
return pOf, err
}
if isEnv := b.checkMeta(p[pOf]); isEnv {
startEnv := pOf - len(utils.MetaEnv) + 1
midEnv := pOf
pOf, err = b.replaceEnv(p, startEnv, midEnv)
pOf, err = b.replaceEnv(p, pOf-len(utils.MetaEnv)+1, pOf)
if err != nil {
return pOf, err
}
}
}
if b.m != 0 { //continue to read if posible meta
initMeta := b.m
buf := make([]byte, len(utils.MetaEnv)-initMeta)
for i := 0; b.m != 0; i++ {
i := 0
for ; b.m != 0; i++ {
buf[i], err = b.rd.ReadByte()
if err != nil {
return i - 1, err
}
if isEnv := b.checkMeta(buf[i]); isEnv {
startEnv := len(p) - initMeta
midEnv := i
i, err = b.replaceEnv(p, startEnv, midEnv)
i, err = b.replaceEnv(p, len(p)-initMeta, i)
if err != nil {
return i, err
}
buf = nil
}
}
if len(buf) > 0 {
b.buf = buf
b.buf = buf[:i]
}
}
return len(p), nil
}
// func (b *EnvReader) commentGuard(bit byte) byte {
// // fmt.Println(b.isPosComment, string(bit))
// if b.isPosComment && bit == '/' {
// b.isPosComment = false
// return 1
// } else if b.isPosComment && bit == '*' {
// b.isPosComment = false
// return 2
// }
// b.isPosComment = bit == '/'
// return 0
// }

View File

@@ -18,54 +18,71 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package config
import (
// "reflect"
"fmt"
"bufio"
"os"
"reflect"
"strings"
"testing"
)
func TestEnvReaderRead(t *testing.T) {
envStr := `{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
var (
envStr = `{//nonprocess string
/***********************************************/
"general": {
"node_id": "", // identifier of this instance in the cluster, if empty it will be autogenerated
"logger":"*syslog", // controls the destination of logs <*syslog|*stdout>
"log_level": 6, // control the level of messages logged (0-emerg to 7-debug)
"http_skip_tls_verify": false, // if enabled Http Client will accept any TLS certificate
"rounding_decimals": 5, // system level precision for floats
"dbdata_encoding": "/usr/*env:PATH", // encoding used to store object data in strings: <msgpack|json>
"tpexport_dir": "/var/spool//cgrates/tpe", // path towards export folder for offline Tariff Plans
"poster_attempts": 3, // number of attempts before considering post request failed (eg: *call_url, CDR replication)
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
"default_request_type": "*rated", // default request type to consider when missing from requests: <""|*prepaid|*postpaid|*pseudoprepaid|*rated>
"default_category": "call", // default category to consider when missing from requests
"default_tenant": "cgrates.org", // default tenant to consider when missing from requests
"default_timezone": "Local", // default timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"connect_attempts": 3, // initial server connect attempts
"reconnects": -1, // number of retries in case of connection lost
"connect_timeout": "1s", // consider connection unsuccessful on timeout, 0 to disable the feature
"reply_timeout": "2s", // consider connection down for replies taking longer than this value
"response_cache_ttl": "0s", // the life span of a cached response
"internal_ttl": "2m", // maximum duration to wait for internal connections before giving up
"locking_timeout": "0", // timeout internal locks to avoid deadlocks
"digest_separator": ",",
"digest_equal": ":",
},
}`
r := strings.NewReader(envStr)
envR := NewRawJSONReader(r)
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "redis", // data_db type: <*redis|*mongo|*internal>
"db_host": "127.0.0.1", /* data_db host address*/
"db_port": 6379, // data_db port to reach the database
"db_name": "10",/*/*asd*/ // data_db database name to connect to
"db_user": "*env:TESTVAR", // username to use when connecting to data_db
"db_password": ",/**/", // password to use when connecting to data_db
"redis_sentinel":"", // redis_sentinel is the name of sentinel
},/*Multiline coment
Line1
Line2
Line3
*/
/**/ }//`
)
func TestEnvRawJsonReadByte(t *testing.T) {
raw := &rawJson{rdr: bufio.NewReader(strings.NewReader(envStr))}
expected := []byte(`{"data_db":{"db_type":"redis","db_host":"127.0.0.1","db_port":6379,"db_name":"10","db_user":"*env:TESTVAR","db_password":",/**/","redis_sentinel":""}}`)
rply := []byte{}
bit, err := raw.ReadByte()
for ; err == nil; bit, err = raw.ReadByte() {
rply = append(rply, bit)
}
if !reflect.DeepEqual(expected, rply) {
t.Errorf("Expected: %+v\n, recived: %+v", string(expected), string(rply))
}
}
func TestEnvReaderRead(t *testing.T) {
os.Setenv("TESTVAR", "cgRates")
envR := NewRawJSONReader(strings.NewReader(envStr))
expected := []byte(`{"data_db":{"db_type":"redis","db_host":"127.0.0.1","db_port":6379,"db_name":"10","db_user":"cgRates","db_password":",/**/","redis_sentinel":""}}`)
rply := []byte{}
buf := make([]byte, 20)
n, err := envR.Read(buf)
for ; err == nil && n > 0; n, err = envR.Read(buf) {
fmt.Printf("%v", string(buf) /*, err, n*/) /*/////*/
rply = append(rply, buf[:n]...)
buf = make([]byte, 20)
}
fmt.Printf("%v", string(buf))
rply = append(rply, buf[:n]...)
if !reflect.DeepEqual(expected, rply) {
// for i := 0; i < len(expected); i++ {
// if expected[i] != rply[i] {
// t.Errorf("Expected: %q\n, recived: %q pe pozitia %+v", (string(expected[i-2 : i+2])), (string(rply[i-2 : i+2])), i)
// break
// }
// }
t.Errorf("Expected: %+v\n, recived: %+v", (string(expected)), (string(rply)))
}
}

2
glide.lock generated
View File

@@ -21,8 +21,6 @@ imports:
version: 69d4269e21990c0f120b8e60d5b75d533db7f3dd
- name: github.com/cgrates/rpcclient
version: 7316bff37a2b8692fbadd57f9c9cda070cc33081
- name: github.com/DisposaBoy/JsonConfigReader
version: 5ea4d0ddac554439159cd6f191cb94a110d73352
- name: github.com/fiorix/go-diameter
version: 16028e641c19a8dd67509053bc558d389258ff6d
subpackages:

View File

@@ -1,6 +1,5 @@
package: ""
import:
- package: github.com/DisposaBoy/JsonConfigReader
- package: github.com/cenkalti/hub
- package: github.com/cenkalti/rpc2
- package: github.com/cgrates/fsock

View File

@@ -57,6 +57,8 @@ var (
ErrUnauthorizedApi = errors.New("UNAUTHORIZED_API")
ErrUnknownApiKey = errors.New("UNKNOWN_API_KEY")
RalsErrorPrfx = "RALS_ERROR"
ErrJsonIncompleteComment = errors.New("JSON_INCOMPLETE_COMMENT")
)
// NewCGRError initialises a new CGRError
@@ -155,3 +157,7 @@ func ErrHasPrefix(err error, prfx string) (has bool) {
}
return strings.HasPrefix(err.Error(), prfx)
}
func ErrEnvNotFound(key string) error {
return errors.New("ENV_VAR_NOT_FOUND:" + key)
}