Merge pull request #1826 from Trial97/master

Added *sql as event reader
This commit is contained in:
Dan Christian Bogos
2019-12-17 14:49:34 +01:00
committed by GitHub
14 changed files with 786 additions and 8 deletions

View File

@@ -38,7 +38,7 @@
"cdrs": {
"enabled": true, // start the CDR Server service: <true|false>
"chargers_conns":["*conn1"],
"chargers_conns":["conn1"],
},
"chargers": {

View File

@@ -0,0 +1,73 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"node_id": "DispatcherS1",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"stor_db": {
"db_type":"*internal",
},
"cache":{
"*dispatcher_routes": {"limit": -1, "ttl": "2s"}
},
"attributes": {
"enabled": true
},
"scheduler": {
"enabled": true,
},
"rals": {
"enabled": true,
},
"chargers": {
"enabled": true,
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
},
},
"sessions": {
"enabled": true,
"attributes_conns": ["conn1"],
"rals_conns": ["conn1"],
"resources_conns": ["conn1"],
"chargers_conns": ["conn1"],
"listen_bijson": ":3014",
},
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
},
"apier": {
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,84 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"node_id": "DispatcherS1",
"log_level": 7,
"reconnects": 1,
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": {
"db_type": "mongo",
"db_name": "10",
"db_port": 27017,
},
"stor_db": {
"db_type": "mongo",
"db_name": "cgrates",
"db_port": 27017,
},
"cache":{
"*dispatcher_routes": {"limit": -1, "ttl": "2s"}
},
"scheduler": {
"enabled": true,
},
"attributes": {
"enabled": true
},
"rals": {
"enabled": true,
},
"chargers": {
"enabled": true,
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
},
},
"sessions": {
"enabled": true,
"attributes_conns": ["conn1"],
"rals_conns": ["conn1"],
"resources_conns": ["conn1"],
"chargers_conns": ["conn1"],
"listen_bijson": ":3014",
},
"dispatchers":{
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
},
"apier": {
"scheduler_conns": ["*internal"],
},
}

View File

@@ -14,6 +14,13 @@
"http": ":2080", // HTTP listening address
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:2013", "transport":"*gob"}],
},
},
"stor_db": { // database used to store offline tariff plans and CDRs
"db_password": "CGRateS.org", // password to use when connecting to stordb
},

View File

