Make StatQueues and StatQueueProfiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-07 15:45:30 +02:00
committed by Dan Christian Bogos
parent bb5d589dce
commit dcdf55e5d4
19 changed files with 704 additions and 43 deletions

View File

@@ -89,11 +89,11 @@ func TestStatsIT(t *testing.T) {
case utils.MetaMongo:
sqConfigDIR = "stats_mongo"
case utils.MetaRedis:
t.SkipNow()
sqConfigDIR = "stats_redis"
case utils.MetaMySQL:
sqConfigDIR = "stats_mysql"
case utils.MetaPostgres:
t.SkipNow()
sqConfigDIR = "stats_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -174,11 +174,11 @@ const CGRATES_CFG_JSON = `
"*attribute_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resource_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*resources": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*threshold_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*thresholds": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*filters": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},

View File

@@ -1035,6 +1035,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.MetaAttributeProfiles,
utils.MetaResourceProfiles,
utils.MetaResources,
utils.MetaStatQueueProfiles,
utils.MetaStatQueues,
}
for _, dbcfg := range cfg.dbCfg.DBConns {
if dbcfg.Type == utils.MetaInternal {

View File

@@ -26,7 +26,9 @@
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},

View File

@@ -0,0 +1,125 @@
{
"general": {
"reply_timeout": "50s"
},
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "postgres",
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"attributes": {
"enabled": true,
"stats_conns": ["*internal"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"]
},
"cdrs": {
"enabled": true
},
"resources": {
"enabled": true,
"store_interval": "-1"
},
"loaders": [
{
"id": "*default",
"enabled": true,
"tenant": "cgrates.org",
"tp_in_dir": "/usr/share/cgrates/tariffplans/tutroutes",
"tp_out_dir": "",
"lockfile_path": ""
}
],
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1"
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*localhost"],
"rates_conns": ["*internal"]
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"]
},
"admins": {
"enabled": true
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*localhost"]
}
}

View File

@@ -0,0 +1,123 @@
{
"general": {
"reply_timeout": "50s"
},
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"attributes": {
"enabled": true,
"stats_conns": ["*internal"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"]
},
"cdrs": {
"enabled": true
},
"resources": {
"enabled": true,
"store_interval": "-1"
},
"loaders": [
{
"id": "*default",
"enabled": true,
"tenant": "cgrates.org",
"tp_in_dir": "/usr/share/cgrates/tariffplans/tutroutes",
"tp_out_dir": "",
"lockfile_path": ""
}
],
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1"
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*localhost"],
"rates_conns": ["*internal"]
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"]
},
"admins": {
"enabled": true
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*localhost"]
}
}

View File

@@ -24,7 +24,9 @@
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},

View File

@@ -0,0 +1,59 @@
{
// CGRateS Configuration file
// will be used in apis/stats_it_test.go
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "postgres",
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueue_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*statqueues": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"actions": {
"enabled": true,
"thresholds_conns": ["*internal"],
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
"actions_conns": ["*internal"],
},
"admins": {
"enabled": true,
}
}

View File

