Update integration tests

This commit is contained in:
TeoV
2019-10-30 09:03:56 -04:00
committed by Dan Christian Bogos
parent 26d0758bea
commit 722f74f652
22 changed files with 183 additions and 276 deletions

View File

@@ -463,7 +463,7 @@ func main() {
}
}
if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled {
storDb, err := engine.ConfigureStorStorage(cfg.StorDbCfg().Type,
storDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode,

View File

@@ -231,13 +231,15 @@ func main() {
}
if !*toStorDB {
if dm, err = engine.ConfigureDataStorage(ldrCfg.DataDbCfg().DataDbType,
d, err := engine.NewDataDBConn(ldrCfg.DataDbCfg().DataDbType,
ldrCfg.DataDbCfg().DataDbHost, ldrCfg.DataDbCfg().DataDbPort,
ldrCfg.DataDbCfg().DataDbName, ldrCfg.DataDbCfg().DataDbUser,
ldrCfg.DataDbCfg().DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding,
config.CgrConfig().CacheCfg(), ldrCfg.DataDbCfg().DataDbSentinelName); err != nil {
ldrCfg.DataDbCfg().DataDbSentinelName)
if err != nil {
log.Fatalf("Coud not open dataDB connection: %s", err.Error())
}
dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg())
defer dm.DataDB().Close()
}

View File

@@ -71,14 +71,15 @@ var (
)
func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) {
dm, err := engine.ConfigureDataStorage(tstCfg.DataDbCfg().DataDbType,
dbConn, err := engine.NewDataDBConn(tstCfg.DataDbCfg().DataDbType,
tstCfg.DataDbCfg().DataDbHost, tstCfg.DataDbCfg().DataDbPort,
tstCfg.DataDbCfg().DataDbName, tstCfg.DataDbCfg().DataDbUser,
tstCfg.DataDbCfg().DataDbPass, tstCfg.GeneralCfg().DBDataEncoding,
cgrConfig.CacheCfg(), tstCfg.DataDbCfg().DataDbSentinelName) // for the momentn we use here "" for sentinelName
tstCfg.DataDbCfg().DataDbSentinelName)
if err != nil {
return nilDuration, fmt.Errorf("Could not connect to data database: %s", err.Error())
}
dm := engine.NewDataManager(dbConn, cgrConfig.CacheCfg()) // for the momentn we use here "" for sentinelName
defer dm.DataDB().Close()
engine.SetDataStorage(dm)
if err := dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil,

View File

@@ -62,8 +62,8 @@ const CGRATES_CFG_JSON = `
"db_password": "", // password to use when connecting to data_db
"redis_sentinel":"", // the name of sentinel when used
"query_timeout":"10s",
"remote_db_urls":[],
"replicate_db_urls":[],
"remote_conns":[],
"replication_conns":[],
},
@@ -351,7 +351,7 @@ const CGRATES_CFG_JSON = `
"stats_conns": [], // connections to StatS for reporting session events <""|*internal|127.0.0.1:2013>
"suppliers_conns": [], // connections to SupplierS for querying suppliers for event <""|*internal|127.0.0.1:2013>
"attributes_conns": [], // connections to AttributeS for altering event fields <""|*internal|127.0.0.1:2013>
"session_replication_conns": [], // replicate sessions towards these session services
"replication_conns": [], // replicate sessions towards these session services
"debit_interval": "0s", // interval to perform debits on.
"store_session_costs": false, // enable storing of the session costs within CDRs
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this

View File

@@ -417,11 +417,11 @@ func TestCGRConfigReloadSessionS(t *testing.T) {
},
},
SessionReplicationConns: []*RemoteHost{},
MaxCallDuration: 3 * time.Hour,
SessionIndexes: utils.NewStringMap(),
ClientProtocol: 1,
TerminateAttempts: 5,
ReplicationConns: []*RemoteHost{},
MaxCallDuration: 3 * time.Hour,
SessionIndexes: utils.NewStringMap(),
ClientProtocol: 1,
TerminateAttempts: 5,
}
if !reflect.DeepEqual(expAttr, cfg.SessionSCfg()) {
t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SessionSCfg()))

View File

