Make Trends and TrendProfiles storable in MySQL and Postgres

This commit is contained in:
arberkatellari
2025-11-17 17:36:53 +02:00
committed by Dan Christian Bogos
parent 8a4244b26d
commit 19675d6b20
29 changed files with 1034 additions and 111 deletions

View File

@@ -19,6 +19,9 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package admins
import (
"fmt"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
@@ -128,6 +131,21 @@ func (a *AdminS) V1SetTrendProfile(ctx *context.Context, arg *utils.TrendProfile
if err = a.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheTrendProfiles and store it in database
loadID := time.Now().UnixNano()
if err = a.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
return utils.APIErrorHandler(err)
}
// delay if needed before cache call
if a.cfg.GeneralCfg().CachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<AdminSv1.SetTrendProfile> Delaying cache call for %v", a.cfg.GeneralCfg().CachingDelay))
time.Sleep(a.cfg.GeneralCfg().CachingDelay)
}
//handle caching for TrendProfile
if err = a.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles,
arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}
@@ -144,6 +162,21 @@ func (a *AdminS) V1RemoveTrendProfile(ctx *context.Context, args *utils.TenantID
if err := a.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil {
return utils.APIErrorHandler(err)
}
// delay if needed before cache call
if a.cfg.GeneralCfg().CachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<AdminSv1.RemoveTrendProfile> Delaying cache call for %v", a.cfg.GeneralCfg().CachingDelay))
time.Sleep(a.cfg.GeneralCfg().CachingDelay)
}
//handle caching for TrendProfile
if err := a.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles,
utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheTrendProfiles and store it in database
loadID := time.Now().UnixNano()
if err := a.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK
return nil
}

View File

@@ -183,6 +183,8 @@ const CGRATES_CFG_JSON = `
"*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
// compatible db types: <*internal|*redis|*mongo>
"*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
@@ -191,8 +193,6 @@ const CGRATES_CFG_JSON = `
"*ip_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"},
"*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},
"*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"},

View File