@@ -0,0 +1,57 @@
{
// CGRateS Configuration file
// will be used in apis/stats_it_test.go
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {
"db_type": "redis",
"db_host": "127.0.0.1",
"db_port": 6379,
"db_name": "10",
"db_user": "cgrates"
},
"StorDB": {
"db_type": "mysql",
"db_host": "127.0.0.1",
"db_port": 3306,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"listen": {
"rpc_json": ":2012", // RPC JSON listening address
"rpc_gob": ":2013", // RPC GOB listening address
"http": ":2080", // HTTP listening address
},
"actions": {
"enabled": true,
"thresholds_conns": ["*internal"],
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
"actions_conns": ["*internal"],
},
"admins": {
"enabled": true,
}
}

View File

@@ -88,4 +88,26 @@ CREATE TABLE resources (
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX resources_idx ON resources (`id`);
CREATE UNIQUE INDEX resources_idx ON resources (`id`);
DROP TABLE IF EXISTS stat_queue_profiles;
CREATE TABLE stat_queue_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`stat_queue_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX stat_queue_profiles_idx ON stat_queue_profiles (`id`);
DROP TABLE IF EXISTS stat_queues;
CREATE TABLE stat_queues (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`stat_queue` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX stat_queues_idx ON stat_queues (`id`);

View File

@@ -87,3 +87,24 @@ CREATE TABLE resources (
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX resources_idx ON resources ("id");
DROP TABLE IF EXISTS stat_queue_profiles;
CREATE TABLE stat_queue_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
stat_queue_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX stat_queue_profiles_idx ON stat_queue_profiles ("id");
DROP TABLE IF EXISTS stat_queues;
CREATE TABLE stat_queues (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
stat_queue JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX stat_queues_idx ON stat_queues ("id");

View File

@@ -20,6 +20,7 @@ package engine
import (
"bytes"
"encoding/base64"
"encoding/gob"
"encoding/json"
"fmt"
@@ -614,6 +615,65 @@ func (sq *StatQueue) CacheClone() any {
return sq.Clone()
}
// AsMapStringInterface converts StoredStatQueue struct to map[string]any
func (ssq *StoredStatQueue) AsMapStringInterface() map[string]any {
if ssq == nil {
return nil
}
return map[string]any{
utils.Tenant: ssq.Tenant,
utils.ID: ssq.ID,
utils.SQItems: ssq.SQItems,
utils.SQMetrics: ssq.SQMetrics,
utils.Compressed: ssq.Compressed,
}
}
// MapStringInterfaceToStoredStatQueue converts map[string]any to StoredStatQueue struct
func MapStringInterfaceToStoredStatQueue(m map[string]any) (*StoredStatQueue, error) {
ssq := &StoredStatQueue{}
if v, ok := m[utils.Tenant].(string); ok {
ssq.Tenant = v
}
if v, ok := m[utils.ID].(string); ok {
ssq.ID = v
}
if items, ok := m[utils.SQItems].([]any); ok {
for _, item := range items {
if itemMap, ok := item.(map[string]any); ok {
sqItem := SQItem{}
if eventID, ok := itemMap[utils.EventID].(string); ok {
sqItem.EventID = eventID
}
if expiryTime, ok := itemMap[utils.ExpiryTime].(*time.Time); ok {
sqItem.ExpiryTime = expiryTime
} else if expiryStr, ok := itemMap[utils.ExpiryTime].(string); ok {
if parsedTime, err := time.Parse(time.RFC3339, expiryStr); err == nil {
sqItem.ExpiryTime = &parsedTime
}
}
ssq.SQItems = append(ssq.SQItems, sqItem)
}
}
}
if metrics, ok := m[utils.SQMetrics].(map[string]any); ok {
ssq.SQMetrics = make(map[string][]byte)
for key, value := range metrics {
if metricBytes, ok := value.(string); ok {
decodedBytes, err := base64.StdEncoding.DecodeString(metricBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode base64 string: %v", err)
}
ssq.SQMetrics[key] = decodedBytes
}
}
}
if v, ok := m[utils.Compressed].(bool); ok {
ssq.Compressed = v
}
return ssq, nil
}
// unlockStatQueues unlocks all locked StatQueues in the given slice.
func unlockStatQueues(sqs []*StatQueue) {
for _, sq := range sqs {
@@ -867,3 +927,81 @@ func (mf *MetricWithFilters) FieldAsInterface(fldPath []string) (_ any, err erro
return mf.Blockers, nil
}
}
// AsMapStringInterface converts StatQueueProfile struct to map[string]any
func (sqp *StatQueueProfile) AsMapStringInterface() map[string]any {
if sqp == nil {
return nil
}
return map[string]any{
utils.Tenant: sqp.Tenant,
utils.ID: sqp.ID,
utils.FilterIDs: sqp.FilterIDs,
utils.Weights: sqp.Weights,
utils.Blockers: sqp.Blockers,
utils.QueueLength: sqp.QueueLength,
utils.TTL: sqp.TTL,
utils.MinItems: sqp.MinItems,
utils.Stored: sqp.Stored,
utils.ThresholdIDs: sqp.ThresholdIDs,
utils.Metrics: sqp.Metrics,
}
}
// MapStringInterfaceToStatQueueProfile converts map[string]any to StatQueueProfile struct
func MapStringInterfaceToStatQueueProfile(m map[string]any) (*StatQueueProfile, error) {
sqp := &StatQueueProfile{}
if v, ok := m[utils.Tenant].(string); ok {
sqp.Tenant = v
}
if v, ok := m[utils.ID].(string); ok {
sqp.ID = v
}
sqp.FilterIDs = utils.InterfaceToStringSlice(m[utils.FilterIDs])
sqp.Weights = utils.InterfaceToDynamicWeights(m[utils.Weights])
sqp.Blockers = utils.InterfaceToDynamicBlockers(m[utils.Blockers])
if v, ok := m[utils.QueueLength].(float64); ok {
sqp.QueueLength = int(v)
}
if v, ok := m[utils.TTL].(string); ok {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
sqp.TTL = dur
}
} else if v, ok := m[utils.TTL].(float64); ok { // for -1 cases
sqp.TTL = time.Duration(v)
}
if v, ok := m[utils.MinItems].(float64); ok {
sqp.MinItems = int(v)
}
if v, ok := m[utils.Stored].(bool); ok {
sqp.Stored = v
}
sqp.ThresholdIDs = utils.InterfaceToStringSlice(m[utils.ThresholdIDs])
sqp.Metrics = InterfaceToMetrics(m[utils.Metrics])
return sqp, nil
}
func InterfaceToMetrics(v any) []*MetricWithFilters {
if v == nil {
return nil
}
switch val := v.(type) {
case []any:
result := make([]*MetricWithFilters, 0, len(val))
for _, item := range val {
if itemMap, ok := item.(map[string]any); ok {
metric := &MetricWithFilters{}
if metricID, ok := itemMap[utils.MetricID].(string); ok {
metric.MetricID = metricID
}
metric.FilterIDs = utils.InterfaceToStringSlice(itemMap[utils.FilterIDs])
metric.Blockers = utils.InterfaceToDynamicBlockers(itemMap[utils.Blockers])
result = append(result, metric)
}
}
return result
}
return nil
}

View File

@@ -470,3 +470,25 @@ type ResourceJSONMdl struct {
func (ResourceJSONMdl) TableName() string {
return utils.TBLResources
}
type StatQueueProfileMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
StatQueueProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (StatQueueProfileMdl) TableName() string {
return utils.TBLStatQueueProfiles
}
type StatQueueMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
StatQueue utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (StatQueueMdl) TableName() string {
return utils.TBLStatQueues
}

View File

@@ -32,9 +32,13 @@ type MySQLStorage struct {
SQLStorage
}
func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleConn,
func NewMySQLStorage(host, port, name, user, password, mrshlerStr string, maxConn, maxIdleConn,
logLevel int, connMaxLifetime time.Duration, location string,
dsnParams map[string]string) (*SQLStorage, error) {
ms, err := utils.NewMarshaler(mrshlerStr)
if err != nil {
return nil, err
}
connectString := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&loc=%s&parseTime=true&sql_mode='ALLOW_INVALID_DATES'",
user, password, host, port, name, location)
db, err := gorm.Open(mysql.Open(connectString+AppendToMysqlDSNOpts(dsnParams)), &gorm.Config{AllowGlobalUpdate: true, Logger: logger.Default.LogMode(logger.LogLevel(logLevel))})
@@ -56,6 +60,7 @@ func NewMySQLStorage(host, port, name, user, password string, maxConn, maxIdleCo
return &SQLStorage{
DB: mySQLStorage.DB,
db: mySQLStorage.db,
ms: ms,
DataDB: mySQLStorage,
SQLImpl: mySQLStorage,
}, nil

View File

@@ -33,9 +33,13 @@ type PostgresStorage struct {
}
// NewPostgresStorage returns the posgres DB
func NewPostgresStorage(host, port, name, user, password, sslmode, sslcert,
func NewPostgresStorage(host, port, name, user, password, mrshlerStr, sslmode, sslcert,
sslkey, sslpassword, sslcertmode, sslrootcert string,
maxConn, maxIdleConn, sqlLogLevel int, connMaxLifetime time.Duration) (*SQLStorage, error) {
ms, err := utils.NewMarshaler(mrshlerStr)
if err != nil {
return nil, err
}
connStr := fmt.Sprintf(
"host=%s port=%s dbname=%s user=%s password=%s sslmode=%s",
host, port, name, user, password, sslmode)
@@ -73,6 +77,7 @@ func NewPostgresStorage(host, port, name, user, password, sslmode, sslcert,
return &SQLStorage{
DB: pgStor.DB,
db: pgStor.db,
ms: ms,
DataDB: pgStor,
SQLImpl: pgStor,
}, nil

View File

@@ -44,6 +44,7 @@ type SQLImpl interface {
type SQLStorage struct {
DB *sql.DB
db *gorm.DB
ms utils.Marshaler
DataDB
SQLImpl
}
@@ -114,6 +115,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResourceProfiles, tntID)
case utils.ResourcesPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLResources, tntID)
case utils.StatQueueProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueueProfiles, tntID)
case utils.StatQueuePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLStatQueues, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -754,6 +759,103 @@ func (sqls *SQLStorage) RemoveResourceDrv(ctx *context.Context, tenant, id strin
return
}
func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) {
var result []*StatQueueProfileMdl
if err = sqls.db.Model(&StatQueueProfileMdl{}).Where(&StatQueueProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return MapStringInterfaceToStatQueueProfile(result[0].StatQueueProfile)
}
func (sqls *SQLStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) {
tx := sqls.db.Begin()
mdl := &StatQueueProfileMdl{
Tenant: sq.Tenant,
ID: sq.ID,
StatQueueProfile: sq.AsMapStringInterface(),
}
if err = tx.Model(&StatQueueProfileMdl{}).Where(
StatQueueProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
StatQueueProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&StatQueueProfileMdl{}).Where(&StatQueueProfileMdl{Tenant: tenant, ID: id}).
Delete(&StatQueueProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) {
var result []*StatQueueMdl
if err = sqls.db.Model(&StatQueueMdl{}).Where(&StatQueueMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
ssq, err := MapStringInterfaceToStoredStatQueue(result[0].StatQueue)
if err != nil {
return nil, err
}
return ssq.AsStatQueue(sqls.ms)
}
func (sqls *SQLStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) {
if ssq == nil {
if ssq, err = NewStoredStatQueue(sq, sqls.ms); err != nil {
return
}
}
tx := sqls.db.Begin()
mdl := &StatQueueMdl{
Tenant: ssq.Tenant,
ID: ssq.ID,
StatQueue: ssq.AsMapStringInterface(),
}
if err = tx.Model(&StatQueueMdl{}).Where(
StatQueueMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
StatQueueMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&StatQueueMdl{}).Where(&StatQueueMdl{Tenant: tenant, ID: id}).
Delete(&StatQueueMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// AddLoadHistory DataDB method not implemented yet
func (sqls *SQLStorage) AddLoadHistory(ldInst *utils.LoadInstance,
loadHistSize int, transactionID string) error {
@@ -796,36 +898,6 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
return nil, utils.ErrNotImplemented
}
// GetStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetStatQueueProfileDrv(ctx *context.Context, tenant string, id string) (sq *StatQueueProfile, err error) {
return nil, utils.ErrNotImplemented
}
// SetStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetStatQueueProfileDrv(ctx *context.Context, sq *StatQueueProfile) (err error) {
return utils.ErrNotImplemented
}
// RemStatQueueProfileDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemStatQueueProfileDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// GetStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) GetStatQueueDrv(ctx *context.Context, tenant, id string) (sq *StatQueue, err error) {
return nil, utils.ErrNotImplemented
}
// SetStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) SetStatQueueDrv(ctx *context.Context, ssq *StoredStatQueue, sq *StatQueue) (err error) {
return utils.ErrNotImplemented
}
// RemStatQueueDrv DataDB method not implemented yet
func (sqls *SQLStorage) RemStatQueueDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) {
return utils.ErrNotImplemented

View File

@@ -55,12 +55,12 @@ func NewDataDBConn(dbType, host, port, name, user,
d, err = NewMongoStorage(opts.MongoConnScheme, host, port, name, user, pass,
marshaler, stringIndexedFields, opts.MongoQueryTimeout)
case utils.MetaPostgres:
d, err = NewPostgresStorage(host, port, name, user, pass, opts.PgSSLMode,
d, err = NewPostgresStorage(host, port, name, user, pass, marshaler, opts.PgSSLMode,
opts.PgSSLCert, opts.PgSSLKey, opts.PgSSLPassword, opts.PgSSLCertMode,
opts.PgSSLRootCert, opts.SQLMaxOpenConns, opts.SQLMaxIdleConns,
opts.SQLLogLevel, opts.SQLConnMaxLifetime)
case utils.MetaMySQL:
d, err = NewMySQLStorage(host, port, name, user, pass, opts.SQLMaxOpenConns,
d, err = NewMySQLStorage(host, port, name, user, pass, marshaler, opts.SQLMaxOpenConns,
opts.SQLMaxIdleConns, opts.SQLLogLevel, opts.SQLConnMaxLifetime,
opts.MySQLLocation, opts.SQLDSNParams)
case utils.MetaInternal:

View File

@@ -64,13 +64,13 @@ func TestRtStatsCaseV1IT(t *testing.T) {
case utils.MetaInternal:
RtStatsSv1ConfDIR = "routes_cases_internal"
case utils.MetaRedis:
t.SkipNow()
RtStatsSv1ConfDIR = "routes_cases_redis"
case utils.MetaMySQL:
RtStatsSv1ConfDIR = "routes_cases_mysql"
case utils.MetaMongo:
RtStatsSv1ConfDIR = "routes_cases_mongo"
case utils.MetaPostgres:
t.SkipNow()
RtStatsSv1ConfDIR = "routes_cases_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -505,6 +505,7 @@ const (
CDRKey = "CDR"
CDRs = "CDRs"
ExpiryTime = "ExpiryTime"
EventID = "EventID"
AllowNegative = "AllowNegative"
Disabled = "Disabled"
Action = "Action"
@@ -1919,6 +1920,8 @@ const (
TBLAttributeProfiles = "attribute_profiles"
TBLResourceProfiles = "resource_profiles"
TBLResources = "resources"
TBLStatQueueProfiles = "stat_queue_profiles"
TBLStatQueues = "stat_queues"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"
@@ -2699,6 +2702,9 @@ const (
// Stats
OptsStatsProfileIDs = "*statsProfileIDs"
OptsRoundingDecimals = "*roundingDecimals"
SQItems = "SQItems"
SQMetrics = "SQMetrics"
Compressed = "Compressed"
// Thresholds
ThresholdHit = "ThresholdHit"