@@ -14,7 +14,7 @@ cgrates.org,ATTR_API_RSP_AUTH,*auth,*string:~*req.APIKey:rsp12345,,,APIMethods,*
cgrates.org,ATTR_API_CHC_AUTH,*auth,*string:~*req.APIKey:chc12345,,,APIMethods,*constant,CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear,false,20
cgrates.org,ATTR_API_GRD_AUTH,*auth,*string:~*req.APIKey:grd12345,,,APIMethods,*constant,GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock,false,20
cgrates.org,ATTR_API_SCHD_AUTH,*auth,*string:~*req.APIKey:sched12345,,,APIMethods,*constant,SchedulerSv1.Ping,false,20
cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~*req.APIKey:cdrs12345,,,APIMethods,*constant,CDRsV1.Ping&CDRsV1.ProcessEvent&CDRsV1.GetCDRs&CDRsV1.CountCDRs&CDRsV1.ProcessCDR&CDRsV1.ProcessExternalCDR,false,20
cgrates.org,ATTR_API_CDRS_AUTH,*auth,*string:~*req.APIKey:cdrs12345,,,APIMethods,*constant,CDRsV1.Ping&CDRsV1.ProcessEvent&CDRsV1.GetCDRs&CDRsV1.GetCDRsCount&CDRsV1.ProcessCDR&CDRsV1.ProcessExternalCDR,false,20
cgrates.org,ATTR_API_DSP_AUTH,*auth,*string:~*req.APIKey:dsp12345,,,APIMethods,*constant,DispatcherSv1.Ping&DispatcherSv1.GetProfileForEvent,false,20
cgrates.org,ATTR_API_PSE_AUTH,*auth,*string:~*req.APIKey:pse12345,,,APIMethods,*constant,SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessMessage&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&AttributeSv1.ProcessEvent&Responder.Debit&ResourceSv1.AllocateResources&ChargerSv1.ProcessEvent&Responder.MaxDebit,false,20
cgrates.org,ATTR_API_CFG_AUTH,*auth,*string:~*req.APIKey:cfg12345,,,APIMethods,*constant,ConfigSv1.GetJSONSection&ConfigSv1.ReloadConfig,false,20
1 #Tenant ID Contexts FilterIDs ActivationInterval AttributeFilterIDs FieldName Type Value Blocker Weight
14 cgrates.org ATTR_API_CHC_AUTH *auth *string:~*req.APIKey:chc12345 APIMethods *constant CacheSv1.Ping&CacheSv1.GetCacheStats&CacheSv1.LoadCache&CacheSv1.PrecacheStatus&CacheSv1.GetItemIDs&CacheSv1.HasItem&CacheSv1.GetItemExpiryTime&CacheSv1.ReloadCache&CacheSv1.RemoveItem&CacheSv1.FlushCache&CacheSv1.Clear false 20
15 cgrates.org ATTR_API_GRD_AUTH *auth *string:~*req.APIKey:grd12345 APIMethods *constant GuardianSv1.Ping&GuardianSv1.RemoteLock&GuardianSv1.RemoteUnlock false 20
16 cgrates.org ATTR_API_SCHD_AUTH *auth *string:~*req.APIKey:sched12345 APIMethods *constant SchedulerSv1.Ping false 20
17 cgrates.org ATTR_API_CDRS_AUTH *auth *string:~*req.APIKey:cdrs12345 APIMethods *constant CDRsV1.Ping&CDRsV1.ProcessEvent&CDRsV1.GetCDRs&CDRsV1.CountCDRs&CDRsV1.ProcessCDR&CDRsV1.ProcessExternalCDR CDRsV1.Ping&CDRsV1.ProcessEvent&CDRsV1.GetCDRs&CDRsV1.GetCDRsCount&CDRsV1.ProcessCDR&CDRsV1.ProcessExternalCDR false 20
18 cgrates.org ATTR_API_DSP_AUTH *auth *string:~*req.APIKey:dsp12345 APIMethods *constant DispatcherSv1.Ping&DispatcherSv1.GetProfileForEvent false 20
19 cgrates.org ATTR_API_PSE_AUTH *auth *string:~*req.APIKey:pse12345 APIMethods *constant SessionSv1.Ping&SessionSv1.AuthorizeEvent&SessionSv1.AuthorizeEventWithDigest&SessionSv1.InitiateSession&SessionSv1.InitiateSessionWithDigest&SessionSv1.UpdateSession&SessionSv1.SyncSessions&SessionSv1.TerminateSession&SessionSv1.ProcessCDR&SessionSv1.ProcessMessage&SessionSv1.GetActiveSessions&SessionSv1.GetActiveSessionsCount&SessionSv1.ForceDisconnect&SessionSv1.GetPassiveSessions&SessionSv1.GetPassiveSessionsCount&SessionSv1.ReplicateSessions&SessionSv1.SetPassiveSession&AttributeSv1.ProcessEvent&Responder.Debit&ResourceSv1.AllocateResources&ChargerSv1.ProcessEvent&Responder.MaxDebit false 20
20 cgrates.org ATTR_API_CFG_AUTH *auth *string:~*req.APIKey:cfg12345 APIMethods *constant ConfigSv1.GetJSONSection&ConfigSv1.ReloadConfig false 20

View File

