From df1dc5e8388cd989bb835f61de1f51ee9e3611e8 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Thu, 27 Apr 2023 14:59:27 -0400 Subject: [PATCH] Add StorDB service Add StorDB to config and services. Put back the store_cdrs option under cdrs and update the CDRs service to depend on StorDB. Define the StorDB interface and add a constructor for it. Add a constructor for postgres storage. Add a config sanity check to validate SSL modes for postgres. Update cgr-engine to consider StorDB on startup. --- config/apis.go | 8 +- config/cdrscfg.go | 10 + config/config.go | 24 ++- config/config_defaults.go | 25 ++- config/config_json.go | 3 + config/configsanity.go | 10 + config/datadbcfg.go | 2 +- config/stordbcfg.go | 323 ++++++++++++++++++++++++++++++ dispatchers/lib_test.go | 19 +- engine/cdrs.go | 28 ++- engine/libtest.go | 26 +++ engine/storage_interface.go | 4 + engine/storage_internal_datadb.go | 16 +- engine/storage_mongo_datadb.go | 7 +- engine/storage_mysql.go | 1 + engine/storage_postgres.go | 48 +++++ engine/storage_sql.go | 1 + engine/storage_utils.go | 22 ++ services/cdrs.go | 12 +- services/cgr-engine.go | 27 ++- services/stordb.go | 188 +++++++++++++++++ utils/consts.go | 15 ++ 22 files changed, 782 insertions(+), 37 deletions(-) create mode 100644 config/stordbcfg.go create mode 100644 services/stordb.go diff --git a/config/apis.go b/config/apis.go index f9adc0b5d..56e5ea378 100644 --- a/config/apis.go +++ b/config/apis.go @@ -391,7 +391,13 @@ func storeDiffSection(ctx *context.Context, section string, db ConfigDB, v1, v2 if err = db.GetSection(ctx, section, jsn); err != nil { return } - return db.SetSection(ctx, section, diffDataDbJsonCfg(jsn, v1.DataDbCfg(), v2.DataDbCfg())) + return db.SetSection(ctx, section, diffDataDBJsonCfg(jsn, v1.DataDbCfg(), v2.DataDbCfg())) + case StorDBJSON: + jsn := new(DbJsonCfg) + if err = db.GetSection(ctx, section, jsn); err != nil { + return + } + return db.SetSection(ctx, section, diffStorDBJsonCfg(jsn, v1.StorDbCfg(), v2.StorDbCfg())) case FilterSJSON: jsn := new(FilterSJsonCfg) if err = db.GetSection(ctx, section, jsn); err != nil { diff --git a/config/cdrscfg.go b/config/cdrscfg.go index ee7bf9730..08f8e8452 100644 --- a/config/cdrscfg.go +++ b/config/cdrscfg.go @@ -47,6 +47,7 @@ type CdrsOpts struct { type CdrsCfg struct { Enabled bool // Enable CDR Server service ExtraFields RSRParsers // Extra fields to store in CDRs + StoreCdrs bool // store cdrs in storDb SMCostRetries int ChargerSConns []string AttributeSConns []string @@ -109,6 +110,9 @@ func (cdrscfg *CdrsCfg) loadFromJSONCfg(jsnCdrsCfg *CdrsJsonCfg) (err error) { return } } + if jsnCdrsCfg.Store_cdrs != nil { + cdrscfg.StoreCdrs = *jsnCdrsCfg.Store_cdrs + } if jsnCdrsCfg.Session_cost_retries != nil { cdrscfg.SMCostRetries = *jsnCdrsCfg.Session_cost_retries } @@ -159,6 +163,7 @@ func (cdrscfg CdrsCfg) AsMapInterface(string) interface{} { mp := map[string]interface{}{ utils.EnabledCfg: cdrscfg.Enabled, utils.SMCostRetriesCfg: cdrscfg.SMCostRetries, + utils.StoreCdrsCfg: cdrscfg.StoreCdrs, utils.ExtraFieldsCfg: cdrscfg.ExtraFields.AsStringSlice(), utils.OnlineCDRExportsCfg: utils.CloneStringSlice(cdrscfg.OnlineCDRExports), utils.OptsCfg: opts, @@ -239,6 +244,7 @@ func (cdrscfg CdrsCfg) Clone() (cln *CdrsCfg) { cln = &CdrsCfg{ Enabled: cdrscfg.Enabled, ExtraFields: cdrscfg.ExtraFields.Clone(), + StoreCdrs: cdrscfg.StoreCdrs, SMCostRetries: cdrscfg.SMCostRetries, Opts: cdrscfg.Opts.Clone(), } @@ -287,6 +293,7 @@ type CdrsOptsJson struct { type CdrsJsonCfg struct { Enabled *bool Extra_fields *[]string + Store_cdrs *bool Session_cost_retries *int Chargers_conns *[]string Attributes_conns *[]string @@ -340,6 +347,9 @@ func diffCdrsJsonCfg(d *CdrsJsonCfg, v1, v2 *CdrsCfg) *CdrsJsonCfg { if !utils.SliceStringEqual(extra1, extra2) { d.Extra_fields = &extra2 } + if v1.StoreCdrs != v2.StoreCdrs { + d.Store_cdrs = utils.BoolPointer(v2.StoreCdrs) + } if v1.SMCostRetries != v2.SMCostRetries { d.Session_cost_retries = utils.IntPointer(v2.SMCostRetries) } diff --git a/config/config.go b/config/config.go index 0ec2aacc2..b389cd612 100644 --- a/config/config.go +++ b/config/config.go @@ -110,6 +110,10 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) { Items: make(map[string]*ItemOpts), Opts: &DataDBOpts{}, }, + storDbCfg: &StorDbCfg{ + Items: make(map[string]*ItemOpts), + Opts: &StorDBOpts{}, + }, tlsCfg: new(TLSCfg), cacheCfg: &CacheCfg{Partitions: make(map[string]*CacheParamCfg)}, listenCfg: new(ListenCfg), @@ -328,6 +332,7 @@ type CGRConfig struct { generalCfg *GeneralCfg // General config loggerCfg *LoggerCfg // Logger config dataDbCfg *DataDbCfg // Database config + storDbCfg *StorDbCfg // StorDb config tlsCfg *TLSCfg // TLS config cacheCfg *CacheCfg // Cache config listenCfg *ListenCfg // Listen config @@ -604,6 +609,13 @@ func (cfg *CGRConfig) DataDbCfg() *DataDbCfg { return cfg.dataDbCfg } +// StorDbCfg returns the config for StorDb +func (cfg *CGRConfig) StorDbCfg() *StorDbCfg { + cfg.lks[StorDBJSON].Lock() + defer cfg.lks[StorDBJSON].Unlock() + return cfg.storDbCfg +} + // GeneralCfg returns the General config section func (cfg *CGRConfig) GeneralCfg() *GeneralCfg { cfg.lks[GeneralJSON].Lock() @@ -983,20 +995,27 @@ func (cfg *CGRConfig) reloadSections(sections ...string) { ChargerSJSON, ResourceSJSON, StatSJSON, ThresholdSJSON, RouteSJSON, LoaderSJSON, DispatcherSJSON, RateSJSON, AdminSJSON, AccountSJSON, ActionSJSON}) + subsystemsThatNeedStorDB := utils.NewStringSet([]string{StorDBJSON, CDRsJSON}) needsDataDB := false + needsStorDB := false for _, section := range sections { if !needsDataDB && subsystemsThatNeedDataDB.Has(section) { needsDataDB = true cfg.rldCh <- SectionToService[DataDBJSON] // reload datadb before } - if needsDataDB { + if !needsStorDB && subsystemsThatNeedStorDB.Has(section) { + needsStorDB = true + cfg.rldCh <- SectionToService[StorDBJSON] // reload stordb before + } + if needsDataDB && needsStorDB { break } } runtime.Gosched() for _, section := range sections { if srv := SectionToService[section]; srv != utils.EmptyString && - section != DataDBJSON { + section != DataDBJSON && + section != StorDBJSON { cfg.rldCh <- srv } } @@ -1020,6 +1039,7 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) { generalCfg: cfg.generalCfg.Clone(), loggerCfg: cfg.loggerCfg.Clone(), dataDbCfg: cfg.dataDbCfg.Clone(), + storDbCfg: cfg.storDbCfg.Clone(), tlsCfg: cfg.tlsCfg.Clone(), cacheCfg: cfg.cacheCfg.Clone(), listenCfg: cfg.listenCfg.Clone(), diff --git a/config/config_defaults.go b/config/config_defaults.go index 9adfd91a1..8f72c7ba4 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -173,8 +173,28 @@ const CGRATES_CFG_JSON = ` } }, - - +"stor_db": { // database used to store offline tariff plans and CDRs + "db_type": "*mysql", // stor database type to use: <*mongo|*mysql|*postgres|*internal> + "db_host": "127.0.0.1", // the host to connect to + "db_port": 3306, // the port to reach the stor_db + "db_name": "cgrates", // stor database name + "db_user": "cgrates", // username to use when connecting to stor_db + "db_password": "", // password to use when connecting to stor_db + "string_indexed_fields": [], // indexes on cdrs table to speed up queries, used in case of *mongo and *internal + "prefix_indexed_fields": [], // prefix indexes on cdrs table to speed up queries, used in case of *internal + "opts": { + "sqlMaxOpenConns": 100, // maximum database connections opened, not applying for mongo + "sqlMaxIdleConns": 10, // maximum database connections idle, not applying for mongo + "sqlConnMaxLifetime": "0", // maximum amount of time a connection may be reused (0 for unlimited), not applying for mongo + "mysqlDSNParams":{}, // DSN params for opening db + "mongoQueryTimeout":"10s", // timeout for query when mongo is used + "pgSSLMode":"disable", // ssl mode in case of *postgres + "mysqlLocation": "Local", // the location the time from mysql is retrived + }, + "items":{ + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, + }, +}, "listen": { "rpc_json": "127.0.0.1:2012", // RPC JSON listening address @@ -288,6 +308,7 @@ const CGRATES_CFG_JSON = ` "cdrs": { // CDRs config "enabled": false, // start the CDR Server: "extra_fields": [], // extra fields to store in CDRs for non-generic CDRs (ie: FreeSWITCH JSON) + "store_cdrs": true, // store cdrs in StorDB "session_cost_retries": 5, // number of queries to session_costs before recalculating CDR "chargers_conns": [], // connection to ChargerS for CDR forking, empty to disable billing for CDRs: <""|*internal|$rpc_conns_id> "attributes_conns": [], // connection to AttributeS for altering *raw CDRs, empty to disable attributes functionality: <""|*internal|$rpc_conns_id> diff --git a/config/config_json.go b/config/config_json.go index adb63d1c8..455632d6e 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -32,6 +32,7 @@ const ( ListenJSON = "listen" HTTPJSON = "http" DataDBJSON = "data_db" + StorDBJSON = "stor_db" FilterSJSON = "filters" CDRsJSON = "cdrs" SessionSJSON = "sessions" @@ -97,6 +98,7 @@ var ( AnalyzerSJSON: utils.AnalyzerS, DispatcherSJSON: utils.DispatcherS, DataDBJSON: utils.DataDB, + StorDBJSON: utils.StorDB, EEsJSON: utils.EEs, EFsJSON: utils.EFs, RateSJSON: utils.RateS, @@ -157,6 +159,7 @@ func newSections(cfg *CGRConfig) Sections { cfg.efsCfg, cfg.rpcConns, cfg.dataDbCfg, + cfg.storDbCfg, cfg.listenCfg, cfg.tlsCfg, cfg.httpCfg, diff --git a/config/configsanity.go b/config/configsanity.go index dc30c1ccb..757d4018c 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -825,6 +825,16 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } } + + // StorDB sanity checks + if cfg.storDbCfg.Type == utils.Postgres { + if !utils.IsSliceMember([]string{utils.PostgresSSLModeDisable, utils.PostgresSSLModeAllow, + utils.PostgresSSLModePrefer, utils.PostgresSSLModeRequire, utils.PostgresSSLModeVerifyCa, + utils.PostgresSSLModeVerifyFull}, cfg.storDbCfg.Opts.PgSSLMode) { + return fmt.Errorf("<%s> unsupported ssl mode for storDB", utils.StorDB) + } + } + // DataDB sanity checks if cfg.dataDbCfg.Type == utils.MetaInternal { for key, config := range cfg.cacheCfg.Partitions { diff --git a/config/datadbcfg.go b/config/datadbcfg.go index d19242073..0a07a88ed 100644 --- a/config/datadbcfg.go +++ b/config/datadbcfg.go @@ -561,7 +561,7 @@ func diffDataDBOptsJsonCfg(d *DBOptsJson, v1, v2 *DataDBOpts) *DBOptsJson { return d } -func diffDataDbJsonCfg(d *DbJsonCfg, v1, v2 *DataDbCfg) *DbJsonCfg { +func diffDataDBJsonCfg(d *DbJsonCfg, v1, v2 *DataDbCfg) *DbJsonCfg { if d == nil { d = new(DbJsonCfg) } diff --git a/config/stordbcfg.go b/config/stordbcfg.go new file mode 100644 index 000000000..bb6cb5df3 --- /dev/null +++ b/config/stordbcfg.go @@ -0,0 +1,323 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "fmt" + "reflect" + "strconv" + "strings" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/utils" +) + +type StorDBOpts struct { + SQLMaxOpenConns int + SQLMaxIdleConns int + SQLConnMaxLifetime time.Duration + SQLDSNParams map[string]string + MongoQueryTimeout time.Duration + PgSSLMode string + MySQLLocation string +} + +// StorDbCfg StroreDb config +type StorDbCfg struct { + Type string // should reflect the database type used to store logs + Host string // the host to connect to. Values that start with / are for UNIX domain sockets + Port string // the port to bind to + Name string // the name of the database to connect to + User string // the user to sign in as + Password string // the user's password + StringIndexedFields []string + PrefixIndexedFields []string + RmtConns []string // remote DataDB connIDs + RplConns []string // replication connIDs + Items map[string]*ItemOpts + Opts *StorDBOpts +} + +// loadStorDBCfg loads the StorDB section of the configuration +func (dbcfg *StorDbCfg) Load(ctx *context.Context, jsnCfg ConfigDB, _ *CGRConfig) (err error) { + jsnDataDbCfg := new(DbJsonCfg) + if err = jsnCfg.GetSection(ctx, StorDBJSON, jsnDataDbCfg); err != nil { + return + } + return dbcfg.loadFromJSONCfg(jsnDataDbCfg) +} + +func (dbOpts *StorDBOpts) loadFromJSONCfg(jsnCfg *DBOptsJson) (err error) { + if jsnCfg == nil { + return + } + if jsnCfg.SQLMaxOpenConns != nil { + dbOpts.SQLMaxOpenConns = *jsnCfg.SQLMaxOpenConns + } + if jsnCfg.SQLMaxIdleConns != nil { + dbOpts.SQLMaxIdleConns = *jsnCfg.SQLMaxIdleConns + } + if jsnCfg.SQLConnMaxLifetime != nil { + if dbOpts.SQLConnMaxLifetime, err = utils.ParseDurationWithNanosecs(*jsnCfg.SQLConnMaxLifetime); err != nil { + return + } + } + if jsnCfg.MYSQLDSNParams != nil { + dbOpts.SQLDSNParams = make(map[string]string) + dbOpts.SQLDSNParams = jsnCfg.MYSQLDSNParams + } + if jsnCfg.MongoQueryTimeout != nil { + if dbOpts.MongoQueryTimeout, err = utils.ParseDurationWithNanosecs(*jsnCfg.MongoQueryTimeout); err != nil { + return + } + } + if jsnCfg.PgSSLMode != nil { + dbOpts.PgSSLMode = *jsnCfg.PgSSLMode + } + if jsnCfg.MySQLLocation != nil { + dbOpts.MySQLLocation = *jsnCfg.MySQLLocation + } + return +} + +// loadFromJSONCfg loads StoreDb config from JsonCfg +func (dbcfg *StorDbCfg) loadFromJSONCfg(jsnDbCfg *DbJsonCfg) (err error) { + if jsnDbCfg == nil { + return nil + } + if jsnDbCfg.Db_type != nil { + if !strings.HasPrefix(*jsnDbCfg.Db_type, "*") { + dbcfg.Type = fmt.Sprintf("*%v", *jsnDbCfg.Db_type) + } else { + dbcfg.Type = *jsnDbCfg.Db_type + } + } + if jsnDbCfg.Db_host != nil { + dbcfg.Host = *jsnDbCfg.Db_host + } + if jsnDbCfg.Db_port != nil { + port := strconv.Itoa(*jsnDbCfg.Db_port) + if port == "-1" { + port = utils.MetaDynamic + } + dbcfg.Port = defaultDBPort(dbcfg.Type, port) + } + if jsnDbCfg.Db_name != nil { + dbcfg.Name = *jsnDbCfg.Db_name + } + if jsnDbCfg.Db_user != nil { + dbcfg.User = *jsnDbCfg.Db_user + } + if jsnDbCfg.Db_password != nil { + dbcfg.Password = *jsnDbCfg.Db_password + } + if jsnDbCfg.String_indexed_fields != nil { + dbcfg.StringIndexedFields = *jsnDbCfg.String_indexed_fields + } + if jsnDbCfg.Prefix_indexed_fields != nil { + dbcfg.PrefixIndexedFields = *jsnDbCfg.Prefix_indexed_fields + } + if jsnDbCfg.Remote_conns != nil { + dbcfg.RmtConns = make([]string, len(*jsnDbCfg.Remote_conns)) + for i, item := range *jsnDbCfg.Remote_conns { + if item == utils.MetaInternal { + return fmt.Errorf("Remote connection ID needs to be different than *internal ") + } + dbcfg.RmtConns[i] = item + } + } + if jsnDbCfg.Replication_conns != nil { + dbcfg.RplConns = make([]string, len(*jsnDbCfg.Replication_conns)) + for i, item := range *jsnDbCfg.Replication_conns { + if item == utils.MetaInternal { + return fmt.Errorf("Replication connection ID needs to be different than *internal ") + } + dbcfg.RplConns[i] = item + } + } + if jsnDbCfg.Items != nil { + for kJsn, vJsn := range jsnDbCfg.Items { + val := new(ItemOpts) + if err = val.loadFromJSONCfg(vJsn); err != nil { + return + } + dbcfg.Items[kJsn] = val + } + } + if jsnDbCfg.Opts != nil { + err = dbcfg.Opts.loadFromJSONCfg(jsnDbCfg.Opts) + } + return +} + +func (StorDbCfg) SName() string { return StorDBJSON } +func (dbcfg StorDbCfg) CloneSection() Section { return dbcfg.Clone() } + +func (dbOpts *StorDBOpts) Clone() *StorDBOpts { + return &StorDBOpts{ + SQLMaxOpenConns: dbOpts.SQLMaxOpenConns, + SQLMaxIdleConns: dbOpts.SQLMaxIdleConns, + SQLConnMaxLifetime: dbOpts.SQLConnMaxLifetime, + SQLDSNParams: dbOpts.SQLDSNParams, + MongoQueryTimeout: dbOpts.MongoQueryTimeout, + PgSSLMode: dbOpts.PgSSLMode, + MySQLLocation: dbOpts.MySQLLocation, + } +} + +// Clone returns the cloned object +func (dbcfg StorDbCfg) Clone() (cln *StorDbCfg) { + cln = &StorDbCfg{ + Type: dbcfg.Type, + Host: dbcfg.Host, + Port: dbcfg.Port, + Name: dbcfg.Name, + User: dbcfg.User, + Password: dbcfg.Password, + + Items: make(map[string]*ItemOpts), + Opts: dbcfg.Opts.Clone(), + } + for key, item := range dbcfg.Items { + cln.Items[key] = item.Clone() + } + if dbcfg.StringIndexedFields != nil { + cln.StringIndexedFields = utils.CloneStringSlice(dbcfg.StringIndexedFields) + } + if dbcfg.PrefixIndexedFields != nil { + cln.PrefixIndexedFields = utils.CloneStringSlice(dbcfg.PrefixIndexedFields) + } + if dbcfg.RmtConns != nil { + cln.RmtConns = utils.CloneStringSlice(dbcfg.RmtConns) + } + if dbcfg.RplConns != nil { + cln.RplConns = utils.CloneStringSlice(dbcfg.RplConns) + } + return +} + +// AsMapInterface returns the config as a map[string]interface{} +func (dbcfg StorDbCfg) AsMapInterface(string) interface{} { + opts := map[string]interface{}{ + utils.SQLMaxOpenConnsCfg: dbcfg.Opts.SQLMaxOpenConns, + utils.SQLMaxIdleConnsCfg: dbcfg.Opts.SQLMaxIdleConns, + utils.SQLConnMaxLifetime: dbcfg.Opts.SQLConnMaxLifetime.String(), + utils.MYSQLDSNParams: dbcfg.Opts.SQLDSNParams, + utils.MongoQueryTimeoutCfg: dbcfg.Opts.MongoQueryTimeout.String(), + utils.PgSSLModeCfg: dbcfg.Opts.PgSSLMode, + utils.MysqlLocation: dbcfg.Opts.MySQLLocation, + } + mp := map[string]interface{}{ + utils.DataDbTypeCfg: utils.Meta + dbcfg.Type, + utils.DataDbHostCfg: dbcfg.Host, + utils.DataDbNameCfg: dbcfg.Name, + utils.DataDbUserCfg: dbcfg.User, + utils.DataDbPassCfg: dbcfg.Password, + utils.StringIndexedFieldsCfg: dbcfg.StringIndexedFields, + utils.PrefixIndexedFieldsCfg: dbcfg.PrefixIndexedFields, + utils.RemoteConnsCfg: dbcfg.RmtConns, + utils.ReplicationConnsCfg: dbcfg.RplConns, + utils.OptsCfg: opts, + } + if dbcfg.Items != nil { + items := make(map[string]interface{}) + for key, item := range dbcfg.Items { + items[key] = item.AsMapInterface() + } + mp[utils.ItemsCfg] = items + } + if dbcfg.Port != utils.EmptyString { + dbPort, _ := strconv.Atoi(dbcfg.Port) + mp[utils.DataDbPortCfg] = dbPort + } + return mp +} + +func diffStorDBOptsJsonCfg(d *DBOptsJson, v1, v2 *StorDBOpts) *DBOptsJson { + if d == nil { + d = new(DBOptsJson) + } + if v1.SQLMaxOpenConns != v2.SQLMaxOpenConns { + d.SQLMaxOpenConns = utils.IntPointer(v2.SQLMaxOpenConns) + } + if v1.SQLMaxIdleConns != v2.SQLMaxIdleConns { + d.SQLMaxIdleConns = utils.IntPointer(v2.SQLMaxIdleConns) + } + if v1.SQLConnMaxLifetime != v2.SQLConnMaxLifetime { + d.SQLConnMaxLifetime = utils.StringPointer(v2.SQLConnMaxLifetime.String()) + } + if !reflect.DeepEqual(v1.SQLDSNParams, v2.SQLDSNParams) { + d.MYSQLDSNParams = v2.SQLDSNParams + } + if v1.MongoQueryTimeout != v2.MongoQueryTimeout { + d.MongoQueryTimeout = utils.StringPointer(v2.MongoQueryTimeout.String()) + } + if v1.PgSSLMode != v2.PgSSLMode { + d.PgSSLMode = utils.StringPointer(v2.PgSSLMode) + } + if v1.MySQLLocation != v2.MySQLLocation { + d.MySQLLocation = utils.StringPointer(v2.MySQLLocation) + } + return d +} + +func diffStorDBJsonCfg(d *DbJsonCfg, v1, v2 *StorDbCfg) *DbJsonCfg { + if d == nil { + d = new(DbJsonCfg) + } + if v1.Type != v2.Type { + d.Db_type = utils.StringPointer(v2.Type) + } + if v1.Host != v2.Host { + d.Db_host = utils.StringPointer(v2.Host) + } + if v1.Port != v2.Port { + port, _ := strconv.Atoi(v2.Port) + d.Db_port = utils.IntPointer(port) + } + if v1.Name != v2.Name { + d.Db_name = utils.StringPointer(v2.Name) + } + if v1.User != v2.User { + d.Db_user = utils.StringPointer(v2.User) + } + if v1.Password != v2.Password { + d.Db_password = utils.StringPointer(v2.Password) + } + if !utils.SliceStringEqual(v1.RmtConns, v2.RmtConns) { + d.Remote_conns = &v2.RmtConns + } + + if !utils.SliceStringEqual(v1.RplConns, v2.RplConns) { + d.Replication_conns = &v2.RplConns + } + + if !utils.SliceStringEqual(v1.StringIndexedFields, v2.StringIndexedFields) { + d.String_indexed_fields = &v2.StringIndexedFields + } + if !utils.SliceStringEqual(v1.PrefixIndexedFields, v2.PrefixIndexedFields) { + d.Prefix_indexed_fields = &v2.PrefixIndexedFields + } + + d.Items = diffMapItemOptJson(d.Items, v1.Items, v2.Items) + d.Opts = diffStorDBOptsJsonCfg(d.Opts, v1.Opts, v2.Opts) + + return d +} diff --git a/dispatchers/lib_test.go b/dispatchers/lib_test.go index d15c9c6e5..58037df04 100644 --- a/dispatchers/lib_test.go +++ b/dispatchers/lib_test.go @@ -65,7 +65,7 @@ type testDispatcher struct { cmd *exec.Cmd } -func newTestEngine(t *testing.T, cfgPath string, initDataDB bool) (d *testDispatcher) { +func newTestEngine(t *testing.T, cfgPath string, initDataDB, initStorDB bool) (d *testDispatcher) { d = new(testDispatcher) d.CfgPath = cfgPath var err error @@ -78,7 +78,9 @@ func newTestEngine(t *testing.T, cfgPath string, initDataDB bool) (d *testDispat if initDataDB { d.initDataDb(t) } - + if initStorDB { + d.resetStorDb(t) + } d.startEngine(t) return d } @@ -111,6 +113,13 @@ func (d *testDispatcher) initDataDb(t *testing.T) { } } +// Wipe out the cdr database +func (d *testDispatcher) resetStorDb(t *testing.T) { + if err := engine.InitStorDB(d.Cfg); err != nil { + t.Fatalf("Error at DataDB init:%v\n", err) + } +} + // func (d *testDispatcher) loadData(t *testing.T, path string) { // var reply string // attrs := &utils.AttrLoadTpFromFolder{FolderPath: path} @@ -143,9 +152,9 @@ func (d *testDispatcher) loadData2(t *testing.T, path string) { func testDsp(t *testing.T, tests []func(t *testing.T), testName, all, all2, disp, allTF, all2TF, attrTF string) { // engine.KillEngine(0) - allEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all), true) - allEngine2 = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all2), true) - dispEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", disp), true) + allEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all), true, true) + allEngine2 = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", all2), true, true) + dispEngine = newTestEngine(t, path.Join(*dataDir, "conf", "samples", "dispatchers", disp), true, true) dispEngine.loadData2(t, path.Join(*dataDir, "tariffplans", attrTF)) allEngine.loadData2(t, path.Join(*dataDir, "tariffplans", allTF)) allEngine2.loadData2(t, path.Join(*dataDir, "tariffplans", all2TF)) diff --git a/engine/cdrs.go b/engine/cdrs.go index 62f651de6..0c41776b4 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -43,24 +43,26 @@ func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) { } // NewCDRServer is a constructor for CDRServer -func NewCDRServer(cfg *config.CGRConfig, dm *DataManager, filterS *FilterS, +func NewCDRServer(cfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS, connMgr *ConnManager) *CDRServer { return &CDRServer{ - cfg: cfg, - dm: dm, - guard: guardian.Guardian, - fltrS: filterS, - connMgr: connMgr, + cfg: cfg, + dm: dm, + guard: guardian.Guardian, + fltrS: filterS, + connMgr: connMgr, + storDBChan: storDBChan, } } // CDRServer stores and rates CDRs type CDRServer struct { - cfg *config.CGRConfig - dm *DataManager - guard *guardian.GuardianLocker - fltrS *FilterS - connMgr *ConnManager + cfg *config.CGRConfig + dm *DataManager + guard *guardian.GuardianLocker + fltrS *FilterS + connMgr *ConnManager + storDBChan chan StorDB } // ListenAndServe listen for storbd reload @@ -69,6 +71,10 @@ func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) { select { case <-stopChan: return + case _, ok := <-cdrS.storDBChan: + if !ok { // the channel was closed by the shutdown of the StorDB Service + return + } } } } diff --git a/engine/libtest.go b/engine/libtest.go index 49071e998..a42414323 100644 --- a/engine/libtest.go +++ b/engine/libtest.go @@ -25,6 +25,8 @@ import ( "net/rpc/jsonrpc" "os" "os/exec" + "path" + "strings" "time" "github.com/cgrates/birpc/context" @@ -182,6 +184,30 @@ func InitDataDB(cfg *config.CGRConfig) error { return nil } +func InitStorDB(cfg *config.CGRConfig) error { + storDB, err := NewStorDBConn(cfg.StorDbCfg().Type, + cfg.StorDbCfg().Host, cfg.StorDbCfg().Port, + cfg.StorDbCfg().Name, cfg.StorDbCfg().User, + cfg.StorDbCfg().Password, cfg.GeneralCfg().DBDataEncoding, + cfg.StorDbCfg().StringIndexedFields, cfg.StorDbCfg().PrefixIndexedFields, + cfg.StorDbCfg().Opts, cfg.StorDbCfg().Items) + if err != nil { + return err + } + dbPath := strings.Trim(cfg.StorDbCfg().Type, "*") + if err := storDB.Flush(path.Join(cfg.DataFolderPath, "storage", + dbPath)); err != nil { + return err + } + if utils.IsSliceMember([]string{utils.MetaMongo, utils.MetaMySQL, utils.MetaPostgres}, + cfg.StorDbCfg().Type) { + if err := SetDBVersions(storDB); err != nil { + return err + } + } + return nil +} + func InitConfigDB(cfg *config.CGRConfig) error { d, err := NewDataDBConn(cfg.ConfigDBCfg().Type, cfg.ConfigDBCfg().Host, cfg.ConfigDBCfg().Port, diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 8222190d6..06b90467a 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -106,6 +106,10 @@ type DataDBDriver interface { config.ConfigDB } +type StorDB interface { + Storage +} + type LoadStorage interface { Storage LoadReader diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 22f4e97a2..81b54792c 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -32,7 +32,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -// InternalDB is used as a DataDB +// InternalDB is used as a DataDB and/or StorDB type InternalDB struct { stringIndexedFields []string prefixIndexedFields []string @@ -63,6 +63,20 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string, } } +// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe) +func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) { + iDB.indexedFieldsMutex.Lock() + iDB.stringIndexedFields = stringIndexedFields + iDB.indexedFieldsMutex.Unlock() +} + +// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe) +func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) { + iDB.indexedFieldsMutex.Lock() + iDB.prefixIndexedFields = prefixIndexedFields + iDB.indexedFieldsMutex.Unlock() +} + // Close only to implement Storage interface func (iDB *InternalDB) Close() {} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index fc9281d34..b39c18159 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -237,7 +237,7 @@ type MongoStorage struct { ctxTTL time.Duration ctxTTLMutex sync.RWMutex // used for TTL reload db string - storageType string // datadb + storageType string // datadb, stordb ms utils.Marshaler cdrsIndexes []string cnter *utils.Counter @@ -370,6 +370,11 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) { } } } + if ms.storageType == utils.StorDB { + if err = ms.ensureIndexesForCol(utils.CDRsTBL); err != nil { + return + } + } return } diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index cae580c3d..cec2f21d7 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -54,6 +54,7 @@ func NewMySQLStorage(host, port, name, user, password string, return &SQLStorage{ DB: mySQLStorage.DB, db: mySQLStorage.db, + StorDB: mySQLStorage, SQLImpl: mySQLStorage, }, nil } diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index b62def445..7a4662b64 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -19,13 +19,45 @@ along with this program. If not, see package engine import ( + "fmt" + "time" + "github.com/cgrates/cgrates/utils" + "gorm.io/driver/postgres" + "gorm.io/gorm" ) type PostgresStorage struct { SQLStorage } +// NewPostgresStorage returns the posgres storDB +func NewPostgresStorage(host, port, name, user, password, sslmode string, maxConn, maxIdleConn int, connMaxLifetime time.Duration) (*SQLStorage, error) { + connectString := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", host, port, name, user, password, sslmode) + db, err := gorm.Open(postgres.Open(connectString), &gorm.Config{AllowGlobalUpdate: true}) + if err != nil { + return nil, err + } + postgresStorage := new(PostgresStorage) + if postgresStorage.DB, err = db.DB(); err != nil { + return nil, err + } + if err = postgresStorage.DB.Ping(); err != nil { + return nil, err + } + postgresStorage.DB.SetMaxIdleConns(maxIdleConn) + postgresStorage.DB.SetMaxOpenConns(maxConn) + postgresStorage.DB.SetConnMaxLifetime(connMaxLifetime) + //db.LogMode(true) + postgresStorage.db = db + return &SQLStorage{ + DB: postgresStorage.DB, + db: postgresStorage.db, + StorDB: postgresStorage, + SQLImpl: postgresStorage, + }, nil +} + func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error) { tx := poS.db.Begin() if overwrite { @@ -49,6 +81,22 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error return } +func (poS *PostgresStorage) extraFieldsExistsQry(field string) string { + return fmt.Sprintf(" extra_fields ?'%s'", field) +} + +func (poS *PostgresStorage) extraFieldsValueQry(field, value string) string { + return fmt.Sprintf(" (extra_fields ->> '%s') = '%s'", field, value) +} + +func (poS *PostgresStorage) notExtraFieldsExistsQry(field string) string { + return fmt.Sprintf(" NOT extra_fields ?'%s'", field) +} + +func (poS *PostgresStorage) notExtraFieldsValueQry(field, value string) string { + return fmt.Sprintf(" NOT (extra_fields ?'%s' AND (extra_fields ->> '%s') = '%s')", field, field, value) +} + func (poS *PostgresStorage) GetStorageType() string { return utils.MetaPostgres } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index fbdf0940c..3e85f95f9 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -40,6 +40,7 @@ type SQLImpl interface { type SQLStorage struct { DB *sql.DB db *gorm.DB + StorDB SQLImpl } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 4664e40d7..0c02aab42 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -57,3 +57,25 @@ func NewDataDBConn(dbType, host, port, name, user, } return } + +// NewStorDBConn returns a StorDB(implements Storage interface) based on dbType +func NewStorDBConn(dbType, host, port, name, user, pass, marshaler string, + stringIndexedFields, prefixIndexedFields []string, + opts *config.StorDBOpts, itmsCfg map[string]*config.ItemOpts) (db StorDB, err error) { + switch dbType { + case utils.MetaMongo: + db, err = NewMongoStorage(host, port, name, user, pass, marshaler, utils.MetaStorDB, stringIndexedFields, opts.MongoQueryTimeout) + case utils.MetaPostgres: + db, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode, + opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLConnMaxLifetime) + case utils.MetaMySQL: + db, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, + opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.SQLDSNParams) + case utils.MetaInternal: + db = NewInternalDB(stringIndexedFields, prefixIndexedFields, itmsCfg) + default: + err = fmt.Errorf("unknown db '%s' valid options are [%s, %s, %s, %s]", + dbType, utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres, utils.MetaInternal) + } + return +} diff --git a/services/cdrs.go b/services/cdrs.go index 9f9cea029..ae36a2ec1 100644 --- a/services/cdrs.go +++ b/services/cdrs.go @@ -34,7 +34,7 @@ import ( // NewCDRServer returns the CDR Server func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, - filterSChan chan *engine.FilterS, + storDB *StorDBService, filterSChan chan *engine.FilterS, server *cores.Server, internalCDRServerChan chan birpc.ClientConnector, connMgr *engine.ConnManager, anz *AnalyzerService, srvDep map[string]*sync.WaitGroup) servmanager.Service { @@ -42,6 +42,7 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, connChan: internalCDRServerChan, cfg: cfg, dm: dm, + storDB: storDB, filterSChan: filterSChan, server: server, connMgr: connMgr, @@ -53,8 +54,9 @@ func NewCDRServer(cfg *config.CGRConfig, dm *DataDBService, // CDRServer implements Service interface type CDRServer struct { sync.RWMutex - cfg *config.CGRConfig - dm *DataDBService + cfg *config.CGRConfig + dm *DataDBService + storDB *StorDBService filterSChan chan *engine.FilterS server *cores.Server @@ -86,12 +88,14 @@ func (cdrService *CDRServer) Start(ctx *context.Context, _ context.CancelFunc) ( return } + storDBChan := make(chan engine.StorDB, 1) cdrService.stopChan = make(chan struct{}) + cdrService.storDB.RegisterSyncChan(storDBChan) cdrService.Lock() defer cdrService.Unlock() - cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, datadb, filterS, cdrService.connMgr) + cdrService.cdrS = engine.NewCDRServer(cdrService.cfg, storDBChan, datadb, filterS, cdrService.connMgr) go cdrService.cdrS.ListenAndServe(cdrService.stopChan) runtime.Gosched() utils.Logger.Info("Registering CDRS RPC service.") diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 92b9e975a..5f843ffd9 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -50,8 +50,10 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai srvManager: servmanager.NewServiceManager(shdWg, cM, cfg), server: server, // Rpc/http server srvDep: map[string]*sync.WaitGroup{ - utils.AnalyzerS: new(sync.WaitGroup), + utils.AccountS: new(sync.WaitGroup), + utils.ActionS: new(sync.WaitGroup), utils.AdminS: new(sync.WaitGroup), + utils.AnalyzerS: new(sync.WaitGroup), utils.AsteriskAgent: new(sync.WaitGroup), utils.AttributeS: new(sync.WaitGroup), utils.CDRServer: new(sync.WaitGroup), @@ -59,10 +61,10 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai utils.CoreS: new(sync.WaitGroup), utils.DataDB: new(sync.WaitGroup), utils.DiameterAgent: new(sync.WaitGroup), - utils.RegistrarC: new(sync.WaitGroup), utils.DispatcherS: new(sync.WaitGroup), utils.DNSAgent: new(sync.WaitGroup), utils.EEs: new(sync.WaitGroup), + utils.EFs: new(sync.WaitGroup), utils.ERs: new(sync.WaitGroup), utils.FreeSWITCHAgent: new(sync.WaitGroup), utils.GlobalVarS: new(sync.WaitGroup), @@ -71,17 +73,16 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai utils.LoaderS: new(sync.WaitGroup), utils.RadiusAgent: new(sync.WaitGroup), utils.RateS: new(sync.WaitGroup), + utils.RegistrarC: new(sync.WaitGroup), utils.ResourceS: new(sync.WaitGroup), utils.RouteS: new(sync.WaitGroup), utils.SchedulerS: new(sync.WaitGroup), utils.SessionS: new(sync.WaitGroup), utils.SIPAgent: new(sync.WaitGroup), utils.StatS: new(sync.WaitGroup), + utils.StorDB: new(sync.WaitGroup), utils.ThresholdS: new(sync.WaitGroup), - utils.ActionS: new(sync.WaitGroup), - utils.AccountS: new(sync.WaitGroup), utils.TPeS: new(sync.WaitGroup), - utils.EFs: new(sync.WaitGroup), }, iFilterSCh: make(chan *engine.FilterS, 1), } @@ -103,6 +104,7 @@ type CGREngine struct { // services gvS servmanager.Service dmS *DataDBService + sdbS *StorDBService anzS *AnalyzerService coreS *CoreService cacheS *CacheService @@ -194,6 +196,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr cgr.gvS = NewGlobalVarS(cgr.cfg, cgr.srvDep) cgr.dmS = NewDataDBService(cgr.cfg, cgr.cM, cgr.srvDep) + cgr.sdbS = NewStorDBService(cgr.cfg, cgr.srvDep) cgr.anzS = NewAnalyzerService(cgr.cfg, cgr.server, cgr.iFilterSCh, iAnalyzerSCh, cgr.srvDep) // init AnalyzerS @@ -217,7 +220,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr cgr.efs = NewExportFailoverService(cgr.cfg, cgr.cM, iEFsCh, cgr.server, cgr.srvDep) cgr.srvManager.AddServices(cgr.gvS, cgr.coreS, cgr.cacheS, - cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.efs, + cgr.ldrs, cgr.anzS, dspS, cgr.dmS, cgr.sdbS, cgr.efs, NewAdminSv1Service(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iAdminSCh, cgr.cM, cgr.anzS, cgr.srvDep), NewSessionService(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iSessionSCh, cgr.cM, cgr.anzS, cgr.srvDep), @@ -246,7 +249,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr NewEventExporterService(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.server, iEEsCh, cgr.anzS, cgr.srvDep), - NewCDRServer(cgr.cfg, cgr.dmS, cgr.iFilterSCh, cgr.server, iCDRServerCh, + NewCDRServer(cgr.cfg, cgr.dmS, cgr.sdbS, cgr.iFilterSCh, cgr.server, iCDRServerCh, cgr.cM, cgr.anzS, cgr.srvDep), NewRegistrarCService(cgr.cfg, cgr.server, cgr.cM, cgr.anzS, cgr.srvDep), @@ -272,7 +275,7 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu cgr.shdWg.Done() return } - if cgr.efs.ShouldRun() { // efs checking first beacause of loggers + if cgr.efs.ShouldRun() { // efs checking first because of loggers cgr.shdWg.Add(1) if err = cgr.efs.Start(ctx, shtDw); err != nil { cgr.shdWg.Done() @@ -286,7 +289,13 @@ func (cgr *CGREngine) StartServices(ctx *context.Context, shtDw context.CancelFu return } } - + if cgr.sdbS.ShouldRun() { + cgr.shdWg.Add(1) + if err = cgr.sdbS.Start(ctx, shtDw); err != nil { + cgr.shdWg.Done() + return + } + } if cgr.anzS.ShouldRun() { cgr.shdWg.Add(1) if err = cgr.anzS.Start(ctx, shtDw); err != nil { diff --git a/services/stordb.go b/services/stordb.go new file mode 100644 index 000000000..268dc72d6 --- /dev/null +++ b/services/stordb.go @@ -0,0 +1,188 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +// NewStorDBService returns the StorDB Service +func NewStorDBService(cfg *config.CGRConfig, + srvDep map[string]*sync.WaitGroup) *StorDBService { + return &StorDBService{ + cfg: cfg, + srvDep: srvDep, + } +} + +// StorDBService implements Service interface +type StorDBService struct { + sync.RWMutex + cfg *config.CGRConfig + oldDBCfg *config.StorDbCfg + + db engine.StorDB + syncChans []chan engine.StorDB + + srvDep map[string]*sync.WaitGroup +} + +// Start should handle the service start +func (db *StorDBService) Start(*context.Context, context.CancelFunc) (err error) { + if db.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + db.Lock() + defer db.Unlock() + db.oldDBCfg = db.cfg.StorDbCfg().Clone() + d, err := engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, + db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User, + db.cfg.StorDbCfg().Password, db.cfg.GeneralCfg().DBDataEncoding, + db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields, + db.cfg.StorDbCfg().Opts, db.cfg.StorDbCfg().Items) + if err != nil { // Cannot configure getter database, show stopper + utils.Logger.Crit(fmt.Sprintf("Could not configure storDB: %s exiting!", err)) + return + } + db.db = d + db.sync() + return +} + +// Reload handles the change of config +func (db *StorDBService) Reload(*context.Context, context.CancelFunc) (err error) { + db.Lock() + defer db.Unlock() + if db.needsConnectionReload() { + var d engine.StorDB + if d, err = engine.NewStorDBConn(db.cfg.StorDbCfg().Type, db.cfg.StorDbCfg().Host, + db.cfg.StorDbCfg().Port, db.cfg.StorDbCfg().Name, db.cfg.StorDbCfg().User, + db.cfg.StorDbCfg().Password, db.cfg.GeneralCfg().DBDataEncoding, + db.cfg.StorDbCfg().StringIndexedFields, db.cfg.StorDbCfg().PrefixIndexedFields, + db.cfg.StorDbCfg().Opts, db.cfg.StorDbCfg().Items); err != nil { + return + } + db.db.Close() + db.db = d + db.oldDBCfg = db.cfg.StorDbCfg().Clone() + db.sync() // sync only if needed + return + } + if db.cfg.StorDbCfg().Type == utils.Mongo { + mgo, canCast := db.db.(*engine.MongoStorage) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to MongoStorage", + db.cfg.StorDbCfg().Type) + } + mgo.SetTTL(db.cfg.StorDbCfg().Opts.MongoQueryTimeout) + } else if db.cfg.StorDbCfg().Type == utils.Postgres || + db.cfg.StorDbCfg().Type == utils.MySQL { + msql, canCast := db.db.(*engine.SQLStorage) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to SQLStorage", + db.cfg.StorDbCfg().Type) + } + msql.DB.SetMaxOpenConns(db.cfg.StorDbCfg().Opts.SQLMaxOpenConns) + msql.DB.SetMaxIdleConns(db.cfg.StorDbCfg().Opts.SQLMaxIdleConns) + msql.DB.SetConnMaxLifetime(db.cfg.StorDbCfg().Opts.SQLConnMaxLifetime) + } else if db.cfg.StorDbCfg().Type == utils.Internal { + idb, canCast := db.db.(*engine.InternalDB) + if !canCast { + return fmt.Errorf("can't conver StorDB of type %s to InternalDB", + db.cfg.StorDbCfg().Type) + } + idb.SetStringIndexedFields(db.cfg.StorDbCfg().StringIndexedFields) + idb.SetPrefixIndexedFields(db.cfg.StorDbCfg().PrefixIndexedFields) + } + return +} + +// Shutdown stops the service +func (db *StorDBService) Shutdown() (_ error) { + db.Lock() + db.db.Close() + db.db = nil + for _, c := range db.syncChans { + close(c) + } + db.syncChans = nil + db.Unlock() + return +} + +// IsRunning returns if the service is running +func (db *StorDBService) IsRunning() bool { + db.RLock() + defer db.RUnlock() + return db.isRunning() +} + +// isRunning returns if the service is running (not thread safe) +func (db *StorDBService) isRunning() bool { + return db != nil && db.db != nil +} + +// ServiceName returns the service name +func (db *StorDBService) ServiceName() string { + return utils.StorDB +} + +// ShouldRun returns if the service should be running +func (db *StorDBService) ShouldRun() bool { + return db.cfg.CdrsCfg().Enabled +} + +// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the storDB(thread safe) +func (db *StorDBService) RegisterSyncChan(c chan engine.StorDB) { + db.Lock() + db.syncChans = append(db.syncChans, c) + if db.isRunning() { + c <- db.db + } + db.Unlock() +} + +// sync sends the storDB over syncChansv (not thrad safe) +func (db *StorDBService) sync() { + if db.isRunning() { + for _, c := range db.syncChans { + c <- db.db + } + } +} + +// needsConnectionReload returns if the DB connection needs to reloaded +func (db *StorDBService) needsConnectionReload() bool { + if db.oldDBCfg.Type != db.cfg.StorDbCfg().Type || + db.oldDBCfg.Host != db.cfg.StorDbCfg().Host || + db.oldDBCfg.Name != db.cfg.StorDbCfg().Name || + db.oldDBCfg.Port != db.cfg.StorDbCfg().Port || + db.oldDBCfg.User != db.cfg.StorDbCfg().User || + db.oldDBCfg.Password != db.cfg.StorDbCfg().Password { + return true + } + return db.cfg.StorDbCfg().Type == utils.Postgres && + db.oldDBCfg.Opts.PgSSLMode != db.cfg.StorDbCfg().Opts.PgSSLMode +} diff --git a/utils/consts.go b/utils/consts.go index bfc51e31b..8ad5f058b 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -335,6 +335,7 @@ const ( MetaDumpToJSON = "*dump_to_json" NonTransactional = "" DataDB = "data_db" + StorDB = "stor_db" NotFoundCaps = "NOT_FOUND" ServerErrorCaps = "SERVER_ERROR" MandatoryIEMissingCaps = "MANDATORY_IE_MISSING" @@ -482,6 +483,7 @@ const ( //Destinations = "Destinations" MetaSubscribers = "*subscribers" MetaDataDB = "*datadb" + MetaStorDB = "*stordb" MetaWeight = "*weight" MetaLC = "*lc" MetaHC = "*hc" @@ -1680,6 +1682,8 @@ const ( CacheVersions = "*versions" CacheCapsEvents = "*caps_events" CacheReplicationHosts = "*replication_hosts" + // storDB + CacheCDRsTBL = "*cdrs" ) // Prefix for indexing @@ -1716,6 +1720,16 @@ const ( GoogleCredentialsFileName = "credentials.json" ) +// StorDB +var ( + PostgresSSLModeDisable = "disable" + PostgresSSLModeAllow = "allow" + PostgresSSLModePrefer = "prefer" + PostgresSSLModeRequire = "require" + PostgresSSLModeVerifyCa = "verify-ca" + PostgresSSLModeVerifyFull = "verify-full" +) + // GeneralCfg const ( NodeIDCfg = "node_id" @@ -1882,6 +1896,7 @@ const ( CDRsConnsCfg = "cdrs_conns" FiltersCfg = "filters" ExtraFieldsCfg = "extra_fields" + StoreCdrsCfg = "store_cdrs" SMCostRetriesCfg = "session_cost_retries" ChargerSConnsCfg = "chargers_conns" AttributeSConnsCfg = "attributes_conns"