@@ -197,8 +197,8 @@ func TestDfDataDbJsonCfg(t *testing.T) {
Db_password: utils.StringPointer(""),
Redis_sentinel: utils.StringPointer(""),
Query_timeout: utils.StringPointer("10s"),
Replicate_db_urls: &[]string{},
Remote_db_urls: &[]string{},
Replication_conns: &[]*DbJsonCfg{},
Remote_conns: &[]*DbJsonCfg{},
}
if cfg, err := dfCgrJsonCfg.DbJsonCfg(DATADB_JSN); err != nil {
t.Error(err)
@@ -480,26 +480,26 @@ func TestDfCdrcJsonCfg(t *testing.T) {
func TestSmgJsonCfg(t *testing.T) {
eCfg := &SessionSJsonCfg{
Enabled: utils.BoolPointer(false),
Listen_bijson: utils.StringPointer("127.0.0.1:2014"),
Chargers_conns: &[]*RemoteHostJson{},
Rals_conns: &[]*RemoteHostJson{},
Cdrs_conns: &[]*RemoteHostJson{},
Resources_conns: &[]*RemoteHostJson{},
Thresholds_conns: &[]*RemoteHostJson{},
Stats_conns: &[]*RemoteHostJson{},
Suppliers_conns: &[]*RemoteHostJson{},
Attributes_conns: &[]*RemoteHostJson{},
Session_replication_conns: &[]*RemoteHostJson{},
Debit_interval: utils.StringPointer("0s"),
Store_session_costs: utils.BoolPointer(false),
Min_call_duration: utils.StringPointer("0s"),
Max_call_duration: utils.StringPointer("3h"),
Session_ttl: utils.StringPointer("0s"),
Session_indexes: &[]string{},
Client_protocol: utils.Float64Pointer(1.0),
Channel_sync_interval: utils.StringPointer("0"),
Terminate_attempts: utils.IntPointer(5),
Enabled: utils.BoolPointer(false),
Listen_bijson: utils.StringPointer("127.0.0.1:2014"),
Chargers_conns: &[]*RemoteHostJson{},
Rals_conns: &[]*RemoteHostJson{},
Cdrs_conns: &[]*RemoteHostJson{},
Resources_conns: &[]*RemoteHostJson{},
Thresholds_conns: &[]*RemoteHostJson{},
Stats_conns: &[]*RemoteHostJson{},
Suppliers_conns: &[]*RemoteHostJson{},
Attributes_conns: &[]*RemoteHostJson{},
Replication_conns: &[]*RemoteHostJson{},
Debit_interval: utils.StringPointer("0s"),
Store_session_costs: utils.BoolPointer(false),
Min_call_duration: utils.StringPointer("0s"),
Max_call_duration: utils.StringPointer("3h"),
Session_ttl: utils.StringPointer("0s"),
Session_indexes: &[]string{},
Client_protocol: utils.Float64Pointer(1.0),
Channel_sync_interval: utils.StringPointer("0"),
Terminate_attempts: utils.IntPointer(5),
}
if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil {
t.Error(err)

View File

@@ -630,26 +630,26 @@ func TestCgrCfgJSONDefaultsCdreProfiles(t *testing.T) {
func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) {
eSessionSCfg := &SessionSCfg{
Enabled: false,
ListenBijson: "127.0.0.1:2014",
ChargerSConns: []*RemoteHost{},
RALsConns: []*RemoteHost{},
CDRsConns: []*RemoteHost{},
ResSConns: []*RemoteHost{},
ThreshSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
SupplSConns: []*RemoteHost{},
AttrSConns: []*RemoteHost{},
SessionReplicationConns: []*RemoteHost{},
DebitInterval: 0 * time.Second,
StoreSCosts: false,
MinCallDuration: 0 * time.Second,
MaxCallDuration: 3 * time.Hour,
SessionTTL: 0 * time.Second,
SessionIndexes: utils.StringMap{},
ClientProtocol: 1.0,
ChannelSyncInterval: 0,
TerminateAttempts: 5,
Enabled: false,
ListenBijson: "127.0.0.1:2014",
ChargerSConns: []*RemoteHost{},
RALsConns: []*RemoteHost{},
CDRsConns: []*RemoteHost{},
ResSConns: []*RemoteHost{},
ThreshSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
SupplSConns: []*RemoteHost{},
AttrSConns: []*RemoteHost{},
ReplicationConns: []*RemoteHost{},
DebitInterval: 0 * time.Second,
StoreSCosts: false,
MinCallDuration: 0 * time.Second,
MaxCallDuration: 3 * time.Hour,
SessionTTL: 0 * time.Second,
SessionIndexes: utils.StringMap{},
ClientProtocol: 1.0,
ChannelSyncInterval: 0,
TerminateAttempts: 5,
}
if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) {
t.Errorf("expecting: %s, received: %s",

View File

@@ -75,24 +75,16 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) {
return err
}
}
if jsnDbCfg.Remote_db_urls != nil {
dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Remote_db_urls))
for i, url := range *jsnDbCfg.Remote_db_urls {
db, err := newDataDBCfgFromUrl(url)
if err != nil {
return err
}
dbcfg.RmtDataDBCfgs[i] = db
if jsnDbCfg.Remote_conns != nil {
dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Remote_conns))
for i, cfg := range *jsnDbCfg.Remote_conns {
dbcfg.RmtDataDBCfgs[i].loadFromJsonCfg(cfg)
}
}
if jsnDbCfg.Replicate_db_urls != nil {
dbcfg.RplDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Replicate_db_urls))
for i, url := range *jsnDbCfg.Replicate_db_urls {
db, err := newDataDBCfgFromUrl(url)
if err != nil {
return err
}
dbcfg.RplDataDBCfgs[i] = db
if jsnDbCfg.Replication_conns != nil {
dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Replication_conns))
for i, cfg := range *jsnDbCfg.Replication_conns {
dbcfg.RplDataDBCfgs[i].loadFromJsonCfg(cfg)
}
}
return nil
@@ -111,40 +103,3 @@ func (dbcfg *DataDbCfg) Clone() *DataDbCfg {
QueryTimeout: dbcfg.QueryTimeout,
}
}
//newDataDBCfgFromUrl will create a DataDB configuration out of url
//Format: host:port/?type=valOfType&name=valOFName&etc...
//Sample: 127.0.0.1:6379
func newDataDBCfgFromUrl(pUrl string) (newDbCfg *DataDbCfg, err error) {
newDbCfg = new(DataDbCfg)
if pUrl == utils.EmptyString {
return nil, utils.ErrMandatoryIeMissing
}
// populate with default dataDBCfg and overwrite in case we found arguments in url
dfltCfg, _ := NewDefaultCGRConfig()
*newDbCfg = *dfltCfg.dataDbCfg
hostPortSls := strings.Split(strings.Split(pUrl, utils.Slash)[0], utils.InInFieldSep)
newDbCfg.DataDbHost = hostPortSls[0]
newDbCfg.DataDbPort = hostPortSls[1]
arg := utils.GetUrlRawArguments(pUrl)
if val, has := arg[utils.TypeLow]; has {
newDbCfg.DataDbType = strings.TrimPrefix(val, "*")
}
if val, has := arg[utils.UserLow]; has {
newDbCfg.DataDbUser = val
}
if val, has := arg[utils.PassLow]; has {
newDbCfg.DataDbPass = val
}
if val, has := arg[utils.SentinelLow]; has {
newDbCfg.DataDbSentinelName = val
}
if val, has := arg[utils.QueryLow]; has {
dur, err := utils.ParseDurationWithNanosecs(val)
if err != nil {
return nil, err
}
newDbCfg.QueryTimeout = dur
}
return
}

View File

@@ -20,9 +20,6 @@ package config
import (
"reflect"
"testing"
"time"
"github.com/cgrates/cgrates/utils"
)
func TestDataDbCfgloadFromJsonCfg(t *testing.T) {
@@ -125,65 +122,3 @@ func TestDataDbCfgloadFromJsonCfgPort(t *testing.T) {
t.Errorf("Expected: %+v , recived: %+v", expected, dbcfg)
}
}
func TestDataDbNewDataDbFromUrl(t *testing.T) {
if _, err := newDataDBCfgFromUrl(utils.EmptyString); err != utils.ErrMandatoryIeMissing {
t.Errorf("Expected: %+v , recived: %+v", utils.ErrMandatoryIeMissing, err)
}
url := "127.0.0.1:1234"
expected := &DataDbCfg{
DataDbType: utils.REDIS,
DataDbHost: "127.0.0.1",
DataDbPort: "1234",
DataDbName: "10",
DataDbUser: "cgrates",
DataDbPass: "",
DataDbSentinelName: "",
QueryTimeout: 10 * time.Second,
RmtDataDBCfgs: []*DataDbCfg{},
RplDataDBCfgs: []*DataDbCfg{},
}
if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) {
t.Errorf("Error: %+v \n, expected: %+v ,\n recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv))
}
url = "127.0.0.1:1234/?user=test&pass=test"
expected = &DataDbCfg{
DataDbType: utils.REDIS,
DataDbHost: "127.0.0.1",
DataDbPort: "1234",
DataDbName: "10",
DataDbUser: "test",
DataDbPass: "test",
DataDbSentinelName: "",
QueryTimeout: 10 * time.Second,
RmtDataDBCfgs: []*DataDbCfg{},
RplDataDBCfgs: []*DataDbCfg{},
}
if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) {
t.Errorf("Error: %+v , expected: %+v , recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv))
}
url = "0.0.0.0:1234/?type=*mongo"
expected = &DataDbCfg{
DataDbType: utils.MONGO,
DataDbHost: "0.0.0.0",
DataDbPort: "1234",
DataDbName: "10",
DataDbUser: "cgrates",
DataDbPass: "",
DataDbSentinelName: "",
QueryTimeout: 10 * time.Second,
RmtDataDBCfgs: []*DataDbCfg{},
RplDataDBCfgs: []*DataDbCfg{},
}
if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) {
t.Errorf("Error: %+v , expected: %+v , recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv))
}
url = "0.0.0.0:1234/?type=*mongo&query=error"
if _, err := newDataDBCfgFromUrl(url); err == nil || err.Error() != "time: invalid duration error" {
t.Errorf("Expected:<time: invalid duration error> , recived: <%+v>", err)
}
}

