From 19675d6b2063117179620b65c7b184e8439350f1 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Mon, 17 Nov 2025 17:36:53 +0200 Subject: [PATCH] Make Trends and TrendProfiles storable in MySQL and Postgres --- admins/trends.go | 33 ++++ config/config_defaults.go | 4 +- config/configsanity.go | 2 + data/conf/samples/diambench/cgrates.json | 4 +- .../samples/loaders/tutinternal/cgrates.json | 6 - .../samples/loaders/tutmongo/cgrates.json | 6 - .../samples/loaders/tutmysql/cgrates.json | 6 - .../samples/trends/trends_mongo/cgrates.json | 4 +- .../samples/trends/trends_mysql/cgrates.json | 4 +- .../trends/trends_schedIDs_mongo/cgrates.json | 4 +- .../trends/trends_schedIDs_mysql/cgrates.json | 4 +- data/conf/samples/trends_mysql/cgrates.json | 142 ++++++++++++++++++ .../conf/samples/trends_postgres/cgrates.json | 142 ++++++++++++++++++ data/conf/samples/trends_redis/cgrates.json | 140 +++++++++++++++++ data/storage/mysql/create_db_tables.sql | 24 ++- data/storage/postgres/create_db_tables.sql | 24 ++- engine/libstats.go | 2 + engine/models.go | 24 ++- engine/storage_sql.go | 127 ++++++++++++---- general_tests/ers_start_delay_it_test.go | 4 +- general_tests/ips_load_test.go | 4 +- general_tests/ips_prf_it_test.go | 4 +- general_tests/rankings_schedule_it_test.go | 6 +- general_tests/rankings_stored_it_test.go | 4 +- general_tests/trends_schedule_it_test.go | 137 +++++++++++++++-- general_tests/trends_stored_it_test.go | 110 +++++++++++++- trends/trends_it_test.go | 6 +- utils/consts.go | 28 ++-- utils/trends.go | 140 +++++++++++++++++ 29 files changed, 1034 insertions(+), 111 deletions(-) create mode 100644 data/conf/samples/trends_mysql/cgrates.json create mode 100644 data/conf/samples/trends_postgres/cgrates.json create mode 100644 data/conf/samples/trends_redis/cgrates.json diff --git a/admins/trends.go b/admins/trends.go index df0973c76..0e9f07458 100644 --- a/admins/trends.go +++ b/admins/trends.go @@ -19,6 +19,9 @@ along with this program. If not, see package admins import ( + "fmt" + "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/utils" ) @@ -128,6 +131,21 @@ func (a *AdminS) V1SetTrendProfile(ctx *context.Context, arg *utils.TrendProfile if err = a.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil { return utils.APIErrorHandler(err) } + //generate a loadID for CacheTrendProfiles and store it in database + loadID := time.Now().UnixNano() + if err = a.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil { + return utils.APIErrorHandler(err) + } + // delay if needed before cache call + if a.cfg.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", a.cfg.GeneralCfg().CachingDelay)) + time.Sleep(a.cfg.GeneralCfg().CachingDelay) + } + //handle caching for TrendProfile + if err = a.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles, + arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } @@ -144,6 +162,21 @@ func (a *AdminS) V1RemoveTrendProfile(ctx *context.Context, args *utils.TenantID if err := a.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil { return utils.APIErrorHandler(err) } + // delay if needed before cache call + if a.cfg.GeneralCfg().CachingDelay != 0 { + utils.Logger.Info(fmt.Sprintf(" Delaying cache call for %v", a.cfg.GeneralCfg().CachingDelay)) + time.Sleep(a.cfg.GeneralCfg().CachingDelay) + } + //handle caching for TrendProfile + if err := a.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles, + utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil { + return utils.APIErrorHandler(err) + } + //generate a loadID for CacheTrendProfiles and store it in database + loadID := time.Now().UnixNano() + if err := a.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil { + return utils.APIErrorHandler(err) + } *reply = utils.OK return nil } diff --git a/config/config_defaults.go b/config/config_defaults.go index 137d93db5..97ef673d1 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -183,6 +183,8 @@ const CGRATES_CFG_JSON = ` "*rate_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*ranking_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*rankings": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, + "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, // compatible db types: <*internal|*redis|*mongo> "*actions": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, @@ -191,8 +193,6 @@ const CGRATES_CFG_JSON = ` "*ip_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*stat_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*threshold_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, - "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, - "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "*default"}, "*route_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*attribute_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate": false, "dbConn": "*default"}, diff --git a/config/configsanity.go b/config/configsanity.go index d7432e60b..e6589f9e1 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -1045,6 +1045,8 @@ func (cfg *CGRConfig) checkConfigSanity() error { utils.MetaRateProfiles, utils.MetaRankingProfiles, utils.MetaRankings, + utils.MetaTrendProfiles, + utils.MetaTrends, } for _, dbcfg := range cfg.dbCfg.DBConns { if dbcfg.Type == utils.MetaInternal { diff --git a/data/conf/samples/diambench/cgrates.json b/data/conf/samples/diambench/cgrates.json index 5a31f0e0d..9f0004ca6 100644 --- a/data/conf/samples/diambench/cgrates.json +++ b/data/conf/samples/diambench/cgrates.json @@ -1,6 +1,6 @@ { -"general": { - "log_level": 7 +"logger": { + "level": 7 }, "schedulers": { "enabled": true, diff --git a/data/conf/samples/loaders/tutinternal/cgrates.json b/data/conf/samples/loaders/tutinternal/cgrates.json index d288f9cd7..2bdb24c96 100644 --- a/data/conf/samples/loaders/tutinternal/cgrates.json +++ b/data/conf/samples/loaders/tutinternal/cgrates.json @@ -47,7 +47,6 @@ { "id": "CustomLoader", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -68,7 +67,6 @@ { "id": "WithoutMoveToOut", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -90,7 +88,6 @@ { "id": "SubpathLoaderWithoutMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -112,7 +109,6 @@ { "id": "SubpathLoaderWithMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -134,7 +130,6 @@ { "id": "LoaderWithTemplate", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -155,7 +150,6 @@ { "id": "CustomSep", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "-1", "lockfile_path": ".cgr.lock", diff --git a/data/conf/samples/loaders/tutmongo/cgrates.json b/data/conf/samples/loaders/tutmongo/cgrates.json index bf7eedd2e..87a470e5c 100644 --- a/data/conf/samples/loaders/tutmongo/cgrates.json +++ b/data/conf/samples/loaders/tutmongo/cgrates.json @@ -91,7 +91,6 @@ { "id": "CustomLoader", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -112,7 +111,6 @@ { "id": "WithoutMoveToOut", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -134,7 +132,6 @@ { "id": "SubpathLoaderWithoutMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -156,7 +153,6 @@ { "id": "SubpathLoaderWithMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -178,7 +174,6 @@ { "id": "LoaderWithTemplate", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -199,7 +194,6 @@ { "id": "CustomSep", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "-1", "lockfile_path": ".cgr.lock", diff --git a/data/conf/samples/loaders/tutmysql/cgrates.json b/data/conf/samples/loaders/tutmysql/cgrates.json index 6c57ffb0a..c67195c25 100644 --- a/data/conf/samples/loaders/tutmysql/cgrates.json +++ b/data/conf/samples/loaders/tutmysql/cgrates.json @@ -55,7 +55,6 @@ { "id": "CustomLoader", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -76,7 +75,6 @@ { "id": "WithoutMoveToOut", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -98,7 +96,6 @@ { "id": "SubpathLoaderWithoutMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -120,7 +117,6 @@ { "id": "SubpathLoaderWithMove", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -142,7 +138,6 @@ { "id": "LoaderWithTemplate", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "0", "lockfile_path": ".cgr.lock", @@ -163,7 +158,6 @@ { "id": "CustomSep", "enabled": true, - "dry_run": false, "tenant": "cgrates.org", "run_delay": "-1", "lockfile_path": ".cgr.lock", diff --git a/data/conf/samples/trends/trends_mongo/cgrates.json b/data/conf/samples/trends/trends_mongo/cgrates.json index cc781b7e4..ebfd4d922 100644 --- a/data/conf/samples/trends/trends_mongo/cgrates.json +++ b/data/conf/samples/trends/trends_mongo/cgrates.json @@ -1,7 +1,7 @@ { -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "db": { diff --git a/data/conf/samples/trends/trends_mysql/cgrates.json b/data/conf/samples/trends/trends_mysql/cgrates.json index bd6165545..13535b6d3 100644 --- a/data/conf/samples/trends/trends_mysql/cgrates.json +++ b/data/conf/samples/trends/trends_mysql/cgrates.json @@ -1,7 +1,7 @@ { -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "db": { diff --git a/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json b/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json index 970d79fb7..a6b30c28c 100644 --- a/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json +++ b/data/conf/samples/trends/trends_schedIDs_mongo/cgrates.json @@ -1,7 +1,7 @@ { -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "db": { diff --git a/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json b/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json index b83fca5ca..d63314d36 100644 --- a/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json +++ b/data/conf/samples/trends/trends_schedIDs_mysql/cgrates.json @@ -1,7 +1,7 @@ { -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "db": { diff --git a/data/conf/samples/trends_mysql/cgrates.json b/data/conf/samples/trends_mysql/cgrates.json new file mode 100644 index 000000000..2c20349e1 --- /dev/null +++ b/data/conf/samples/trends_mysql/cgrates.json @@ -0,0 +1,142 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "reply_timeout": "50s", +}, + +"logger": { + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_conns": { + "*default": { // The id of the DB connection + "db_type": "redis", // db type: + "db_host": "127.0.0.1", + "db_port": 6379, // db port to reach the database + "db_name": "10", // db database name to connect to + "db_user": "cgrates", + }, + "StorDB": { // The id of the DB connection + "db_type": "mysql", // db type: + "db_host": "127.0.0.1", // the host to connect to + "db_port": 3306, // db port to reach the database + "db_name": "cgrates", // db database name to connect to + "db_user": "cgrates", // username to use when connecting to the database + "db_password": "CGRateS.org" // password to use when connecting to the database + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "rates_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator":{ + + "users_filters":["Account"], +}, + + +"admins": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*internal"], +}, + +"tpes": { + "enabled": true +}, + +} diff --git a/data/conf/samples/trends_postgres/cgrates.json b/data/conf/samples/trends_postgres/cgrates.json new file mode 100644 index 000000000..730b16005 --- /dev/null +++ b/data/conf/samples/trends_postgres/cgrates.json @@ -0,0 +1,142 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "reply_timeout": "50s", +}, + +"logger": { + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_conns": { + "*default": { // The id of the DB connection + "db_type": "redis", // db type: + "db_host": "127.0.0.1", + "db_port": 6379, // db port to reach the database + "db_name": "10", // db database name to connect to + "db_user": "cgrates", + }, + "StorDB": { + "db_type": "postgres", + "db_host": "127.0.0.1", + "db_port": 5432, + "db_name": "cgrates", + "db_user": "cgrates", + "db_password": "CGRateS.org" + } + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*trend_profiles": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"}, + "*trends": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "rates_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator":{ + + "users_filters":["Account"], +}, + + +"admins": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*internal"], +}, + +"tpes": { + "enabled": true +}, + +} diff --git a/data/conf/samples/trends_redis/cgrates.json b/data/conf/samples/trends_redis/cgrates.json new file mode 100644 index 000000000..31b7a0631 --- /dev/null +++ b/data/conf/samples/trends_redis/cgrates.json @@ -0,0 +1,140 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "reply_timeout": "50s", +}, + +"logger": { + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_conns": { + "*default": { // The id of the DB connection + "db_type": "redis", // db type: + "db_host": "127.0.0.1", + "db_port": 6379, // db port to reach the database + "db_name": "10", // db database name to connect to + "db_user": "cgrates", + }, + "StorDB": { // The id of the DB connection + "db_type": "mysql", // db type: + "db_host": "127.0.0.1", // the host to connect to + "db_port": 3306, // db port to reach the database + "db_name": "cgrates", // db database name to connect to + "db_user": "cgrates", // username to use when connecting to the database + "db_password": "CGRateS.org" // password to use when connecting to the database + }, + }, + "items": { + "*cdrs": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false, "dbConn": "StorDB"} + } +}, + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], +}, + + +"attributes": { + "enabled": true, + "stats_conns": ["*localhost"], + "resources_conns": ["*localhost"], + "accounts_conns": ["*localhost"] +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"] +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": ["*internal"], +}, + + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"routes": { + "enabled": true, + "prefix_indexed_fields":["*req.Destination"], + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "rates_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "routes_conns": ["*internal"], + "resources_conns": ["*internal"], + "attributes_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], +}, + + +"migrator":{ + + "users_filters":["Account"], +}, + + +"admins": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + + +"rates": { + "enabled": true +}, + + +"actions": { + "enabled": true, + "accounts_conns": ["*localhost"] +}, + + +"accounts": { + "enabled": true +}, + + +"filters": { + "stats_conns": ["*internal"], + "resources_conns": ["*internal"], + "accounts_conns": ["*internal"], +}, + +"tpes": { + "enabled": true +}, + +} diff --git a/data/storage/mysql/create_db_tables.sql b/data/storage/mysql/create_db_tables.sql index bdd042d28..82e6ad594 100644 --- a/data/storage/mysql/create_db_tables.sql +++ b/data/storage/mysql/create_db_tables.sql @@ -171,8 +171,8 @@ CREATE TABLE rates ( `pk` int(11) NOT NULL AUTO_INCREMENT, `tenant` VARCHAR(40) NOT NULL, `id` VARCHAR(64) NOT NULL, - `rate` JSON NOT NULL, `rate_profile_id` VARCHAR(64) NOT NULL, + `rate` JSON NOT NULL, PRIMARY KEY (`pk`), UNIQUE KEY unique_tenant_id_rate_profile_id (`tenant`, `id`, `rate_profile_id`), FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id) @@ -199,3 +199,25 @@ CREATE TABLE rankings ( UNIQUE KEY unique_tenant_id (`tenant`, `id`) ); CREATE UNIQUE INDEX rankings_idx ON rankings (`id`); + +DROP TABLE IF EXISTS trend_profiles; +CREATE TABLE trend_profiles ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `trend_profile` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX trend_profiles_idx ON trend_profiles (`id`); + +DROP TABLE IF EXISTS trends; +CREATE TABLE trends ( + `pk` int(11) NOT NULL AUTO_INCREMENT, + `tenant` VARCHAR(40) NOT NULL, + `id` VARCHAR(64) NOT NULL, + `trend` JSON NOT NULL, + PRIMARY KEY (`pk`), + UNIQUE KEY unique_tenant_id (`tenant`, `id`) +); +CREATE UNIQUE INDEX trends_idx ON trends (`id`); diff --git a/data/storage/postgres/create_db_tables.sql b/data/storage/postgres/create_db_tables.sql index c75e9942c..0eefee4ff 100644 --- a/data/storage/postgres/create_db_tables.sql +++ b/data/storage/postgres/create_db_tables.sql @@ -168,8 +168,8 @@ CREATE TABLE rates ( pk SERIAL PRIMARY KEY, tenant VARCHAR(40) NOT NULL, id VARCHAR(64) NOT NULL, - rate JSONB NOT NULL, rate_profile_id VARCHAR(64) NOT NULL, + rate JSONB NOT NULL, UNIQUE (tenant, id, rate_profile_id), FOREIGN KEY (rate_profile_id) REFERENCES rate_profiles (id) ); @@ -195,3 +195,25 @@ CREATE TABLE rankings ( UNIQUE (tenant, id) ); CREATE UNIQUE INDEX rankings_idx ON rankings ("id"); + + +DROP TABLE IF EXISTS trend_profiles; +CREATE TABLE trend_profiles ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + trend_profile JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX trend_profiles_idx ON trend_profiles ("id"); + + +DROP TABLE IF EXISTS trends; +CREATE TABLE trends ( + pk SERIAL PRIMARY KEY, + tenant VARCHAR(40) NOT NULL, + id VARCHAR(64) NOT NULL, + trend JSONB NOT NULL, + UNIQUE (tenant, id) +); +CREATE UNIQUE INDEX trends_idx ON trends ("id"); diff --git a/engine/libstats.go b/engine/libstats.go index 2f38c0ba4..8a549bf9c 100644 --- a/engine/libstats.go +++ b/engine/libstats.go @@ -650,6 +650,8 @@ func MapStringInterfaceToStoredStatQueue(m map[string]any) (*StoredStatQueue, er } else if expiryStr, ok := itemMap[utils.ExpiryTime].(string); ok { if parsedTime, err := time.Parse(time.RFC3339, expiryStr); err == nil { sqItem.ExpiryTime = &parsedTime + } else { + return nil, err } } ssq.SQItems = append(ssq.SQItems, sqItem) diff --git a/engine/models.go b/engine/models.go index 179924093..b1140f132 100644 --- a/engine/models.go +++ b/engine/models.go @@ -553,8 +553,8 @@ type RateMdl struct { PK uint `gorm:"primary_key"` Tenant string `index:"0" re:".*"` ID string `index:"1" re:".*"` - Rate utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` RateProfileID string `gorm:"foreign_key" index:"3" re:".*"` + Rate utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` } func (RateMdl) TableName() string { @@ -582,3 +582,25 @@ type RankingJSONMdl struct { func (RankingJSONMdl) TableName() string { return utils.TBLRankings } + +type TrendProfileMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + TrendProfile utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (TrendProfileMdl) TableName() string { + return utils.TBLTrendProfiles +} + +type TrendJSONMdl struct { + PK uint `gorm:"primary_key"` + Tenant string `index:"0" re:".*"` + ID string `index:"1" re:".*"` + Trend utils.JSONB `gorm:"type:jsonb" index:"2" re:".*"` +} + +func (TrendJSONMdl) TableName() string { + return utils.TBLTrends +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 44095ffa1..5c44c3088 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -133,6 +133,10 @@ func (sqls *SQLStorage) GetKeysForPrefix(ctx *context.Context, prefix string) (k keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankingProfiles, tntID) case utils.RankingPrefix: keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLRankings, tntID) + case utils.TrendProfilePrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrendProfiles, tntID) + case utils.TrendPrefix: + keys, err = sqls.getAllKeysMatchingTenantID(ctx, utils.TBLTrends, tntID) default: err = fmt.Errorf("unsupported prefix in GetKeysForPrefix: %q", prefix) } @@ -1300,6 +1304,95 @@ func (sqls *SQLStorage) RemoveRankingDrv(ctx *context.Context, tenant, id string return } +func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, tp *utils.TrendProfile) (err error) { + tx := sqls.db.Begin() + mdl := &TrendProfileMdl{ + Tenant: tp.Tenant, + ID: tp.ID, + TrendProfile: tp.AsMapStringInterface(), + } + if err = tx.Model(&TrendProfileMdl{}).Where( + TrendProfileMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + TrendProfileMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return + +} + +func (sqls *SQLStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (tp *utils.TrendProfile, err error) { + var result []*TrendProfileMdl + if err = sqls.db.Model(&TrendProfileMdl{}).Where(&TrendProfileMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return utils.MapStringInterfaceToTrendProfile(result[0].TrendProfile) +} + +func (sqls *SQLStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&TrendProfileMdl{}).Where(&TrendProfileMdl{Tenant: tenant, ID: id}). + Delete(&TrendProfileMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + +func (sqls *SQLStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (t *utils.Trend, err error) { + var result []*TrendJSONMdl + if err = sqls.db.Model(&TrendJSONMdl{}).Where(&TrendJSONMdl{Tenant: tenant, + ID: id}).Find(&result).Error; err != nil { + return nil, err + } + if len(result) == 0 { + return nil, utils.ErrNotFound + } + return utils.MapStringInterfaceToTrend(result[0].Trend) +} + +func (sqls *SQLStorage) SetTrendDrv(ctx *context.Context, t *utils.Trend) (err error) { + tx := sqls.db.Begin() + mdl := &TrendJSONMdl{ + Tenant: t.Tenant, + ID: t.ID, + Trend: t.AsMapStringInterface(), + } + if err = tx.Model(&TrendJSONMdl{}).Where( + TrendJSONMdl{Tenant: mdl.Tenant, ID: mdl.ID}).Delete( + TrendJSONMdl{}).Error; err != nil { + tx.Rollback() + return + } + if err = tx.Save(mdl).Error; err != nil { + tx.Rollback() + return + } + tx.Commit() + return +} + +func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) { + tx := sqls.db.Begin() + if err = tx.Model(&TrendJSONMdl{}).Where(&TrendJSONMdl{Tenant: tenant, ID: id}). + Delete(&TrendJSONMdl{}).Error; err != nil { + tx.Rollback() + return err + } + tx.Commit() + return +} + // Used to check if specific subject is stored using prefix key attached to entity func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tenant string) (has bool, err error) { var categoryModelMap = map[string]any{ @@ -1320,8 +1413,8 @@ func (sqls *SQLStorage) HasDataDrv(ctx *context.Context, category, subject, tena utils.RateProfilePrefix: &RateProfileJSONMdl{}, utils.RankingPrefix: &RankingJSONMdl{}, utils.RankingProfilePrefix: &RankingProfileMdl{}, - // utils.TrendPrefix: &TrendJSONMdl{}, - // utils.TrendProfilePrefix: &TrendProfileMdl{}, + utils.TrendPrefix: &TrendJSONMdl{}, + utils.TrendProfilePrefix: &TrendProfileMdl{}, } model, ok := categoryModelMap[category] if !ok { @@ -1371,36 +1464,6 @@ func (sqls *SQLStorage) GetLoadHistory(limit int, skipCache bool, return nil, utils.ErrNotImplemented } -// DataDB method not implemented yet -func (sqls *SQLStorage) SetTrendProfileDrv(ctx *context.Context, sg *utils.TrendProfile) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) GetTrendProfileDrv(ctx *context.Context, tenant string, id string) (sg *utils.TrendProfile, err error) { - return nil, utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) RemTrendProfileDrv(ctx *context.Context, tenant string, id string) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) GetTrendDrv(ctx *context.Context, tenant, id string) (r *utils.Trend, err error) { - return nil, utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) SetTrendDrv(ctx *context.Context, r *utils.Trend) (err error) { - return utils.ErrNotImplemented -} - -// DataDB method not implemented yet -func (sqls *SQLStorage) RemoveTrendDrv(ctx *context.Context, tenant, id string) (err error) { - return utils.ErrNotImplemented -} - // DataDB method not implemented yet func (sqls *SQLStorage) GetItemLoadIDsDrv(ctx *context.Context, itemIDPrefix string) (loadIDs map[string]int64, err error) { return nil, utils.ErrNotImplemented diff --git a/general_tests/ers_start_delay_it_test.go b/general_tests/ers_start_delay_it_test.go index ae11dd3a6..ec9422323 100644 --- a/general_tests/ers_start_delay_it_test.go +++ b/general_tests/ers_start_delay_it_test.go @@ -42,8 +42,8 @@ func TestErsStartDelay(t *testing.T) { t.Fatalf("could not write to file %s: %v", filePath, err) } content := fmt.Sprintf(`{ - "general": { - "log_level": 7 + "logger": { + "level": 7 }, "db": { "db_conns": { diff --git a/general_tests/ips_load_test.go b/general_tests/ips_load_test.go index 640c6087b..219bb5f62 100644 --- a/general_tests/ips_load_test.go +++ b/general_tests/ips_load_test.go @@ -144,8 +144,8 @@ func TestStressIPsAuthorize(t *testing.T) { } content := `{ - "general": { - "log_level": 7 + "logger": { + "level": 7 }, "db": { "db_conns": { diff --git a/general_tests/ips_prf_it_test.go b/general_tests/ips_prf_it_test.go index 4e0c84a9c..212b84456 100644 --- a/general_tests/ips_prf_it_test.go +++ b/general_tests/ips_prf_it_test.go @@ -35,8 +35,8 @@ import ( func Benchmark10IPsAllocated(b *testing.B) { content := `{ - "general": { - "log_level": 7 + "logger": { + "level": 7 }, "db": { "db_conns": { diff --git a/general_tests/rankings_schedule_it_test.go b/general_tests/rankings_schedule_it_test.go index e6972fa0c..3fcc10828 100644 --- a/general_tests/rankings_schedule_it_test.go +++ b/general_tests/rankings_schedule_it_test.go @@ -42,9 +42,9 @@ func TestRankingSchedule(t *testing.T) { } content := `{ - "general": { - "log_level": 7, - }, + "logger": { + "level": 7 + }, "db": { "db_conns": { "*default": { diff --git a/general_tests/rankings_stored_it_test.go b/general_tests/rankings_stored_it_test.go index ad830e660..f534d341d 100644 --- a/general_tests/rankings_stored_it_test.go +++ b/general_tests/rankings_stored_it_test.go @@ -154,8 +154,8 @@ func TestRankingStore(t *testing.T) { } content := `{ -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "rankings": { "enabled": true, diff --git a/general_tests/trends_schedule_it_test.go b/general_tests/trends_schedule_it_test.go index a11caa5f4..a8d741c4c 100644 --- a/general_tests/trends_schedule_it_test.go +++ b/general_tests/trends_schedule_it_test.go @@ -32,10 +32,130 @@ import ( ) func TestTrendSchedule(t *testing.T) { + var dbcfg engine.DBCfg switch *utils.DBType { case utils.MetaInternal: - case utils.MetaMySQL, utils.MetaRedis, utils.MetaMongo, utils.MetaPostgres: + dbcfg = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaInternal), + Opts: engine.Opts{ + InternalDBDumpInterval: utils.StringPointer("0s"), + InternalDBRewriteInterval: utils.StringPointer("0s"), + }, + }, + }, + }, + } + case utils.MetaMySQL: + dbcfg = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaMySQL), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(3306), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrendProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrends: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholdProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholds: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } + case utils.MetaRedis: + dbcfg = engine.RedisDBCfg + case utils.MetaMongo: t.SkipNow() + case utils.MetaPostgres: + dbcfg = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaPostgres), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(5432), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrendProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrends: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholdProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaThresholds: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } default: t.Fatal("unsupported dbtype value") } @@ -45,20 +165,6 @@ func TestTrendSchedule(t *testing.T) { "level": 7, }, -"db": { - "db_conns": { - "*default": { - "db_type": "*internal", - "opts":{ - "internalDBRewriteInterval": "0s", - "internalDBDumpInterval": "0s" - } - } - }, -}, - - - "trends": { "enabled": true, "store_interval": "-1", @@ -119,6 +225,7 @@ cgrates.org,Threshold1,*string:~*req.Metrics.*acd.ID:*acd,;10,-1,0,1s,false,,tru cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,true,`} ng := engine.TestEngine{ ConfigJSON: content, + DBCfg: dbcfg, TpFiles: tpFiles, Encoding: *utils.Encoding, } diff --git a/general_tests/trends_stored_it_test.go b/general_tests/trends_stored_it_test.go index 65be6e824..3ea22b138 100644 --- a/general_tests/trends_stored_it_test.go +++ b/general_tests/trends_stored_it_test.go @@ -34,21 +34,119 @@ import ( func TestTrendStore(t *testing.T) { var dbConfig engine.DBCfg switch *utils.DBType { + case utils.MetaInternal: + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaInternal), + Opts: engine.Opts{ + InternalDBDumpInterval: utils.StringPointer("0s"), + InternalDBRewriteInterval: utils.StringPointer("0s"), + }, + }, + }, + }, + } case utils.MetaRedis: - t.SkipNow() + dbConfig = engine.RedisDBCfg case utils.MetaMySQL: - dbConfig = engine.MySQLDBCfg + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaMySQL), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(3306), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrendProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrends: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } case utils.MetaMongo: dbConfig = engine.MongoDBCfg - case utils.MetaInternal, utils.MetaPostgres: - t.SkipNow() + case utils.MetaPostgres: + dbConfig = engine.DBCfg{ + DB: &engine.DBParams{ + DBConns: map[string]engine.DBConn{ + utils.MetaDefault: { + Type: utils.StringPointer(utils.MetaRedis), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(6379), + Name: utils.StringPointer("10"), + User: utils.StringPointer(utils.CGRateSLwr), + }, + utils.StorDB: { + Type: utils.StringPointer(utils.MetaPostgres), + Host: utils.StringPointer("127.0.0.1"), + Port: utils.IntPointer(5432), + Name: utils.StringPointer(utils.CGRateSLwr), + User: utils.StringPointer(utils.CGRateSLwr), + Password: utils.StringPointer("CGRateS.org"), + }, + }, + Items: map[string]engine.Item{ + utils.MetaCDRs: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrendProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaTrends: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueueProfiles: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + utils.MetaStatQueues: { + Limit: utils.IntPointer(-1), + DbConn: utils.StringPointer(utils.StorDB), + }, + }, + }, + } default: t.Fatal("unsupported dbtype value") } content := `{ -"general": { - "log_level": 7, +"logger": { + "level": 7 }, "trends": { "enabled": true, diff --git a/trends/trends_it_test.go b/trends/trends_it_test.go index 827ab2961..833b5e04a 100644 --- a/trends/trends_it_test.go +++ b/trends/trends_it_test.go @@ -76,11 +76,11 @@ func TestTrendsIT(t *testing.T) { case utils.MetaMongo: trConfigDIR = "tutmongo" case utils.MetaRedis: - t.SkipNow() + trConfigDIR = "trends_redis" case utils.MetaMySQL: - trConfigDIR = "tutmysql" + trConfigDIR = "trends_mysql" case utils.MetaPostgres: - t.SkipNow() + trConfigDIR = "trends_postgres" default: t.Fatal("Unknown Database type") } diff --git a/utils/consts.go b/utils/consts.go index 89a1a1f95..b6cd270d2 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -211,16 +211,20 @@ const ( FallbackSubject = "FallbackSubject" DryRun = "DryRun" - CustomValue = "CustomValue" - Value = "Value" - Rules = "Rules" - Metrics = "Metrics" - MetricID = "MetricID" - LastUsed = "LastUsed" - PDD = "PDD" - RouteStr = "Route" - RunID = "RunID" - MetaRunID = "*runID" + CustomValue = "CustomValue" + Value = "Value" + Rules = "Rules" + Metrics = "Metrics" + RunTimes = "RunTimes" + CompressedMetrics = "CompressedMetrics" + TrendGrowth = "TrendGrowth" + TrendLabel = "TrendLabel" + MetricID = "MetricID" + LastUsed = "LastUsed" + PDD = "PDD" + RouteStr = "Route" + RunID = "RunID" + MetaRunID = "*runID" AttributeIDs = "AttributeIDs" MetaOptsRunID = "*opts.*runID" @@ -1927,9 +1931,11 @@ const ( TBLFilters = "filters" TBLRouteProfiles = "route_profiles" TBLRateProfiles = "rate_profiles" + TBLRates = "rates" TBLRankingProfiles = "ranking_profiles" TBLRankings = "rankings" - TBLRates = "rates" + TBLTrendProfiles = "trend_profiles" + TBLTrends = "trends" OldSMCosts = "sm_costs" TBLTPDispatchers = "tp_dispatcher_profiles" TBLTPDispatcherHosts = "tp_dispatcher_hosts" diff --git a/utils/trends.go b/utils/trends.go index 141f08d6d..0fcb1e597 100644 --- a/utils/trends.go +++ b/utils/trends.go @@ -19,6 +19,7 @@ along with this program. If not, see package utils import ( + "encoding/base64" "maps" "math" "slices" @@ -224,6 +225,72 @@ func (tp *TrendProfile) FieldAsInterface(fldPath []string) (_ any, err error) { } } +// AsMapStringInterface converts TrendProfile struct to map[string]any +func (tp *TrendProfile) AsMapStringInterface() map[string]any { + if tp == nil { + return nil + } + return map[string]any{ + Tenant: tp.Tenant, + ID: tp.ID, + Schedule: tp.Schedule, + StatID: tp.StatID, + Metrics: tp.Metrics, + TTL: tp.TTL, + QueueLength: tp.QueueLength, + MinItems: tp.MinItems, + CorrelationType: tp.CorrelationType, + Tolerance: tp.Tolerance, + Stored: tp.Stored, + ThresholdIDs: tp.ThresholdIDs, + } +} + +// MapStringInterfaceToTrendProfile converts map[string]any to TrendProfile struct +func MapStringInterfaceToTrendProfile(m map[string]any) (*TrendProfile, error) { + tp := &TrendProfile{} + + if v, ok := m[Tenant].(string); ok { + tp.Tenant = v + } + if v, ok := m[ID].(string); ok { + tp.ID = v + } + if v, ok := m[Schedule].(string); ok { + tp.Schedule = v + } + if v, ok := m[StatID].(string); ok { + tp.StatID = v + } + tp.Metrics = InterfaceToStringSlice(m[Metrics]) + if v, ok := m[TTL].(string); ok { + if dur, err := time.ParseDuration(v); err != nil { + return nil, err + } else { + tp.TTL = dur + } + } else if v, ok := m[TTL].(float64); ok { // for -1 cases + tp.TTL = time.Duration(v) + } + if v, ok := m[QueueLength].(float64); ok { + tp.QueueLength = int(v) + } + if v, ok := m[MinItems].(float64); ok { + tp.MinItems = int(v) + } + if v, ok := m[CorrelationType].(string); ok { + tp.CorrelationType = v + } + if v, ok := m[Tolerance].(float64); ok { + tp.Tolerance = v + } + if v, ok := m[Stored].(bool); ok { + tp.Stored = v + } + tp.ThresholdIDs = InterfaceToStringSlice(m[ThresholdIDs]) + return tp, nil +} + // Trend represents a collection of metrics with trend analysis. type Trend struct { tMux sync.RWMutex @@ -540,3 +607,76 @@ func GetTrendLabel(tGrowth float64, tolerance float64) (lbl string) { } return } + +// AsMapStringInterface converts Trend struct to map[string]any +func (t *Trend) AsMapStringInterface() map[string]any { + if t == nil { + return nil + } + return map[string]any{ + Tenant: t.Tenant, + ID: t.ID, + RunTimes: t.RunTimes, + Metrics: t.Metrics, + CompressedMetrics: t.CompressedMetrics, + } +} + +// MapStringInterfaceToTrend converts map[string]any to Trend struct +func MapStringInterfaceToTrend(m map[string]any) (*Trend, error) { + t := &Trend{} + if v, ok := m[Tenant].(string); ok { + t.Tenant = v + } + if v, ok := m[ID].(string); ok { + t.ID = v + } + if v, ok := m[RunTimes].([]any); ok { + for _, rt := range v { + if timeStr, ok := rt.(string); ok { + parsedTime, err := time.Parse(time.RFC3339, timeStr) + if err != nil { + return nil, err + } + t.RunTimes = append(t.RunTimes, parsedTime) + } + } + } + if cMetrics, ok := m[CompressedMetrics].(string); ok { + var err error + if t.CompressedMetrics, err = base64.StdEncoding.DecodeString(cMetrics); err != nil { + return nil, err + } + } + if v, ok := m[Metrics].(map[string]any); ok { + t.Metrics = make(map[time.Time]map[string]*MetricWithTrend) + for timeStr, innerMap := range v { + parsedTime, err := time.Parse(time.RFC3339, timeStr) + if err != nil { + return nil, err + } + if innerMetrics, ok := innerMap.(map[string]any); ok { + t.Metrics[parsedTime] = make(map[string]*MetricWithTrend) + for metricKey, metricVal := range innerMetrics { + if metricData, ok := metricVal.(map[string]any); ok { + mwt := &MetricWithTrend{} + if id, ok := metricData[ID].(string); ok { + mwt.ID = id + } + if value, ok := metricData[Value].(float64); ok { + mwt.Value = value + } + if trendGrowth, ok := metricData[TrendGrowth].(float64); ok { + mwt.TrendGrowth = trendGrowth + } + if trendLabel, ok := metricData[TrendLabel].(string); ok { + mwt.TrendLabel = trendLabel + } + t.Metrics[parsedTime][metricKey] = mwt + } + } + } + } + } + return t, nil +}