@@ -60,7 +60,7 @@ var sTestsDspSession = []func(t *testing.T){
//Test start here
func TestDspSessionSTMySQL(t *testing.T) {
if *encoding == utils.MetaGOB {
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers", "testit", "tutorial", "dispatchers_gob")
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_gob", "testit", "tutorial", "dispatchers_gob")
} else {
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers", "testit", "tutorial", "dispatchers")
}
@@ -68,7 +68,7 @@ func TestDspSessionSTMySQL(t *testing.T) {
func TestDspSessionSMongo(t *testing.T) {
if *encoding == utils.MetaGOB {
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_mongo", "testit", "tutorial", "dispatchers_gob")
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_mongo_gob", "testit", "tutorial", "dispatchers_gob")
} else {
testDsp(t, sTestsDspSession, "TestDspSessionS", "all", "all2", "dispatchers_mongo", "testit", "tutorial", "dispatchers")
}

View File

@@ -92,7 +92,7 @@ func (cM *ConnManager) getConn(connID string, biRPCClient rpcclient.ClientConnec
func (cM *ConnManager) Call(connIDs []string, biRPCClient rpcclient.ClientConnector,
method string, arg, reply interface{}) (err error) {
if len(connIDs) == 0 {
return utils.ErrMandatoryIeMissing
return utils.NewErrMandatoryIeMissing("connIDs")
}
var conn rpcclient.ClientConnector
for _, connID := range connIDs {

View File

@@ -76,7 +76,6 @@ func TestKafkaER(t *testing.T) {
rdrErr, new(engine.FilterS), rdrExit); err != nil {
t.Fatal(err)
}
kfk.Serve()
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: defaultTopic,
@@ -90,6 +89,8 @@ func TestKafkaER(t *testing.T) {
)
w.Close()
kfk.Serve()
select {
case err = <-rdrErr:
t.Error(err)

View File

@@ -42,6 +42,8 @@ func NewEventReader(cfg *config.CGRConfig, cfgIdx int,
return NewCSVFileER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaKafkajsonMap:
return NewKafkaER(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
case utils.MetaSQL:
return NewSQLEventReader(cfg, cfgIdx, rdrEvents, rdrErr, fltrS, rdrExit)
}
return
}

324
ers/sql.go Normal file
View File

@@ -0,0 +1,324 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"fmt"
"net/url"
"strings"
"time"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
// libs for sql DBs
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
_ "github.com/lib/pq"
)
const (
dbName = "db_name"
tableName = "table_name"
sslMode = "sslmode"
defaultSSLMode = "disable"
defaultDBName = "cgrates"
)
// NewSQLEventReader return a new kafka event reader
func NewSQLEventReader(cfg *config.CGRConfig, cfgIdx int,
rdrEvents chan *erEvent, rdrErr chan error,
fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) {
rdr := &SQLEventReader{
cgrCfg: cfg,
cfgIdx: cfgIdx,
fltrS: fltrS,
rdrEvents: rdrEvents,
rdrExit: rdrExit,
rdrErr: rdrErr,
}
if concReq := rdr.Config().ConcurrentReqs; concReq != -1 {
rdr.cap = make(chan struct{}, concReq)
for i := 0; i < concReq; i++ {
rdr.cap <- struct{}{}
}
}
if err = rdr.setURL(rdr.Config().SourcePath, rdr.Config().ProcessedPath); err != nil {
return
}
er = rdr
return
}
// SQLEventReader implements EventReader interface for sql
type SQLEventReader struct {
// sync.RWMutex
cgrCfg *config.CGRConfig
cfgIdx int // index of config instance within ERsCfg.Readers
fltrS *engine.FilterS
connString string
connType string
tableName string
expConnString string
expConnType string
expTableName string
rdrEvents chan *erEvent // channel to dispatch the events created to
rdrExit chan struct{}
rdrErr chan error
cap chan struct{}
}
// Config returns the curent configuration
func (rdr *SQLEventReader) Config() *config.EventReaderCfg {
return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx]
}
// Serve will start the gorutines needed to watch the kafka topic
func (rdr *SQLEventReader) Serve() (err error) {
var db *gorm.DB
if db, err = gorm.Open(rdr.connType, rdr.connString); err != nil {
return
}
if err = db.DB().Ping(); err != nil {
return
}
if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API
return
}
go rdr.readLoop(db) // read until the connection is closed
return
}
func (rdr *SQLEventReader) readLoop(db *gorm.DB) {
for {
if db = db.Table(rdr.tableName).Select("*"); db.Error != nil {
rdr.rdrErr <- db.Error
return
}
rows, err := db.Rows()
if err != nil {
rdr.rdrErr <- err
return
}
colNames, err := rows.Columns()
if err != nil {
rdr.rdrErr <- err
return
}
for rows.Next() {
select {
case <-rdr.rdrExit:
utils.Logger.Info(
fmt.Sprintf("<%s> stop monitoring sql DB <%s>",
utils.ERs, rdr.Config().SourcePath))
db.Close()
return
default:
}
if err := rows.Err(); err != nil {
rdr.rdrErr <- err
return
}
if rdr.Config().ConcurrentReqs != -1 {
<-rdr.cap // do not try to read if the limit is reached
}
columns := make([]interface{}, len(colNames))
columnPointers := make([]interface{}, len(colNames))
for i := range columns {
columnPointers[i] = &columns[i]
}
if err = rows.Scan(columnPointers...); err != nil {
rdr.rdrErr <- err
return
}
go func(columns []interface{}, colNames []string) {
msg := make(map[string]interface{})
for i, colName := range colNames {
msg[colName] = columns[i]
}
if err := rdr.processMessage(msg); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
}
db = db.Delete(msg) // to ensure we don't read it again
if rdr.Config().ProcessedPath != utils.EmptyString {
if err = rdr.postCDR(columns); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> posting message %s error: %s",
utils.ERs, utils.ToJSON(msg), err.Error()))
}
}
if rdr.Config().ConcurrentReqs != -1 {
rdr.cap <- struct{}{}
}
}(columns, colNames)
}
if rdr.Config().RunDelay < 0 {
return
}
time.Sleep(rdr.Config().RunDelay)
}
}
func (rdr *SQLEventReader) processMessage(msg map[string]interface{}) (err error) {
reqVars := make(map[string]interface{})
agReq := agents.NewAgentRequest(
config.NewNavigableMap(msg), reqVars,
nil, nil, rdr.Config().Tenant,
rdr.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(rdr.Config().Timezone,
rdr.cgrCfg.GeneralCfg().DefaultTimezone),
rdr.fltrS) // create an AgentRequest
var pass bool
if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters,
agReq); err != nil || !pass {
return
}
var navMp *config.NavigableMap
if navMp, err = agReq.AsNavigableMap(rdr.Config().ContentFields); err != nil {
return
}
rdr.rdrEvents <- &erEvent{
cgrEvent: navMp.AsCGREvent(agReq.Tenant, utils.NestingSep),
rdrCfg: rdr.Config(),
}
return
}
func (rdr *SQLEventReader) setURL(inURL, outURL string) (err error) {
inURL = strings.TrimPrefix(inURL, utils.Meta)
var u *url.URL
if u, err = url.Parse(inURL); err != nil {
return
}
password, _ := u.User.Password()
qry := u.Query()
rdr.connType = u.Scheme
dbname := defaultDBName
if vals, has := qry[dbName]; has && len(vals) != 0 {
dbname = vals[0]
}
ssl := defaultSSLMode
if vals, has := qry[sslMode]; has && len(vals) != 0 {
ssl = vals[0]
}
rdr.tableName = utils.CDRsTBL
if vals, has := qry[tableName]; has && len(vals) != 0 {
rdr.tableName = vals[0]
}
switch rdr.connType {
case utils.MYSQL:
rdr.connString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
u.User.Username(), password, u.Hostname(), u.Port(), dbname)
case utils.POSTGRES:
rdr.connString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", u.Hostname(), u.Port(), dbname, u.User.Username(), password, ssl)
default:
return fmt.Errorf("unknown db_type %s", rdr.connType)
}
// outURL
if len(outURL) == 0 {
return
}
var outUser, outPassword, outDBname, outSSL, outHost, outPort string
var oqry url.Values
if !strings.HasPrefix(outURL, utils.Meta) {
rdr.expConnType = rdr.connType
outUser = u.User.Username()
outPassword = password
outHost = u.Hostname()
outPort = u.Port()
if oqry, err = url.ParseQuery(outURL); err != nil {
return
}
} else {
outURL = strings.TrimPrefix(outURL, utils.Meta)
var oURL *url.URL
if oURL, err = url.Parse(inURL); err != nil {
return
}
rdr.expConnType = oURL.Scheme
outPassword, _ = oURL.User.Password()
outUser = oURL.User.Username()
outHost = oURL.Hostname()
outPort = oURL.Port()
oqry = oURL.Query()
}
outDBname = defaultDBName
if vals, has := oqry[dbName]; has && len(vals) != 0 {
outDBname = vals[0]
}
outSSL = defaultSSLMode
if vals, has := oqry[sslMode]; has && len(vals) != 0 {
outSSL = vals[0]
}
rdr.expTableName = utils.CDRsTBL
if vals, has := oqry[tableName]; has && len(vals) != 0 {
rdr.expTableName = vals[0]
}
switch rdr.expConnType {
case utils.MYSQL:
rdr.expConnString = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
outUser, outPassword, outHost, outPort, outDBname)
case utils.POSTGRES:
rdr.expConnString = fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
outHost, outPort, outDBname, outUser, outPassword, outSSL)
default:
return fmt.Errorf("unknown db_type")
}
return
}
func (rdr *SQLEventReader) postCDR(in []interface{}) (err error) {
sqlValues := make([]string, len(in))
for i := range in {
sqlValues[i] = "?"
}
sqlStatement := fmt.Sprintf("INSERT INTO %s VALUES (%s); ", rdr.expTableName, strings.Join(sqlValues, ","))
var db *gorm.DB
if db, err = gorm.Open(rdr.expConnType, rdr.expConnString); err != nil {
return
}
if err = db.DB().Ping(); err != nil {
return
}
tx := db.Begin()
_, err = db.DB().Exec(sqlStatement, in...)
if err != nil {
tx.Rollback()
if strings.Contains(err.Error(), "1062") || strings.Contains(err.Error(), "duplicate key") { // returns 1062/pq when key is duplicated
return utils.ErrExists
}
return
}
tx.Commit()
return
}