View File

@@ -90,8 +90,8 @@ type DbJsonCfg struct {
Redis_sentinel *string
Query_timeout *string
Sslmode *string // Used only in case of storDb
Remote_db_urls *[]string
Replicate_db_urls *[]string
Remote_conns *[]*DbJsonCfg
Replication_conns *[]*DbJsonCfg
}
// Filters config
@@ -203,29 +203,29 @@ type EventReaderJsonCfg struct {
// SM-Generic config section
type SessionSJsonCfg struct {
Enabled *bool
Listen_bijson *string
Chargers_conns *[]*RemoteHostJson
Rals_conns *[]*RemoteHostJson
Resources_conns *[]*RemoteHostJson
Thresholds_conns *[]*RemoteHostJson
Stats_conns *[]*RemoteHostJson
Suppliers_conns *[]*RemoteHostJson
Cdrs_conns *[]*RemoteHostJson
Session_replication_conns *[]*RemoteHostJson
Attributes_conns *[]*RemoteHostJson
Debit_interval *string
Store_session_costs *bool
Min_call_duration *string
Max_call_duration *string
Session_ttl *string
Session_ttl_max_delay *string
Session_ttl_last_used *string
Session_ttl_usage *string
Session_indexes *[]string
Client_protocol *float64
Channel_sync_interval *string
Terminate_attempts *int
Enabled *bool
Listen_bijson *string
Chargers_conns *[]*RemoteHostJson
Rals_conns *[]*RemoteHostJson
Resources_conns *[]*RemoteHostJson
Thresholds_conns *[]*RemoteHostJson
Stats_conns *[]*RemoteHostJson
Suppliers_conns *[]*RemoteHostJson
Cdrs_conns *[]*RemoteHostJson
Replication_conns *[]*RemoteHostJson
Attributes_conns *[]*RemoteHostJson
Debit_interval *string
Store_session_costs *bool
Min_call_duration *string
Max_call_duration *string
Session_ttl *string
Session_ttl_max_delay *string
Session_ttl_last_used *string
Session_ttl_usage *string
Session_indexes *[]string
Client_protocol *float64
Channel_sync_interval *string
Terminate_attempts *int
}
// FreeSWITCHAgent config section

View File

@@ -99,29 +99,29 @@ func (self *FsConnCfg) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error {
}
type SessionSCfg struct {
Enabled bool
ListenBijson string
ChargerSConns []*RemoteHost
RALsConns []*RemoteHost
ResSConns []*RemoteHost
ThreshSConns []*RemoteHost
StatSConns []*RemoteHost
SupplSConns []*RemoteHost
AttrSConns []*RemoteHost
CDRsConns []*RemoteHost
SessionReplicationConns []*RemoteHost
DebitInterval time.Duration
StoreSCosts bool
MinCallDuration time.Duration
MaxCallDuration time.Duration
SessionTTL time.Duration
SessionTTLMaxDelay *time.Duration
SessionTTLLastUsed *time.Duration
SessionTTLUsage *time.Duration
SessionIndexes utils.StringMap
ClientProtocol float64
ChannelSyncInterval time.Duration
TerminateAttempts int
Enabled bool
ListenBijson string
ChargerSConns []*RemoteHost
RALsConns []*RemoteHost
ResSConns []*RemoteHost
ThreshSConns []*RemoteHost
StatSConns []*RemoteHost
SupplSConns []*RemoteHost
AttrSConns []*RemoteHost
CDRsConns []*RemoteHost
ReplicationConns []*RemoteHost
DebitInterval time.Duration
StoreSCosts bool
MinCallDuration time.Duration
MaxCallDuration time.Duration
SessionTTL time.Duration
SessionTTLMaxDelay *time.Duration
SessionTTLLastUsed *time.Duration
SessionTTLUsage *time.Duration
SessionIndexes utils.StringMap
ClientProtocol float64
ChannelSyncInterval time.Duration
TerminateAttempts int
}
func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
@@ -190,11 +190,11 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) {
self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Session_replication_conns != nil {
self.SessionReplicationConns = make([]*RemoteHost, len(*jsnCfg.Session_replication_conns))
for idx, jsnHaCfg := range *jsnCfg.Session_replication_conns {
self.SessionReplicationConns[idx] = NewDfltRemoteHost()
self.SessionReplicationConns[idx].loadFromJsonCfg(jsnHaCfg)
if jsnCfg.Replication_conns != nil {
self.ReplicationConns = make([]*RemoteHost, len(*jsnCfg.Replication_conns))
for idx, jsnHaCfg := range *jsnCfg.Replication_conns {
self.ReplicationConns[idx] = NewDfltRemoteHost()
self.ReplicationConns[idx].loadFromJsonCfg(jsnHaCfg)
}
}
if jsnCfg.Debit_interval != nil {

View File

@@ -88,7 +88,7 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) {
"stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013>
"suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013>
"attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013>
"session_replication_conns": [], // replicate sessions towards these session services
"replication_conns": [], // replicate sessions towards these session services
"debit_interval": "0s", // interval to perform debits on.
"min_call_duration": "0s", // only authorize calls with allowed duration higher than this
"max_call_duration": "3h", // maximum call duration a prepaid call can last
@@ -102,19 +102,19 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) {
},
}`
expected = SessionSCfg{
ListenBijson: "127.0.0.1:2014",
ChargerSConns: []*RemoteHost{},
RALsConns: []*RemoteHost{{Address: "*internal"}},
ResSConns: []*RemoteHost{},
ThreshSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
SupplSConns: []*RemoteHost{},
AttrSConns: []*RemoteHost{},
CDRsConns: []*RemoteHost{{Address: "*internal"}},
SessionReplicationConns: []*RemoteHost{},
MaxCallDuration: time.Duration(3 * time.Hour),
SessionIndexes: map[string]bool{},
ClientProtocol: 1,
ListenBijson: "127.0.0.1:2014",
ChargerSConns: []*RemoteHost{},
RALsConns: []*RemoteHost{{Address: "*internal"}},
ResSConns: []*RemoteHost{},
ThreshSConns: []*RemoteHost{},
StatSConns: []*RemoteHost{},
SupplSConns: []*RemoteHost{},
AttrSConns: []*RemoteHost{},
CDRsConns: []*RemoteHost{{Address: "*internal"}},
ReplicationConns: []*RemoteHost{},
MaxCallDuration: time.Duration(3 * time.Hour),
SessionIndexes: map[string]bool{},
ClientProtocol: 1,
}
if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil {
t.Error(err)

View File

@@ -328,7 +328,7 @@
// "stats_conns": [], // connections to StatS for reporting session events <""|*internal|127.0.0.1:2013>
// "suppliers_conns": [], // connections to SupplierS for querying suppliers for event <""|*internal|127.0.0.1:2013>
// "attributes_conns": [], // connections to AttributeS for altering event fields <""|*internal|127.0.0.1:2013>
// "session_replication_conns": [], // replicate sessions towards these session services
// "replication_conns": [], // replicate sessions towards these session services
// "debit_interval": "0s", // interval to perform debits on.
// "store_session_costs": false, // enable storing of the session costs within CDRs
// "min_call_duration": "0s", // only authorize calls with allowed duration higher than this

View File

@@ -42,7 +42,7 @@
"sessions": {
"enabled": true,
"session_replication_conns": [
"replication_conns": [
{"address": "127.0.0.1:22012", "transport": "*json"},
],
"rals_conns": [

View File

@@ -43,7 +43,7 @@
"sessions": {
"enabled": true, // starts SessionManager service: <true|false>
"listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests
"session_replication_conns": [
"replication_conns": [
{"address": "127.0.0.1:2012", "transport": "*json"},
],
"rals_conns": [

View File

@@ -284,14 +284,16 @@ cgrates.org,ALL1,127.0.0.1:3012,*json,false
)
func InitDataDb(cfg *config.CGRConfig) error {
dm, err := ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
d, err := NewDataDBConn(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
cfg.CacheCfg(), cfg.DataDbCfg().DataDbSentinelName)
cfg.DataDbCfg().DataDbSentinelName)
if err != nil {
return err
}
dm := NewDataManager(d, cfg.CacheCfg())
if err := dm.DataDB().Flush(""); err != nil {
return err
}

View File

@@ -43,24 +43,30 @@ func TestLoaderITConnDataDbs(t *testing.T) {
lCfg, _ = config.NewDefaultCGRConfig()
lCfg.StorDbCfg().Password = "CGRateS.org"
var err error
if dataDbCsv, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType,
dbConn, err := NewDataDBConn(lCfg.DataDbCfg().DataDbType,
lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "7",
lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass,
lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil {
lCfg.GeneralCfg().DBDataEncoding, "")
if err != nil {
t.Fatal("Error on dataDb connection: ", err.Error())
}
if dataDbStor, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType,
dataDbCsv = NewDataManager(dbConn, nil)
dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType,
lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "8",
lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass,
lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil {
lCfg.GeneralCfg().DBDataEncoding, "")
if err != nil {
t.Fatal("Error on dataDb connection: ", err.Error())
}
if dataDbApier, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType,
dataDbStor = NewDataManager(dbConn, nil)
dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType,
lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "9",
lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass,
lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil {
lCfg.GeneralCfg().DBDataEncoding, "")
if err != nil {
t.Fatal("Error on dataDb connection: ", err.Error())
}
dataDbApier = NewDataManager(dbConn, nil)
for _, db := range []Storage{dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB(),
dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB()} {
if err = db.Flush(""); err != nil {

View File

@@ -24,16 +24,14 @@ import (
"strings"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
)
// Various helpers to deal with database
// ConfigureDataStorage returns the DataManager using the given config
func ConfigureDataStorage(dbType, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheCfg, sentinelName string) (dm *DataManager, err error) {
var d DataDB
// NewDataDBConn creates a DataDB connection
func NewDataDBConn(dbType, host, port, name, user,
pass, marshaler, sentinelName string) (d DataDB, err error) {
switch dbType {
case utils.REDIS:
var dbNo int
@@ -61,11 +59,11 @@ func ConfigureDataStorage(dbType, host, port, name, user, pass, marshaler string
if err != nil {
return nil, err
}
return NewDataManager(d, cacheCfg), nil
return
}
// ConfigureStorStorage returns a StorDB(implements Storage interface) based on dbType
func ConfigureStorStorage(dbType, host, port, name, user, pass, sslmode string,
// NewStorDBConn returns a StorDB(implements Storage interface) based on dbType
func NewStorDBConn(dbType, host, port, name, user, pass, sslmode string,
maxConn, maxIdleConn, connMaxLifetime int,
stringIndexedFields, prefixIndexedFields []string) (db StorDB, err error) {
switch dbType {

View File

@@ -45,14 +45,16 @@ func TestVersionsITMongo(t *testing.T) {
if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmongo")); err != nil {
t.Fatal(err)
}
if dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
cfg.CacheCfg(), ""); err != nil {
"")
if err != nil {
log.Fatal(err)
}
storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type,
dm3 = NewDataManager(dbConn, cfg.CacheCfg())
storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode,
@@ -73,15 +75,17 @@ func TestVersionsITRedisMYSQL(t *testing.T) {
if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmysql")); err != nil {
t.Fatal(err)
}
dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, cfg.CacheCfg(), "")
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
"")
if err != nil {
log.Fatal(err)
}
dm3 = NewDataManager(dbConn, cfg.CacheCfg())
storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type,
storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode,
@@ -102,14 +106,16 @@ func TestVersionsITRedisPostgres(t *testing.T) {
if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "storage", "postgres")); err != nil {
t.Fatal(err)
}
dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType,
dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType,
cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort,
cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser,
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, cfg.CacheCfg(), "")
cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding,
"")
if err != nil {
log.Fatal(err)
}
storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type,
dm3 = NewDataManager(dbConn, cfg.CacheCfg())
storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type,
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode,

View File

@@ -29,13 +29,14 @@ import (
func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string,
cacheCfg config.CacheCfg, sentinelName string) (db MigratorDataDB, err error) {
dm, err := engine.ConfigureDataStorage(db_type,
dbCon, err := engine.NewDataDBConn(db_type,
host, port, name, user, pass, marshaler,
cacheCfg, sentinelName)
var d MigratorDataDB
sentinelName)
if err != nil {
return nil, err
}
dm := engine.NewDataManager(dbCon, cacheCfg)
var d MigratorDataDB
switch db_type {
case utils.REDIS:
d = newRedisMigrator(dm)
@@ -56,7 +57,7 @@ func NewMigratorStorDB(db_type, host, port, name, user, pass, sslmode string,
maxConn, maxIdleConn, connMaxLifetime int,
stringIndexedFields, prefixIndexedFields []string) (db MigratorStorDB, err error) {
var d MigratorStorDB
storDb, err := engine.ConfigureStorStorage(db_type, host, port, name, user, pass, sslmode,
storDb, err := engine.NewStorDBConn(db_type, host, port, name, user, pass, sslmode,
maxConn, maxIdleConn, connMaxLifetime, stringIndexedFields, prefixIndexedFields)
if err != nil {
return nil, err

View File

@@ -55,11 +55,11 @@ func (db *DataDBService) Start() (err error) {
db.Lock()
defer db.Unlock()
db.oldDBCfg = db.cfg.DataDbCfg().Clone()
db.db, err = engine.ConfigureDataStorage(db.cfg.DataDbCfg().DataDbType,
d, err := engine.NewDataDBConn(db.cfg.DataDbCfg().DataDbType,
db.cfg.DataDbCfg().DataDbHost, db.cfg.DataDbCfg().DataDbPort,
db.cfg.DataDbCfg().DataDbName, db.cfg.DataDbCfg().DataDbUser,
db.cfg.DataDbCfg().DataDbPass, db.cfg.GeneralCfg().DBDataEncoding,
db.cfg.CacheCfg(), db.cfg.DataDbCfg().DataDbSentinelName)
db.cfg.DataDbCfg().DataDbSentinelName)
if db.needsDB() && err != nil { // Cannot configure getter database, show stopper
utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err))
return
@@ -67,6 +67,7 @@ func (db *DataDBService) Start() (err error) {
utils.Logger.Warning(fmt.Sprintf("Could not configure dataDb: %s.Some SessionS APIs will not work", err))
return
}
db.db = engine.NewDataManager(d, db.cfg.CacheCfg())
engine.SetDataStorage(db.db)
if err = engine.CheckVersions(db.db.DataDB()); err != nil {
fmt.Println(err)

View File

@@ -137,7 +137,7 @@ func (smg *SessionService) Start() (err error) {
return
}
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns,
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns,
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
smg.cfg.GeneralCfg().ReplyTimeout)
if err != nil {
@@ -244,7 +244,7 @@ func (smg *SessionService) Reload() (err error) {
return
}
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns,
sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns,
smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout,
smg.cfg.GeneralCfg().ReplyTimeout)
if err != nil {