mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-16 21:59:53 +05:00
Add StorDB service
Add StorDB to config and services. Put back the store_cdrs option under cdrs and update the CDRs service to depend on StorDB. Define the StorDB interface and add a constructor for it. Add a constructor for postgres storage. Add a config sanity check to validate SSL modes for postgres. Update cgr-engine to consider StorDB on startup.
This commit is contained in:
committed by
Dan Christian Bogos
parent
0cd4ee5ca8
commit
df1dc5e838
@@ -391,7 +391,13 @@ func storeDiffSection(ctx *context.Context, section string, db ConfigDB, v1, v2
|
||||
if err = db.GetSection(ctx, section, jsn); err != nil {
|
||||
return
|
||||
}
|
||||
return db.SetSection(ctx, section, diffDataDbJsonCfg(jsn, v1.DataDbCfg(), v2.DataDbCfg()))
|
||||
return db.SetSection(ctx, section, diffDataDBJsonCfg(jsn, v1.DataDbCfg(), v2.DataDbCfg()))
|
||||
case StorDBJSON:
|
||||
jsn := new(DbJsonCfg)
|
||||
if err = db.GetSection(ctx, section, jsn); err != nil {
|
||||
return
|
||||
}
|
||||
return db.SetSection(ctx, section, diffStorDBJsonCfg(jsn, v1.StorDbCfg(), v2.StorDbCfg()))
|
||||
case FilterSJSON:
|
||||
jsn := new(FilterSJsonCfg)
|
||||
if err = db.GetSection(ctx, section, jsn); err != nil {
|
||||
|
||||
@@ -47,6 +47,7 @@ type CdrsOpts struct {
|
||||
type CdrsCfg struct {
|
||||
Enabled bool // Enable CDR Server service
|
||||
ExtraFields RSRParsers // Extra fields to store in CDRs
|
||||
StoreCdrs bool // store cdrs in storDb
|
||||
SMCostRetries int
|
||||
ChargerSConns []string
|
||||
AttributeSConns []string
|
||||
@@ -109,6 +110,9 @@ func (cdrscfg *CdrsCfg) loadFromJSONCfg(jsnCdrsCfg *CdrsJsonCfg) (err error) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnCdrsCfg.Store_cdrs != nil {
|
||||
cdrscfg.StoreCdrs = *jsnCdrsCfg.Store_cdrs
|
||||
}
|
||||
if jsnCdrsCfg.Session_cost_retries != nil {
|
||||
cdrscfg.SMCostRetries = *jsnCdrsCfg.Session_cost_retries
|
||||
}
|
||||
@@ -159,6 +163,7 @@ func (cdrscfg CdrsCfg) AsMapInterface(string) interface{} {
|
||||
mp := map[string]interface{}{
|
||||
utils.EnabledCfg: cdrscfg.Enabled,
|
||||
utils.SMCostRetriesCfg: cdrscfg.SMCostRetries,
|
||||
utils.StoreCdrsCfg: cdrscfg.StoreCdrs,
|
||||
utils.ExtraFieldsCfg: cdrscfg.ExtraFields.AsStringSlice(),
|
||||
utils.OnlineCDRExportsCfg: utils.CloneStringSlice(cdrscfg.OnlineCDRExports),
|
||||
utils.OptsCfg: opts,
|
||||
@@ -239,6 +244,7 @@ func (cdrscfg CdrsCfg) Clone() (cln *CdrsCfg) {
|
||||
cln = &CdrsCfg{
|
||||
Enabled: cdrscfg.Enabled,
|
||||
ExtraFields: cdrscfg.ExtraFields.Clone(),
|
||||
StoreCdrs: cdrscfg.StoreCdrs,
|
||||
SMCostRetries: cdrscfg.SMCostRetries,
|
||||
Opts: cdrscfg.Opts.Clone(),
|
||||
}
|
||||
@@ -287,6 +293,7 @@ type CdrsOptsJson struct {
|
||||
type CdrsJsonCfg struct {
|
||||
Enabled *bool
|
||||
Extra_fields *[]string
|
||||
Store_cdrs *bool
|
||||
Session_cost_retries *int
|
||||
Chargers_conns *[]string
|
||||
Attributes_conns *[]string
|
||||
@@ -340,6 +347,9 @@ func diffCdrsJsonCfg(d *CdrsJsonCfg, v1, v2 *CdrsCfg) *CdrsJsonCfg {
|
||||
if !utils.SliceStringEqual(extra1, extra2) {
|
||||
d.Extra_fields = &extra2
|
||||
}
|
||||
if v1.StoreCdrs != v2.StoreCdrs {
|
||||
d.Store_cdrs = utils.BoolPointer(v2.StoreCdrs)
|
||||
}
|
||||
if v1.SMCostRetries != v2.SMCostRetries {
|
||||
d.Session_cost_retries = utils.IntPointer(v2.SMCostRetries)
|
||||
}
|
||||
|
||||
@@ -110,6 +110,10 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
|
||||
Items: make(map[string]*ItemOpts),
|
||||
Opts: &DataDBOpts{},
|
||||
},
|
||||
storDbCfg: &StorDbCfg{
|
||||
Items: make(map[string]*ItemOpts),
|
||||
Opts: &StorDBOpts{},
|
||||
},
|
||||
tlsCfg: new(TLSCfg),
|
||||
cacheCfg: &CacheCfg{Partitions: make(map[string]*CacheParamCfg)},
|
||||
listenCfg: new(ListenCfg),
|
||||
@@ -328,6 +332,7 @@ type CGRConfig struct {
|
||||
generalCfg *GeneralCfg // General config
|
||||
loggerCfg *LoggerCfg // Logger config
|
||||
dataDbCfg *DataDbCfg // Database config
|
||||
storDbCfg *StorDbCfg // StorDb config
|
||||
tlsCfg *TLSCfg // TLS config
|
||||
cacheCfg *CacheCfg // Cache config
|
||||
listenCfg *ListenCfg // Listen config
|
||||
@@ -604,6 +609,13 @@ func (cfg *CGRConfig) DataDbCfg() *DataDbCfg {
|
||||
return cfg.dataDbCfg
|
||||
}
|
||||
|
||||
// StorDbCfg returns the config for StorDb
|
||||
func (cfg *CGRConfig) StorDbCfg() *StorDbCfg {
|
||||
cfg.lks[StorDBJSON].Lock()
|
||||
defer cfg.lks[StorDBJSON].Unlock()
|
||||
return cfg.storDbCfg
|
||||
}
|
||||
|
||||
// GeneralCfg returns the General config section
|
||||
func (cfg *CGRConfig) GeneralCfg() *GeneralCfg {
|
||||
cfg.lks[GeneralJSON].Lock()
|
||||
@@ -983,20 +995,27 @@ func (cfg *CGRConfig) reloadSections(sections ...string) {
|
||||
ChargerSJSON, ResourceSJSON, StatSJSON, ThresholdSJSON,
|
||||
RouteSJSON, LoaderSJSON, DispatcherSJSON, RateSJSON, AdminSJSON, AccountSJSON,
|
||||
ActionSJSON})
|
||||
subsystemsThatNeedStorDB := utils.NewStringSet([]string{StorDBJSON, CDRsJSON})
|
||||
needsDataDB := false
|
||||
needsStorDB := false
|
||||
for _, section := range sections {
|
||||
if !needsDataDB && subsystemsThatNeedDataDB.Has(section) {
|
||||
needsDataDB = true
|
||||
cfg.rldCh <- SectionToService[DataDBJSON] // reload datadb before
|
||||
}
|
||||
if needsDataDB {
|
||||
if !needsStorDB && subsystemsThatNeedStorDB.Has(section) {
|
||||
needsStorDB = true
|
||||
cfg.rldCh <- SectionToService[StorDBJSON] // reload stordb before
|
||||
}
|
||||
if needsDataDB && needsStorDB {
|
||||
break
|
||||
}
|
||||
}
|
||||
runtime.Gosched()
|
||||
for _, section := range sections {
|
||||
if srv := SectionToService[section]; srv != utils.EmptyString &&
|
||||
section != DataDBJSON {
|
||||
section != DataDBJSON &&
|
||||
section != StorDBJSON {
|
||||
cfg.rldCh <- srv
|
||||
}
|
||||
}
|
||||
@@ -1020,6 +1039,7 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) {
|
||||
generalCfg: cfg.generalCfg.Clone(),
|
||||
loggerCfg: cfg.loggerCfg.Clone(),
|
||||
dataDbCfg: cfg.dataDbCfg.Clone(),
|
||||
storDbCfg: cfg.storDbCfg.Clone(),
|
||||
tlsCfg: cfg.tlsCfg.Clone(),
|
||||
cacheCfg: cfg.cacheCfg.Clone(),
|
||||
listenCfg: cfg.listenCfg.Clone(),
|
||||
|
||||
@@ -173,8 +173,28 @@ const CGRATES_CFG_JSON = `
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
|
||||
"stor_db": { // database used to store offline tariff plans and CDRs
|
||||
"db_type": "*mysql", // stor database type to use: <*mongo|*mysql|*postgres|*internal>
|
||||
"db_host": "127.0.0.1", // the host to connect to
|
||||
"db_port": 3306, // the port to reach the stor_db
|
||||
"db_name": "cgrates", // stor database name
|
||||
"db_user": "cgrates", // username to use when connecting to stor_db
|
||||
"db_password": "", // password to use when connecting to stor_db
|
||||
"string_indexed_fields": [], // indexes on cdrs table to speed up queries, used in case of *mongo and *internal
|
||||
"prefix_indexed_fields": [], // prefix indexes on cdrs table to speed up queries, used in case of *internal
|
||||
"opts": {
|
||||
"sqlMaxOpenConns": 100, // maximum database connections opened, not applying for mongo
|
||||
"sqlMaxIdleConns": 10, // maximum database connections idle, not applying for mongo
|
||||
"sqlConnMaxLifetime": "0", // maximum amount of time a connection may be reused (0 for unlimited), not applying for mongo
|
||||
"mysqlDSNParams":{}, // DSN params for opening db
|
||||
"mongoQueryTimeout":"10s", // timeout for query when mongo is used
|
||||
"pgSSLMode":"disable", // ssl mode in case of *postgres
|
||||
"mysqlLocation": "Local", // the location the time from mysql is retrived
|
||||
},
|
||||
"items":{
|
||||
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false},
|
||||
},
|
||||
},
|
||||
|
||||
"listen": {
|
||||
"rpc_json": "127.0.0.1:2012", // RPC JSON listening address
|
||||
@@ -288,6 +308,7 @@ const CGRATES_CFG_JSON = `
|
||||
"cdrs": { // CDRs config
|
||||
"enabled": false, // start the CDR Server: <true|false>
|
||||
"extra_fields": [], // extra fields to store in CDRs for non-generic CDRs (ie: FreeSWITCH JSON)
|
||||
"store_cdrs": true, // store cdrs in StorDB
|
||||
"session_cost_retries": 5, // number of queries to session_costs before recalculating CDR
|
||||
"chargers_conns": [], // connection to ChargerS for CDR forking, empty to disable billing for CDRs: <""|*internal|$rpc_conns_id>
|
||||
"attributes_conns": [], // connection to AttributeS for altering *raw CDRs, empty to disable attributes functionality: <""|*internal|$rpc_conns_id>
|
||||
|
||||
@@ -32,6 +32,7 @@ const (
|
||||
ListenJSON = "listen"
|
||||
HTTPJSON = "http"
|
||||
DataDBJSON = "data_db"
|
||||
StorDBJSON = "stor_db"
|
||||
FilterSJSON = "filters"
|
||||
CDRsJSON = "cdrs"
|
||||
SessionSJSON = "sessions"
|
||||
@@ -97,6 +98,7 @@ var (
|
||||
AnalyzerSJSON: utils.AnalyzerS,
|
||||
DispatcherSJSON: utils.DispatcherS,
|
||||
DataDBJSON: utils.DataDB,
|
||||
StorDBJSON: utils.StorDB,
|
||||
EEsJSON: utils.EEs,
|
||||
EFsJSON: utils.EFs,
|
||||
RateSJSON: utils.RateS,
|
||||
@@ -157,6 +159,7 @@ func newSections(cfg *CGRConfig) Sections {
|
||||
cfg.efsCfg,
|
||||
cfg.rpcConns,
|
||||
cfg.dataDbCfg,
|
||||
cfg.storDbCfg,
|
||||
cfg.listenCfg,
|
||||
cfg.tlsCfg,
|
||||
cfg.httpCfg,
|
||||
|
||||
@@ -825,6 +825,16 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StorDB sanity checks
|
||||
if cfg.storDbCfg.Type == utils.Postgres {
|
||||
if !utils.IsSliceMember([]string{utils.PostgresSSLModeDisable, utils.PostgresSSLModeAllow,
|
||||
utils.PostgresSSLModePrefer, utils.PostgresSSLModeRequire, utils.PostgresSSLModeVerifyCa,
|
||||
utils.PostgresSSLModeVerifyFull}, cfg.storDbCfg.Opts.PgSSLMode) {
|
||||
return fmt.Errorf("<%s> unsupported ssl mode for storDB", utils.StorDB)
|
||||
}
|
||||
}
|
||||
|
||||
// DataDB sanity checks
|
||||
if cfg.dataDbCfg.Type == utils.MetaInternal {
|
||||
for key, config := range cfg.cacheCfg.Partitions {
|
||||
|
||||
@@ -561,7 +561,7 @@ func diffDataDBOptsJsonCfg(d *DBOptsJson, v1, v2 *DataDBOpts) *DBOptsJson {
|
||||
return d
|
||||
}
|
||||
|
||||
func diffDataDbJsonCfg(d *DbJsonCfg, v1, v2 *DataDbCfg) *DbJsonCfg {
|
||||
func diffDataDBJsonCfg(d *DbJsonCfg, v1, v2 *DataDbCfg) *DbJsonCfg {
|
||||
if d == nil {
|
||||
d = new(DbJsonCfg)
|
||||
}
|
||||
|
||||
323
config/stordbcfg.go
Normal file
323
config/stordbcfg.go
Normal file
@@ -0,0 +1,323 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
type StorDBOpts struct {
|
||||
SQLMaxOpenConns int
|
||||
SQLMaxIdleConns int
|
||||
SQLConnMaxLifetime time.Duration
|
||||
SQLDSNParams map[string]string
|
||||
MongoQueryTimeout time.Duration
|
||||
PgSSLMode string
|
||||
MySQLLocation string
|
||||
}
|
||||
|
||||
// StorDbCfg StroreDb config
|
||||
type StorDbCfg struct {
|
||||
Type string // should reflect the database type used to store logs
|
||||
Host string // the host to connect to. Values that start with / are for UNIX domain sockets
|
||||
Port string // the port to bind to
|
||||
Name string // the name of the database to connect to
|
||||
User string // the user to sign in as
|
||||
Password string // the user's password
|
||||
StringIndexedFields []string
|
||||
PrefixIndexedFields []string
|
||||
RmtConns []string // remote DataDB connIDs
|
||||
RplConns []string // replication connIDs
|
||||
Items map[string]*ItemOpts
|
||||
Opts *StorDBOpts
|
||||
}
|
||||
|
||||
// loadStorDBCfg loads the StorDB section of the configuration
|
||||
func (dbcfg *StorDbCfg) Load(ctx *context.Context, jsnCfg ConfigDB, _ *CGRConfig) (err error) {
|
||||
jsnDataDbCfg := new(DbJsonCfg)
|
||||
if err = jsnCfg.GetSection(ctx, StorDBJSON, jsnDataDbCfg); err != nil {
|
||||
return
|
||||
}
|
||||
return dbcfg.loadFromJSONCfg(jsnDataDbCfg)
|
||||
}
|
||||
|
||||
func (dbOpts *StorDBOpts) loadFromJSONCfg(jsnCfg *DBOptsJson) (err error) {
|
||||
if jsnCfg == nil {
|
||||
return
|
||||
}
|
||||
if jsnCfg.SQLMaxOpenConns != nil {
|
||||
dbOpts.SQLMaxOpenConns = *jsnCfg.SQLMaxOpenConns
|
||||
}
|
||||
if jsnCfg.SQLMaxIdleConns != nil {
|
||||
dbOpts.SQLMaxIdleConns = *jsnCfg.SQLMaxIdleConns
|
||||
}
|
||||
if jsnCfg.SQLConnMaxLifetime != nil {
|
||||
if dbOpts.SQLConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnCfg.MYSQLDSNParams != nil {
|
||||
dbOpts.SQLDSNParams = make(map[string]string)
|
||||
dbOpts.SQLDSNParams = jsnCfg.MYSQLDSNParams
|
||||
}
|
||||
if jsnCfg.MongoQueryTimeout != nil {
|
||||
if dbOpts.MongoQueryTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.MongoQueryTimeout); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if jsnCfg.PgSSLMode != nil {
|
||||
dbOpts.PgSSLMode = *jsnCfg.PgSSLMode
|
||||
}
|
||||
if jsnCfg.MySQLLocation != nil {
|
||||
dbOpts.MySQLLocation = *jsnCfg.MySQLLocation
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// loadFromJSONCfg loads StoreDb config from JsonCfg
|
||||
func (dbcfg *StorDbCfg) loadFromJSONCfg(jsnDbCfg *DbJsonCfg) (err error) {
|
||||
if jsnDbCfg == nil {
|
||||
return nil
|
||||
}
|
||||
if jsnDbCfg.Db_type != nil {
|
||||
if !strings.HasPrefix(*jsnDbCfg.Db_type, "*") {
|
||||
dbcfg.Type = fmt.Sprintf("*%v", *jsnDbCfg.Db_type)
|
||||
} else {
|
||||
dbcfg.Type = *jsnDbCfg.Db_type
|
||||
}
|
||||
}
|
||||
if jsnDbCfg.Db_host != nil {
|
||||
dbcfg.Host = *jsnDbCfg.Db_host
|
||||
}
|
||||
if jsnDbCfg.Db_port != nil {
|
||||
port := strconv.Itoa(*jsnDbCfg.Db_port)
|
||||
if port == "-1" {
|
||||
port = utils.MetaDynamic
|
||||
}
|
||||
dbcfg.Port = defaultDBPort(dbcfg.Type, port)
|
||||
}
|
||||
if jsnDbCfg.Db_name != nil {
|
||||
dbcfg.Name = *jsnDbCfg.Db_name
|
||||
}
|
||||
if jsnDbCfg.Db_user != nil {
|
||||
dbcfg.User = *jsnDbCfg.Db_user
|
||||
}
|
||||
if jsnDbCfg.Db_password != nil {
|
||||
dbcfg.Password = *jsnDbCfg.Db_password
|
||||
}
|
||||
if jsnDbCfg.String_indexed_fields != nil {
|
||||
dbcfg.StringIndexedFields = *jsnDbCfg.String_indexed_fields
|
||||
}
|
||||
if jsnDbCfg.Prefix_indexed_fields != nil {
|
||||
dbcfg.PrefixIndexedFields = *jsnDbCfg.Prefix_indexed_fields
|
||||
}
|
||||
if jsnDbCfg.Remote_conns != nil {
|
||||
dbcfg.RmtConns = make([]string, len(*jsnDbCfg.Remote_conns))
|
||||
for i, item := range *jsnDbCfg.Remote_conns {
|
||||
if item == utils.MetaInternal {
|
||||
return fmt.Errorf("Remote connection ID needs to be different than *internal ")
|
||||
}
|
||||
dbcfg.RmtConns[i] = item
|
||||
}
|
||||
}
|
||||
if jsnDbCfg.Replication_conns != nil {
|
||||
dbcfg.RplConns = make([]string, len(*jsnDbCfg.Replication_conns))
|
||||
for i, item := range *jsnDbCfg.Replication_conns {
|
||||
if item == utils.MetaInternal {
|
||||
return fmt.Errorf("Replication connection ID needs to be different than *internal ")
|
||||
}
|
||||
dbcfg.RplConns[i] = item
|
||||
}
|
||||
}
|
||||
if jsnDbCfg.Items != nil {
|
||||
for kJsn, vJsn := range jsnDbCfg.Items {
|
||||
val := new(ItemOpts)
|
||||
if err = val.loadFromJSONCfg(vJsn); err != nil {
|
||||
return
|
||||
}
|
||||
dbcfg.Items[kJsn] = val
|
||||
}
|
||||
}
|
||||
if jsnDbCfg.Opts != nil {
|
||||
err = dbcfg.Opts.loadFromJSONCfg(jsnDbCfg.Opts)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (StorDbCfg) SName() string { return StorDBJSON }
|
||||
func (dbcfg StorDbCfg) CloneSection() Section { return dbcfg.Clone() }
|
||||
|
||||
func (dbOpts *StorDBOpts) Clone() *StorDBOpts {
|
||||
return &StorDBOpts{
|
||||
SQLMaxOpenConns: dbOpts.SQLMaxOpenConns,
|
||||
SQLMaxIdleConns: dbOpts.SQLMaxIdleConns,
|
||||
SQLConnMaxLifetime: dbOpts.SQLConnMaxLifetime,
|
||||
SQLDSNParams: dbOpts.SQLDSNParams,
|
||||
MongoQueryTimeout: dbOpts.MongoQueryTimeout,
|
||||
PgSSLMode: dbOpts.PgSSLMode,
|
||||
MySQLLocation: dbOpts.MySQLLocation,
|
||||
}
|
||||
}
|
||||
|
||||
// Clone returns the cloned object
|
||||
func (dbcfg StorDbCfg) Clone() (cln *StorDbCfg) {
|
||||
cln = &StorDbCfg{
|
||||
Type: dbcfg.Type,
|
||||
Host: dbcfg.Host,
|
||||
Port: dbcfg.Port,
|
||||
Name: dbcfg.Name,
|
||||
User: dbcfg.User,
|
||||
Password: dbcfg.Password,
|
||||
|
||||
Items: make(map[string]*ItemOpts),
|
||||
Opts: dbcfg.Opts.Clone(),
|
||||
}
|
||||
for key, item := range dbcfg.Items {
|
||||
cln.Items[key] = item.Clone()
|
||||
}
|
||||
if dbcfg.StringIndexedFields != nil {
|
||||
cln.StringIndexedFields = utils.CloneStringSlice(dbcfg.StringIndexedFields)
|
||||
}
|
||||
if dbcfg.PrefixIndexedFields != nil {
|
||||
cln.PrefixIndexedFields = utils.CloneStringSlice(dbcfg.PrefixIndexedFields)
|
||||
}
|
||||
if dbcfg.RmtConns != nil {
|
||||
cln.RmtConns = utils.CloneStringSlice(dbcfg.RmtConns)
|
||||
}
|
||||
if dbcfg.RplConns != nil {
|
||||
cln.RplConns = utils.CloneStringSlice(dbcfg.RplConns)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AsMapInterface returns the config as a map[string]interface{}
|
||||
func (dbcfg StorDbCfg) AsMapInterface(string) interface{} {
|
||||
opts := map[string]interface{}{
|
||||
utils.SQLMaxOpenConnsCfg: dbcfg.Opts.SQLMaxOpenConns,
|
||||
utils.SQLMaxIdleConnsCfg: dbcfg.Opts.SQLMaxIdleConns,
|
||||
utils.SQLConnMaxLifetime: dbcfg.Opts.SQLConnMaxLifetime.String(),
|
||||
utils.MYSQLDSNParams: dbcfg.Opts.SQLDSNParams,
|
||||
utils.MongoQueryTimeoutCfg: dbcfg.Opts.MongoQueryTimeout.String(),
|
||||
utils.PgSSLModeCfg: dbcfg.Opts.PgSSLMode,
|
||||
utils.MysqlLocation: dbcfg.Opts.MySQLLocation,
|
||||
}
|
||||
mp := map[string]interface{}{
|
||||
utils.DataDbTypeCfg: utils.Meta + dbcfg.Type,
|
||||
utils.DataDbHostCfg: dbcfg.Host,
|
||||
utils.DataDbNameCfg: dbcfg.Name,
|
||||
utils.DataDbUserCfg: dbcfg.User,
|
||||
utils.DataDbPassCfg: dbcfg.Password,
|
||||
utils.StringIndexedFieldsCfg: dbcfg.StringIndexedFields,
|
||||
utils.PrefixIndexedFieldsCfg: dbcfg.PrefixIndexedFields,
|
||||
utils.RemoteConnsCfg: dbcfg.RmtConns,
|
||||
utils.ReplicationConnsCfg: dbcfg.RplConns,
|
||||
utils.OptsCfg: opts,
|
||||
}
|
||||
if dbcfg.Items != nil {
|
||||
items := make(map[string]interface{})
|
||||
for key, item := range dbcfg.Items {
|
||||
items[key] = item.AsMapInterface()
|
||||
}
|
||||
mp[utils.ItemsCfg] = items
|
||||
}
|
||||
if dbcfg.Port != utils.EmptyString {
|
||||
dbPort, _ := strconv.Atoi(dbcfg.Port)
|
||||
mp[utils.DataDbPortCfg] = dbPort
|
||||
}
|
||||
return mp
|
||||
}
|
||||
|
||||
func diffStorDBOptsJsonCfg(d *DBOptsJson, v1, v2 *StorDBOpts) *DBOptsJson {
|
||||
if d == nil {
|
||||
d = new(DBOptsJson)
|
||||
}
|
||||
if v1.SQLMaxOpenConns != v2.SQLMaxOpenConns {
|
||||
d.SQLMaxOpenConns = utils.IntPointer(v2.SQLMaxOpenConns)
|
||||
}
|
||||
if v1.SQLMaxIdleConns != v2.SQLMaxIdleConns {
|
||||
d.SQLMaxIdleConns = utils.IntPointer(v2.SQLMaxIdleConns)
|
||||
}
|
||||
if v1.SQLConnMaxLifetime != v2.SQLConnMaxLifetime {
|
||||
d.SQLConnMaxLifetime = utils.StringPointer(v2.SQLConnMaxLifetime.String())
|
||||
}
|
||||
if !reflect.DeepEqual(v1.SQLDSNParams, v2.SQLDSNParams) {
|
||||
d.MYSQLDSNParams = v2.SQLDSNParams
|
||||
}
|
||||
if v1.MongoQueryTimeout != v2.MongoQueryTimeout {
|
||||
d.MongoQueryTimeout = utils.StringPointer(v2.MongoQueryTimeout.String())
|
||||
}
|
||||
if v1.PgSSLMode != v2.PgSSLMode {
|
||||
d.PgSSLMode = utils.StringPointer(v2.PgSSLMode)
|
||||
}
|
||||
if v1.MySQLLocation != v2.MySQLLocation {
|
||||
d.MySQLLocation = utils.StringPointer(v2.MySQLLocation)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
func diffStorDBJsonCfg(d *DbJsonCfg, v1, v2 *StorDbCfg) *DbJsonCfg {
|
||||
if d == nil {
|
||||
d = new(DbJsonCfg)
|
||||
}
|
||||
if v1.Type != v2.Type {
|
||||
d.Db_type = utils.StringPointer(v2.Type)
|
||||
}
|
||||
if v1.Host != v2.Host {
|
||||
d.Db_host = utils.StringPointer(v2.Host)
|
||||
}
|
||||
if v1.Port != v2.Port {
|
||||
port, _ := strconv.Atoi(v2.Port)
|
||||
d.Db_port = utils.IntPointer(port)
|
||||
}
|
||||
if v1.Name != v2.Name {
|
||||
d.Db_name = utils.StringPointer(v2.Name)
|
||||
}
|
||||
if v1.User != v2.User {
|
||||
d.Db_user = utils.StringPointer(v2.User)
|
||||
}
|
||||
if v1.Password != v2.Password {
|
||||
d.Db_password = utils.StringPointer(v2.Password)
|
||||
}
|
||||
if !utils.SliceStringEqual(v1.RmtConns, v2.RmtConns) {
|
||||
d.Remote_conns = &v2.RmtConns
|
||||
}
|
||||
|
||||
if !utils.SliceStringEqual(v1.RplConns, v2.RplConns) {
|
||||
d.Replication_conns = &v2.RplConns
|
||||
}
|
||||
|
||||
if !utils.SliceStringEqual(v1.StringIndexedFields, v2.StringIndexedFields) {
|
||||
d.String_indexed_fields = &v2.StringIndexedFields
|
||||
}
|
||||
if !utils.SliceStringEqual(v1.PrefixIndexedFields, v2.PrefixIndexedFields) {
|
||||
d.Prefix_indexed_fields = &v2.PrefixIndexedFields
|
||||
}
|
||||
|
||||
d.Items = diffMapItemOptJson(d.Items, v1.Items, v2.Items)
|
||||
d.Opts = diffStorDBOptsJsonCfg(d.Opts, v1.Opts, v2.Opts)
|
||||
|
||||
return d
|
||||
}
|
||||
@@ -65,7 +65,7 @@ type testDispatcher struct {
|
||||
cmd *exec.Cmd
|
||||
}
|
||||
|
||||
func newTestEngine(t *testing.T, cfgPath string, initDataDB bool) (d *testDispatcher) {
|
||||
func newTestEngine(t *testing.T, cfgPath string, initDataDB, initStorDB bool) (d *testDispatcher) {
|
||||
d = new(testDispatcher)
|
||||
d.CfgPath = cfgPath
|
||||
var err error
|
||||
@@ -78,7 +78,9 @@ func newTestEngine(t *testing.T, cfgPath string, initDataDB bool) (d *testDispat
|
||||
if initDataDB {
|
||||
d.initDataDb(t)
|
||||
}
|
||||
|
||||
if initStorDB {
|
||||
d.resetStorDb(t)
|
||||
}
|
||||
d.startEngine(t)
|
||||
return d
|
||||
}
|
||||
@@ -111,6 +113,13 @@ func (d *testDispatcher) initDataDb(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Wipe out the cdr database
|
||||
func (d *testDispatcher) resetStorDb(t *testing.T) {
|
||||
if err := engine.InitStorDB(d.Cfg); err != nil {
|
||||
t.Fatalf("Error at DataDB init:%v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// func (d *testDispatcher) loadData(t *testing.T, path string) {
|
||||
// var reply string
|
||||
// attrs := &utils.AttrLoadTpFromFolder{FolderPath: path}
|
||||
@@ -143,9 +152,9 @@ func (d *testDispatcher) loadData2(t *testing.T, path string) {
|
||||
|
||||
func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, disp, allTF, all2TF, attrTF string) {
|
||||
// engine.KillEngine(0)
|
||||
allEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all), true)
|
||||
allEngine2 = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all2), true)
|
||||
dispEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", disp), true)
|
||||
allEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all), true, true)
|
||||
allEngine2 = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all2), true, true)
|
||||
dispEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", disp), true, true)
|
||||
dispEngine.loadData2(t, path.Join(*dataDir, "tariffplans", attrTF))
|
||||
allEngine.loadData2(t, path.Join(*dataDir, "tariffplans", allTF))
|
||||
allEngine2.loadData2(t, path.Join(*dataDir, "tariffplans", all2TF))
|
||||
|
||||
@@ -43,24 +43,26 @@ func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) {
|
||||
}
|
||||
|
||||
// NewCDRServer is a constructor for CDRServer
|
||||
func NewCDRServer(cfg *config.CGRConfig, dm *DataManager, filterS *FilterS,
|
||||
func NewCDRServer(cfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS,
|
||||
connMgr *ConnManager) *CDRServer {
|
||||
return &CDRServer{
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
guard: guardian.Guardian,
|
||||
fltrS: filterS,
|
||||
connMgr: connMgr,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
guard: guardian.Guardian,
|
||||
fltrS: filterS,
|
||||
connMgr: connMgr,
|
||||
storDBChan: storDBChan,
|
||||
}
|
||||
}
|
||||
|
||||
// CDRServer stores and rates CDRs
|
||||
type CDRServer struct {
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
guard *guardian.GuardianLocker
|
||||
fltrS *FilterS
|
||||
connMgr *ConnManager
|
||||
cfg *config.CGRConfig
|
||||
dm *DataManager
|
||||
guard *guardian.GuardianLocker
|
||||
fltrS *FilterS
|
||||
connMgr *ConnManager
|
||||
storDBChan chan StorDB
|
||||
}
|
||||
|
||||
// ListenAndServe listen for storbd reload
|
||||
@@ -69,6 +71,10 @@ func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) {
|
||||
select {
|
||||
case <-stopChan:
|
||||
return
|
||||
case _, ok := <-cdrS.storDBChan:
|
||||
if !ok { // the channel was closed by the shutdown of the StorDB Service
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ import (
|
||||
"net/rpc/jsonrpc"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
@@ -182,6 +184,30 @@ func InitDataDB(cfg *config.CGRConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func InitStorDB(cfg *config.CGRConfig) error {
|
||||
storDB, err := NewStorDBConn(cfg.StorDbCfg().Type,
|
||||
cfg.StorDbCfg().Host, cfg.StorDbCfg().Port,
|
||||
cfg.StorDbCfg().Name, cfg.StorDbCfg().User,
|
||||
cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding,
|
||||
cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields,
|
||||
cfg.StorDbCfg().Opts, cfg.StorDbCfg().Items)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dbPath := strings.Trim(cfg.StorDbCfg().Type, "*")
|
||||
if err := storDB.Flush(path.Join(cfg.DataFolderPath, "storage",
|
||||
dbPath)); err != nil {
|
||||
return err
|
||||
}
|
||||
if utils.IsSliceMember([]string{utils.MetaMongo, utils.MetaMySQL, utils.MetaPostgres},
|
||||
cfg.StorDbCfg().Type) {
|
||||
if err := SetDBVersions(storDB); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func InitConfigDB(cfg *config.CGRConfig) error {
|
||||
d, err := NewDataDBConn(cfg.ConfigDBCfg().Type,
|
||||
cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port,
|
||||
|
||||
@@ -106,6 +106,10 @@ type DataDBDriver interface {
|
||||
config.ConfigDB
|
||||
}
|
||||
|
||||
type StorDB interface {
|
||||
Storage
|
||||
}
|
||||
|
||||
type LoadStorage interface {
|
||||
Storage
|
||||
LoadReader
|
||||
|
||||
@@ -32,7 +32,7 @@ import (
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// InternalDB is used as a DataDB
|
||||
// InternalDB is used as a DataDB and/or StorDB
|
||||
type InternalDB struct {
|
||||
stringIndexedFields []string
|
||||
prefixIndexedFields []string
|
||||
@@ -63,6 +63,20 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string,
|
||||
}
|
||||
}
|
||||
|
||||
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe)
|
||||
func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) {
|
||||
iDB.indexedFieldsMutex.Lock()
|
||||
iDB.stringIndexedFields = stringIndexedFields
|
||||
iDB.indexedFieldsMutex.Unlock()
|
||||
}
|
||||
|
||||
// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe)
|
||||
func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) {
|
||||
iDB.indexedFieldsMutex.Lock()
|
||||
iDB.prefixIndexedFields = prefixIndexedFields
|
||||
iDB.indexedFieldsMutex.Unlock()
|
||||
}
|
||||
|
||||
// Close only to implement Storage interface
|
||||
func (iDB *InternalDB) Close() {}
|
||||
|
||||
|
||||
@@ -237,7 +237,7 @@ type MongoStorage struct {
|
||||
ctxTTL time.Duration
|
||||
ctxTTLMutex sync.RWMutex // used for TTL reload
|
||||
db string
|
||||
storageType string // datadb
|
||||
storageType string // datadb, stordb
|
||||
ms utils.Marshaler
|
||||
cdrsIndexes []string
|
||||
cnter *utils.Counter
|
||||
@@ -370,6 +370,11 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if ms.storageType == utils.StorDB {
|
||||
if err = ms.ensureIndexesForCol(utils.CDRsTBL); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ func NewMySQLStorage(host, port, name, user, password string,
|
||||
return &SQLStorage{
|
||||
DB: mySQLStorage.DB,
|
||||
db: mySQLStorage.db,
|
||||
StorDB: mySQLStorage,
|
||||
SQLImpl: mySQLStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -19,13 +19,45 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type PostgresStorage struct {
|
||||
SQLStorage
|
||||
}
|
||||
|
||||
// NewPostgresStorage returns the posgres storDB
|
||||
func NewPostgresStorage(host, port, name, user, password, sslmode string, maxConn, maxIdleConn int, connMaxLifetime time.Duration) (*SQLStorage, error) {
|
||||
connectString := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", host, port, name, user, password, sslmode)
|
||||
db, err := gorm.Open(postgres.Open(connectString), &gorm.Config{AllowGlobalUpdate: true})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
postgresStorage := new(PostgresStorage)
|
||||
if postgresStorage.DB, err = db.DB(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = postgresStorage.DB.Ping(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
postgresStorage.DB.SetMaxIdleConns(maxIdleConn)
|
||||
postgresStorage.DB.SetMaxOpenConns(maxConn)
|
||||
postgresStorage.DB.SetConnMaxLifetime(connMaxLifetime)
|
||||
//db.LogMode(true)
|
||||
postgresStorage.db = db
|
||||
return &SQLStorage{
|
||||
DB: postgresStorage.DB,
|
||||
db: postgresStorage.db,
|
||||
StorDB: postgresStorage,
|
||||
SQLImpl: postgresStorage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
|
||||
tx := poS.db.Begin()
|
||||
if overwrite {
|
||||
@@ -49,6 +81,22 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error
|
||||
return
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) extraFieldsExistsQry(field string) string {
|
||||
return fmt.Sprintf(" extra_fields ?'%s'", field)
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) extraFieldsValueQry(field, value string) string {
|
||||
return fmt.Sprintf(" (extra_fields ->> '%s') = '%s'", field, value)
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) notExtraFieldsExistsQry(field string) string {
|
||||
return fmt.Sprintf(" NOT extra_fields ?'%s'", field)
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) notExtraFieldsValueQry(field, value string) string {
|
||||
return fmt.Sprintf(" NOT (extra_fields ?'%s' AND (extra_fields ->> '%s') = '%s')", field, field, value)
|
||||
}
|
||||
|
||||
func (poS *PostgresStorage) GetStorageType() string {
|
||||
return utils.MetaPostgres
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ type SQLImpl interface {
|
||||
type SQLStorage struct {
|
||||
DB *sql.DB
|
||||
db *gorm.DB
|
||||
StorDB
|
||||
SQLImpl
|
||||
}
|
||||
|
||||
|
||||
@@ -57,3 +57,25 @@ func NewDataDBConn(dbType, host, port, name, user,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewStorDBConn returns a StorDB(implements Storage interface) based on dbType
|
||||
func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string,
|
||||
stringIndexedFields, prefixIndexedFields []string,
|
||||
opts *config.StorDBOpts, itmsCfg map[string]*config.ItemOpts) (db StorDB, err error) {
|
||||
switch dbType {
|
||||
case utils.MetaMongo:
|
||||
db, err = NewMongoStorage(host, port, name, user, pass, marshaler, utils.MetaStorDB, stringIndexedFields, opts.MongoQueryTimeout)
|
||||
case utils.MetaPostgres:
|
||||
db, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode,
|
||||
opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLConnMaxLifetime)
|
||||
case utils.MetaMySQL:
|
||||
db, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
|
||||
opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.SQLDSNParams)
|
||||
case utils.MetaInternal:
|
||||
db = NewInternalDB(stringIndexedFields, prefixIndexedFields, itmsCfg)
|
||||
default:
|
||||
err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]",
|
||||
dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import (
|
||||
|
||||
// NewCDRServer returns the CDR Server
|
||||
func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
filterSChan chan *engine.FilterS,
|
||||
storDB *StorDBService, filterSChan chan *engine.FilterS,
|
||||
server *cores.Server, internalCDRServerChan chan birpc.ClientConnector,
|
||||
connMgr *engine.ConnManager, anz *AnalyzerService,
|
||||
srvDep map[string]*sync.WaitGroup) servmanager.Service {
|
||||
@@ -42,6 +42,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
connChan: internalCDRServerChan,
|
||||
cfg: cfg,
|
||||
dm: dm,
|
||||
storDB: storDB,
|
||||
filterSChan: filterSChan,
|
||||
server: server,
|
||||
connMgr: connMgr,
|
||||
@@ -53,8 +54,9 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService,
|
||||
// CDRServer implements Service interface
|
||||
type CDRServer struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
cfg *config.CGRConfig
|
||||
dm *DataDBService
|
||||
storDB *StorDBService
|
||||
|
||||
filterSChan chan *engine.FilterS
|
||||
server *cores.Server
|
||||
@@ -86,12 +88,14 @@ func (cdrService *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) (
|
||||
return
|
||||
}
|
||||
|
||||
storDBChan := make(chan engine.StorDB, 1)
|
||||
cdrService.stopChan = make(chan struct{})
|
||||
cdrService.storDB.RegisterSyncChan(storDBChan)
|
||||
|
||||
cdrService.Lock()
|
||||
defer cdrService.Unlock()
|
||||
|
||||
cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, datadb, filterS, cdrService.connMgr)
|
||||
cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, storDBChan, datadb, filterS, cdrService.connMgr)
|
||||
go cdrService.cdrS.ListenAndServe(cdrService.stopChan)
|
||||
runtime.Gosched()
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
|
||||
@@ -50,8 +50,10 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
|
||||
srvManager: servmanager.NewServiceManager(shdWg, cM, cfg),
|
||||
server: server, // Rpc/http server
|
||||
srvDep: map[string]*sync.WaitGroup{
|
||||
utils.AnalyzerS: new(sync.WaitGroup),
|
||||
utils.AccountS: new(sync.WaitGroup),
|
||||
utils.ActionS: new(sync.WaitGroup),
|
||||
utils.AdminS: new(sync.WaitGroup),
|
||||
utils.AnalyzerS: new(sync.WaitGroup),
|
||||
utils.AsteriskAgent: new(sync.WaitGroup),
|
||||
utils.AttributeS: new(sync.WaitGroup),
|
||||
utils.CDRServer: new(sync.WaitGroup),
|
||||
@@ -59,10 +61,10 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
|
||||
utils.CoreS: new(sync.WaitGroup),
|
||||
utils.DataDB: new(sync.WaitGroup),
|
||||
utils.DiameterAgent: new(sync.WaitGroup),
|
||||
utils.RegistrarC: new(sync.WaitGroup),
|
||||
utils.DispatcherS: new(sync.WaitGroup),
|
||||
utils.DNSAgent: new(sync.WaitGroup),
|
||||
utils.EEs: new(sync.WaitGroup),
|
||||
utils.EFs: new(sync.WaitGroup),
|
||||
utils.ERs: new(sync.WaitGroup),
|
||||
utils.FreeSWITCHAgent: new(sync.WaitGroup),
|
||||
utils.GlobalVarS: new(sync.WaitGroup),
|
||||
@@ -71,17 +73,16 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
|
||||
utils.LoaderS: new(sync.WaitGroup),
|
||||
utils.RadiusAgent: new(sync.WaitGroup),
|
||||
utils.RateS: new(sync.WaitGroup),
|
||||
utils.RegistrarC: new(sync.WaitGroup),
|
||||
utils.ResourceS: new(sync.WaitGroup),
|
||||
utils.RouteS: new(sync.WaitGroup),
|
||||
utils.SchedulerS: new(sync.WaitGroup),
|
||||
utils.SessionS: new(sync.WaitGroup),
|
||||
utils.SIPAgent: new(sync.WaitGroup),
|
||||
utils.StatS: new(sync.WaitGroup),
|
||||
utils.StorDB: new(sync.WaitGroup),
|
||||
utils.ThresholdS: new(sync.WaitGroup),
|
||||
utils.ActionS: new(sync.WaitGroup),
|
||||
utils.AccountS: new(sync.WaitGroup),
|
||||
utils.TPeS: new(sync.WaitGroup),
|
||||
utils.EFs: new(sync.WaitGroup),
|
||||
},
|
||||
iFilterSCh: make(chan *engine.FilterS, 1),
|
||||
}
|
||||
@@ -103,6 +104,7 @@ type CGREngine struct {
|
||||
// services
|
||||
gvS servmanager.Service
|
||||
dmS *DataDBService
|
||||
sdbS *StorDBService
|
||||
anzS *AnalyzerService
|
||||
coreS *CoreService
|
||||
cacheS *CacheService
|
||||
@@ -194,6 +196,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
|
||||
|
||||
cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep)
|
||||
cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep)
|
||||
cgr.sdbS = NewStorDBService(cgr.cfg, cgr.srvDep)
|
||||
cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.server,
|
||||
cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS
|
||||
|
||||
@@ -217,7 +220,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
|
||||
cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep)
|
||||
|
||||
cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS,
|
||||
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.efs,
|
||||
cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs,
|
||||
NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server,
|
||||
iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep),
|
||||
NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep),
|
||||
@@ -246,7 +249,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
|
||||
|
||||
NewEventExporterService(cgr.cfg, cgr.iFilterSCh,
|
||||
cgr.cM, cgr.server, iEEsCh, cgr.anzS, cgr.srvDep),
|
||||
NewCDRServer(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iCDRServerCh,
|
||||
NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server, iCDRServerCh,
|
||||
cgr.cM, cgr.anzS, cgr.srvDep),
|
||||
|
||||
NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, cgr.anzS, cgr.srvDep),
|
||||
@@ -272,7 +275,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
|
||||
cgr.shdWg.Done()
|
||||
return
|
||||
}
|
||||
if cgr.efs.ShouldRun() { // efs checking first beacause of loggers
|
||||
if cgr.efs.ShouldRun() { // efs checking first because of loggers
|
||||
cgr.shdWg.Add(1)
|
||||
if err = cgr.efs.Start(ctx, shtDw); err != nil {
|
||||
cgr.shdWg.Done()
|
||||
@@ -286,7 +289,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if cgr.sdbS.ShouldRun() {
|
||||
cgr.shdWg.Add(1)
|
||||
if err = cgr.sdbS.Start(ctx, shtDw); err != nil {
|
||||
cgr.shdWg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
if cgr.anzS.ShouldRun() {
|
||||
cgr.shdWg.Add(1)
|
||||
if err = cgr.anzS.Start(ctx, shtDw); err != nil {
|
||||
|
||||
188
services/stordb.go
Normal file
188
services/stordb.go
Normal file
@@ -0,0 +1,188 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// NewStorDBService returns the StorDB Service
|
||||
func NewStorDBService(cfg *config.CGRConfig,
|
||||
srvDep map[string]*sync.WaitGroup) *StorDBService {
|
||||
return &StorDBService{
|
||||
cfg: cfg,
|
||||
srvDep: srvDep,
|
||||
}
|
||||
}
|
||||
|
||||
// StorDBService implements Service interface
|
||||
type StorDBService struct {
|
||||
sync.RWMutex
|
||||
cfg *config.CGRConfig
|
||||
oldDBCfg *config.StorDbCfg
|
||||
|
||||
db engine.StorDB
|
||||
syncChans []chan engine.StorDB
|
||||
|
||||
srvDep map[string]*sync.WaitGroup
|
||||
}
|
||||
|
||||
// Start should handle the service start
|
||||
func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) {
|
||||
if db.IsRunning() {
|
||||
return utils.ErrServiceAlreadyRunning
|
||||
}
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
|
||||
d, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
|
||||
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
|
||||
db.cfg.StorDbCfg().Password, db.cfg.GeneralCfg().DBDataEncoding,
|
||||
db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields,
|
||||
db.cfg.StorDbCfg().Opts, db.cfg.StorDbCfg().Items)
|
||||
if err != nil { // Cannot configure getter database, show stopper
|
||||
utils.Logger.Crit(fmt.Sprintf("Could not configure storDB: %s exiting!", err))
|
||||
return
|
||||
}
|
||||
db.db = d
|
||||
db.sync()
|
||||
return
|
||||
}
|
||||
|
||||
// Reload handles the change of config
|
||||
func (db *StorDBService) Reload(*context.Context, context.CancelFunc) (err error) {
|
||||
db.Lock()
|
||||
defer db.Unlock()
|
||||
if db.needsConnectionReload() {
|
||||
var d engine.StorDB
|
||||
if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host,
|
||||
db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User,
|
||||
db.cfg.StorDbCfg().Password, db.cfg.GeneralCfg().DBDataEncoding,
|
||||
db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields,
|
||||
db.cfg.StorDbCfg().Opts, db.cfg.StorDbCfg().Items); err != nil {
|
||||
return
|
||||
}
|
||||
db.db.Close()
|
||||
db.db = d
|
||||
db.oldDBCfg = db.cfg.StorDbCfg().Clone()
|
||||
db.sync() // sync only if needed
|
||||
return
|
||||
}
|
||||
if db.cfg.StorDbCfg().Type == utils.Mongo {
|
||||
mgo, canCast := db.db.(*engine.MongoStorage)
|
||||
if !canCast {
|
||||
return fmt.Errorf("can't conver StorDB of type %s to MongoStorage",
|
||||
db.cfg.StorDbCfg().Type)
|
||||
}
|
||||
mgo.SetTTL(db.cfg.StorDbCfg().Opts.MongoQueryTimeout)
|
||||
} else if db.cfg.StorDbCfg().Type == utils.Postgres ||
|
||||
db.cfg.StorDbCfg().Type == utils.MySQL {
|
||||
msql, canCast := db.db.(*engine.SQLStorage)
|
||||
if !canCast {
|
||||
return fmt.Errorf("can't conver StorDB of type %s to SQLStorage",
|
||||
db.cfg.StorDbCfg().Type)
|
||||
}
|
||||
msql.DB.SetMaxOpenConns(db.cfg.StorDbCfg().Opts.SQLMaxOpenConns)
|
||||
msql.DB.SetMaxIdleConns(db.cfg.StorDbCfg().Opts.SQLMaxIdleConns)
|
||||
msql.DB.SetConnMaxLifetime(db.cfg.StorDbCfg().Opts.SQLConnMaxLifetime)
|
||||
} else if db.cfg.StorDbCfg().Type == utils.Internal {
|
||||
idb, canCast := db.db.(*engine.InternalDB)
|
||||
if !canCast {
|
||||
return fmt.Errorf("can't conver StorDB of type %s to InternalDB",
|
||||
db.cfg.StorDbCfg().Type)
|
||||
}
|
||||
idb.SetStringIndexedFields(db.cfg.StorDbCfg().StringIndexedFields)
|
||||
idb.SetPrefixIndexedFields(db.cfg.StorDbCfg().PrefixIndexedFields)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Shutdown stops the service
|
||||
func (db *StorDBService) Shutdown() (_ error) {
|
||||
db.Lock()
|
||||
db.db.Close()
|
||||
db.db = nil
|
||||
for _, c := range db.syncChans {
|
||||
close(c)
|
||||
}
|
||||
db.syncChans = nil
|
||||
db.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// IsRunning returns if the service is running
|
||||
func (db *StorDBService) IsRunning() bool {
|
||||
db.RLock()
|
||||
defer db.RUnlock()
|
||||
return db.isRunning()
|
||||
}
|
||||
|
||||
// isRunning returns if the service is running (not thread safe)
|
||||
func (db *StorDBService) isRunning() bool {
|
||||
return db != nil && db.db != nil
|
||||
}
|
||||
|
||||
// ServiceName returns the service name
|
||||
func (db *StorDBService) ServiceName() string {
|
||||
return utils.StorDB
|
||||
}
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (db *StorDBService) ShouldRun() bool {
|
||||
return db.cfg.CdrsCfg().Enabled
|
||||
}
|
||||
|
||||
// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the storDB(thread safe)
|
||||
func (db *StorDBService) RegisterSyncChan(c chan engine.StorDB) {
|
||||
db.Lock()
|
||||
db.syncChans = append(db.syncChans, c)
|
||||
if db.isRunning() {
|
||||
c <- db.db
|
||||
}
|
||||
db.Unlock()
|
||||
}
|
||||
|
||||
// sync sends the storDB over syncChansv (not thrad safe)
|
||||
func (db *StorDBService) sync() {
|
||||
if db.isRunning() {
|
||||
for _, c := range db.syncChans {
|
||||
c <- db.db
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// needsConnectionReload returns if the DB connection needs to reloaded
|
||||
func (db *StorDBService) needsConnectionReload() bool {
|
||||
if db.oldDBCfg.Type != db.cfg.StorDbCfg().Type ||
|
||||
db.oldDBCfg.Host != db.cfg.StorDbCfg().Host ||
|
||||
db.oldDBCfg.Name != db.cfg.StorDbCfg().Name ||
|
||||
db.oldDBCfg.Port != db.cfg.StorDbCfg().Port ||
|
||||
db.oldDBCfg.User != db.cfg.StorDbCfg().User ||
|
||||
db.oldDBCfg.Password != db.cfg.StorDbCfg().Password {
|
||||
return true
|
||||
}
|
||||
return db.cfg.StorDbCfg().Type == utils.Postgres &&
|
||||
db.oldDBCfg.Opts.PgSSLMode != db.cfg.StorDbCfg().Opts.PgSSLMode
|
||||
}
|
||||
@@ -335,6 +335,7 @@ const (
|
||||
MetaDumpToJSON = "*dump_to_json"
|
||||
NonTransactional = ""
|
||||
DataDB = "data_db"
|
||||
StorDB = "stor_db"
|
||||
NotFoundCaps = "NOT_FOUND"
|
||||
ServerErrorCaps = "SERVER_ERROR"
|
||||
MandatoryIEMissingCaps = "MANDATORY_IE_MISSING"
|
||||
@@ -482,6 +483,7 @@ const (
|
||||
//Destinations = "Destinations"
|
||||
MetaSubscribers = "*subscribers"
|
||||
MetaDataDB = "*datadb"
|
||||
MetaStorDB = "*stordb"
|
||||
MetaWeight = "*weight"
|
||||
MetaLC = "*lc"
|
||||
MetaHC = "*hc"
|
||||
@@ -1680,6 +1682,8 @@ const (
|
||||
CacheVersions = "*versions"
|
||||
CacheCapsEvents = "*caps_events"
|
||||
CacheReplicationHosts = "*replication_hosts"
|
||||
// storDB
|
||||
CacheCDRsTBL = "*cdrs"
|
||||
)
|
||||
|
||||
// Prefix for indexing
|
||||
@@ -1716,6 +1720,16 @@ const (
|
||||
GoogleCredentialsFileName = "credentials.json"
|
||||
)
|
||||
|
||||
// StorDB
|
||||
var (
|
||||
PostgresSSLModeDisable = "disable"
|
||||
PostgresSSLModeAllow = "allow"
|
||||
PostgresSSLModePrefer = "prefer"
|
||||
PostgresSSLModeRequire = "require"
|
||||
PostgresSSLModeVerifyCa = "verify-ca"
|
||||
PostgresSSLModeVerifyFull = "verify-full"
|
||||
)
|
||||
|
||||
// GeneralCfg
|
||||
const (
|
||||
NodeIDCfg = "node_id"
|
||||
@@ -1882,6 +1896,7 @@ const (
|
||||
CDRsConnsCfg = "cdrs_conns"
|
||||
FiltersCfg = "filters"
|
||||
ExtraFieldsCfg = "extra_fields"
|
||||
StoreCdrsCfg = "store_cdrs"
|
||||
SMCostRetriesCfg = "session_cost_retries"
|
||||
ChargerSConnsCfg = "chargers_conns"
|
||||
AttributeSConnsCfg = "attributes_conns"
|
||||
|
||||
Reference in New Issue
Block a user