@@ -1045,6 +1045,8 @@ func (cfg *CGRConfig) checkConfigSanity() error {
utils.MetaRateProfiles,
utils.MetaRankingProfiles,
utils.MetaRankings,
utils.MetaTrendProfiles,
utils.MetaTrends,
}
for _, dbcfg := range cfg.dbCfg.DBConns {
if dbcfg.Type == utils.MetaInternal {

View File

@@ -1,6 +1,6 @@
{
"general": {
"log_level": 7
"logger": {
"level": 7
},
"schedulers": {
"enabled": true,

View File

@@ -47,7 +47,6 @@
{
"id": "CustomLoader",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -68,7 +67,6 @@
{
"id": "WithoutMoveToOut",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -90,7 +88,6 @@
{
"id": "SubpathLoaderWithoutMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -112,7 +109,6 @@
{
"id": "SubpathLoaderWithMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -134,7 +130,6 @@
{
"id": "LoaderWithTemplate",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -155,7 +150,6 @@
{
"id": "CustomSep",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "-1",
"lockfile_path": ".cgr.lock",

View File

@@ -91,7 +91,6 @@
{
"id": "CustomLoader",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -112,7 +111,6 @@
{
"id": "WithoutMoveToOut",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -134,7 +132,6 @@
{
"id": "SubpathLoaderWithoutMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -156,7 +153,6 @@
{
"id": "SubpathLoaderWithMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -178,7 +174,6 @@
{
"id": "LoaderWithTemplate",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -199,7 +194,6 @@
{
"id": "CustomSep",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "-1",
"lockfile_path": ".cgr.lock",

View File

@@ -55,7 +55,6 @@
{
"id": "CustomLoader",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -76,7 +75,6 @@
{
"id": "WithoutMoveToOut",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -98,7 +96,6 @@
{
"id": "SubpathLoaderWithoutMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -120,7 +117,6 @@
{
"id": "SubpathLoaderWithMove",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -142,7 +138,6 @@
{
"id": "LoaderWithTemplate",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "0",
"lockfile_path": ".cgr.lock",
@@ -163,7 +158,6 @@
{
"id": "CustomSep",
"enabled": true,
"dry_run": false,
"tenant": "cgrates.org",
"run_delay": "-1",
"lockfile_path": ".cgr.lock",

View File

@@ -1,7 +1,7 @@
{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"db": {

View File

@@ -1,7 +1,7 @@
{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"db": {

View File

@@ -1,7 +1,7 @@
{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"db": {

View File

@@ -1,7 +1,7 @@
{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"db": {

View File

@@ -0,0 +1,142 @@
{
// CGRateS Configuration file
//
"general": {
"reply_timeout": "50s",
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_conns": {
"*default": { // The id of the DB connection
"db_type": "redis", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1",
"db_port": 6379, // db port to reach the database
"db_name": "10", // db database name to connect to
"db_user": "cgrates",
},
"StorDB": { // The id of the DB connection
"db_type": "mysql", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // db port to reach the database
"db_name": "cgrates", // db database name to connect to
"db_user": "cgrates", // username to use when connecting to the database
"db_password": "CGRateS.org" // password to use when connecting to the database
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator":{
"users_filters":["Account"],
},
"admins": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -0,0 +1,142 @@
{
// CGRateS Configuration file
//
"general": {
"reply_timeout": "50s",
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_conns": {
"*default": { // The id of the DB connection
"db_type": "redis", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1",
"db_port": 6379, // db port to reach the database
"db_name": "10", // db database name to connect to
"db_user": "cgrates",
},
"StorDB": {
"db_type": "postgres",
"db_host": "127.0.0.1",
"db_port": 5432,
"db_name": "cgrates",
"db_user": "cgrates",
"db_password": "CGRateS.org"
}
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"},
"*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator":{
"users_filters":["Account"],
},
"admins": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -0,0 +1,140 @@
{
// CGRateS Configuration file
//
"general": {
"reply_timeout": "50s",
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_conns": {
"*default": { // The id of the DB connection
"db_type": "redis", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1",
"db_port": 6379, // db port to reach the database
"db_name": "10", // db database name to connect to
"db_user": "cgrates",
},
"StorDB": { // The id of the DB connection
"db_type": "mysql", // db type: <internal|redis|mysql|mongo|postgres>
"db_host": "127.0.0.1", // the host to connect to
"db_port": 3306, // db port to reach the database
"db_name": "cgrates", // db database name to connect to
"db_user": "cgrates", // username to use when connecting to the database
"db_password": "CGRateS.org" // password to use when connecting to the database
},
},
"items": {
"*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}
}
},
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"],
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": ["*internal"],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
},
"migrator":{
"users_filters":["Account"],
},
"admins": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"],
},
"tpes": {
"enabled": true
},
}

View File

@@ -171,8 +171,8 @@ CREATE TABLE rates (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`rate` JSON NOT NULL,
`rate_profile_id` VARCHAR(64) NOT NULL,
`rate` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id_rate_profile_id (`tenant`, `id`, `rate_profile_id`),
FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id)
@@ -199,3 +199,25 @@ CREATE TABLE rankings (
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX rankings_idx ON rankings (`id`);
DROP TABLE IF EXISTS trend_profiles;
CREATE TABLE trend_profiles (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`trend_profile` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX trend_profiles_idx ON trend_profiles (`id`);
DROP TABLE IF EXISTS trends;
CREATE TABLE trends (
`pk` int(11) NOT NULL AUTO_INCREMENT,
`tenant` VARCHAR(40) NOT NULL,
`id` VARCHAR(64) NOT NULL,
`trend` JSON NOT NULL,
PRIMARY KEY (`pk`),
UNIQUE KEY unique_tenant_id (`tenant`, `id`)
);
CREATE UNIQUE INDEX trends_idx ON trends (`id`);

View File

@@ -168,8 +168,8 @@ CREATE TABLE rates (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
rate JSONB NOT NULL,
rate_profile_id VARCHAR(64) NOT NULL,
rate JSONB NOT NULL,
UNIQUE (tenant, id, rate_profile_id),
FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id)
);
@@ -195,3 +195,25 @@ CREATE TABLE rankings (
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX rankings_idx ON rankings ("id");
DROP TABLE IF EXISTS trend_profiles;
CREATE TABLE trend_profiles (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
trend_profile JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX trend_profiles_idx ON trend_profiles ("id");
DROP TABLE IF EXISTS trends;
CREATE TABLE trends (
pk SERIAL PRIMARY KEY,
tenant VARCHAR(40) NOT NULL,
id VARCHAR(64) NOT NULL,
trend JSONB NOT NULL,
UNIQUE (tenant, id)
);
CREATE UNIQUE INDEX trends_idx ON trends ("id");

View File

@@ -650,6 +650,8 @@ func MapStringInterfaceToStoredStatQueue(m map[string]any) (*StoredStatQueue, er
} else if expiryStr, ok := itemMap[utils.ExpiryTime].(string); ok {
if parsedTime, err := time.Parse(time.RFC3339, expiryStr); err == nil {
sqItem.ExpiryTime = &parsedTime
} else {
return nil, err
}
}
ssq.SQItems = append(ssq.SQItems, sqItem)

View File

@@ -553,8 +553,8 @@ type RateMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Rate utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
RateProfileID string `gorm:"foreign_key" index:"3" re:".*"`
Rate utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (RateMdl) TableName() string {
@@ -582,3 +582,25 @@ type RankingJSONMdl struct {
func (RankingJSONMdl) TableName() string {
return utils.TBLRankings
}
type TrendProfileMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
TrendProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (TrendProfileMdl) TableName() string {
return utils.TBLTrendProfiles
}
type TrendJSONMdl struct {
PK uint `gorm:"primary_key"`
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
Trend utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"`
}
func (TrendJSONMdl) TableName() string {
return utils.TBLTrends
}

View File

@@ -133,6 +133,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankingProfiles, tntID)
case utils.RankingPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankings, tntID)
case utils.TrendProfilePrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrendProfiles, tntID)
case utils.TrendPrefix:
keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrends, tntID)
default:
err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix)
}
@@ -1300,6 +1304,95 @@ func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string
return
}
func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, tp *utils.TrendProfile) (err error) {
tx := sqls.db.Begin()
mdl := &TrendProfileMdl{
Tenant: tp.Tenant,
ID: tp.ID,
TrendProfile: tp.AsMapStringInterface(),
}
if err = tx.Model(&TrendProfileMdl{}).Where(
TrendProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
TrendProfileMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (tp *utils.TrendProfile, err error) {
var result []*TrendProfileMdl
if err = sqls.db.Model(&TrendProfileMdl{}).Where(&TrendProfileMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToTrendProfile(result[0].TrendProfile)
}
func (sqls *SQLStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&TrendProfileMdl{}).Where(&TrendProfileMdl{Tenant: tenant, ID: id}).
Delete(&TrendProfileMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
func (sqls *SQLStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (t *utils.Trend, err error) {
var result []*TrendJSONMdl
if err = sqls.db.Model(&TrendJSONMdl{}).Where(&TrendJSONMdl{Tenant: tenant,
ID: id}).Find(&result).Error; err != nil {
return nil, err
}
if len(result) == 0 {
return nil, utils.ErrNotFound
}
return utils.MapStringInterfaceToTrend(result[0].Trend)
}
func (sqls *SQLStorage) SetTrendDrv(ctx *context.Context, t *utils.Trend) (err error) {
tx := sqls.db.Begin()
mdl := &TrendJSONMdl{
Tenant: t.Tenant,
ID: t.ID,
Trend: t.AsMapStringInterface(),
}
if err = tx.Model(&TrendJSONMdl{}).Where(
TrendJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete(
TrendJSONMdl{}).Error; err != nil {
tx.Rollback()
return
}
if err = tx.Save(mdl).Error; err != nil {
tx.Rollback()
return
}
tx.Commit()
return
}
func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) {
tx := sqls.db.Begin()
if err = tx.Model(&TrendJSONMdl{}).Where(&TrendJSONMdl{Tenant: tenant, ID: id}).
Delete(&TrendJSONMdl{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return
}
// Used to check if specific subject is stored using prefix key attached to entity
func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (has bool, err error) {
var categoryModelMap = map[string]any{
@@ -1320,8 +1413,8 @@ func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tena
utils.RateProfilePrefix: &RateProfileJSONMdl{},
utils.RankingPrefix: &RankingJSONMdl{},
utils.RankingProfilePrefix: &RankingProfileMdl{},
// utils.TrendPrefix: &TrendJSONMdl{},
// utils.TrendProfilePrefix: &TrendProfileMdl{},
utils.TrendPrefix: &TrendJSONMdl{},
utils.TrendProfilePrefix: &TrendProfileMdl{},
}
model, ok := categoryModelMap[category]
if !ok {
@@ -1371,36 +1464,6 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool,
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) {
return nil, utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) {
return utils.ErrNotImplemented
}
// DataDB method not implemented yet
func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) {
return nil, utils.ErrNotImplemented

View File

@@ -42,8 +42,8 @@ func TestErsStartDelay(t *testing.T) {
t.Fatalf("could not write to file %s: %v", filePath, err)
}
content := fmt.Sprintf(`{
"general": {
"log_level": 7
"logger": {
"level": 7
},
"db": {
"db_conns": {

View File

@@ -144,8 +144,8 @@ func TestStressIPsAuthorize(t *testing.T) {
}
content := `{
"general": {
"log_level": 7
"logger": {
"level": 7
},
"db": {
"db_conns": {

View File

@@ -35,8 +35,8 @@ import (
func Benchmark10IPsAllocated(b *testing.B) {
content := `{
"general": {
"log_level": 7
"logger": {
"level": 7
},
"db": {
"db_conns": {

View File

@@ -42,9 +42,9 @@ func TestRankingSchedule(t *testing.T) {
}
content := `{
"general": {
"log_level": 7,
},
"logger": {
"level": 7
},
"db": {
"db_conns": {
"*default": {

View File

@@ -154,8 +154,8 @@ func TestRankingStore(t *testing.T) {
}
content := `{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"rankings": {
"enabled": true,

View File

@@ -32,10 +32,130 @@ import (
)
func TestTrendSchedule(t *testing.T) {
var dbcfg engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
case utils.MetaMySQL, utils.MetaRedis, utils.MetaMongo, utils.MetaPostgres:
dbcfg = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaInternal),
Opts: engine.Opts{
InternalDBDumpInterval: utils.StringPointer("0s"),
InternalDBRewriteInterval: utils.StringPointer("0s"),
},
},
},
},
}
case utils.MetaMySQL:
dbcfg = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrendProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrends: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholdProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholds: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
case utils.MetaRedis:
dbcfg = engine.RedisDBCfg
case utils.MetaMongo:
t.SkipNow()
case utils.MetaPostgres:
dbcfg = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaPostgres),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(5432),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrendProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrends: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholdProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaThresholds: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
default:
t.Fatal("unsupported dbtype value")
}
@@ -45,20 +165,6 @@ func TestTrendSchedule(t *testing.T) {
"level": 7,
},
"db": {
"db_conns": {
"*default": {
"db_type": "*internal",
"opts":{
"internalDBRewriteInterval": "0s",
"internalDBDumpInterval": "0s"
}
}
},
},
"trends": {
"enabled": true,
"store_interval": "-1",
@@ -119,6 +225,7 @@ cgrates.org,Threshold1,*string:~*req.Metrics.*acd.ID:*acd,;10,-1,0,1s,false,,tru
cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,true,`}
ng := engine.TestEngine{
ConfigJSON: content,
DBCfg: dbcfg,
TpFiles: tpFiles,
Encoding: *utils.Encoding,
}

View File

@@ -34,21 +34,119 @@ import (
func TestTrendStore(t *testing.T) {
var dbConfig engine.DBCfg
switch *utils.DBType {
case utils.MetaInternal:
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaInternal),
Opts: engine.Opts{
InternalDBDumpInterval: utils.StringPointer("0s"),
InternalDBRewriteInterval: utils.StringPointer("0s"),
},
},
},
},
}
case utils.MetaRedis:
t.SkipNow()
dbConfig = engine.RedisDBCfg
case utils.MetaMySQL:
dbConfig = engine.MySQLDBCfg
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaMySQL),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(3306),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrendProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrends: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
case utils.MetaMongo:
dbConfig = engine.MongoDBCfg
case utils.MetaInternal, utils.MetaPostgres:
t.SkipNow()
case utils.MetaPostgres:
dbConfig = engine.DBCfg{
DB: &engine.DBParams{
DBConns: map[string]engine.DBConn{
utils.MetaDefault: {
Type: utils.StringPointer(utils.MetaRedis),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(6379),
Name: utils.StringPointer("10"),
User: utils.StringPointer(utils.CGRateSLwr),
},
utils.StorDB: {
Type: utils.StringPointer(utils.MetaPostgres),
Host: utils.StringPointer("127.0.0.1"),
Port: utils.IntPointer(5432),
Name: utils.StringPointer(utils.CGRateSLwr),
User: utils.StringPointer(utils.CGRateSLwr),
Password: utils.StringPointer("CGRateS.org"),
},
},
Items: map[string]engine.Item{
utils.MetaCDRs: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrendProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaTrends: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueueProfiles: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
utils.MetaStatQueues: {
Limit: utils.IntPointer(-1),
DbConn: utils.StringPointer(utils.StorDB),
},
},
},
}
default:
t.Fatal("unsupported dbtype value")
}
content := `{
"general": {
"log_level": 7,
"logger": {
"level": 7
},
"trends": {
"enabled": true,

View File

@@ -76,11 +76,11 @@ func TestTrendsIT(t *testing.T) {
case utils.MetaMongo:
trConfigDIR = "tutmongo"
case utils.MetaRedis:
t.SkipNow()
trConfigDIR = "trends_redis"
case utils.MetaMySQL:
trConfigDIR = "tutmysql"
trConfigDIR = "trends_mysql"
case utils.MetaPostgres:
t.SkipNow()
trConfigDIR = "trends_postgres"
default:
t.Fatal("Unknown Database type")
}

View File

@@ -211,16 +211,20 @@ const (
FallbackSubject = "FallbackSubject"
DryRun = "DryRun"
CustomValue = "CustomValue"
Value = "Value"
Rules = "Rules"
Metrics = "Metrics"
MetricID = "MetricID"
LastUsed = "LastUsed"
PDD = "PDD"
RouteStr = "Route"
RunID = "RunID"
MetaRunID = "*runID"
CustomValue = "CustomValue"
Value = "Value"
Rules = "Rules"
Metrics = "Metrics"
RunTimes = "RunTimes"
CompressedMetrics = "CompressedMetrics"
TrendGrowth = "TrendGrowth"
TrendLabel = "TrendLabel"
MetricID = "MetricID"
LastUsed = "LastUsed"
PDD = "PDD"
RouteStr = "Route"
RunID = "RunID"
MetaRunID = "*runID"
AttributeIDs = "AttributeIDs"
MetaOptsRunID = "*opts.*runID"
@@ -1927,9 +1931,11 @@ const (
TBLFilters = "filters"
TBLRouteProfiles = "route_profiles"
TBLRateProfiles = "rate_profiles"
TBLRates = "rates"
TBLRankingProfiles = "ranking_profiles"
TBLRankings = "rankings"
TBLRates = "rates"
TBLTrendProfiles = "trend_profiles"
TBLTrends = "trends"
OldSMCosts = "sm_costs"
TBLTPDispatchers = "tp_dispatcher_profiles"
TBLTPDispatcherHosts = "tp_dispatcher_hosts"

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package utils
import (
"encoding/base64"
"maps"
"math"
"slices"
@@ -224,6 +225,72 @@ func (tp *TrendProfile) FieldAsInterface(fldPath []string) (_ any, err error) {
}
}
// AsMapStringInterface converts TrendProfile struct to map[string]any
func (tp *TrendProfile) AsMapStringInterface() map[string]any {
if tp == nil {
return nil
}
return map[string]any{
Tenant: tp.Tenant,
ID: tp.ID,
Schedule: tp.Schedule,
StatID: tp.StatID,
Metrics: tp.Metrics,
TTL: tp.TTL,
QueueLength: tp.QueueLength,
MinItems: tp.MinItems,
CorrelationType: tp.CorrelationType,
Tolerance: tp.Tolerance,
Stored: tp.Stored,
ThresholdIDs: tp.ThresholdIDs,
}
}
// MapStringInterfaceToTrendProfile converts map[string]any to TrendProfile struct
func MapStringInterfaceToTrendProfile(m map[string]any) (*TrendProfile, error) {
tp := &TrendProfile{}
if v, ok := m[Tenant].(string); ok {
tp.Tenant = v
}
if v, ok := m[ID].(string); ok {
tp.ID = v
}
if v, ok := m[Schedule].(string); ok {
tp.Schedule = v
}
if v, ok := m[StatID].(string); ok {
tp.StatID = v
}
tp.Metrics = InterfaceToStringSlice(m[Metrics])
if v, ok := m[TTL].(string); ok {
if dur, err := time.ParseDuration(v); err != nil {
return nil, err
} else {
tp.TTL = dur
}
} else if v, ok := m[TTL].(float64); ok { // for -1 cases
tp.TTL = time.Duration(v)
}
if v, ok := m[QueueLength].(float64); ok {
tp.QueueLength = int(v)
}
if v, ok := m[MinItems].(float64); ok {
tp.MinItems = int(v)
}
if v, ok := m[CorrelationType].(string); ok {
tp.CorrelationType = v
}
if v, ok := m[Tolerance].(float64); ok {
tp.Tolerance = v
}
if v, ok := m[Stored].(bool); ok {
tp.Stored = v
}
tp.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs])
return tp, nil
}
// Trend represents a collection of metrics with trend analysis.
type Trend struct {
tMux sync.RWMutex
@@ -540,3 +607,76 @@ func GetTrendLabel(tGrowth float64, tolerance float64) (lbl string) {
}
return
}
// AsMapStringInterface converts Trend struct to map[string]any
func (t *Trend) AsMapStringInterface() map[string]any {
if t == nil {
return nil
}
return map[string]any{
Tenant: t.Tenant,
ID: t.ID,
RunTimes: t.RunTimes,
Metrics: t.Metrics,
CompressedMetrics: t.CompressedMetrics,
}
}
// MapStringInterfaceToTrend converts map[string]any to Trend struct
func MapStringInterfaceToTrend(m map[string]any) (*Trend, error) {
t := &Trend{}
if v, ok := m[Tenant].(string); ok {
t.Tenant = v
}
if v, ok := m[ID].(string); ok {
t.ID = v
}
if v, ok := m[RunTimes].([]any); ok {
for _, rt := range v {
if timeStr, ok := rt.(string); ok {
parsedTime, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
return nil, err
}
t.RunTimes = append(t.RunTimes, parsedTime)
}
}
}
if cMetrics, ok := m[CompressedMetrics].(string); ok {
var err error
if t.CompressedMetrics, err = base64.StdEncoding.DecodeString(cMetrics); err != nil {
return nil, err
}
}
if v, ok := m[Metrics].(map[string]any); ok {
t.Metrics = make(map[time.Time]map[string]*MetricWithTrend)
for timeStr, innerMap := range v {
parsedTime, err := time.Parse(time.RFC3339, timeStr)
if err != nil {
return nil, err
}
if innerMetrics, ok := innerMap.(map[string]any); ok {
t.Metrics[parsedTime] = make(map[string]*MetricWithTrend)
for metricKey, metricVal := range innerMetrics {
if metricData, ok := metricVal.(map[string]any); ok {
mwt := &MetricWithTrend{}
if id, ok := metricData[ID].(string); ok {
mwt.ID = id
}
if value, ok := metricData[Value].(float64); ok {
mwt.Value = value
}
if trendGrowth, ok := metricData[TrendGrowth].(float64); ok {
mwt.TrendGrowth = trendGrowth
}
if trendLabel, ok := metricData[TrendLabel].(string); ok {
mwt.TrendLabel = trendLabel
}
t.Metrics[parsedTime][metricKey] = mwt
}
}
}
}
}
return t, nil
}