Add mongo support for sessions backup

This commit is contained in:
arberkatellari
2024-06-24 08:50:00 +02:00
committed by Dan Christian Bogos
parent 692279ef8b
commit 5ccb77c5dc
5 changed files with 103 additions and 89 deletions

View File

@@ -1,75 +0,0 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
"general": {
"log_level": 7,
"node_id":"BackupSessionsIntervalNode2",
},
"listen": {
"rpc_json": "127.0.0.1:22012", // RPC JSON listening address
"rpc_gob": "127.0.0.1:22013", // RPC GOB listening address
"http": "127.0.0.1:22080", // HTTP listening address
},
"rpc_conns": {
"conn1": {
"strategy": "*first",
"conns": [{"address": "127.0.0.1:22012", "transport":"*json"}],
},
},
"data_db": {
"db_type": "mongo",
"db_name": "10",
"db_port": 27017,
},
"stor_db": {
"db_type": "mongo",
"db_name": "cgrates",
"db_port": 27017,
"db_password": "",
},
"schedulers": {
"enabled": true,
},
"rals": {
"enabled": true,
},
"cdrs": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"listen_bijson": "127.0.0.1:22014",
"chargers_conns": ["*internal"],
"default_usage":{
"*voice":"1h"
},
"backup_interval": "500ms",
},
"attributes": {
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
}
}

View File

@@ -2036,17 +2036,116 @@ func (ms *MongoStorage) RemoveIndexesDrv(idxItmType, tntCtx, idxKey string) erro
})
}
// used to "mold" the structure so that it appears in the first level of the mongo document and to make the conversion from mongo back to cgrates simpler
type mongoStoredSession struct {
NodeID string
CGRID string
Tenant string
ResourceID string
ClientConnID string
EventStart MapEvent
DebitInterval time.Duration
Chargeable bool
SRuns []*StoredSRun
OptsStart MapEvent
UpdatedAt time.Time
}
// Will backup active sessions in DataDB
func (ms *MongoStorage) SetBackupSessionsDrv(nodeID, tnt string, storedSessions []*StoredSession) error {
return utils.ErrNotImplemented
return ms.query(func(sctx mongo.SessionContext) error {
for i := 0; i < len(storedSessions); i += 1000 {
end := i + 1000
if end > len(storedSessions) {
end = len(storedSessions)
}
// split sessions into batches of 1001 sessons
batch := storedSessions[i:end] // if sessions < 1001, puts all sessions in 1 batch
var models []mongo.WriteModel
for _, sess := range batch {
doc := bson.M{"$set": mongoStoredSession{
NodeID: nodeID,
CGRID: sess.CGRID,
Tenant: sess.Tenant,
ResourceID: sess.ResourceID,
ClientConnID: sess.ClientConnID,
EventStart: sess.EventStart,
DebitInterval: sess.DebitInterval,
Chargeable: sess.Chargeable,
SRuns: sess.SRuns,
OptsStart: sess.OptsStart,
UpdatedAt: sess.UpdatedAt,
}}
model := mongo.NewUpdateOneModel().SetUpdate(doc).SetUpsert(true).SetFilter(bson.M{"nodeid": nodeID, "cgrid": sess.CGRID})
models = append(models, model)
}
if len(models) != 0 {
_, err := ms.getCol(ColBkup).BulkWrite(sctx, models)
if err != nil {
return err
}
}
}
return nil
})
}
// Will restore sessions that were active from dataDB backup
func (ms *MongoStorage) GetSessionsBackupDrv(nodeID, tnt string) ([]*StoredSession, error) {
return nil, utils.ErrNotImplemented
var storeSessions []*StoredSession
if err := ms.query(func(sctx mongo.SessionContext) (qryErr error) {
cur, qryErr := ms.getCol(ColBkup).Find(sctx, bson.M{"nodeid": nodeID})
if qryErr != nil {
return qryErr
}
defer func() {
closeErr := cur.Close(sctx)
if closeErr != nil && qryErr == nil {
qryErr = closeErr
}
}()
for cur.Next(sctx) {
var result mongoStoredSession
qryErr := cur.Decode(&result)
if errors.Is(qryErr, mongo.ErrNoDocuments) {
return utils.ErrNoBackupFound
} else if qryErr != nil {
return qryErr
}
oneStSession := &StoredSession{
CGRID: result.CGRID,
Tenant: result.Tenant,
ResourceID: result.ResourceID,
ClientConnID: result.ClientConnID,
EventStart: result.EventStart,
DebitInterval: result.DebitInterval,
Chargeable: result.Chargeable,
SRuns: result.SRuns,
OptsStart: result.OptsStart,
UpdatedAt: result.UpdatedAt,
}
storeSessions = append(storeSessions, oneStSession)
}
if len(storeSessions) == 0 {
return utils.ErrNoBackupFound
}
return
}); err != nil {
return nil, err
}
return storeSessions, nil
}
// Will remove one or all sessions from dataDB Backup
func (ms *MongoStorage) RemoveSessionsBackupDrv(nodeID, tnt, cgrid string) error {
return utils.ErrNotImplemented
if cgrid == utils.EmptyString {
return ms.query(func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColBkup).DeleteMany(sctx, bson.M{"nodeid": nodeID})
return err
})
}
return ms.query(func(sctx mongo.SessionContext) error {
_, err := ms.getCol(ColBkup).DeleteOne(sctx, bson.M{"nodeid": nodeID, "cgrid": cgrid})
return err
})
}