285
ers/sql_it_test.go Normal file
View File

@@ -0,0 +1,285 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package ers
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
)
var (
sqlCfgPath string
sqlCfg *config.CGRConfig
sqlTests = []func(t *testing.T){
testSQLInitConfig,
testSQLInitCdrDb,
testSQLInitDB,
testSQLReader,
testSQLEmptyTable,
testSQLPoster,
testSQLInitDB,
testSQLReader2,
testSQLStop,
}
cdr = &engine.CDR{
CGRID: "CGRID",
RunID: "RunID",
}
db *gorm.DB
dbConnString = "cgrates:CGRateS.org@tcp(127.0.0.1:3306)/cgrates2?charset=utf8&loc=Local&parseTime=true&sql_mode='ALLOW_INVALID_DATES'"
)
func TestSQL(t *testing.T) {
// sqlCfgPath = path.Join(*dataDir, "conf", "samples", "ers_reload", "disabled")
for _, test := range sqlTests {
t.Run("TestSQL", test)
}
}
func testSQLInitConfig(t *testing.T) {
var err error
if sqlCfg, err = config.NewCGRConfigFromJsonStringWithDefaults(`{
"stor_db": {
"db_password": "CGRateS.org",
},
"ers": { // EventReaderService
"enabled": true, // starts the EventReader service: <true|false>
"readers": [
{
"id": "mysql", // identifier of the EventReader profile
"type": "*sql", // reader type <*file_csv>
"run_delay": 1, // sleep interval in seconds between consecutive runs, -1 to use automation via inotify or 0 to disable running all together
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "*mysql://cgrates:CGRateS.org@127.0.0.1:3306?db_name=cgrates2", // read data from this path
"processed_path": "db_name=cgrates2&table_name=cdrs2", // move processed data here
"tenant": "cgrates.org", // tenant used by import
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
// "header_fields": [], // template of the import header fields
"content_fields":[ // import content_fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "CGRID", "type": "*composed", "value": "~*req.cgrid", "field_id": "CGRID"},
],
// "trailer_fields": [], // template of the import trailer fields
},
],
},
}`); err != nil {
t.Fatal(err)
}
utils.Newlogger(utils.MetaSysLog, sqlCfg.GeneralCfg().NodeID)
utils.Logger.SetLogLevel(7)
}
func testSQLInitCdrDb(t *testing.T) {
rdrsql := sqlCfg.StorDbCfg().Clone()
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
sqlCfg.StorDbCfg().Name = "cgrates2"
if err := engine.InitStorDb(sqlCfg); err != nil {
t.Fatal(err)
}
*sqlCfg.StorDbCfg() = *rdrsql
}
type testModelSql struct {
ID int64
Cgrid string
RunID string
OriginHost string
Source string
OriginID string
TOR string
RequestType string
Tenant string
Category string
Account string
Subject string
Destination string
SetupTime time.Time
AnswerTime time.Time
Usage int64
ExtraFields string
CostSource string
Cost float64
CostDetails string
ExtraInfo string
CreatedAt time.Time
UpdatedAt time.Time
DeletedAt *time.Time
}
func (_ *testModelSql) TableName() string {
return "cdrs2"
}
func testSQLInitDB(t *testing.T) {
cdr.CGRID = utils.UUIDSha1Prefix()
var err error
db, err = gorm.Open("mysql", dbConnString)
if err != nil {
t.Fatal(err)
}
if !db.HasTable("cdrs2") {
db = db.CreateTable(&testModelSql{})
}
db = db.Table(utils.CDRsTBL)
tx := db.Begin()
cdrSql := cdr.AsCDRsql()
cdrSql.CreatedAt = time.Now()
saved := tx.Save(cdrSql)
if saved.Error != nil {
tx.Rollback()
t.Fatal(err)
}
tx.Commit()
}
func testSQLReader(t *testing.T) {
rdrEvents = make(chan *erEvent, 1)
rdrErr = make(chan error, 1)
rdrExit = make(chan struct{}, 1)
sqlER, err := NewEventReader(sqlCfg, 1, rdrEvents, rdrErr, new(engine.FilterS), rdrExit)
if err != nil {
t.Fatal(err)
}
sqlER.Serve()
select {
case err = <-rdrErr:
t.Error(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "mysql" {
t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]interface{}{
"CGRID": cdr.CGRID,
},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
case <-time.After(time.Second):
t.Fatal("Timeout")
}
}
func testSQLEmptyTable(t *testing.T) {
time.Sleep(10 * time.Millisecond)
rows, err := db.Table(utils.CDRsTBL).Select("*").Rows()
if err != nil {
t.Fatal(err)
}
colNames, err := rows.Columns()
if err != nil {
t.Fatal(err)
}
for rows.Next() {
columns := make([]interface{}, len(colNames))
columnPointers := make([]interface{}, len(colNames))
for i := range columns {
columnPointers[i] = &columns[i]
}
if err = rows.Scan(columnPointers...); err != nil {
t.Fatal(err)
}
msg := make(map[string]interface{})
for i, colName := range colNames {
msg[colName] = columns[i]
}
t.Fatal("Expected empty table ", utils.ToJSON(msg))
}
}
func testSQLReader2(t *testing.T) {
select {
case err := <-rdrErr:
t.Error(err)
case ev := <-rdrEvents:
if ev.rdrCfg.ID != "mysql" {
t.Errorf("Expected 'mysql' received `%s`", ev.rdrCfg.ID)
}
expected := &utils.CGREvent{
Tenant: "cgrates.org",
ID: ev.cgrEvent.ID,
Time: ev.cgrEvent.Time,
Event: map[string]interface{}{
"CGRID": cdr.CGRID,
},
}
if !reflect.DeepEqual(ev.cgrEvent, expected) {
t.Errorf("Expected %s ,received %s", utils.ToJSON(expected), utils.ToJSON(ev.cgrEvent))
}
case <-time.After(time.Second):
t.Fatal("Timeout")
}
}
func testSQLPoster(t *testing.T) {
rows, err := db.Table("cdrs2").Select("*").Rows()
if err != nil {
t.Fatal(err)
}
colNames, err := rows.Columns()
if err != nil {
t.Fatal(err)
}
for rows.Next() {
columns := make([]interface{}, len(colNames))
columnPointers := make([]interface{}, len(colNames))
for i := range columns {
columnPointers[i] = &columns[i]
}
if err = rows.Scan(columnPointers...); err != nil {
t.Fatal(err)
}
msg := make(map[string]interface{})
for i, colName := range colNames {
msg[colName] = columns[i]
}
db.Table("cdrs2").Delete(msg)
if cgrid := utils.IfaceAsString(msg["cgrid"]); cgrid != cdr.CGRID {
t.Errorf("Expected: %s ,receieved: %s", cgrid, cdr.CGRID)
}
}
}
func testSQLStop(t *testing.T) {
rdrExit <- struct{}{}
db = db.DropTable("cdrs2")
if err := db.Close(); err != nil {
t.Error(err)
}
}

View File

@@ -68,7 +68,7 @@ func testbrodcastItLoadConfig(t *testing.T) {
t.Error(err)
}
brodcastInternalCfgPath = path.Join(*dataDir, "conf", "samples", "tutinternal")
if brodcastInternalCfg, err = config.NewCGRConfigFromPath(brodcastCfgPath); err != nil {
if brodcastInternalCfg, err = config.NewCGRConfigFromPath(brodcastInternalCfgPath); err != nil {
t.Error(err)
}
}

View File

@@ -223,7 +223,6 @@ func testV1CDRsProcessEventAttrS(t *testing.T) {
}
func testV1CDRsProcessEventChrgS(t *testing.T) {
argsEv := &engine.ArgV1ProcessEvent{
Flags: []string{utils.MetaChargers, "*attributes:false"},
CGREvent: utils.CGREvent{

View File

@@ -301,6 +301,9 @@ const (
MetaAMQPV1jsonMap = "*amqpv1_json_map"
MetaSQSjsonMap = "*sqs_json_map"
MetaKafkajsonMap = "*kafka_json_map"
MetaSQL = "*sql"
MetaMySQL = "*mysql"
MetaPostgres = "*postgres"
MetaS3jsonMap = "*s3_json_map"
CONFIG_PATH = "/etc/cgrates/"
DISCONNECT_CAUSE = "DisconnectCause"