From f356695f6fe8f150dea7b29fd555ec082ee009c4 Mon Sep 17 00:00:00 2001 From: arberkatellari Date: Tue, 11 Jun 2024 17:01:50 +0200 Subject: [PATCH] Add active sessions backup functionalitiy --- apier/v1/replicator.go | 18 ++ apier/v1/sessions.go | 5 + config/config_defaults.go | 4 +- config/configsanity.go | 3 + config/libconfig_json.go | 1 + config/sessionscfg.go | 12 + .../sbkupreplcengine1_mongo/cgrates.json | 73 +++++ .../sbkupreplcengine1_mongo_gob/cgrates.json | 73 +++++ .../sbkupreplcengine1_mysql/cgrates.json | 64 +++++ .../sbkupreplcengine1_mysql_gob/cgrates.json | 64 +++++ .../sbkupreplcengine1_postgres/cgrates.json | 66 +++++ .../cgrates.json | 66 +++++ .../sbkupreplcengine2_mongo/cgrates.json | 78 ++++++ .../sbkupreplcengine2_mongo_gob/cgrates.json | 77 ++++++ .../sbkupreplcengine2_mysql/cgrates.json | 68 +++++ .../sbkupreplcengine2_mysql_gob/cgrates.json | 67 +++++ .../cgrates.json | 56 ++++ .../cgrates.json | 61 +++++ .../cgrates.json | 75 ++++++ .../cgrates.json | 47 ++++ .../cgrates.json | 53 ++++ .../sessions_backup_mongo/cgrates.json | 61 +++++ .../sessions_backup_mysql/cgrates.json | 47 ++++ .../sessions_backup_postgres/cgrates.json | 53 ++++ engine/datadbmock.go | 12 + engine/datamanager.go | 67 +++++ engine/storage_interface.go | 3 + engine/storage_internal_datadb.go | 35 +++ engine/storage_mongo_datadb.go | 16 ++ engine/storage_redis.go | 58 ++++ engine/stored_session.go | 49 ++++ services/sessions.go | 17 +- sessions/session.go | 59 +++++ sessions/sessions.go | 249 ++++++++++++++++-- utils/consts.go | 7 + utils/errors.go | 1 + utils/reflect.go | 11 + 37 files changed, 1748 insertions(+), 28 deletions(-) create mode 100644 data/conf/samples/sbkupreplcengine1_mongo/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine1_mongo_gob/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine1_mysql/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine1_mysql_gob/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine1_postgres/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine1_postgres_gob/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine2_mongo/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine2_mongo_gob/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine2_mysql/cgrates.json create mode 100644 data/conf/samples/sbkupreplcengine2_mysql_gob/cgrates.json create mode 100644 data/conf/samples/sessions_backup_interval_internal/cgrates.json create mode 100644 data/conf/samples/sessions_backup_interval_mongo/cgrates.json create mode 100644 data/conf/samples/sessions_backup_interval_mongo2/cgrates.json create mode 100644 data/conf/samples/sessions_backup_interval_mysql/cgrates.json create mode 100644 data/conf/samples/sessions_backup_interval_postgres/cgrates.json create mode 100644 data/conf/samples/sessions_backup_mongo/cgrates.json create mode 100644 data/conf/samples/sessions_backup_mysql/cgrates.json create mode 100644 data/conf/samples/sessions_backup_postgres/cgrates.json create mode 100644 engine/stored_session.go diff --git a/apier/v1/replicator.go b/apier/v1/replicator.go index 55e7be64f..8c09a7302 100644 --- a/apier/v1/replicator.go +++ b/apier/v1/replicator.go @@ -706,6 +706,24 @@ func (rplSv1 *ReplicatorSv1) SetIndexes(ctx *context.Context, args *utils.SetInd return } +// SetBackupSessions is the replication method coresponding to the dataDB driver method +func (rplSv1 *ReplicatorSv1) SetBackupSessions(ctx *context.Context, args *engine.SetBackupSessionsArgs, reply *string) (err error) { + if err = rplSv1.dm.DataDB().SetBackupSessionsDrv(args.StoredSessions, args.NodeID, args.Tenant); err != nil { + return + } + *reply = utils.OK + return +} + +// RemoveSessionBackup is the replication method coresponding to the dataDB driver method +func (rplSv1 *ReplicatorSv1) RemoveSessionBackup(ctx *context.Context, args *engine.RemoveSessionBackupArgs, reply *string) (err error) { + if err = rplSv1.dm.DataDB().RemoveSessionsBackupDrv(args.NodeID, args.Tenant, args.CGRID); err != nil { + return + } + *reply = utils.OK + return +} + // RemoveThreshold is the replication method coresponding to the dataDb driver method func (rplSv1 *ReplicatorSv1) RemoveThreshold(ctx *context.Context, args *utils.TenantIDWithAPIOpts, reply *string) (err error) { if err = rplSv1.dm.DataDB().RemoveThresholdDrv(args.Tenant, args.ID); err != nil { diff --git a/apier/v1/sessions.go b/apier/v1/sessions.go index bcc3c7541..abaeb454b 100644 --- a/apier/v1/sessions.go +++ b/apier/v1/sessions.go @@ -171,3 +171,8 @@ func (ssv1 *SessionSv1) CapsError(ctx *context.Context, args any, reply *string) func (ssv1 *SessionSv1) RegisterInternalBiJSONConn(ctx *context.Context, args string, rply *string) (err error) { return ssv1.sS.BiRPCv1RegisterInternalBiJSONConn(ctx, args, rply) } + +// BackupActiveSessions stores all active sessions in dataDB and replies with the amount of sessions it stored +func (ssv1 *SessionSv1) BackupActiveSessions(ctx *context.Context, args string, rply *int) (err error) { + return ssv1.sS.BiRPCv1BackupActiveSessions(ctx, args, rply) +} diff --git a/config/config_defaults.go b/config/config_defaults.go index 029a0c7cc..52beb258e 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -133,6 +133,7 @@ const CGRATES_CFG_JSON = ` "*charger_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, "*dispatcher_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, "*reverse_filter_indexes" : {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, + "*sessions_backup": {"limit": -1, "ttl": "", "static_ttl": false, "remote":false, "replicate":false}, }, "opts":{ "redisMaxConns": 10, // the connection pool size @@ -625,7 +626,7 @@ const CGRATES_CFG_JSON = ` "session_indexes": [], // index sessions based on these fields for GetActiveSessions API "client_protocol": 2.0, // version of protocol to use when acting as JSON-PRC client <"0","1.0","2.0"> "channel_sync_interval": "0", // sync channels to detect stale sessions (0 to disable) - "stale_chan_max_extra_usage": "0", // add random usage belllow max for stale channels + "stale_chan_max_extra_usage": "0", // add random usage below max for stale channels "terminate_attempts": 5, // attempts to get the session before terminating it "alterable_fields": [], // the session fields that can be updated //"min_dur_low_balance": "5s", // threshold which will trigger low balance warnings for prepaid calls (needs to be lower than debit_interval) @@ -637,6 +638,7 @@ const CGRATES_CFG_JSON = ` "privatekey_path": "", // the path to the private key }, "scheduler_conns": [], // connections to SchedulerS in case of *dynaprepaid request + "backup_interval": "0s", // backup active sessions regularly to dataDB: "0" - disables it; "-1" - dump at shutdown; <""|$dur> }, diff --git a/config/configsanity.go b/config/configsanity.go index 8ae58a3e8..275ed91ce 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -986,6 +986,9 @@ func (cfg *CGRConfig) checkConfigSanity() error { if cfg.thresholdSCfg.Enabled && cfg.thresholdSCfg.StoreInterval != -1 { return fmt.Errorf("<%s> the StoreInterval field needs to be -1 when DataBD is *internal, received : %d", utils.ThresholdS, cfg.thresholdSCfg.StoreInterval) } + // if cfg.sessionSCfg.Enabled && cfg.sessionSCfg.BackupInterval != -1 { + // return fmt.Errorf("<%s> the BackupInterval field needs to be -1 when DataBD is *internal, received : %d", utils.SessionS, cfg.sessionSCfg.BackupInterval) + // } } for item, val := range cfg.dataDbCfg.Items { if val.Remote && len(cfg.dataDbCfg.RmtConns) == 0 { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 52f643462..48dbae87c 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -407,6 +407,7 @@ type SessionSJsonCfg struct { Scheduler_conns *[]string Stir *STIRJsonCfg Default_usage *map[string]string + Backup_interval *string } // FreeSWITCHAgent config section diff --git a/config/sessionscfg.go b/config/sessionscfg.go index 1da5fbdb3..68eff79f7 100644 --- a/config/sessionscfg.go +++ b/config/sessionscfg.go @@ -132,6 +132,7 @@ type SessionSCfg struct { SchedulerConns []string STIRCfg *STIRcfg DefaultUsage map[string]time.Duration + BackupInterval time.Duration } func (scfg *SessionSCfg) loadFromJSONCfg(jsnCfg *SessionSJsonCfg) (err error) { @@ -321,6 +322,11 @@ func (scfg *SessionSCfg) loadFromJSONCfg(jsnCfg *SessionSJsonCfg) (err error) { } } } + if jsnCfg.Backup_interval != nil { + if scfg.BackupInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Backup_interval); err != nil { + return err + } + } return scfg.STIRCfg.loadFromJSONCfg(jsnCfg.Stir) } @@ -357,6 +363,7 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]any) { utils.StaleChanMaxExtraUsageCfg: "0", utils.DebitIntervalCfg: "0", utils.SessionTTLCfg: "0", + utils.BackupIntervalCfg: "0", utils.DefaultUsageCfg: maxComputed, } if scfg.DebitInterval != 0 { @@ -476,6 +483,10 @@ func (scfg *SessionSCfg) AsMapInterface() (initialMP map[string]any) { } initialMP[utils.SchedulerConnsCfg] = schedulerConns } + + if scfg.BackupInterval != 0 { + initialMP[utils.BackupIntervalCfg] = scfg.BackupInterval.String() + } return } @@ -487,6 +498,7 @@ func (scfg SessionSCfg) Clone() (cln *SessionSCfg) { DebitInterval: scfg.DebitInterval, StoreSCosts: scfg.StoreSCosts, SessionTTL: scfg.SessionTTL, + BackupInterval: scfg.BackupInterval, ClientProtocol: scfg.ClientProtocol, ChannelSyncInterval: scfg.ChannelSyncInterval, StaleChanMaxExtraUsage: scfg.StaleChanMaxExtraUsage, diff --git a/data/conf/samples/sbkupreplcengine1_mongo/cgrates.json b/data/conf/samples/sbkupreplcengine1_mongo/cgrates.json new file mode 100644 index 000000000..69f8c0a33 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_mongo/cgrates.json @@ -0,0 +1,73 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", + "db_password": "", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", + "db_password": "", +}, + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*localhost"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*localhost"], + "cdrs_conns": ["*localhost"], + "chargers_conns": ["*localhost"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*localhost"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine1_mongo_gob/cgrates.json b/data/conf/samples/sbkupreplcengine1_mongo_gob/cgrates.json new file mode 100644 index 000000000..95225e883 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_mongo_gob/cgrates.json @@ -0,0 +1,73 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22013", "transport": "*gob"}], + }, +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", + "db_password": "", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", + "db_password": "", +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine1_mysql/cgrates.json b/data/conf/samples/sbkupreplcengine1_mysql/cgrates.json new file mode 100644 index 000000000..90d1f4ffb --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_mysql/cgrates.json @@ -0,0 +1,64 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine1_mysql_gob/cgrates.json b/data/conf/samples/sbkupreplcengine1_mysql_gob/cgrates.json new file mode 100644 index 000000000..1452413cf --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_mysql_gob/cgrates.json @@ -0,0 +1,64 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22013", "transport": "*gob"}], + }, +}, + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine1_postgres/cgrates.json b/data/conf/samples/sbkupreplcengine1_postgres/cgrates.json new file mode 100644 index 000000000..b775130d9 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_postgres/cgrates.json @@ -0,0 +1,66 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + +"stor_db": { + "db_type": "postgres", + "db_port": 5432, + "db_password": "CGRateS.org", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22012", "transport": "*json"}], + }, +}, + + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine1_postgres_gob/cgrates.json b/data/conf/samples/sbkupreplcengine1_postgres_gob/cgrates.json new file mode 100644 index 000000000..778824ecd --- /dev/null +++ b/data/conf/samples/sbkupreplcengine1_postgres_gob/cgrates.json @@ -0,0 +1,66 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:2012", + "rpc_gob": "127.0.0.1:2013", + "http": "127.0.0.1:2080", +}, + + +"rpc_conns": { + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:22013", "transport": "*gob"}], + }, +}, + + +"stor_db": { + "db_type": "postgres", + "db_port": 5432, + "db_password": "CGRateS.org", +}, + +"rals": { + "enabled": true, +}, + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, + "replication_conns": ["rplConn"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine2_mongo/cgrates.json b/data/conf/samples/sbkupreplcengine2_mongo/cgrates.json new file mode 100644 index 000000000..ea528a99b --- /dev/null +++ b/data/conf/samples/sbkupreplcengine2_mongo/cgrates.json @@ -0,0 +1,78 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address +}, + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], + }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2012", "transport": "*json"}], + } +}, + + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", + "db_password": "", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", + "db_password": "", +}, + +"rals": { + "enabled": true, // enable Rater service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: +}, + +"schedulers": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, // starts SessionManager service: + "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests + "replication_conns": ["rplConn"], + "rals_conns": ["conn1"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine2_mongo_gob/cgrates.json b/data/conf/samples/sbkupreplcengine2_mongo_gob/cgrates.json new file mode 100644 index 000000000..f3f3c7991 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine2_mongo_gob/cgrates.json @@ -0,0 +1,77 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "datadb", + "db_password": "", +}, + +"stor_db": { + "db_type": "mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "stordb", + "db_password": "", +}, + +"rals": { + "enabled": true, // enable Rater service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: +}, + +"schedulers": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:22013", "transport":"*gob"}], + }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2013", "transport": "*gob"}], + } +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, // starts SessionManager service: + "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests + "replication_conns": ["rplConn"], + "rals_conns": ["conn1"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine2_mysql/cgrates.json b/data/conf/samples/sbkupreplcengine2_mysql/cgrates.json new file mode 100644 index 000000000..65773d598 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine2_mysql/cgrates.json @@ -0,0 +1,68 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address +}, + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], + }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2012", "transport": "*json"}], + } +}, + + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + +"rals": { + "enabled": true, // enable Rater service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: +}, + +"schedulers": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, // starts SessionManager service: + "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests + "replication_conns": ["rplConn"], + "rals_conns": ["conn1"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sbkupreplcengine2_mysql_gob/cgrates.json b/data/conf/samples/sbkupreplcengine2_mysql_gob/cgrates.json new file mode 100644 index 000000000..eda0d9086 --- /dev/null +++ b/data/conf/samples/sbkupreplcengine2_mysql_gob/cgrates.json @@ -0,0 +1,67 @@ +{ +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +"general": { + "log_level": 7, + "node_id":"BackupReplication", +}, + +"listen": { + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address +}, + +"stor_db": { // database used to store offline tariff plans and CDRs + "db_password": "CGRateS.org", // password to use when connecting to stordb +}, + +"rals": { + "enabled": true, // enable Rater service: +}, + +"cdrs": { + "enabled": true, // start the CDR Server service: +}, + +"schedulers": { + "enabled": true, +}, + +"attributes": { + "enabled": true, +}, + +"rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:22013", "transport":"*gob"}], + }, + "rplConn": { + "strategy": "*broadcast_sync", + "conns": [{"address": "127.0.0.1:2013", "transport": "*gob"}], + } +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"sessions": { + "enabled": true, // starts SessionManager service: + "listen_bijson": "127.0.0.1:22014", // address where to listen for bidirectional JSON-RPC requests + "replication_conns": ["rplConn"], + "rals_conns": ["conn1"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "backup_interval": "500ms", +}, + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + +} diff --git a/data/conf/samples/sessions_backup_interval_internal/cgrates.json b/data/conf/samples/sessions_backup_interval_internal/cgrates.json new file mode 100644 index 000000000..163321e36 --- /dev/null +++ b/data/conf/samples/sessions_backup_interval_internal/cgrates.json @@ -0,0 +1,56 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsIntervalNode", + }, + + "schedulers": { + "enabled": true, + }, + + "data_db": { + "db_type": "internal", + }, + + + "stor_db": { + "db_type": "internal", + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "default_usage":{ + "*voice":"1h" + }, + "backup_interval": "500ms", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_interval_mongo/cgrates.json b/data/conf/samples/sessions_backup_interval_mongo/cgrates.json new file mode 100644 index 000000000..4542f4c3f --- /dev/null +++ b/data/conf/samples/sessions_backup_interval_mongo/cgrates.json @@ -0,0 +1,61 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsIntervalNode", + }, + + "data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, + }, + + + "stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, + "db_password": "", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "default_usage":{ + "*voice":"1h" + }, + "backup_interval": "500ms", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json b/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json new file mode 100644 index 000000000..f87825849 --- /dev/null +++ b/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json @@ -0,0 +1,75 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsIntervalNode2", + }, + + "listen": { + "rpc_json": "127.0.0.1:22012", // RPC JSON listening address + "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address + "http": "127.0.0.1:22080", // HTTP listening address + }, + + "rpc_conns": { + "conn1": { + "strategy": "*first", + "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], + }, + }, + + "data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, + }, + + + "stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, + "db_password": "", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "listen_bijson": "127.0.0.1:22014", + "chargers_conns": ["*internal"], + "default_usage":{ + "*voice":"1h" + }, + "backup_interval": "500ms", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_interval_mysql/cgrates.json b/data/conf/samples/sessions_backup_interval_mysql/cgrates.json new file mode 100644 index 000000000..36159e2c6 --- /dev/null +++ b/data/conf/samples/sessions_backup_interval_mysql/cgrates.json @@ -0,0 +1,47 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsIntervalNode", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "default_usage":{ + "*voice":"1h" + }, + "backup_interval": "500ms", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_interval_postgres/cgrates.json b/data/conf/samples/sessions_backup_interval_postgres/cgrates.json new file mode 100644 index 000000000..58959a6ee --- /dev/null +++ b/data/conf/samples/sessions_backup_interval_postgres/cgrates.json @@ -0,0 +1,53 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsIntervalNode", + }, + + "stor_db": { + "db_type": "postgres", + "db_port": 5432, + "db_password": "CGRateS.org", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*localhost"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*localhost"], + "cdrs_conns": ["*localhost"], + "chargers_conns": ["*localhost"], + "default_usage":{ + "*voice":"1h" + }, + "backup_interval": "500ms", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*localhost"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_mongo/cgrates.json b/data/conf/samples/sessions_backup_mongo/cgrates.json new file mode 100644 index 000000000..32b00213d --- /dev/null +++ b/data/conf/samples/sessions_backup_mongo/cgrates.json @@ -0,0 +1,61 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsNode", + }, + + "data_db": { + "db_type": "mongo", + "db_name": "10", + "db_port": 27017, + }, + + + "stor_db": { + "db_type": "mongo", + "db_name": "cgrates", + "db_port": 27017, + "db_password": "", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*internal"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "default_usage":{ + "*voice":"4s" + }, + "backup_interval": "-1", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_mysql/cgrates.json b/data/conf/samples/sessions_backup_mysql/cgrates.json new file mode 100644 index 000000000..1b095c88e --- /dev/null +++ b/data/conf/samples/sessions_backup_mysql/cgrates.json @@ -0,0 +1,47 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsNode", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*localhost"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*localhost"], + "cdrs_conns": ["*localhost"], + "chargers_conns": ["*localhost"], + "default_usage":{ + "*voice":"4s" + }, + "backup_interval": "-1", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*localhost"], + } +} + \ No newline at end of file diff --git a/data/conf/samples/sessions_backup_postgres/cgrates.json b/data/conf/samples/sessions_backup_postgres/cgrates.json new file mode 100644 index 000000000..688322235 --- /dev/null +++ b/data/conf/samples/sessions_backup_postgres/cgrates.json @@ -0,0 +1,53 @@ +{ + // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments + // Copyright (C) ITsysCOM GmbH + + "general": { + "log_level": 7, + "node_id":"BackupSessionsNode", + }, + + "stor_db": { + "db_type": "postgres", + "db_port": 5432, + "db_password": "CGRateS.org", + }, + + "schedulers": { + "enabled": true, + }, + + "rals": { + "enabled": true, + }, + + "cdrs": { + "enabled": true, + }, + + "chargers": { + "enabled": true, + "attributes_conns": ["*localhost"], + }, + + "sessions": { + "enabled": true, + "rals_conns": ["*localhost"], + "cdrs_conns": ["*localhost"], + "chargers_conns": ["*localhost"], + "default_usage":{ + "*voice":"4s" + }, + "backup_interval": "-1", + }, + + "attributes": { + "enabled": true, + }, + + "apiers": { + "enabled": true, + "scheduler_conns": ["*localhost"], + } +} + \ No newline at end of file diff --git a/engine/datadbmock.go b/engine/datadbmock.go index 5c9637210..9785410c2 100644 --- a/engine/datadbmock.go +++ b/engine/datadbmock.go @@ -482,3 +482,15 @@ func (dbM *DataDBMock) SetVersions(vrs Versions, overwrite bool) (err error) { func (dbM *DataDBMock) RemoveRatingProfileDrv(string) error { return utils.ErrNotImplemented } + +func (dbM *DataDBMock) SetBackupSessionsDrv(storedSessions []*StoredSession, nodeID string, tnt string) error { + return utils.ErrNotImplemented +} + +func (dbM *DataDBMock) GetSessionsBackupDrv(nodeID string, tnt string) ([]*StoredSession, error) { + return nil, utils.ErrNotImplemented +} + +func (dbM *DataDBMock) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error { + return utils.ErrNotImplemented +} diff --git a/engine/datamanager.go b/engine/datamanager.go index b2c9db9ee..4da34c123 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -3263,3 +3263,70 @@ func (dm *DataManager) checkFilters(tenant string, ids []string) (err error) { } return } + +// GetSessionsBackup gets sessions from dataDB backup +func (dm *DataManager) GetSessionsBackup(nodeID, tenant string) ([]*StoredSession, error) { + if dm == nil { + return nil, utils.ErrNoDatabaseConn + } + return dm.dataDB.GetSessionsBackupDrv(nodeID, tenant) +} + +type SetBackupSessionsArgs struct { + StoredSessions []*StoredSession // all active sessions ready for backup + NodeID string // used as part of filter of DataDB query + Tenant string // used as part of filter of DataDB query +} + +// SetBackupSessions stores the active sessions in dataDB +func (dm *DataManager) SetBackupSessions(nodeID, tenant string, + storedSessions []*StoredSession) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.dataDB.SetBackupSessionsDrv(storedSessions, nodeID, tenant); err != nil { + return + } + + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), + utils.ReplicatorSv1SetBackupSessions, + &SetBackupSessionsArgs{ + StoredSessions: storedSessions, + NodeID: nodeID, + Tenant: tenant, + }) + } + return +} + +type RemoveSessionBackupArgs struct { + Tenant string // used as part of filter of DataDB query + NodeID string // used as part of filter of DataDB query + CGRID string // used as part of filter of DataDB query +} + +// RemoveSessionsBackup remove one or all sessions from dataDB backup +func (dm *DataManager) RemoveSessionsBackup(nodeID, tenant, cgrid string) (err error) { + if dm == nil { + return utils.ErrNoDatabaseConn + } + if err = dm.dataDB.RemoveSessionsBackupDrv(nodeID, tenant, cgrid); err != nil { + return + } + + if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup]; itm.Replicate { + err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, + config.CgrConfig().DataDbCfg().RplFiltered, + utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), + utils.ReplicatorSv1RemoveSessionBackup, + &RemoveSessionBackupArgs{ + CGRID: cgrid, + NodeID: nodeID, + Tenant: tenant, + }) + } + return +} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index 7e981f343..0810199cf 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -128,6 +128,9 @@ type DataDB interface { GetDispatcherHostDrv(string, string) (*DispatcherHost, error) SetDispatcherHostDrv(*DispatcherHost) error RemoveDispatcherHostDrv(string, string) error + SetBackupSessionsDrv(sessions []*StoredSession, nodeID string, tenant string) error + GetSessionsBackupDrv(nodeID string, tenant string) ([]*StoredSession, error) + RemoveSessionsBackupDrv(nodeID, tenant, cgrid string) error } type StorDB interface { diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index a3e2825a5..32ad7a75d 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -854,3 +854,38 @@ func (iDB *InternalDB) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err iDB.db.Remove(idxItmType, utils.ConcatenatedKey(tntCtx, idxKey), true, utils.NonTransactional) return } + +// Will backup active sessions in DataDB +func (iDB *InternalDB) SetBackupSessionsDrv(storedSessions []*StoredSession, nodeID string, + tnt string) error { + for _, sess := range storedSessions { + iDB.db.Set(utils.CacheSessionsBackup, sess.CGRID, sess, + []string{utils.ConcatenatedKey(tnt, nodeID)}, true, utils.NonTransactional) + } + return nil +} + +// Will restore sessions that were active from dataDB backup +func (iDB *InternalDB) GetSessionsBackupDrv(nodeID, tnt string) ([]*StoredSession, error) { + var storedSessions []*StoredSession + for _, sessIface := range iDB.db.GetGroupItems(utils.CacheSessionsBackup, utils.ConcatenatedKey(tnt, + nodeID)) { + sess := sessIface.(*StoredSession) + storedSessions = append(storedSessions, sess) + } + if len(storedSessions) == 0 { + return nil, utils.ErrNoBackupFound + } + return storedSessions, nil +} + +// Will remove one or all sessions from dataDB backup +func (iDB *InternalDB) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error { + if cgrid == utils.EmptyString { + iDB.db.RemoveGroup(utils.CacheSessionsBackup, utils.ConcatenatedKey(tnt, + nodeID), true, utils.NonTransactional) + return nil + } + iDB.db.Remove(utils.CacheSessionsBackup, cgrid, true, utils.NonTransactional) + return nil +} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 4ce574419..09415e780 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -75,6 +75,7 @@ const ( ColDpp = "dispatcher_profiles" ColDph = "dispatcher_hosts" ColLID = "load_ids" + ColBkup = "sessions_backup" ) var ( @@ -1956,3 +1957,18 @@ func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) erro return err }) } + +// Will backup active sessions in DataDB +func (ms *MongoStorage) SetBackupSessionsDrv(storedSessions []*StoredSession, nodeID, tnt string) error { + return utils.ErrNotImplemented +} + +// Will restore sessions that were active from dataDB backup +func (ms *MongoStorage) GetSessionsBackupDrv(nodeID, tnt string) ([]*StoredSession, error) { + return nil, utils.ErrNotImplemented +} + +// Will remove one or all sessions from dataDB Backup +func (ms *MongoStorage) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error { + return utils.ErrNotImplemented +} diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 623841bb2..c7508e2cc 100644 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -66,6 +66,7 @@ const ( redis_HGET = "HGET" redis_RENAME = "RENAME" redis_HMSET = "HMSET" + redis_HSET = "HSET" redisLoadError = "Redis is loading the dataset in memory" RedisLimit = 524287 // https://github.com/StackExchange/StackExchange.Redis/issues/201#issuecomment-98639005 @@ -1246,3 +1247,60 @@ func (rs *RedisStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) (err } return rs.Cmd(nil, redis_HDEL, utils.CacheInstanceToPrefix[idxItmType]+tntCtx, idxKey) } + +// Converts time.Time values inside EventStart and SRuns Events, to string type values. Used before marshaling StoredSessions with msgpack +func StoredSessionEvTimeAsStr(sess *StoredSession) { + utils.MapIfaceTimeAsString(sess.EventStart) + for i := range sess.SRuns { + utils.MapIfaceTimeAsString(sess.SRuns[i].Event) + } +} + +// Will backup active sessions in DataDB +func (rs *RedisStorage) SetBackupSessionsDrv(storedSessions []*StoredSession, nodeID string, + tnt string) (err error) { + mp := make(map[string]string) + for _, sess := range storedSessions { + StoredSessionEvTimeAsStr(sess) + var sessByte []byte + if sessByte, err = rs.ms.Marshal(sess); err != nil { + return + } + mp[sess.CGRID] = string(sessByte) + if len(mp) == RedisLimit { + if err = rs.FlatCmd(nil, redis_HMSET, utils.SessionsBackupPrefix+utils.ConcatenatedKey(tnt, + nodeID), mp); err != nil { + return + } + mp = make(map[string]string) + } + } + return rs.FlatCmd(nil, redis_HMSET, utils.SessionsBackupPrefix+utils.ConcatenatedKey(tnt, nodeID), mp) +} + +// Will restore sessions that were active from dataDB backup +func (rs *RedisStorage) GetSessionsBackupDrv(nodeID, tnt string) (r []*StoredSession, err error) { + mp := make(map[string]string) + if err = rs.Cmd(&mp, redis_HGETALL, utils.SessionsBackupPrefix+utils.ConcatenatedKey(tnt, + nodeID)); err != nil { + return + } else if len(mp) == 0 { + return nil, utils.ErrNoBackupFound + } + for _, v := range mp { + var ss *StoredSession + if err = rs.ms.Unmarshal([]byte(v), &ss); err != nil { + return + } + r = append(r, ss) + } + return +} + +// Will remove one or all sessions from dataDB backup +func (rs *RedisStorage) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error { + if cgrid == utils.EmptyString { + return rs.Cmd(nil, redis_DEL, utils.SessionsBackupPrefix+utils.ConcatenatedKey(tnt, nodeID)) + } + return rs.Cmd(nil, redis_HDEL, utils.SessionsBackupPrefix+utils.ConcatenatedKey(tnt, nodeID), cgrid) +} diff --git a/engine/stored_session.go b/engine/stored_session.go new file mode 100644 index 000000000..1617fc382 --- /dev/null +++ b/engine/stored_session.go @@ -0,0 +1,49 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "time" +) + +// used to evade import cycle of the real sessions.SRun struct +type StoredSRun struct { + Event MapEvent // Event received from ChargerS + CD *CallDescriptor // initial CD used for debits, updated on each debit + EventCost *EventCost + + ExtraDuration time.Duration // keeps the current duration debited on top of what has been asked + LastUsage time.Duration // last requested Duration + TotalUsage time.Duration // sum of lastUsage + NextAutoDebit *time.Time +} + +// Holds a Session for storing in DataDB +type StoredSession struct { + CGRID string + Tenant string + ResourceID string + ClientConnID string // connection ID towards the client so we can recover from passive + EventStart MapEvent // Event which started the session + DebitInterval time.Duration // execute debits for *prepaid runs + Chargeable bool // used in case of pausing debit + SRuns []*StoredSRun // forked based on ChargerS + OptsStart MapEvent + UpdatedAt time.Time // time when session was changed +} diff --git a/services/sessions.go b/services/sessions.go index ede17c978..449ca9ba7 100644 --- a/services/sessions.go +++ b/services/sessions.go @@ -75,9 +75,9 @@ func (smg *SessionService) Start() error { if smg.IsRunning() { return utils.ErrServiceAlreadyRunning } - + smg.srvDep[utils.DataDB].Add(1) // DataDB will wait for session service to close before closing var datadb *engine.DataManager - if smg.dm.IsRunning() { + if smg.dm.ShouldRun() { dbchan := smg.dm.GetDMChan() datadb = <-dbchan dbchan <- datadb @@ -86,9 +86,17 @@ func (smg *SessionService) Start() error { defer smg.Unlock() smg.sm = sessions.NewSessionS(smg.cfg, datadb, smg.connMgr) - //start sync session in a separate gorutine smg.stopChan = make(chan struct{}) - go smg.sm.ListenAndServe(smg.stopChan) + + // Restore previuos sessions backup and start backup looping + if smg.cfg.SessionSCfg().BackupInterval != 0 { + if err := smg.sm.RestoreAndBackupSessions(smg.stopChan); err != nil { + return err + } + } + + //start sync session in a separate gorutine + go smg.sm.SyncSessions(smg.stopChan) // Pass internal connection srv, err := engine.NewServiceWithName(v1.NewSessionSv1(smg.sm), "", false) if err != nil { @@ -143,6 +151,7 @@ func (smg *SessionService) Reload() (err error) { // Shutdown stops the service func (smg *SessionService) Shutdown() (err error) { + defer smg.srvDep[utils.DataDB].Done() // signal DataDB when session service finishes shutting down smg.Lock() defer smg.Unlock() close(smg.stopChan) diff --git a/sessions/session.go b/sessions/session.go index 90452b64d..23ea09cca 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -79,6 +79,7 @@ type Session struct { Chargeable bool // used in case of pausing debit SRuns []*SRun // forked based on ChargerS OptsStart engine.MapEvent + UpdatedAt time.Time // time when session was changed debitStop chan struct{} sTerminator *sTerminator // automatic timeout for the session @@ -132,6 +133,64 @@ func (s *Session) Clone() (cln *Session) { return } +// asStoredSession converts a Session to a StoredSession to be stored later in DataDB +func (s *Session) asStoredSession() *engine.StoredSession { + storedSRuns := make([]*engine.StoredSRun, len(s.SRuns)) + for i, sRun := range s.SRuns { + storedSRuns[i] = &engine.StoredSRun{ + Event: sRun.Event, + CD: sRun.CD, + EventCost: sRun.EventCost, + ExtraDuration: sRun.ExtraDuration, + LastUsage: sRun.LastUsage, + TotalUsage: sRun.TotalUsage, + NextAutoDebit: sRun.NextAutoDebit, + } + } + + return &engine.StoredSession{ + CGRID: s.CGRID, + Tenant: s.Tenant, + ResourceID: s.ResourceID, + ClientConnID: s.ClientConnID, + EventStart: s.EventStart, + DebitInterval: s.DebitInterval, + Chargeable: s.Chargeable, + SRuns: storedSRuns, + OptsStart: s.OptsStart, + UpdatedAt: s.UpdatedAt, + } +} + +// newSessionFromStoredSession converts a StoredSession to a Session to be restored and activated later on +func newSessionFromStoredSession(s *engine.StoredSession) *Session { + storedSRuns := make([]*SRun, len(s.SRuns)) + for i, sRun := range s.SRuns { + storedSRuns[i] = &SRun{ + Event: sRun.Event, + CD: sRun.CD, + EventCost: sRun.EventCost, + ExtraDuration: sRun.ExtraDuration, + LastUsage: sRun.LastUsage, + TotalUsage: sRun.TotalUsage, + NextAutoDebit: sRun.NextAutoDebit, + } + } + + return &Session{ + CGRID: s.CGRID, + Tenant: s.Tenant, + ResourceID: s.ResourceID, + ClientConnID: s.ClientConnID, + EventStart: s.EventStart, + DebitInterval: s.DebitInterval, + Chargeable: s.Chargeable, + SRuns: storedSRuns, + OptsStart: s.OptsStart, + UpdatedAt: s.UpdatedAt, + } +} + // AsExternalSessions returns the session as a list of ExternalSession using all SRuns (thread safe) func (s *Session) AsExternalSessions(tmz, nodeID string) (aSs []*ExternalSession) { s.RLock() diff --git a/sessions/sessions.go b/sessions/sessions.go index 056e80dbe..17b718a22 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -44,17 +44,19 @@ func NewSessionS(cgrCfg *config.CGRConfig, cgrCfg.SessionSCfg().SessionIndexes.Add(utils.OriginID) // Make sure we have indexing for OriginID since it is a requirement on prefix searching return &SessionS{ - cgrCfg: cgrCfg, - dm: dm, - connMgr: connMgr, - biJClnts: make(map[birpc.ClientConnector]string), - biJIDs: make(map[string]*biJClient), - aSessions: make(map[string]*Session), - aSessionsIdx: make(map[string]map[string]map[string]utils.StringSet), - aSessionsRIdx: make(map[string][]*riFieldNameVal), - pSessions: make(map[string]*Session), - pSessionsIdx: make(map[string]map[string]map[string]utils.StringSet), - pSessionsRIdx: make(map[string][]*riFieldNameVal), + cgrCfg: cgrCfg, + dm: dm, + connMgr: connMgr, + biJClnts: make(map[birpc.ClientConnector]string), + biJIDs: make(map[string]*biJClient), + aSessions: make(map[string]*Session), + aSessionsIdx: make(map[string]map[string]map[string]utils.StringSet), + aSessionsRIdx: make(map[string][]*riFieldNameVal), + pSessions: make(map[string]*Session), + pSessionsIdx: make(map[string]map[string]map[string]utils.StringSet), + pSessionsRIdx: make(map[string][]*riFieldNameVal), + markedSsCGRIDs: make(utils.StringSet), + removeSsCGRIDs: make(utils.StringSet), } } @@ -87,10 +89,16 @@ type SessionS struct { pSIMux sync.RWMutex // protects pSessionsIdx pSessionsIdx map[string]map[string]map[string]utils.StringSet // map[fieldName]map[fieldValue][cgrID]utils.StringSet[runID]sID pSessionsRIdx map[string][]*riFieldNameVal // reverse indexes for passive sessions, used on remove + + markedSsCGRIDs utils.StringSet // keep a record of session cgrids to be stored in dataDB backup + markedSsCGRIDsMux sync.RWMutex // prevent concurrency when adding/deleting CGRIDs from map + removeSsCGRIDs utils.StringSet // keep a record of session cgrids to be removed from dataDB backup + removeSsCGRIDsMux sync.RWMutex // prevent concurrency when adding/deleting CGRIDs from map + storeSessMux sync.RWMutex // protects storeSessions } -// ListenAndServe starts the service and binds it to the listen loop -func (sS *SessionS) ListenAndServe(stopChan chan struct{}) { +// SyncSessions starts the service and binds it to the listen loop +func (sS *SessionS) SyncSessions(stopChan chan struct{}) { utils.Logger.Info(fmt.Sprintf("<%s> starting <%s> subsystem", utils.CoreS, utils.SessionS)) if sS.cgrCfg.SessionSCfg().ChannelSyncInterval != 0 { for { // Schedule sync channels to run repeately @@ -106,15 +114,9 @@ func (sS *SessionS) ListenAndServe(stopChan chan struct{}) { // Shutdown is called by engine to clear states func (sS *SessionS) Shutdown() (err error) { - if len(sS.cgrCfg.SessionSCfg().ReplicationConns) == 0 { - var hasErr bool - for _, s := range sS.getSessions("", false) { // Force sessions shutdown - if err = sS.terminateSession(s, nil, nil, nil, false); err != nil { - hasErr = true - } - } - if hasErr { - return utils.ErrPartiallyExecuted + if sS.cgrCfg.SessionSCfg().BackupInterval != 0 { + if _, err := sS.storeSessions(); err != nil { + utils.Logger.Err(fmt.Sprintf("Backup Sessions error on shutdown: <%v>", err)) } } return @@ -661,6 +663,14 @@ func (sS *SessionS) debitLoopSession(s *Session, sRunIdx int, } return } + s.Lock() + s.UpdatedAt = time.Now() + s.Unlock() + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } select { case <-debitStop: return @@ -726,6 +736,12 @@ func (sS *SessionS) refundSession(s *Session, sRunIdx int, rUsage time.Duration) acntSummary.UpdateInitialValue(sr.EventCost.AccountSummary) sr.EventCost.AccountSummary = acntSummary } + s.UpdatedAt = time.Now() + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } return } @@ -908,6 +924,7 @@ func (sS *SessionS) registerSession(s *Session, passive bool) { sMp = sS.pSessions } sMux.Lock() + s.UpdatedAt = time.Now() sMp[s.CGRID] = s sMux.Unlock() sS.indexSession(s, passive) @@ -936,6 +953,14 @@ func (sS *SessionS) unregisterSession(cgrID string, passive bool) bool { sMux = &sS.pSsMux sMp = sS.pSessions } + if !passive && sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + delete(sS.markedSsCGRIDs, cgrID) // in case not yet in backup, dont needlessly store session + sS.markedSsCGRIDsMux.Unlock() + sS.removeSsCGRIDsMux.Lock() + sS.removeSsCGRIDs.Add(cgrID) + sS.removeSsCGRIDsMux.Unlock() + } sMux.Lock() if _, has := sMp[cgrID]; !has { sMux.Unlock() @@ -1385,7 +1410,13 @@ func (sS *SessionS) getActivateSession(cgrID string) (s *Session) { if len(ss) != 0 { return ss[0] } - return sS.transitSState(cgrID, false) + s = sS.transitSState(cgrID, false) + if len(ss) != 0 && sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } + return } // relocateSession will change the CGRID of a session (ie: prefix based session group) @@ -1411,6 +1442,11 @@ func (sS *SessionS) relocateSession(initOriginID, originID, originHost string) ( } s.Unlock() sS.registerSession(s, false) + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } sS.replicateSessions(initCGRID, false, sS.cgrCfg.SessionSCfg().ReplicationConns) return } @@ -1554,6 +1590,26 @@ func (sS *SessionS) authEvent(cgrEv *utils.CGREvent, forceDuration bool) (usage return } +// restoreSessions reinitiates sessions stored on dataDB backup +// no session protection needed since it runs only once at start of service, +// before the start modifying/creating sessions +func (sS *SessionS) restoreSessions(sessions []*Session) { + for _, s := range sessions { + tor, _ := s.EventStart[utils.ToR].(string) + if tor == utils.EmptyString { + tor = utils.MetaVoice + } + if time.Since(s.UpdatedAt) <= sS.cgrCfg.SessionSCfg().DefaultUsage[tor] { + sS.initSessionDebitLoops(s) + sS.registerSession(s, false) + } else { // remove expired sessions from dataDB + sS.removeSsCGRIDsMux.Lock() + sS.removeSsCGRIDs.Add(s.CGRID) + sS.removeSsCGRIDsMux.Unlock() + } + } +} + // initSession handles a new session // not thread-safe for Session since it is constructed here func (sS *SessionS) initSession(cgrEv *utils.CGREvent, clntConnID, @@ -1588,6 +1644,7 @@ func (sS *SessionS) updateSession(s *Session, updtEv, opts engine.MapEvent, isMs sS.setSTerminator(s, opts) // reset the terminator } s.Chargeable = opts.GetBoolOrDefault(utils.OptsChargeable, true) + s.UpdatedAt = time.Now() //init has no updtEv if updtEv == nil { updtEv = engine.MapEvent(s.EventStart.Clone()) @@ -1719,6 +1776,14 @@ func (sS *SessionS) endSession(s *Session, tUsage, lastUsage *time.Duration, nil, true, utils.NonTransactional); errCh != nil { return errCh } + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + delete(sS.markedSsCGRIDs, s.CGRID) // in case not yet in backup, dont needlessly store session + sS.markedSsCGRIDsMux.Unlock() + sS.removeSsCGRIDsMux.Lock() + sS.removeSsCGRIDs.Add(s.CGRID) + sS.removeSsCGRIDsMux.Unlock() + } return } @@ -2385,6 +2450,11 @@ func (sS *SessionS) BiRPCv1InitiateSession(ctx *context.Context, if sRunsUsage, err = sS.updateSession(s, nil, args.APIOpts, false); err != nil { return utils.NewErrRALs(err) } + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } var maxUsage time.Duration var maxUsageSet bool // so we know if we have set the 0 on purpose @@ -2602,6 +2672,11 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context, if sRunsUsage, err = sS.updateSession(s, ev, args.APIOpts, false); err != nil { return utils.NewErrRALs(err) } + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } var maxUsage time.Duration var maxUsageSet bool // so we know if we have set the 0 on purpose for _, rplyMaxUsage := range sRunsUsage { @@ -3542,6 +3617,11 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, } else if sRunsMaxUsage, err = sS.updateSession(s, nil, args.APIOpts, false); err != nil { return utils.NewErrRALs(err) } + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } rply.MaxUsage = getDerivedMaxUsage(sRunsMaxUsage, ralsOpts.Has(utils.MetaDerivedReply)) //check for update session case ralsOpts.Has(utils.MetaUpdate): @@ -3564,6 +3644,11 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, if sRunsMaxUsage, err = sS.updateSession(s, ev, args.APIOpts, false); err != nil { return utils.NewErrRALs(err) } + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(s.CGRID) + sS.markedSsCGRIDsMux.Unlock() + } rply.MaxUsage = getDerivedMaxUsage(sRunsMaxUsage, ralsOpts.Has(utils.MetaDerivedReply)) // check for terminate session case ralsOpts.Has(utils.MetaTerminate): @@ -3790,6 +3875,12 @@ func (sS *SessionS) BiRPCv1ActivateSessions(ctx *context.Context, if s := sS.transitSState(sID, false); s == nil { utils.Logger.Warning(fmt.Sprintf("<%s> no passive session with id: <%s>", utils.SessionS, sID)) err = utils.ErrPartiallyExecuted + } else { + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.markedSsCGRIDsMux.Lock() + sS.markedSsCGRIDs.Add(sID) + sS.markedSsCGRIDsMux.Unlock() + } } } if err == nil { @@ -3816,6 +3907,12 @@ func (sS *SessionS) BiRPCv1DeactivateSessions(ctx *context.Context, if s := sS.transitSState(sID, true); s == nil { utils.Logger.Warning(fmt.Sprintf("<%s> no active session with id: <%s>", utils.SessionS, sID)) err = utils.ErrPartiallyExecuted + } else { + if sS.cgrCfg.SessionSCfg().BackupInterval > 0 { + sS.removeSsCGRIDsMux.Lock() + sS.removeSsCGRIDs.Add(sID) + sS.removeSsCGRIDsMux.Unlock() + } } } if err == nil { @@ -4221,3 +4318,109 @@ func (ssv1 *SessionS) BiRPCv1Sleep(ctx *context.Context, args *utils.DurationArg *reply = utils.OK return nil } + +// RestoreAndBackupSessions will restore previuos backup sessions and start backup looping +func (sS *SessionS) RestoreAndBackupSessions(stopChan chan struct{}) error { + var restoredSess []*Session //holds the restored sessions gotten from datadb + storedSessions, err := sS.dm.GetSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID, + sS.cgrCfg.GeneralCfg().DefaultTenant) + if err != nil && err != utils.ErrNoBackupFound { // if backup is not found we still want to start the backup loop + return err + } else { + for _, storSess := range storedSessions { + storSess := newSessionFromStoredSession(storSess) + restoredSess = append(restoredSess, storSess) + } + sS.restoreSessions(restoredSess) + } + + go sS.runBackup(stopChan) + return nil +} + +// Start running backup loop +func (sS *SessionS) runBackup(stopChan chan struct{}) { + backupInterval := sS.cgrCfg.SessionSCfg().BackupInterval + if backupInterval > 0 { + for { + if err := sS.storeSessionsMarked(); err != nil { + utils.Logger.Err(fmt.Sprintf("Backup Sessions error: <%v>", err)) + } + select { + case <-stopChan: + return + case <-time.After(backupInterval): + } + } + } +} + +// storeSessionsMarked stores only marked active sessions for backup in DataDB, and removes inactive sessions from it +func (sS *SessionS) storeSessionsMarked() (err error) { + sS.markedSsCGRIDsMux.Lock() + var storedSessions []*engine.StoredSession // hold the converted active marked sessions + for cgrID := range sS.markedSsCGRIDs { + activeSess := sS.getSessions(cgrID, false) + if len(activeSess) == 0 { + utils.Logger.Warning(" Couldn't backup session with CGRID <" + cgrID + ">. Session is not active") + delete(sS.markedSsCGRIDs, cgrID) // remove inactive session cgrids from the map + continue + } + storedSessions = append(storedSessions, activeSess[0].asStoredSession()) + } + if len(storedSessions) != 0 { + if err := sS.dm.SetBackupSessions(sS.cgrCfg.GeneralCfg().NodeID, + sS.cgrCfg.GeneralCfg().DefaultTenant, storedSessions); err != nil { + return err + } + } + for _, sess := range storedSessions { + delete(sS.markedSsCGRIDs, sess.CGRID) + } + sS.markedSsCGRIDsMux.Unlock() + sS.removeSsCGRIDsMux.Lock() + for cgrID := range sS.removeSsCGRIDs { + if err := sS.dm.RemoveSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID, + sS.cgrCfg.GeneralCfg().DefaultTenant, cgrID); err != nil { + return err + } + delete(sS.removeSsCGRIDs, cgrID) + } + sS.removeSsCGRIDsMux.Unlock() + return nil +} + +// storeSessions clears current sessions stored in datadb, and stores active sessions for backup in it +func (sS *SessionS) storeSessions() (sessStored int, err error) { + sS.storeSessMux.Lock() // prevents concurrent execution of the function + defer sS.storeSessMux.Unlock() + activeSess := sS.getSessions(utils.EmptyString, false) + // remove all sessions from dataDB backup if any + if err := sS.dm.RemoveSessionsBackup(sS.cgrCfg.GeneralCfg().NodeID, + sS.cgrCfg.GeneralCfg().DefaultTenant, utils.EmptyString); err != nil { + return 0, err + } + if len(activeSess) == 0 { + return + } + var storedSessions []*engine.StoredSession + for _, sess := range activeSess { + storedSessions = append(storedSessions, sess.asStoredSession()) + } + if err := sS.dm.SetBackupSessions(sS.cgrCfg.GeneralCfg().NodeID, + sS.cgrCfg.GeneralCfg().DefaultTenant, storedSessions); err != nil { + return 0, err + } + return len(activeSess), nil +} + +// BiRPCv1BackupActiveSessions will store all active sessions in dataDB and reply with the amount of sessions it stored +func (sS *SessionS) BiRPCv1BackupActiveSessions(ctx *context.Context, + args string, reply *int) error { + if sessCount, err := sS.storeSessions(); err != nil { + return err + } else { + *reply = sessCount + } + return nil +} diff --git a/utils/consts.go b/utils/consts.go index d12331a4d..b67eba2fe 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -303,6 +303,7 @@ const ( ThresholdProfilePrefix = "thp_" StatQueuePrefix = "stq_" LoadIDPrefix = "lid_" + SessionsBackupPrefix = "sbk_" LoadInstKey = "load_history" CreateCDRsTablesSQL = "create_cdrs_tables.sql" CreateTariffPlanTablesSQL = "create_tariffplan_tables.sql" @@ -972,6 +973,7 @@ const ( MetaThresholds = "*thresholds" MetaRoutes = "*routes" MetaAttributes = "*attributes" + MetaSessionsBackup = "*sessions_backup" MetaLoadIDs = "*load_ids" MetaNodeID = "*node_id" ) @@ -1280,6 +1282,8 @@ const ( ReplicatorSv1SetDispatcherProfile = "ReplicatorSv1.SetDispatcherProfile" ReplicatorSv1SetDispatcherHost = "ReplicatorSv1.SetDispatcherHost" ReplicatorSv1SetLoadIDs = "ReplicatorSv1.SetLoadIDs" + ReplicatorSv1SetBackupSessions = "ReplicatorSv1.SetBackupSessions" + ReplicatorSv1RemoveSessionBackup = "ReplicatorSv1.RemoveSessionBackup" ReplicatorSv1RemoveThreshold = "ReplicatorSv1.RemoveThreshold" ReplicatorSv1RemoveDestination = "ReplicatorSv1.RemoveDestination" ReplicatorSv1RemoveAccount = "ReplicatorSv1.RemoveAccount" @@ -1683,6 +1687,7 @@ const ( SessionSv1STIRIdentity = "SessionSv1.STIRIdentity" SessionSv1Sleep = "SessionSv1.Sleep" SessionSv1CapsError = "SessionSv1.CapsError" + SessionSv1BackupActiveSessions = "SessionSv1.BackupActiveSessions" ) // Agent APIs @@ -1938,6 +1943,7 @@ const ( CacheAccounts = "*accounts" CacheVersions = "*versions" CacheCapsEvents = "*caps_events" + CacheSessionsBackup = "*sessions_backup" CacheReplicationHosts = "*replication_hosts" // storDB @@ -2210,6 +2216,7 @@ const ( MinDurLowBalanceCfg = "min_dur_low_balance" DefaultUsageCfg = "default_usage" STIRCfg = "stir" + BackupIntervalCfg = "backup_interval" AllowedAtestCfg = "allowed_attest" PayloadMaxdurationCfg = "payload_maxduration" diff --git a/utils/errors.go b/utils/errors.go index a82398aa2..c04d99d14 100644 --- a/utils/errors.go +++ b/utils/errors.go @@ -81,6 +81,7 @@ var ( ErrMaxIterationsReached = errors.New("maximum iterations reached") ErrNegative = errors.New("NEGATIVE") ErrCastFailed = errors.New("CAST_FAILED") + ErrNoBackupFound = errors.New("NO_BACKUP_FOUND") ErrMap = map[string]error{ ErrNoMoreData.Error(): ErrNoMoreData, diff --git a/utils/reflect.go b/utils/reflect.go index a7cfc24aa..63751a385 100644 --- a/utils/reflect.go +++ b/utils/reflect.go @@ -343,6 +343,17 @@ func IfaceAsBool(itm any) (b bool, err error) { return } +// MapIfaceTimeAsString converts time.Time type fields in a map[string]any to RFC3339 time format string. Used before msgpack marshaling since time.Time variables put inside interfaces arent encoded/decoded into a readable string or time.Time type +func MapIfaceTimeAsString(me map[string]any) { + for k, v := range me { + if timeStr, ok := v.(time.Time); ok { + v = timeStr.Format(time.RFC3339) + me[k] = v + } + } + +} + func IfaceAsString(fld any) (out string) { switch value := fld.(type) { case nil: