From dcdf55e5d430c50b25c91916c21eb18fa2b19e79 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Fri, 7 Nov 2025 15:45:30 +0200 Subject: [PATCH] Make StatQueues and StatQueueProfiles storable in MySQL and Postgres --- apis/stats_it_test.go | 4 +- config/config_defaults.go | 4 +- config/configsanity.go | 2 + .../samples/routes_cases_mysql/cgrates.json | 4 +- .../routes_cases_postgres/cgrates.json | 125 ++++++++++++++++ .../samples/routes_cases_redis/cgrates.json | 123 ++++++++++++++++ data/conf/samples/stats_mysql/cgrates.json | 4 +- data/conf/samples/stats_postgres/cgrates.json | 59 ++++++++ data/conf/samples/stats_redis/cgrates.json | 57 ++++++++ data/storage/mysql/create_db_tables.sql | 24 ++- data/storage/postgres/create_db_tables.sql | 21 +++ engine/libstats.go | 138 ++++++++++++++++++ engine/models.go | 22 +++ engine/storage_mysql.go | 7 +- engine/storage_postgres.go | 7 +- engine/storage_sql.go | 132 +++++++++++++---- engine/storage_utils.go | 4 +- general_tests/route_stats_it_test.go | 4 +- utils/consts.go | 6 + 19 files changed, 704 insertions(+), 43 deletions(-) create mode 100644 data/conf/samples/routes_cases_postgres/cgrates.json create mode 100644 data/conf/samples/routes_cases_redis/cgrates.json create mode 100644 data/conf/samples/stats_postgres/cgrates.json create mode 100644 data/conf/samples/stats_redis/cgrates.json diff --git a/apis/stats_it_test.go b/apis/stats_it_test.go index 925566063..3e9f9daee 100644 --- a/apis/stats_it_test.go +++ b/apis/stats_it_test.go @@ -89,11 +89,11 @@ func TestStatsIT(t *testing.T) { case utils.MetaMongo: sqConfigDIR = "stats_mongo" case utils.MetaRedis: - t.SkipNow() + sqConfigDIR = "stats_redis" case utils.MetaMySQL: sqConfigDIR = "stats_mysql" case utils.MetaPostgres: - t.SkipNow() + sqConfigDIR = "stats_postgres" default: t.Fatal("Unknown Database type") } diff --git a/config/config_defaults.go b/config/config_defaults.go index 8656c60eb..585d3b93b 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -174,11 +174,11 @@ const CGRATES_CFG_JSON = ` "*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, // compatible db types: <*internal|*redis|*mongo> "*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, diff --git a/config/configsanity.go b/config/configsanity.go index c20015dfa..1af67639d 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1035,6 +1035,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { utils.MetaAttributeProfiles, utils.MetaResourceProfiles, utils.MetaResources, + utils.MetaStatQueueProfiles, + utils.MetaStatQueues, } for _, dbcfg := range cfg.dbCfg.DBConns { if dbcfg.Type == utils.MetaInternal { diff --git a/data/conf/samples/routes_cases_mysql/cgrates.json b/data/conf/samples/routes_cases_mysql/cgrates.json index a129f87b9..d92d2f12b 100644 --- a/data/conf/samples/routes_cases_mysql/cgrates.json +++ b/data/conf/samples/routes_cases_mysql/cgrates.json @@ -26,7 +26,9 @@ } }, "items": { - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} } }, diff --git a/data/conf/samples/routes_cases_postgres/cgrates.json b/data/conf/samples/routes_cases_postgres/cgrates.json new file mode 100644 index 000000000..a7b7f17aa --- /dev/null +++ b/data/conf/samples/routes_cases_postgres/cgrates.json @@ -0,0 +1,125 @@ +{ +"general": { + "reply_timeout": "50s" +}, + +"logger": { + "level": 7 +}, + +"db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "postgres", + "db_host": "127.0.0.1", + "db_port": 5432, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"attributes": { + "enabled": true, + "stats_conns": ["*internal"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"] +}, + +"cdrs": { + "enabled": true +}, + +"resources": { + "enabled": true, + "store_interval": "-1" +}, + +"loaders": [ + { + "id": "*default", + "enabled": true, + "tenant": "cgrates.org", + "tp_in_dir": "/usr/share/cgrates/tariffplans/tutroutes", + "tp_out_dir": "", + "lockfile_path": "" + } +], + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"] +}, + +"thresholds": { + "enabled": true, + "store_interval": "-1" +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*localhost"], + "rates_conns": ["*internal"] +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"] +}, + +"admins": { + "enabled": true +}, + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*localhost"] +} + + } + \ No newline at end of file diff --git a/data/conf/samples/routes_cases_redis/cgrates.json b/data/conf/samples/routes_cases_redis/cgrates.json new file mode 100644 index 000000000..a129f87b9 --- /dev/null +++ b/data/conf/samples/routes_cases_redis/cgrates.json @@ -0,0 +1,123 @@ +{ +"general": { + "reply_timeout": "50s" +}, + +"logger": { + "level": 7 +}, + +"db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "mysql", + "db_host": "127.0.0.1", + "db_port": 3306, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"attributes": { + "enabled": true, + "stats_conns": ["*internal"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"] +}, + +"cdrs": { + "enabled": true +}, + +"resources": { + "enabled": true, + "store_interval": "-1" +}, + +"loaders": [ + { + "id": "*default", + "enabled": true, + "tenant": "cgrates.org", + "tp_in_dir": "/usr/share/cgrates/tariffplans/tutroutes", + "tp_out_dir": "", + "lockfile_path": "" + } +], + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"] +}, + +"thresholds": { + "enabled": true, + "store_interval": "-1" +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*localhost"], + "rates_conns": ["*internal"] +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"] +}, + +"admins": { + "enabled": true +}, + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*localhost"] +} + + } + \ No newline at end of file diff --git a/data/conf/samples/stats_mysql/cgrates.json b/data/conf/samples/stats_mysql/cgrates.json index 1c7cef146..0342ab2d8 100644 --- a/data/conf/samples/stats_mysql/cgrates.json +++ b/data/conf/samples/stats_mysql/cgrates.json @@ -24,7 +24,9 @@ } }, "items": { - "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} } }, diff --git a/data/conf/samples/stats_postgres/cgrates.json b/data/conf/samples/stats_postgres/cgrates.json new file mode 100644 index 000000000..f6f229899 --- /dev/null +++ b/data/conf/samples/stats_postgres/cgrates.json @@ -0,0 +1,59 @@ +{ + // CGRateS Configuration file + // will be used in apis/stats_it_test.go + "logger": { + "level": 7 + }, + + "db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "postgres", + "db_host": "127.0.0.1", + "db_port": 5432, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } + }, + + "listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address + }, + + "actions": { + "enabled": true, + "thresholds_conns": ["*internal"], + }, + + "stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"] + }, + + "thresholds": { + "enabled": true, + "store_interval": "-1", + "actions_conns": ["*internal"], + }, + + "admins": { + "enabled": true, + } + } diff --git a/data/conf/samples/stats_redis/cgrates.json b/data/conf/samples/stats_redis/cgrates.json new file mode 100644 index 000000000..1c7cef146 --- /dev/null +++ b/data/conf/samples/stats_redis/cgrates.json @@ -0,0 +1,57 @@ +{ + // CGRateS Configuration file + // will be used in apis/stats_it_test.go + "logger": { + "level": 7 + }, + + "db": { + "db_conns": { + "*default": { + "db_type": "redis", + "db_host": "127.0.0.1", + "db_port": 6379, + "db_name": "10", + "db_user": "cgrates" + }, + "StorDB": { + "db_type": "mysql", + "db_host": "127.0.0.1", + "db_port": 3306, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } + }, + + "listen": { + "rpc_json": ":2012", // RPC JSON listening address + "rpc_gob": ":2013", // RPC GOB listening address + "http": ":2080", // HTTP listening address + }, + + "actions": { + "enabled": true, + "thresholds_conns": ["*internal"], + }, + + "stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"] + }, + + "thresholds": { + "enabled": true, + "store_interval": "-1", + "actions_conns": ["*internal"], + }, + + "admins": { + "enabled": true, + } + } diff --git a/data/storage/mysql/create_db_tables.sql b/data/storage/mysql/create_db_tables.sql index dda60e944..9fbdd9c70 100644 --- a/data/storage/mysql/create_db_tables.sql +++ b/data/storage/mysql/create_db_tables.sql @@ -88,4 +88,26 @@ CREATE TABLE resources ( PRIMARY KEY (`pk`), UNIQUE KEY unique_tenant_id (`tenant`, `id`) ); -CREATE UNIQUE INDEX resources_idx ON resources (`id`); \ No newline at end of file +CREATE UNIQUE INDEX resources_idx ON resources (`id`); + +DROP TABLE IF EXISTS stat_queue_profiles; +CREATE TABLE stat_queue_profiles ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `stat_queue_profile` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX stat_queue_profiles_idx ON stat_queue_profiles (`id`); + +DROP TABLE IF EXISTS stat_queues; +CREATE TABLE stat_queues ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `stat_queue` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`); \ No newline at end of file diff --git a/data/storage/postgres/create_db_tables.sql b/data/storage/postgres/create_db_tables.sql index 6775c95f7..9b39d121e 100644 --- a/data/storage/postgres/create_db_tables.sql +++ b/data/storage/postgres/create_db_tables.sql @@ -87,3 +87,24 @@ CREATE TABLE resources ( UNIQUE (tenant, id) ); CREATE UNIQUE INDEX resources_idx ON resources ("id"); + +DROP TABLE IF EXISTS stat_queue_profiles; +CREATE TABLE stat_queue_profiles ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + stat_queue_profile JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX stat_queue_profiles_idx ON stat_queue_profiles ("id"); + + +DROP TABLE IF EXISTS stat_queues; +CREATE TABLE stat_queues ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + stat_queue JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX stat_queues_idx ON stat_queues ("id"); diff --git a/engine/libstats.go b/engine/libstats.go index 4b78b105b..2f38c0ba4 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -20,6 +20,7 @@ package engine import ( "bytes" + "encoding/base64" "encoding/gob" "encoding/json" "fmt" @@ -614,6 +615,65 @@ func (sq *StatQueue) CacheClone() any { return sq.Clone() } +// AsMapStringInterface converts StoredStatQueue struct to map[string]any +func (ssq *StoredStatQueue) AsMapStringInterface() map[string]any { + if ssq == nil { + return nil + } + return map[string]any{ + utils.Tenant: ssq.Tenant, + utils.ID: ssq.ID, + utils.SQItems: ssq.SQItems, + utils.SQMetrics: ssq.SQMetrics, + utils.Compressed: ssq.Compressed, + } +} + +// MapStringInterfaceToStoredStatQueue converts map[string]any to StoredStatQueue struct +func MapStringInterfaceToStoredStatQueue(m map[string]any) (*StoredStatQueue, error) { + ssq := &StoredStatQueue{} + if v, ok := m[utils.Tenant].(string); ok { + ssq.Tenant = v + } + if v, ok := m[utils.ID].(string); ok { + ssq.ID = v + } + if items, ok := m[utils.SQItems].([]any); ok { + for _, item := range items { + if itemMap, ok := item.(map[string]any); ok { + sqItem := SQItem{} + if eventID, ok := itemMap[utils.EventID].(string); ok { + sqItem.EventID = eventID + } + if expiryTime, ok := itemMap[utils.ExpiryTime].(*time.Time); ok { + sqItem.ExpiryTime = expiryTime + } else if expiryStr, ok := itemMap[utils.ExpiryTime].(string); ok { + if parsedTime, err := time.Parse(time.RFC3339, expiryStr); err == nil { + sqItem.ExpiryTime = &parsedTime + } + } + ssq.SQItems = append(ssq.SQItems, sqItem) + } + } + } + if metrics, ok := m[utils.SQMetrics].(map[string]any); ok { + ssq.SQMetrics = make(map[string][]byte) + for key, value := range metrics { + if metricBytes, ok := value.(string); ok { + decodedBytes, err := base64.StdEncoding.DecodeString(metricBytes) + if err != nil { + return nil, fmt.Errorf("failed to decode base64 string: %v", err) + } + ssq.SQMetrics[key] = decodedBytes + } + } + } + if v, ok := m[utils.Compressed].(bool); ok { + ssq.Compressed = v + } + return ssq, nil +} + // unlockStatQueues unlocks all locked StatQueues in the given slice. func unlockStatQueues(sqs []*StatQueue) { for _, sq := range sqs { @@ -867,3 +927,81 @@ func (mf *MetricWithFilters) FieldAsInterface(fldPath []string) (_ any, err erro return mf.Blockers, nil } } + +// AsMapStringInterface converts StatQueueProfile struct to map[string]any +func (sqp *StatQueueProfile) AsMapStringInterface() map[string]any { + if sqp == nil { + return nil + } + return map[string]any{ + utils.Tenant: sqp.Tenant, + utils.ID: sqp.ID, + utils.FilterIDs: sqp.FilterIDs, + utils.Weights: sqp.Weights, + utils.Blockers: sqp.Blockers, + utils.QueueLength: sqp.QueueLength, + utils.TTL: sqp.TTL, + utils.MinItems: sqp.MinItems, + utils.Stored: sqp.Stored, + utils.ThresholdIDs: sqp.ThresholdIDs, + utils.Metrics: sqp.Metrics, + } +} + +// MapStringInterfaceToStatQueueProfile converts map[string]any to StatQueueProfile struct +func MapStringInterfaceToStatQueueProfile(m map[string]any) (*StatQueueProfile, error) { + sqp := &StatQueueProfile{} + if v, ok := m[utils.Tenant].(string); ok { + sqp.Tenant = v + } + if v, ok := m[utils.ID].(string); ok { + sqp.ID = v + } + sqp.FilterIDs = utils.InterfaceToStringSlice(m[utils.FilterIDs]) + sqp.Weights = utils.InterfaceToDynamicWeights(m[utils.Weights]) + sqp.Blockers = utils.InterfaceToDynamicBlockers(m[utils.Blockers]) + if v, ok := m[utils.QueueLength].(float64); ok { + sqp.QueueLength = int(v) + } + if v, ok := m[utils.TTL].(string); ok { + if dur, err := time.ParseDuration(v); err != nil { + return nil, err + } else { + sqp.TTL = dur + } + } else if v, ok := m[utils.TTL].(float64); ok { // for -1 cases + sqp.TTL = time.Duration(v) + } + if v, ok := m[utils.MinItems].(float64); ok { + sqp.MinItems = int(v) + } + if v, ok := m[utils.Stored].(bool); ok { + sqp.Stored = v + } + sqp.ThresholdIDs = utils.InterfaceToStringSlice(m[utils.ThresholdIDs]) + sqp.Metrics = InterfaceToMetrics(m[utils.Metrics]) + return sqp, nil +} + +func InterfaceToMetrics(v any) []*MetricWithFilters { + if v == nil { + return nil + } + switch val := v.(type) { + case []any: + result := make([]*MetricWithFilters, 0, len(val)) + for _, item := range val { + if itemMap, ok := item.(map[string]any); ok { + metric := &MetricWithFilters{} + if metricID, ok := itemMap[utils.MetricID].(string); ok { + metric.MetricID = metricID + } + metric.FilterIDs = utils.InterfaceToStringSlice(itemMap[utils.FilterIDs]) + metric.Blockers = utils.InterfaceToDynamicBlockers(itemMap[utils.Blockers]) + result = append(result, metric) + } + } + return result + } + return nil +} diff --git a/engine/models.go b/engine/models.go index 0a7f1c807..26beac4b5 100644 --- a/engine/models.go +++ b/engine/models.go @@ -470,3 +470,25 @@ type ResourceJSONMdl struct { func (ResourceJSONMdl) TableName() string { return utils.TBLResources } + +type StatQueueProfileMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + StatQueueProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (StatQueueProfileMdl) TableName() string { + return utils.TBLStatQueueProfiles +} + +type StatQueueMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + StatQueue utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (StatQueueMdl) TableName() string { + return utils.TBLStatQueues +} diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 14b4a8b29..1c3b8fb2d 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -32,9 +32,13 @@ type MySQLStorage struct { SQLStorage } -func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleConn, +func NewMySQLStorage(host, port, name, user, password, mrshlerStr string, maxConn, maxIdleConn, logLevel int, connMaxLifetime time.Duration, location string, dsnParams map[string]string) (*SQLStorage, error) { + ms, err := utils.NewMarshaler(mrshlerStr) + if err != nil { + return nil, err + } connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=%s&parseTime=true&sql_mode='ALLOW_INVALID_DATES'", user, password, host, port, name, location) db, err := gorm.Open(mysql.Open(connectString+AppendToMysqlDSNOpts(dsnParams)), &gorm.Config{AllowGlobalUpdate: true, Logger: logger.Default.LogMode(logger.LogLevel(logLevel))}) @@ -56,6 +60,7 @@ func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleCo return &SQLStorage{ DB: mySQLStorage.DB, db: mySQLStorage.db, + ms: ms, DataDB: mySQLStorage, SQLImpl: mySQLStorage, }, nil diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 45e23dc6f..8711e0ff9 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -33,9 +33,13 @@ type PostgresStorage struct { } // NewPostgresStorage returns the posgres DB -func NewPostgresStorage(host, port, name, user, password, sslmode, sslcert, +func NewPostgresStorage(host, port, name, user, password, mrshlerStr, sslmode, sslcert, sslkey, sslpassword, sslcertmode, sslrootcert string, maxConn, maxIdleConn, sqlLogLevel int, connMaxLifetime time.Duration) (*SQLStorage, error) { + ms, err := utils.NewMarshaler(mrshlerStr) + if err != nil { + return nil, err + } connStr := fmt.Sprintf( "host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", host, port, name, user, password, sslmode) @@ -73,6 +77,7 @@ func NewPostgresStorage(host, port, name, user, password, sslmode, sslcert, return &SQLStorage{ DB: pgStor.DB, db: pgStor.db, + ms: ms, DataDB: pgStor, SQLImpl: pgStor, }, nil diff --git a/engine/storage_sql.go b/engine/storage_sql.go index fe0cea72b..4e62b2206 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -44,6 +44,7 @@ type SQLImpl interface { type SQLStorage struct { DB *sql.DB db *gorm.DB + ms utils.Marshaler DataDB SQLImpl } @@ -114,6 +115,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResourceProfiles, tntID) case utils.ResourcesPrefix: keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResources, tntID) + case utils.StatQueueProfilePrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueueProfiles, tntID) + case utils.StatQueuePrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueues, tntID) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) } @@ -754,6 +759,103 @@ func (sqls *SQLStorage) RemoveResourceDrv(ctx *context.Context, tenant, id strin return } +func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) { + var result []*StatQueueProfileMdl + if err = sqls.db.Model(&StatQueueProfileMdl{}).Where(&StatQueueProfileMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return MapStringInterfaceToStatQueueProfile(result[0].StatQueueProfile) +} + +func (sqls *SQLStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { + tx := sqls.db.Begin() + mdl := &StatQueueProfileMdl{ + Tenant: sq.Tenant, + ID: sq.ID, + StatQueueProfile: sq.AsMapStringInterface(), + } + if err = tx.Model(&StatQueueProfileMdl{}).Where( + StatQueueProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + StatQueueProfileMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&StatQueueProfileMdl{}).Where(&StatQueueProfileMdl{Tenant: tenant, ID: id}). + Delete(&StatQueueProfileMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + +func (sqls *SQLStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { + var result []*StatQueueMdl + if err = sqls.db.Model(&StatQueueMdl{}).Where(&StatQueueMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + ssq, err := MapStringInterfaceToStoredStatQueue(result[0].StatQueue) + if err != nil { + return nil, err + } + return ssq.AsStatQueue(sqls.ms) +} + +func (sqls *SQLStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { + if ssq == nil { + if ssq, err = NewStoredStatQueue(sq, sqls.ms); err != nil { + return + } + } + tx := sqls.db.Begin() + mdl := &StatQueueMdl{ + Tenant: ssq.Tenant, + ID: ssq.ID, + StatQueue: ssq.AsMapStringInterface(), + } + if err = tx.Model(&StatQueueMdl{}).Where( + StatQueueMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + StatQueueMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&StatQueueMdl{}).Where(&StatQueueMdl{Tenant: tenant, ID: id}). + Delete(&StatQueueMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + // AddLoadHistory DataDB method not implemented yet func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance, loadHistSize int, transactionID string) error { @@ -796,36 +898,6 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool, return nil, utils.ErrNotImplemented } -// GetStatQueueProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) { - return nil, utils.ErrNotImplemented -} - -// SetStatQueueProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) { - return utils.ErrNotImplemented -} - -// RemStatQueueProfileDrv DataDB method not implemented yet -func (sqls *SQLStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - -// GetStatQueueDrv DataDB method not implemented yet -func (sqls *SQLStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) { - return nil, utils.ErrNotImplemented -} - -// SetStatQueueDrv DataDB method not implemented yet -func (sqls *SQLStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) { - return utils.ErrNotImplemented -} - -// RemStatQueueDrv DataDB method not implemented yet -func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - // DataDB method not implemented yet func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) { return utils.ErrNotImplemented diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 0172d5570..6e343f6e1 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -55,12 +55,12 @@ func NewDataDBConn(dbType, host, port, name, user, d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass, marshaler, stringIndexedFields, opts.MongoQueryTimeout) case utils.MetaPostgres: - d, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode, + d, err = NewPostgresStorage(host, port, name, user, pass, marshaler, opts.PgSSLMode, opts.PgSSLCert, opts.PgSSLKey, opts.PgSSLPassword, opts.PgSSLCertMode, opts.PgSSLRootCert, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime) case utils.MetaMySQL: - d, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns, + d, err = NewMySQLStorage(host, port, name, user, pass, marshaler, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime, opts.MySQLLocation, opts.SQLDSNParams) case utils.MetaInternal: diff --git a/general_tests/route_stats_it_test.go b/general_tests/route_stats_it_test.go index d2420779c..3ffd19dfe 100644 --- a/general_tests/route_stats_it_test.go +++ b/general_tests/route_stats_it_test.go @@ -64,13 +64,13 @@ func TestRtStatsCaseV1IT(t *testing.T) { case utils.MetaInternal: RtStatsSv1ConfDIR = "routes_cases_internal" case utils.MetaRedis: - t.SkipNow() + RtStatsSv1ConfDIR = "routes_cases_redis" case utils.MetaMySQL: RtStatsSv1ConfDIR = "routes_cases_mysql" case utils.MetaMongo: RtStatsSv1ConfDIR = "routes_cases_mongo" case utils.MetaPostgres: - t.SkipNow() + RtStatsSv1ConfDIR = "routes_cases_postgres" default: t.Fatal("Unknown Database type") } diff --git a/utils/consts.go b/utils/consts.go index 488ef93ed..30a461df9 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -505,6 +505,7 @@ const ( CDRKey = "CDR" CDRs = "CDRs" ExpiryTime = "ExpiryTime" + EventID = "EventID" AllowNegative = "AllowNegative" Disabled = "Disabled" Action = "Action" @@ -1919,6 +1920,8 @@ const ( TBLAttributeProfiles = "attribute_profiles" TBLResourceProfiles = "resource_profiles" TBLResources = "resources" + TBLStatQueueProfiles = "stat_queue_profiles" + TBLStatQueues = "stat_queues" OldSMCosts = "sm_costs" TBLTPDispatchers = "tp_dispatcher_profiles" TBLTPDispatcherHosts = "tp_dispatcher_hosts" @@ -2699,6 +2702,9 @@ const ( // Stats OptsStatsProfileIDs = "*statsProfileIDs" OptsRoundingDecimals = "*roundingDecimals" + SQItems = "SQItems" + SQMetrics = "SQMetrics" + Compressed = "Compressed" // Thresholds ThresholdHit = "ThresholdHit"