View File

@@ -73,8 +73,6 @@ func TestSessionsBkupIntrvl(t *testing.T) {
case utils.MetaMySQL:
sBkupCfgDIR = "sessions_backup_interval_mysql"
case utils.MetaMongo:
// mongo is unfinished
t.Skip()
sBkupCfgDIR = "sessions_backup_interval_mongo"
case utils.MetaPostgres:
sBkupCfgDIR = "sessions_backup_interval_postgres"
@@ -240,7 +238,7 @@ func testSessionSBkupIntrvlGetBackedupSessions1(t *testing.T) {
sBkupCfg.DataDbCfg().Port, sBkupCfg.DataDbCfg().Name,
sBkupCfg.DataDbCfg().User, sBkupCfg.DataDbCfg().Password,
sBkupCfg.GeneralCfg().DBDataEncoding,
utils.StorDB, nil, 10*time.Second)
utils.DataDB, nil, 10*time.Second)
if err != nil {
t.Fatal(err)
}

View File

@@ -98,8 +98,6 @@ func TestSessionsBkup(t *testing.T) {
case utils.MetaMySQL:
sBkupCfgDIR = "sessions_backup_mysql"
case utils.MetaMongo:
// mongo is unfinished
t.Skip()
sBkupCfgDIR = "sessions_backup_mongo"
case utils.MetaPostgres:
sBkupCfgDIR = "sessions_backup_postgres"

View File

@@ -68,18 +68,12 @@ func TestSessionSBkupRplc(t *testing.T) {
case utils.MetaInternal:
t.SkipNow()
case utils.MetaMySQL:
// mongo is unfinished
t.SkipNow()
sBRplcEng1CfgDIR = "sbkupreplcengine1_mysql"
sBRplcEng2CfgDIR = "sbkupreplcengine2_mongo"
case utils.MetaMongo:
// mongo is unfinished
t.SkipNow()
sBRplcEng1CfgDIR = "sbkupreplcengine1_mongo"
sBRplcEng2CfgDIR = "sbkupreplcengine2_mysql"
case utils.MetaPostgres:
// mongo is unfinished
t.SkipNow()
sBRplcEng1CfgDIR = "sbkupreplcengine1_postgres"
sBRplcEng2CfgDIR = "sbkupreplcengine2_mongo"
default: