diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 51a5a7f1c..e91fe3c6c 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -463,7 +463,7 @@ func main() { } } if cfg.RalsCfg().Enabled || cfg.CdrsCfg().Enabled { - storDb, err := engine.ConfigureStorStorage(cfg.StorDbCfg().Type, + storDb, err := engine.NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode, diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index 2928194fe..075626546 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -231,13 +231,15 @@ func main() { } if !*toStorDB { - if dm, err = engine.ConfigureDataStorage(ldrCfg.DataDbCfg().DataDbType, + d, err := engine.NewDataDBConn(ldrCfg.DataDbCfg().DataDbType, ldrCfg.DataDbCfg().DataDbHost, ldrCfg.DataDbCfg().DataDbPort, ldrCfg.DataDbCfg().DataDbName, ldrCfg.DataDbCfg().DataDbUser, ldrCfg.DataDbCfg().DataDbPass, ldrCfg.GeneralCfg().DBDataEncoding, - config.CgrConfig().CacheCfg(), ldrCfg.DataDbCfg().DataDbSentinelName); err != nil { + ldrCfg.DataDbCfg().DataDbSentinelName) + if err != nil { log.Fatalf("Coud not open dataDB connection: %s", err.Error()) } + dm = engine.NewDataManager(d, config.CgrConfig().CacheCfg()) defer dm.DataDB().Close() } diff --git a/cmd/cgr-tester/cgr-tester.go b/cmd/cgr-tester/cgr-tester.go index 2d255c85c..3d19d288c 100644 --- a/cmd/cgr-tester/cgr-tester.go +++ b/cmd/cgr-tester/cgr-tester.go @@ -71,14 +71,15 @@ var ( ) func durInternalRater(cd *engine.CallDescriptor) (time.Duration, error) { - dm, err := engine.ConfigureDataStorage(tstCfg.DataDbCfg().DataDbType, + dbConn, err := engine.NewDataDBConn(tstCfg.DataDbCfg().DataDbType, tstCfg.DataDbCfg().DataDbHost, tstCfg.DataDbCfg().DataDbPort, tstCfg.DataDbCfg().DataDbName, tstCfg.DataDbCfg().DataDbUser, tstCfg.DataDbCfg().DataDbPass, tstCfg.GeneralCfg().DBDataEncoding, - cgrConfig.CacheCfg(), tstCfg.DataDbCfg().DataDbSentinelName) // for the momentn we use here "" for sentinelName + tstCfg.DataDbCfg().DataDbSentinelName) if err != nil { return nilDuration, fmt.Errorf("Could not connect to data database: %s", err.Error()) } + dm := engine.NewDataManager(dbConn, cgrConfig.CacheCfg()) // for the momentn we use here "" for sentinelName defer dm.DataDB().Close() engine.SetDataStorage(dm) if err := dm.LoadDataDBCache(nil, nil, nil, nil, nil, nil, nil, nil, diff --git a/config/config_defaults.go b/config/config_defaults.go index 0d6142af7..4d127292f 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -62,8 +62,8 @@ const CGRATES_CFG_JSON = ` "db_password": "", // password to use when connecting to data_db "redis_sentinel":"", // the name of sentinel when used "query_timeout":"10s", - "remote_db_urls":[], - "replicate_db_urls":[], + "remote_conns":[], + "replication_conns":[], }, @@ -351,7 +351,7 @@ const CGRATES_CFG_JSON = ` "stats_conns": [], // connections to StatS for reporting session events <""|*internal|127.0.0.1:2013> "suppliers_conns": [], // connections to SupplierS for querying suppliers for event <""|*internal|127.0.0.1:2013> "attributes_conns": [], // connections to AttributeS for altering event fields <""|*internal|127.0.0.1:2013> - "session_replication_conns": [], // replicate sessions towards these session services + "replication_conns": [], // replicate sessions towards these session services "debit_interval": "0s", // interval to perform debits on. "store_session_costs": false, // enable storing of the session costs within CDRs "min_call_duration": "0s", // only authorize calls with allowed duration higher than this diff --git a/config/config_it_test.go b/config/config_it_test.go index b929cc1c1..c86cbad7a 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -417,11 +417,11 @@ func TestCGRConfigReloadSessionS(t *testing.T) { }, }, - SessionReplicationConns: []*RemoteHost{}, - MaxCallDuration: 3 * time.Hour, - SessionIndexes: utils.NewStringMap(), - ClientProtocol: 1, - TerminateAttempts: 5, + ReplicationConns: []*RemoteHost{}, + MaxCallDuration: 3 * time.Hour, + SessionIndexes: utils.NewStringMap(), + ClientProtocol: 1, + TerminateAttempts: 5, } if !reflect.DeepEqual(expAttr, cfg.SessionSCfg()) { t.Errorf("Expected %s , received: %s ", utils.ToJSON(expAttr), utils.ToJSON(cfg.SessionSCfg())) diff --git a/config/config_json_test.go b/config/config_json_test.go index 9ca79dc04..6edae14b5 100755 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -197,8 +197,8 @@ func TestDfDataDbJsonCfg(t *testing.T) { Db_password: utils.StringPointer(""), Redis_sentinel: utils.StringPointer(""), Query_timeout: utils.StringPointer("10s"), - Replicate_db_urls: &[]string{}, - Remote_db_urls: &[]string{}, + Replication_conns: &[]*DbJsonCfg{}, + Remote_conns: &[]*DbJsonCfg{}, } if cfg, err := dfCgrJsonCfg.DbJsonCfg(DATADB_JSN); err != nil { t.Error(err) @@ -480,26 +480,26 @@ func TestDfCdrcJsonCfg(t *testing.T) { func TestSmgJsonCfg(t *testing.T) { eCfg := &SessionSJsonCfg{ - Enabled: utils.BoolPointer(false), - Listen_bijson: utils.StringPointer("127.0.0.1:2014"), - Chargers_conns: &[]*RemoteHostJson{}, - Rals_conns: &[]*RemoteHostJson{}, - Cdrs_conns: &[]*RemoteHostJson{}, - Resources_conns: &[]*RemoteHostJson{}, - Thresholds_conns: &[]*RemoteHostJson{}, - Stats_conns: &[]*RemoteHostJson{}, - Suppliers_conns: &[]*RemoteHostJson{}, - Attributes_conns: &[]*RemoteHostJson{}, - Session_replication_conns: &[]*RemoteHostJson{}, - Debit_interval: utils.StringPointer("0s"), - Store_session_costs: utils.BoolPointer(false), - Min_call_duration: utils.StringPointer("0s"), - Max_call_duration: utils.StringPointer("3h"), - Session_ttl: utils.StringPointer("0s"), - Session_indexes: &[]string{}, - Client_protocol: utils.Float64Pointer(1.0), - Channel_sync_interval: utils.StringPointer("0"), - Terminate_attempts: utils.IntPointer(5), + Enabled: utils.BoolPointer(false), + Listen_bijson: utils.StringPointer("127.0.0.1:2014"), + Chargers_conns: &[]*RemoteHostJson{}, + Rals_conns: &[]*RemoteHostJson{}, + Cdrs_conns: &[]*RemoteHostJson{}, + Resources_conns: &[]*RemoteHostJson{}, + Thresholds_conns: &[]*RemoteHostJson{}, + Stats_conns: &[]*RemoteHostJson{}, + Suppliers_conns: &[]*RemoteHostJson{}, + Attributes_conns: &[]*RemoteHostJson{}, + Replication_conns: &[]*RemoteHostJson{}, + Debit_interval: utils.StringPointer("0s"), + Store_session_costs: utils.BoolPointer(false), + Min_call_duration: utils.StringPointer("0s"), + Max_call_duration: utils.StringPointer("3h"), + Session_ttl: utils.StringPointer("0s"), + Session_indexes: &[]string{}, + Client_protocol: utils.Float64Pointer(1.0), + Channel_sync_interval: utils.StringPointer("0"), + Terminate_attempts: utils.IntPointer(5), } if cfg, err := dfCgrJsonCfg.SessionSJsonCfg(); err != nil { t.Error(err) diff --git a/config/config_test.go b/config/config_test.go index e1ffd8fb6..29057d5d6 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -630,26 +630,26 @@ func TestCgrCfgJSONDefaultsCdreProfiles(t *testing.T) { func TestCgrCfgJSONDefaultsSMGenericCfg(t *testing.T) { eSessionSCfg := &SessionSCfg{ - Enabled: false, - ListenBijson: "127.0.0.1:2014", - ChargerSConns: []*RemoteHost{}, - RALsConns: []*RemoteHost{}, - CDRsConns: []*RemoteHost{}, - ResSConns: []*RemoteHost{}, - ThreshSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, - SupplSConns: []*RemoteHost{}, - AttrSConns: []*RemoteHost{}, - SessionReplicationConns: []*RemoteHost{}, - DebitInterval: 0 * time.Second, - StoreSCosts: false, - MinCallDuration: 0 * time.Second, - MaxCallDuration: 3 * time.Hour, - SessionTTL: 0 * time.Second, - SessionIndexes: utils.StringMap{}, - ClientProtocol: 1.0, - ChannelSyncInterval: 0, - TerminateAttempts: 5, + Enabled: false, + ListenBijson: "127.0.0.1:2014", + ChargerSConns: []*RemoteHost{}, + RALsConns: []*RemoteHost{}, + CDRsConns: []*RemoteHost{}, + ResSConns: []*RemoteHost{}, + ThreshSConns: []*RemoteHost{}, + StatSConns: []*RemoteHost{}, + SupplSConns: []*RemoteHost{}, + AttrSConns: []*RemoteHost{}, + ReplicationConns: []*RemoteHost{}, + DebitInterval: 0 * time.Second, + StoreSCosts: false, + MinCallDuration: 0 * time.Second, + MaxCallDuration: 3 * time.Hour, + SessionTTL: 0 * time.Second, + SessionIndexes: utils.StringMap{}, + ClientProtocol: 1.0, + ChannelSyncInterval: 0, + TerminateAttempts: 5, } if !reflect.DeepEqual(eSessionSCfg, cgrCfg.sessionSCfg) { t.Errorf("expecting: %s, received: %s", diff --git a/config/datadbcfg.go b/config/datadbcfg.go index 85994acc7..2cd07905f 100644 --- a/config/datadbcfg.go +++ b/config/datadbcfg.go @@ -75,24 +75,16 @@ func (dbcfg *DataDbCfg) loadFromJsonCfg(jsnDbCfg *DbJsonCfg) (err error) { return err } } - if jsnDbCfg.Remote_db_urls != nil { - dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Remote_db_urls)) - for i, url := range *jsnDbCfg.Remote_db_urls { - db, err := newDataDBCfgFromUrl(url) - if err != nil { - return err - } - dbcfg.RmtDataDBCfgs[i] = db + if jsnDbCfg.Remote_conns != nil { + dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Remote_conns)) + for i, cfg := range *jsnDbCfg.Remote_conns { + dbcfg.RmtDataDBCfgs[i].loadFromJsonCfg(cfg) } } - if jsnDbCfg.Replicate_db_urls != nil { - dbcfg.RplDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Replicate_db_urls)) - for i, url := range *jsnDbCfg.Replicate_db_urls { - db, err := newDataDBCfgFromUrl(url) - if err != nil { - return err - } - dbcfg.RplDataDBCfgs[i] = db + if jsnDbCfg.Replication_conns != nil { + dbcfg.RmtDataDBCfgs = make([]*DataDbCfg, len(*jsnDbCfg.Replication_conns)) + for i, cfg := range *jsnDbCfg.Replication_conns { + dbcfg.RplDataDBCfgs[i].loadFromJsonCfg(cfg) } } return nil @@ -111,40 +103,3 @@ func (dbcfg *DataDbCfg) Clone() *DataDbCfg { QueryTimeout: dbcfg.QueryTimeout, } } - -//newDataDBCfgFromUrl will create a DataDB configuration out of url -//Format: host:port/?type=valOfType&name=valOFName&etc... -//Sample: 127.0.0.1:6379 -func newDataDBCfgFromUrl(pUrl string) (newDbCfg *DataDbCfg, err error) { - newDbCfg = new(DataDbCfg) - if pUrl == utils.EmptyString { - return nil, utils.ErrMandatoryIeMissing - } - // populate with default dataDBCfg and overwrite in case we found arguments in url - dfltCfg, _ := NewDefaultCGRConfig() - *newDbCfg = *dfltCfg.dataDbCfg - hostPortSls := strings.Split(strings.Split(pUrl, utils.Slash)[0], utils.InInFieldSep) - newDbCfg.DataDbHost = hostPortSls[0] - newDbCfg.DataDbPort = hostPortSls[1] - arg := utils.GetUrlRawArguments(pUrl) - if val, has := arg[utils.TypeLow]; has { - newDbCfg.DataDbType = strings.TrimPrefix(val, "*") - } - if val, has := arg[utils.UserLow]; has { - newDbCfg.DataDbUser = val - } - if val, has := arg[utils.PassLow]; has { - newDbCfg.DataDbPass = val - } - if val, has := arg[utils.SentinelLow]; has { - newDbCfg.DataDbSentinelName = val - } - if val, has := arg[utils.QueryLow]; has { - dur, err := utils.ParseDurationWithNanosecs(val) - if err != nil { - return nil, err - } - newDbCfg.QueryTimeout = dur - } - return -} diff --git a/config/datadbcfg_test.go b/config/datadbcfg_test.go index ff46c2c44..95a826d46 100644 --- a/config/datadbcfg_test.go +++ b/config/datadbcfg_test.go @@ -20,9 +20,6 @@ package config import ( "reflect" "testing" - "time" - - "github.com/cgrates/cgrates/utils" ) func TestDataDbCfgloadFromJsonCfg(t *testing.T) { @@ -125,65 +122,3 @@ func TestDataDbCfgloadFromJsonCfgPort(t *testing.T) { t.Errorf("Expected: %+v , recived: %+v", expected, dbcfg) } } - -func TestDataDbNewDataDbFromUrl(t *testing.T) { - if _, err := newDataDBCfgFromUrl(utils.EmptyString); err != utils.ErrMandatoryIeMissing { - t.Errorf("Expected: %+v , recived: %+v", utils.ErrMandatoryIeMissing, err) - } - - url := "127.0.0.1:1234" - expected := &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "127.0.0.1", - DataDbPort: "1234", - DataDbName: "10", - DataDbUser: "cgrates", - DataDbPass: "", - DataDbSentinelName: "", - QueryTimeout: 10 * time.Second, - RmtDataDBCfgs: []*DataDbCfg{}, - RplDataDBCfgs: []*DataDbCfg{}, - } - if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) { - t.Errorf("Error: %+v \n, expected: %+v ,\n recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv)) - } - - url = "127.0.0.1:1234/?user=test&pass=test" - expected = &DataDbCfg{ - DataDbType: utils.REDIS, - DataDbHost: "127.0.0.1", - DataDbPort: "1234", - DataDbName: "10", - DataDbUser: "test", - DataDbPass: "test", - DataDbSentinelName: "", - QueryTimeout: 10 * time.Second, - RmtDataDBCfgs: []*DataDbCfg{}, - RplDataDBCfgs: []*DataDbCfg{}, - } - if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) { - t.Errorf("Error: %+v , expected: %+v , recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv)) - } - - url = "0.0.0.0:1234/?type=*mongo" - expected = &DataDbCfg{ - DataDbType: utils.MONGO, - DataDbHost: "0.0.0.0", - DataDbPort: "1234", - DataDbName: "10", - DataDbUser: "cgrates", - DataDbPass: "", - DataDbSentinelName: "", - QueryTimeout: 10 * time.Second, - RmtDataDBCfgs: []*DataDbCfg{}, - RplDataDBCfgs: []*DataDbCfg{}, - } - if rcv, err := newDataDBCfgFromUrl(url); err != nil || !reflect.DeepEqual(rcv, expected) { - t.Errorf("Error: %+v , expected: %+v , recived: %+v", err, utils.ToJSON(expected), utils.ToJSON(rcv)) - } - - url = "0.0.0.0:1234/?type=*mongo&query=error" - if _, err := newDataDBCfgFromUrl(url); err == nil || err.Error() != "time: invalid duration error" { - t.Errorf("Expected: , recived: <%+v>", err) - } -} diff --git a/config/libconfig_json.go b/config/libconfig_json.go index a07135029..23c470bb2 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -90,8 +90,8 @@ type DbJsonCfg struct { Redis_sentinel *string Query_timeout *string Sslmode *string // Used only in case of storDb - Remote_db_urls *[]string - Replicate_db_urls *[]string + Remote_conns *[]*DbJsonCfg + Replication_conns *[]*DbJsonCfg } // Filters config @@ -203,29 +203,29 @@ type EventReaderJsonCfg struct { // SM-Generic config section type SessionSJsonCfg struct { - Enabled *bool - Listen_bijson *string - Chargers_conns *[]*RemoteHostJson - Rals_conns *[]*RemoteHostJson - Resources_conns *[]*RemoteHostJson - Thresholds_conns *[]*RemoteHostJson - Stats_conns *[]*RemoteHostJson - Suppliers_conns *[]*RemoteHostJson - Cdrs_conns *[]*RemoteHostJson - Session_replication_conns *[]*RemoteHostJson - Attributes_conns *[]*RemoteHostJson - Debit_interval *string - Store_session_costs *bool - Min_call_duration *string - Max_call_duration *string - Session_ttl *string - Session_ttl_max_delay *string - Session_ttl_last_used *string - Session_ttl_usage *string - Session_indexes *[]string - Client_protocol *float64 - Channel_sync_interval *string - Terminate_attempts *int + Enabled *bool + Listen_bijson *string + Chargers_conns *[]*RemoteHostJson + Rals_conns *[]*RemoteHostJson + Resources_conns *[]*RemoteHostJson + Thresholds_conns *[]*RemoteHostJson + Stats_conns *[]*RemoteHostJson + Suppliers_conns *[]*RemoteHostJson + Cdrs_conns *[]*RemoteHostJson + Replication_conns *[]*RemoteHostJson + Attributes_conns *[]*RemoteHostJson + Debit_interval *string + Store_session_costs *bool + Min_call_duration *string + Max_call_duration *string + Session_ttl *string + Session_ttl_max_delay *string + Session_ttl_last_used *string + Session_ttl_usage *string + Session_indexes *[]string + Client_protocol *float64 + Channel_sync_interval *string + Terminate_attempts *int } // FreeSWITCHAgent config section diff --git a/config/smconfig.go b/config/smconfig.go index 81d2ef489..46aa32507 100644 --- a/config/smconfig.go +++ b/config/smconfig.go @@ -99,29 +99,29 @@ func (self *FsConnCfg) loadFromJsonCfg(jsnCfg *FsConnJsonCfg) error { } type SessionSCfg struct { - Enabled bool - ListenBijson string - ChargerSConns []*RemoteHost - RALsConns []*RemoteHost - ResSConns []*RemoteHost - ThreshSConns []*RemoteHost - StatSConns []*RemoteHost - SupplSConns []*RemoteHost - AttrSConns []*RemoteHost - CDRsConns []*RemoteHost - SessionReplicationConns []*RemoteHost - DebitInterval time.Duration - StoreSCosts bool - MinCallDuration time.Duration - MaxCallDuration time.Duration - SessionTTL time.Duration - SessionTTLMaxDelay *time.Duration - SessionTTLLastUsed *time.Duration - SessionTTLUsage *time.Duration - SessionIndexes utils.StringMap - ClientProtocol float64 - ChannelSyncInterval time.Duration - TerminateAttempts int + Enabled bool + ListenBijson string + ChargerSConns []*RemoteHost + RALsConns []*RemoteHost + ResSConns []*RemoteHost + ThreshSConns []*RemoteHost + StatSConns []*RemoteHost + SupplSConns []*RemoteHost + AttrSConns []*RemoteHost + CDRsConns []*RemoteHost + ReplicationConns []*RemoteHost + DebitInterval time.Duration + StoreSCosts bool + MinCallDuration time.Duration + MaxCallDuration time.Duration + SessionTTL time.Duration + SessionTTLMaxDelay *time.Duration + SessionTTLLastUsed *time.Duration + SessionTTLUsage *time.Duration + SessionIndexes utils.StringMap + ClientProtocol float64 + ChannelSyncInterval time.Duration + TerminateAttempts int } func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) { @@ -190,11 +190,11 @@ func (self *SessionSCfg) loadFromJsonCfg(jsnCfg *SessionSJsonCfg) (err error) { self.CDRsConns[idx].loadFromJsonCfg(jsnHaCfg) } } - if jsnCfg.Session_replication_conns != nil { - self.SessionReplicationConns = make([]*RemoteHost, len(*jsnCfg.Session_replication_conns)) - for idx, jsnHaCfg := range *jsnCfg.Session_replication_conns { - self.SessionReplicationConns[idx] = NewDfltRemoteHost() - self.SessionReplicationConns[idx].loadFromJsonCfg(jsnHaCfg) + if jsnCfg.Replication_conns != nil { + self.ReplicationConns = make([]*RemoteHost, len(*jsnCfg.Replication_conns)) + for idx, jsnHaCfg := range *jsnCfg.Replication_conns { + self.ReplicationConns[idx] = NewDfltRemoteHost() + self.ReplicationConns[idx].loadFromJsonCfg(jsnHaCfg) } } if jsnCfg.Debit_interval != nil { diff --git a/config/smconfig_test.go b/config/smconfig_test.go index c14344ea3..689887151 100644 --- a/config/smconfig_test.go +++ b/config/smconfig_test.go @@ -88,7 +88,7 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) { "stats_conns": [], // address where to reach the StatS <""|*internal|127.0.0.1:2013> "suppliers_conns": [], // address where to reach the SupplierS <""|*internal|127.0.0.1:2013> "attributes_conns": [], // address where to reach the AttributeS <""|*internal|127.0.0.1:2013> - "session_replication_conns": [], // replicate sessions towards these session services + "replication_conns": [], // replicate sessions towards these session services "debit_interval": "0s", // interval to perform debits on. "min_call_duration": "0s", // only authorize calls with allowed duration higher than this "max_call_duration": "3h", // maximum call duration a prepaid call can last @@ -102,19 +102,19 @@ func TestSessionSCfgloadFromJsonCfg(t *testing.T) { }, }` expected = SessionSCfg{ - ListenBijson: "127.0.0.1:2014", - ChargerSConns: []*RemoteHost{}, - RALsConns: []*RemoteHost{{Address: "*internal"}}, - ResSConns: []*RemoteHost{}, - ThreshSConns: []*RemoteHost{}, - StatSConns: []*RemoteHost{}, - SupplSConns: []*RemoteHost{}, - AttrSConns: []*RemoteHost{}, - CDRsConns: []*RemoteHost{{Address: "*internal"}}, - SessionReplicationConns: []*RemoteHost{}, - MaxCallDuration: time.Duration(3 * time.Hour), - SessionIndexes: map[string]bool{}, - ClientProtocol: 1, + ListenBijson: "127.0.0.1:2014", + ChargerSConns: []*RemoteHost{}, + RALsConns: []*RemoteHost{{Address: "*internal"}}, + ResSConns: []*RemoteHost{}, + ThreshSConns: []*RemoteHost{}, + StatSConns: []*RemoteHost{}, + SupplSConns: []*RemoteHost{}, + AttrSConns: []*RemoteHost{}, + CDRsConns: []*RemoteHost{{Address: "*internal"}}, + ReplicationConns: []*RemoteHost{}, + MaxCallDuration: time.Duration(3 * time.Hour), + SessionIndexes: map[string]bool{}, + ClientProtocol: 1, } if jsnCfg, err := NewCgrJsonCfgFromBytes([]byte(cfgJSONStr)); err != nil { t.Error(err) diff --git a/data/conf/cgrates/cgrates.json b/data/conf/cgrates/cgrates.json index 6cfbf251c..e252e6e51 100644 --- a/data/conf/cgrates/cgrates.json +++ b/data/conf/cgrates/cgrates.json @@ -328,7 +328,7 @@ // "stats_conns": [], // connections to StatS for reporting session events <""|*internal|127.0.0.1:2013> // "suppliers_conns": [], // connections to SupplierS for querying suppliers for event <""|*internal|127.0.0.1:2013> // "attributes_conns": [], // connections to AttributeS for altering event fields <""|*internal|127.0.0.1:2013> -// "session_replication_conns": [], // replicate sessions towards these session services +// "replication_conns": [], // replicate sessions towards these session services // "debit_interval": "0s", // interval to perform debits on. // "store_session_costs": false, // enable storing of the session costs within CDRs // "min_call_duration": "0s", // only authorize calls with allowed duration higher than this diff --git a/data/conf/samples/smgreplcmaster/cgrates.json b/data/conf/samples/smgreplcmaster/cgrates.json index 1e023e1b6..2721bb0d4 100644 --- a/data/conf/samples/smgreplcmaster/cgrates.json +++ b/data/conf/samples/smgreplcmaster/cgrates.json @@ -42,7 +42,7 @@ "sessions": { "enabled": true, - "session_replication_conns": [ + "replication_conns": [ {"address": "127.0.0.1:22012", "transport": "*json"}, ], "rals_conns": [ diff --git a/data/conf/samples/smgreplcslave/cgrates.json b/data/conf/samples/smgreplcslave/cgrates.json index 192168c71..7a6ff18b9 100644 --- a/data/conf/samples/smgreplcslave/cgrates.json +++ b/data/conf/samples/smgreplcslave/cgrates.json @@ -43,7 +43,7 @@ "sessions": { "enabled": true, // starts SessionManager service: "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests - "session_replication_conns": [ + "replication_conns": [ {"address": "127.0.0.1:2012", "transport": "*json"}, ], "rals_conns": [ diff --git a/engine/libtest.go b/engine/libtest.go index 13dd56467..cf537597f 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -284,14 +284,16 @@ cgrates.org,ALL1,127.0.0.1:3012,*json,false ) func InitDataDb(cfg *config.CGRConfig) error { - dm, err := ConfigureDataStorage(cfg.DataDbCfg().DataDbType, + d, err := NewDataDBConn(cfg.DataDbCfg().DataDbType, cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, - cfg.CacheCfg(), cfg.DataDbCfg().DataDbSentinelName) + cfg.DataDbCfg().DataDbSentinelName) if err != nil { return err } + dm := NewDataManager(d, cfg.CacheCfg()) + if err := dm.DataDB().Flush(""); err != nil { return err } diff --git a/engine/loader_it_test.go b/engine/loader_it_test.go index cbe93c9be..27cf0ac2b 100644 --- a/engine/loader_it_test.go +++ b/engine/loader_it_test.go @@ -43,24 +43,30 @@ func TestLoaderITConnDataDbs(t *testing.T) { lCfg, _ = config.NewDefaultCGRConfig() lCfg.StorDbCfg().Password = "CGRateS.org" var err error - if dataDbCsv, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType, + dbConn, err := NewDataDBConn(lCfg.DataDbCfg().DataDbType, lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "7", lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass, - lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil { + lCfg.GeneralCfg().DBDataEncoding, "") + if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } - if dataDbStor, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType, + dataDbCsv = NewDataManager(dbConn, nil) + dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType, lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "8", lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass, - lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil { + lCfg.GeneralCfg().DBDataEncoding, "") + if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } - if dataDbApier, err = ConfigureDataStorage(lCfg.DataDbCfg().DataDbType, + dataDbStor = NewDataManager(dbConn, nil) + dbConn, err = NewDataDBConn(lCfg.DataDbCfg().DataDbType, lCfg.DataDbCfg().DataDbHost, lCfg.DataDbCfg().DataDbPort, "9", lCfg.DataDbCfg().DataDbUser, lCfg.DataDbCfg().DataDbPass, - lCfg.GeneralCfg().DBDataEncoding, nil, ""); err != nil { + lCfg.GeneralCfg().DBDataEncoding, "") + if err != nil { t.Fatal("Error on dataDb connection: ", err.Error()) } + dataDbApier = NewDataManager(dbConn, nil) for _, db := range []Storage{dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB(), dataDbCsv.DataDB(), dataDbStor.DataDB(), dataDbApier.DataDB()} { if err = db.Flush(""); err != nil { diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 1ece45035..59b1453ce 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -24,16 +24,14 @@ import ( "strings" "time" - "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" ) // Various helpers to deal with database -// ConfigureDataStorage returns the DataManager using the given config -func ConfigureDataStorage(dbType, host, port, name, user, pass, marshaler string, - cacheCfg config.CacheCfg, sentinelName string) (dm *DataManager, err error) { - var d DataDB +// NewDataDBConn creates a DataDB connection +func NewDataDBConn(dbType, host, port, name, user, + pass, marshaler, sentinelName string) (d DataDB, err error) { switch dbType { case utils.REDIS: var dbNo int @@ -61,11 +59,11 @@ func ConfigureDataStorage(dbType, host, port, name, user, pass, marshaler string if err != nil { return nil, err } - return NewDataManager(d, cacheCfg), nil + return } -// ConfigureStorStorage returns a StorDB(implements Storage interface) based on dbType -func ConfigureStorStorage(dbType, host, port, name, user, pass, sslmode string, +// NewStorDBConn returns a StorDB(implements Storage interface) based on dbType +func NewStorDBConn(dbType, host, port, name, user, pass, sslmode string, maxConn, maxIdleConn, connMaxLifetime int, stringIndexedFields, prefixIndexedFields []string) (db StorDB, err error) { switch dbType { diff --git a/engine/versions_it_test.go b/engine/versions_it_test.go index aeace66ba..76eb265e7 100644 --- a/engine/versions_it_test.go +++ b/engine/versions_it_test.go @@ -45,14 +45,16 @@ func TestVersionsITMongo(t *testing.T) { if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmongo")); err != nil { t.Fatal(err) } - if dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType, + dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType, cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, - cfg.CacheCfg(), ""); err != nil { + "") + if err != nil { log.Fatal(err) } - storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type, + dm3 = NewDataManager(dbConn, cfg.CacheCfg()) + storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode, @@ -73,15 +75,17 @@ func TestVersionsITRedisMYSQL(t *testing.T) { if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "tutmysql")); err != nil { t.Fatal(err) } - dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType, + dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType, cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, - cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, cfg.CacheCfg(), "") + cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, + "") if err != nil { log.Fatal(err) } + dm3 = NewDataManager(dbConn, cfg.CacheCfg()) - storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type, + storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode, @@ -102,14 +106,16 @@ func TestVersionsITRedisPostgres(t *testing.T) { if cfg, err = config.NewCGRConfigFromPath(path.Join(*dataDir, "conf", "samples", "storage", "postgres")); err != nil { t.Fatal(err) } - dm3, err = ConfigureDataStorage(cfg.DataDbCfg().DataDbType, + dbConn, err := NewDataDBConn(cfg.DataDbCfg().DataDbType, cfg.DataDbCfg().DataDbHost, cfg.DataDbCfg().DataDbPort, cfg.DataDbCfg().DataDbName, cfg.DataDbCfg().DataDbUser, - cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, cfg.CacheCfg(), "") + cfg.DataDbCfg().DataDbPass, cfg.GeneralCfg().DBDataEncoding, + "") if err != nil { log.Fatal(err) } - storageDb, err = ConfigureStorStorage(cfg.StorDbCfg().Type, + dm3 = NewDataManager(dbConn, cfg.CacheCfg()) + storageDb, err = NewStorDBConn(cfg.StorDbCfg().Type, cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, cfg.StorDbCfg().Name, cfg.StorDbCfg().User, cfg.StorDbCfg().Password, cfg.StorDbCfg().SSLMode, diff --git a/migrator/migrator_utils.go b/migrator/migrator_utils.go index 2e312c158..cbdbff2b8 100644 --- a/migrator/migrator_utils.go +++ b/migrator/migrator_utils.go @@ -29,13 +29,14 @@ import ( func NewMigratorDataDB(db_type, host, port, name, user, pass, marshaler string, cacheCfg config.CacheCfg, sentinelName string) (db MigratorDataDB, err error) { - dm, err := engine.ConfigureDataStorage(db_type, + dbCon, err := engine.NewDataDBConn(db_type, host, port, name, user, pass, marshaler, - cacheCfg, sentinelName) - var d MigratorDataDB + sentinelName) if err != nil { return nil, err } + dm := engine.NewDataManager(dbCon, cacheCfg) + var d MigratorDataDB switch db_type { case utils.REDIS: d = newRedisMigrator(dm) @@ -56,7 +57,7 @@ func NewMigratorStorDB(db_type, host, port, name, user, pass, sslmode string, maxConn, maxIdleConn, connMaxLifetime int, stringIndexedFields, prefixIndexedFields []string) (db MigratorStorDB, err error) { var d MigratorStorDB - storDb, err := engine.ConfigureStorStorage(db_type, host, port, name, user, pass, sslmode, + storDb, err := engine.NewStorDBConn(db_type, host, port, name, user, pass, sslmode, maxConn, maxIdleConn, connMaxLifetime, stringIndexedFields, prefixIndexedFields) if err != nil { return nil, err diff --git a/services/datadb.go b/services/datadb.go index 523ba2fd3..fc7af8aca 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -55,11 +55,11 @@ func (db *DataDBService) Start() (err error) { db.Lock() defer db.Unlock() db.oldDBCfg = db.cfg.DataDbCfg().Clone() - db.db, err = engine.ConfigureDataStorage(db.cfg.DataDbCfg().DataDbType, + d, err := engine.NewDataDBConn(db.cfg.DataDbCfg().DataDbType, db.cfg.DataDbCfg().DataDbHost, db.cfg.DataDbCfg().DataDbPort, db.cfg.DataDbCfg().DataDbName, db.cfg.DataDbCfg().DataDbUser, db.cfg.DataDbCfg().DataDbPass, db.cfg.GeneralCfg().DBDataEncoding, - db.cfg.CacheCfg(), db.cfg.DataDbCfg().DataDbSentinelName) + db.cfg.DataDbCfg().DataDbSentinelName) if db.needsDB() && err != nil { // Cannot configure getter database, show stopper utils.Logger.Crit(fmt.Sprintf("Could not configure dataDb: %s exiting!", err)) return @@ -67,6 +67,7 @@ func (db *DataDBService) Start() (err error) { utils.Logger.Warning(fmt.Sprintf("Could not configure dataDb: %s.Some SessionS APIs will not work", err)) return } + db.db = engine.NewDataManager(d, db.cfg.CacheCfg()) engine.SetDataStorage(db.db) if err = engine.CheckVersions(db.db.DataDB()); err != nil { fmt.Println(err) diff --git a/services/sessions.go b/services/sessions.go index 0d45a39bc..47cc2b7fd 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -137,7 +137,7 @@ func (smg *SessionService) Start() (err error) { return } - sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns, + sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns, smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, smg.cfg.GeneralCfg().ReplyTimeout) if err != nil { @@ -244,7 +244,7 @@ func (smg *SessionService) Reload() (err error) { return } - sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().SessionReplicationConns, + sReplConns, err := sessions.NewSReplConns(smg.cfg.SessionSCfg().ReplicationConns, smg.cfg.GeneralCfg().Reconnects, smg.cfg.GeneralCfg().ConnectTimeout, smg.cfg.GeneralCfg().ReplyTimeout) if err != nil {