diff --git a/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json b/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json deleted file mode 100644 index f87825849..000000000 --- a/data/conf/samples/sessions_backup_interval_mongo2/cgrates.json +++ /dev/null @@ -1,75 +0,0 @@ -{ - // Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments - // Copyright (C) ITsysCOM GmbH - - "general": { - "log_level": 7, - "node_id":"BackupSessionsIntervalNode2", - }, - - "listen": { - "rpc_json": "127.0.0.1:22012", // RPC JSON listening address - "rpc_gob": "127.0.0.1:22013", // RPC GOB listening address - "http": "127.0.0.1:22080", // HTTP listening address - }, - - "rpc_conns": { - "conn1": { - "strategy": "*first", - "conns": [{"address": "127.0.0.1:22012", "transport":"*json"}], - }, - }, - - "data_db": { - "db_type": "mongo", - "db_name": "10", - "db_port": 27017, - }, - - - "stor_db": { - "db_type": "mongo", - "db_name": "cgrates", - "db_port": 27017, - "db_password": "", - }, - - "schedulers": { - "enabled": true, - }, - - "rals": { - "enabled": true, - }, - - "cdrs": { - "enabled": true, - }, - - "chargers": { - "enabled": true, - "attributes_conns": ["*internal"], - }, - - "sessions": { - "enabled": true, - "rals_conns": ["*internal"], - "cdrs_conns": ["*internal"], - "listen_bijson": "127.0.0.1:22014", - "chargers_conns": ["*internal"], - "default_usage":{ - "*voice":"1h" - }, - "backup_interval": "500ms", - }, - - "attributes": { - "enabled": true, - }, - - "apiers": { - "enabled": true, - "scheduler_conns": ["*internal"], - } -} - \ No newline at end of file diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 59561ea12..e80b4e0ef 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2036,17 +2036,116 @@ func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) erro }) } +// used to "mold" the structure so that it appears in the first level of the mongo document and to make the conversion from mongo back to cgrates simpler +type mongoStoredSession struct { + NodeID string + CGRID string + Tenant string + ResourceID string + ClientConnID string + EventStart MapEvent + DebitInterval time.Duration + Chargeable bool + SRuns []*StoredSRun + OptsStart MapEvent + UpdatedAt time.Time +} + // Will backup active sessions in DataDB func (ms *MongoStorage) SetBackupSessionsDrv(nodeID, tnt string, storedSessions []*StoredSession) error { - return utils.ErrNotImplemented + return ms.query(func(sctx mongo.SessionContext) error { + for i := 0; i < len(storedSessions); i += 1000 { + end := i + 1000 + if end > len(storedSessions) { + end = len(storedSessions) + } + // split sessions into batches of 1001 sessons + batch := storedSessions[i:end] // if sessions < 1001, puts all sessions in 1 batch + var models []mongo.WriteModel + for _, sess := range batch { + doc := bson.M{"$set": mongoStoredSession{ + NodeID: nodeID, + CGRID: sess.CGRID, + Tenant: sess.Tenant, + ResourceID: sess.ResourceID, + ClientConnID: sess.ClientConnID, + EventStart: sess.EventStart, + DebitInterval: sess.DebitInterval, + Chargeable: sess.Chargeable, + SRuns: sess.SRuns, + OptsStart: sess.OptsStart, + UpdatedAt: sess.UpdatedAt, + }} + model := mongo.NewUpdateOneModel().SetUpdate(doc).SetUpsert(true).SetFilter(bson.M{"nodeid": nodeID, "cgrid": sess.CGRID}) + models = append(models, model) + } + if len(models) != 0 { + _, err := ms.getCol(ColBkup).BulkWrite(sctx, models) + if err != nil { + return err + } + } + } + return nil + }) } // Will restore sessions that were active from dataDB backup func (ms *MongoStorage) GetSessionsBackupDrv(nodeID, tnt string) ([]*StoredSession, error) { - return nil, utils.ErrNotImplemented + var storeSessions []*StoredSession + if err := ms.query(func(sctx mongo.SessionContext) (qryErr error) { + cur, qryErr := ms.getCol(ColBkup).Find(sctx, bson.M{"nodeid": nodeID}) + if qryErr != nil { + return qryErr + } + defer func() { + closeErr := cur.Close(sctx) + if closeErr != nil && qryErr == nil { + qryErr = closeErr + } + }() + for cur.Next(sctx) { + var result mongoStoredSession + qryErr := cur.Decode(&result) + if errors.Is(qryErr, mongo.ErrNoDocuments) { + return utils.ErrNoBackupFound + } else if qryErr != nil { + return qryErr + } + oneStSession := &StoredSession{ + CGRID: result.CGRID, + Tenant: result.Tenant, + ResourceID: result.ResourceID, + ClientConnID: result.ClientConnID, + EventStart: result.EventStart, + DebitInterval: result.DebitInterval, + Chargeable: result.Chargeable, + SRuns: result.SRuns, + OptsStart: result.OptsStart, + UpdatedAt: result.UpdatedAt, + } + storeSessions = append(storeSessions, oneStSession) + } + if len(storeSessions) == 0 { + return utils.ErrNoBackupFound + } + return + }); err != nil { + return nil, err + } + return storeSessions, nil } // Will remove one or all sessions from dataDB Backup func (ms *MongoStorage) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error { - return utils.ErrNotImplemented + if cgrid == utils.EmptyString { + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColBkup).DeleteMany(sctx, bson.M{"nodeid": nodeID}) + return err + }) + } + return ms.query(func(sctx mongo.SessionContext) error { + _, err := ms.getCol(ColBkup).DeleteOne(sctx, bson.M{"nodeid": nodeID, "cgrid": cgrid}) + return err + }) } diff --git a/general_tests/session_bkup_interval_it_test.go b/general_tests/session_bkup_interval_it_test.go index 0e0f5920f..abe267192 100644 --- a/general_tests/session_bkup_interval_it_test.go +++ b/general_tests/session_bkup_interval_it_test.go @@ -73,8 +73,6 @@ func TestSessionsBkupIntrvl(t *testing.T) { case utils.MetaMySQL: sBkupCfgDIR = "sessions_backup_interval_mysql" case utils.MetaMongo: - // mongo is unfinished - t.Skip() sBkupCfgDIR = "sessions_backup_interval_mongo" case utils.MetaPostgres: sBkupCfgDIR = "sessions_backup_interval_postgres" @@ -240,7 +238,7 @@ func testSessionSBkupIntrvlGetBackedupSessions1(t *testing.T) { sBkupCfg.DataDbCfg().Port, sBkupCfg.DataDbCfg().Name, sBkupCfg.DataDbCfg().User, sBkupCfg.DataDbCfg().Password, sBkupCfg.GeneralCfg().DBDataEncoding, - utils.StorDB, nil, 10*time.Second) + utils.DataDB, nil, 10*time.Second) if err != nil { t.Fatal(err) } diff --git a/sessions/sessions_bkup_it_test.go b/sessions/sessions_bkup_it_test.go index c6d40cd61..7b7c5acad 100644 --- a/sessions/sessions_bkup_it_test.go +++ b/sessions/sessions_bkup_it_test.go @@ -98,8 +98,6 @@ func TestSessionsBkup(t *testing.T) { case utils.MetaMySQL: sBkupCfgDIR = "sessions_backup_mysql" case utils.MetaMongo: - // mongo is unfinished - t.Skip() sBkupCfgDIR = "sessions_backup_mongo" case utils.MetaPostgres: sBkupCfgDIR = "sessions_backup_postgres" diff --git a/sessions/sessions_bkup_rpl_it_test.go b/sessions/sessions_bkup_rpl_it_test.go index ec42fef7b..a02a34b85 100644 --- a/sessions/sessions_bkup_rpl_it_test.go +++ b/sessions/sessions_bkup_rpl_it_test.go @@ -68,18 +68,12 @@ func TestSessionSBkupRplc(t *testing.T) { case utils.MetaInternal: t.SkipNow() case utils.MetaMySQL: - // mongo is unfinished - t.SkipNow() sBRplcEng1CfgDIR = "sbkupreplcengine1_mysql" sBRplcEng2CfgDIR = "sbkupreplcengine2_mongo" case utils.MetaMongo: - // mongo is unfinished - t.SkipNow() sBRplcEng1CfgDIR = "sbkupreplcengine1_mongo" sBRplcEng2CfgDIR = "sbkupreplcengine2_mysql" case utils.MetaPostgres: - // mongo is unfinished - t.SkipNow() sBRplcEng1CfgDIR = "sbkupreplcengine1_postgres" sBRplcEng2CfgDIR = "sbkupreplcengine2_mongo" default: