add replication buffer support

This commit is contained in:
ionutboangiu
2025-04-03 19:57:49 +03:00
parent 37925b8f59
commit 76f5d931bd
7 changed files with 973 additions and 762 deletions

View File

@@ -1494,6 +1494,62 @@ func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args ReplayFai
return nil
}
// ReplayFailedReplicationsArgs contains args for replaying failed replications.
type ReplayFailedReplicationsArgs struct {
SourcePath string // path for events to be replayed
FailedPath string // path for events that failed to replay, *none to discard, defaults to SourcePath if empty
}
// ReplayFailedReplications will repost failed requests found in the SourcePath.
func (a *APIerSv1) ReplayFailedReplications(ctx *context.Context, args ReplayFailedReplicationsArgs, reply *string) error {
// Set default directories if not provided.
if args.SourcePath == "" {
args.SourcePath = a.Config.DataDbCfg().RplFailedDir
}
if args.SourcePath == "" {
return utils.NewErrServerError(
errors.New("no source directory specified: both SourcePath and replication_failed_dir configuration are empty"),
)
}
if args.FailedPath == "" {
args.FailedPath = args.SourcePath
}
if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error {
if err != nil {
utils.Logger.Warning(fmt.Sprintf("<ReplayFailedReplications> failed to access path %s: %v", path, err))
return nil // skip paths that cause an error
}
if d.IsDir() {
return nil // skip directories
}
task, err := engine.NewReplicationTaskFromFile(path)
if err != nil {
return fmt.Errorf("failed to init ExportEvents from %s: %v", path, err)
}
// Determine the failover path.
failoverPath := utils.MetaNone
if args.FailedPath != utils.MetaNone {
failoverPath = filepath.Join(args.FailedPath, d.Name())
}
if err := task.Execute(a.ConnMgr); err != nil && failoverPath != utils.MetaNone {
// Write the events that failed to be replayed to the failover directory
if err = task.WriteToFile(failoverPath); err != nil {
return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err)
}
}
return nil
}); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return nil
}
func (apierSv1 *APIerSv1) GetLoadIDs(ctx *context.Context, args *string, reply *map[string]int64) (err error) {
var loadIDs map[string]int64
if loadIDs, err = apierSv1.DataManager.GetItemLoadIDs(*args, false); err != nil {

File diff suppressed because it is too large Load Diff

View File

@@ -195,8 +195,8 @@ func TestDmGetFilterRemote(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
exp := &Filter{
Tenant: "cgrates.org",
@@ -611,9 +611,9 @@ func TestDMSetAccount(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
dm.ms = &JSONMarshaler{}
config.SetCgrConfig(cfg)
SetDataStorage(dm)
if err := dm.SetAccount(acc); err != nil {
t.Error(err)
@@ -683,8 +683,8 @@ func TestDMRemoveAccount(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err = dm.RemoveAccount(acc.ID); err != nil {
t.Error(err)
@@ -750,8 +750,8 @@ func TestDmSetFilter(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetFilter(filter, false); err != nil {
t.Error(err)
@@ -808,8 +808,8 @@ func TestDMSetThreshold(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err = dm.SetThreshold(thS); err != nil {
@@ -867,8 +867,8 @@ func TestDmRemoveThreshold(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.RemoveThreshold(thS.Tenant, thS.ID); err != nil {
t.Error(err)
@@ -919,8 +919,8 @@ func TestDMReverseDestinationRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dest := &Destination{
Id: "nat", Prefixes: []string{"0257", "0256", "0723"},
@@ -986,8 +986,8 @@ func TestDMStatQueueRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
sq := &StatQueue{
Tenant: "cgrates.org",
@@ -1063,8 +1063,8 @@ func TestDmTimingR(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
tp := &utils.TPTiming{
ID: "MIDNIGHT",
@@ -1136,8 +1136,8 @@ func TestDMSetActionTriggers(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
attrs := ActionTriggers{
&ActionTrigger{
@@ -1224,8 +1224,8 @@ func TestDMResourceProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
rp := &ResourceProfile{
Tenant: "cgrates.org",
@@ -1309,8 +1309,8 @@ func TestDmSharedGroup(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
sg := &SharedGroup{
Id: "SG2",
@@ -1399,8 +1399,8 @@ func TestDMThresholdProfile(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
th := &ThresholdProfile{
Tenant: "cgrates.org",
@@ -1536,8 +1536,8 @@ func TestDmDispatcherHost(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dH := &DispatcherHost{
Tenant: "testTenant",
@@ -1603,8 +1603,8 @@ func TestGetDispatcherHostErr(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dH := &DispatcherHost{
Tenant: "testTenant",
@@ -1675,8 +1675,8 @@ func TestChargerProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
chrPrf := &ChargerProfile{
Tenant: "cgrates.org",
@@ -1753,8 +1753,8 @@ func TestDispatcherProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dsp := &DispatcherProfile{
Tenant: "cgrates.org",
@@ -1828,8 +1828,8 @@ func TestRouteProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
rpp := &RouteProfile{
Tenant: "cgrates.org",
@@ -1900,8 +1900,8 @@ func TestRatingPlanRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
rP := &RatingPlan{
Id: "RP1",
@@ -1991,8 +1991,8 @@ func TestGetResourceRemote(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetResource(rS.Tenant, rS.ID, false, true, utils.NonTransactional); err != nil {
@@ -2059,8 +2059,8 @@ func TestGetResourceProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetResourceProfile(rsP.Tenant, rsP.ID, false, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -2123,8 +2123,8 @@ func TestGetActionTriggers(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
Cache.Set(utils.CacheActionTriggers, "Test", ActionTriggers{}, []string{}, false, utils.NonTransactional)
if val, err := dm.GetActionTriggers(aT[0].ID, false, utils.NonTransactional); err != nil {
@@ -2187,8 +2187,8 @@ func TestGetActionTriggersErr(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr1)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr1)
SetDataStorage(dm)
if _, err := dm.GetActionTriggers(aT[0].ID, true, utils.NonTransactional); err == nil {
t.Error(err)
@@ -2266,8 +2266,8 @@ func TestGetSharedGroupRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetSharedGroup(shG.Id, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -2324,8 +2324,8 @@ func TestGetStatQueueProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetStatQueueProfile(sqP.Tenant, sqP.ID, true, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -2397,8 +2397,8 @@ func TestStatQueueProfileRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
Cache.Set(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(sqP.Tenant, sqP.ID), &StatQueueProfile{
QueueLength: 2,
@@ -2469,8 +2469,8 @@ func TestDMActionsRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetActions("KeyActions", acs); err != nil {
t.Error(err)
@@ -2540,8 +2540,8 @@ func TestGetDispatcherHost(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetDispatcherHost("cgrates.org", "HostID", false, true, utils.NonTransactional); err != nil {
@@ -2598,8 +2598,8 @@ func TestGetReverseDestinationRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetReverseDestination("CRUDReverseDestination", false, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -2686,8 +2686,8 @@ func TestDMRemoveDestination(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dm.DataDB().SetDestinationDrv(dest, utils.NonTransactional)
if err := dm.RemoveDestination(dest.Id, utils.NonTransactional); err != nil {
@@ -2765,8 +2765,8 @@ func TestDMRemoveFilter(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
fltr := &Filter{
Tenant: "cgrates.org",
@@ -2859,8 +2859,8 @@ func TestRemoveStatQueueProfile(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
dm.DataDB().SetStatQueueProfileDrv(sQ)
if err = dm.RemoveStatQueueProfile(sQ.Tenant, sQ.ID, true); err == nil {
@@ -2935,8 +2935,8 @@ func TestDMGetTimingRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if _, err := dm.GetTiming(tp.ID, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -3011,8 +3011,8 @@ func TestDmGetActions(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if _, err := dm.GetActions("MINI", true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -3061,8 +3061,8 @@ func TestDMSetLoadIDs(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
ld := map[string]int64{
"load1": 23,
@@ -3130,8 +3130,8 @@ func TestGetItemLoadIDsRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetItemLoadIDs("load1", true); err != nil {
t.Error(err)
@@ -3206,11 +3206,10 @@ func TestDMItemLoadIDsRemoteErr(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetConnManager(connMgr)
config.SetCgrConfig(cfg)
Cache = NewCacheS(cfg, dm, nil)
if _, err := dm.GetItemLoadIDs("load1", true); err == nil || err.Error() != "Can't replicate" {
t.Error(err)
@@ -3294,8 +3293,8 @@ func TestActionPlanRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetActionPlan("act_key", actPln, true, utils.NonTransactional); err != nil {
@@ -3361,8 +3360,8 @@ func TestAccountActionPlansRemote(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetAccountActionPlans("acc_ID", []string{"act_pln", "act_pln"}, true); err != nil {
@@ -3669,8 +3668,8 @@ func TestDMRatingProfile(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetRatingProfile(rpf); err != nil {
t.Error(err)
@@ -3811,8 +3810,8 @@ func TestDMGetRatingPlan(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if _, err := dm.GetRatingPlan("id", true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -3877,8 +3876,8 @@ func TestDMChargerProfile(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if _, err := dm.GetChargerProfile(chP.Tenant, chP.ID, false, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -3952,8 +3951,8 @@ func TestDMDispatcherProfile(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if _, err := dm.GetDispatcherProfile(dPP.Tenant, dPP.ID, false, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -4386,8 +4385,8 @@ func TestDMGetRouteProfile(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if val, err := dm.GetRouteProfile(rpL.Tenant, rpL.ID, false, true, utils.NonTransactional); err != nil {
t.Error(err)
@@ -4441,8 +4440,8 @@ func TestDMGetRouteProfileErr(t *testing.T) {
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
Cache = NewCacheS(cfg, dm, nil)
SetConnManager(connMgr)
@@ -4628,8 +4627,8 @@ func TestDMAttributeProfile(t *testing.T) {
connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{
utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn,
})
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
SetDataStorage(dm)
if err := dm.SetAttributeProfile(attrPrf, false); err != nil {
t.Error(err)
@@ -4813,6 +4812,7 @@ func TestDmIndexes(t *testing.T) {
if dErr != nil {
t.Error(dErr)
}
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
idxes := map[string]utils.StringSet{
"*string:Account:1001": {
@@ -4827,7 +4827,6 @@ func TestDmIndexes(t *testing.T) {
"RL5": struct{}{},
},
}
config.SetCgrConfig(cfg)
if err := dm.SetIndexes(utils.CacheResourceFilterIndexes,
"cgrates.org", idxes, false, utils.NonTransactional); err != nil {
t.Error(err)
@@ -4875,8 +4874,8 @@ func TestDmCheckFilters(t *testing.T) {
if dErr != nil {
t.Error(dErr)
}
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMgr)
if err := dm.checkFilters("cgrates.org", []string{"FLTR_1"}); err == nil || err.Error() != "broken reference to filter: <FLTR_1>" {
t.Error(err)
}

View File

@@ -212,8 +212,8 @@ func TestDMSetDestinationSucces(t *testing.T) {
Id: "dest21",
Prefixes: []string{},
}
dm := NewDataManager(db, cfg.CacheCfg(), connMngr)
config.SetCgrConfig(cfg)
dm := NewDataManager(db, cfg.CacheCfg(), connMngr)
if err := dm.SetDestination(dest, utils.NonTransactional); err != nil {
t.Error(err)

View File

@@ -1,79 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
// UpdateReplicationFilters will set the connID in cache
func UpdateReplicationFilters(objType, objID, connID string) {
if connID == utils.EmptyString {
return
}
Cache.SetWithoutReplicate(utils.CacheReplicationHosts, objType+objID+utils.ConcatenatedKeySep+connID, connID, []string{objType + objID},
true, utils.NonTransactional)
}
// replicate will call Set/Remove APIs on ReplicatorSv1
func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// alp_cgrates.org:ATTR1
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
rplcHostIDs := make(utils.StringSet)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
// replicateMultipleIDs will do the same thing as replicate but uses multiple objectIDs
// used when setting the LoadIDs
func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// combine all hosts in a single set so if we receive a get with one ID in list
// send all list to that hos
rplcHostIDs := make(utils.StringSet)
for _, objID := range objIDs {
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}

310
engine/replicator.go Normal file
View File

@@ -0,0 +1,310 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
// replicationData holds the information about a pending replication task.
type replicationData struct {
objType string
objID string
method string
args any
}
// replicator manages replication tasks to synchronize data across instances.
// It can perform immediate replication or batch tasks to replicate on intervals.
//
// For failed replications, files are created with predictable names based on
// "methodName_objTypeObjID" as the key. Before each replication attempt, any existing
// file for that key is removed. A new file is created only if the replication fails.
// This ensures at most one failed replication file exists per unique item.
type replicator struct {
mu sync.Mutex
cm *ConnManager
conns []string // ids of connections to replicate to
// pending stores the latest version of the object, named by the key, that
// is to be replicated.
pending map[string]*replicationData
interval time.Duration // replication frequency
failedDir string // where failed replications are stored (one per id)
filtered bool // whether to replicate only objects coming from remote
stop chan struct{} // stop replication loop
wg sync.WaitGroup // wait for any pending replications before closing
}
// newReplicator creates a replication manager that either performs immediate
// or batched replications based on configuration.
// When interval > 0, replications are queued and processed in batches at that interval.
// When interval = 0, each replication is performed immediately when requested.
func newReplicator(cm *ConnManager) *replicator {
cfg := config.CgrConfig().DataDbCfg()
r := &replicator{
cm: cm,
pending: make(map[string]*replicationData),
interval: cfg.RplInterval,
failedDir: cfg.RplFailedDir,
conns: cfg.RplConns,
filtered: cfg.RplFiltered,
stop: make(chan struct{}),
}
if r.interval > 0 {
r.wg.Add(1)
go r.replicationLoop()
}
return r
}
// replicate handles the object replication based on configuration.
// When interval > 0, the replication task is queued for the next batch.
// Otherwise, it executes immediately.
func (r *replicator) replicate(objType, objID, method string, args any,
item *config.ItemOpt) error {
if !item.Replicate {
return nil
}
if r.interval > 0 {
// Form a unique key by joining method name with object identifiers.
// Including the method name (Set/Remove) allows different operations
// on the same object to have distinct keys, which also serve as
// predictable filenames if replication fails.
_, methodName, _ := strings.Cut(method, utils.NestingSep)
key := methodName + "_" + objType + objID
r.mu.Lock()
defer r.mu.Unlock()
r.pending[key] = &replicationData{
objType: objType,
objID: objID,
method: method,
args: args,
}
return nil
}
return replicate(r.cm, r.conns, r.filtered, objType, objID, method, args)
}
// replicate performs the actual replication by calling Set/Remove APIs on ReplicatorSv1
// It either replicates to all connections or only to filtered ones based on configuration.
func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// alp_cgrates.org:ATTR1
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
rplcHostIDs := make(utils.StringSet)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
// replicationLoop runs periodically according to the configured interval
// to flush pending replications. It stops when the Replicator is closed.
func (r *replicator) replicationLoop() {
defer r.wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.flush()
case <-r.stop:
r.flush()
return
}
}
}
// flush immediately processes all pending replications.
// Failed replications are saved to disk if a failedDir is configured.
func (r *replicator) flush() {
r.mu.Lock()
if len(r.pending) == 0 {
// Skip processing when there are no pending replications.
r.mu.Unlock()
return
}
pending := r.pending
r.pending = make(map[string]*replicationData)
r.mu.Unlock()
for key, data := range pending {
var failedPath string
if r.failedDir != "" {
failedPath = filepath.Join(r.failedDir, key+utils.GOBSuffix)
// Clean up any existing file containing failed replications.
if err := os.Remove(failedPath); err != nil && !os.IsNotExist(err) {
utils.Logger.Warning(fmt.Sprintf(
"<DataManager> failed to remove file for %q: %v", key, err))
}
}
if err := replicate(r.cm, r.conns, r.filtered, data.objType, data.objID,
data.method, data.args); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<DataManager> failed to replicate %q for object %q: %v",
data.method, data.objType+data.objID, err))
if failedPath != "" {
task := &ReplicationTask{
ConnIDs: r.conns,
Filtered: r.filtered,
ObjType: data.objType,
ObjID: data.objID,
Method: data.method,
Args: data.args,
}
if err := task.WriteToFile(failedPath); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<DataManager> failed to dump replication task: %v", err))
}
}
}
}
}
// close stops the replication loop if it's running and waits for pending
// replications to complete.
func (r *replicator) close() {
if r.interval > 0 {
close(r.stop)
r.wg.Wait()
}
}
// UpdateReplicationFilters sets the connection ID in cache for filtered replication.
// It's a no-op if connID is empty.
func UpdateReplicationFilters(objType, objID, connID string) {
if connID == utils.EmptyString {
return
}
Cache.SetWithoutReplicate(utils.CacheReplicationHosts, objType+objID+utils.ConcatenatedKeySep+connID, connID, []string{objType + objID},
true, utils.NonTransactional)
}
// replicateMultipleIDs replicates operations for multiple object IDs.
// It functions similarly to replicate but handles a collection of IDs rather than a single one.
// Used primarily for setting LoadIDs.
// TODO: merge with replicate function.
func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// combine all hosts in a single set so if we receive a get with one ID in list
// send all list to that hos
rplcHostIDs := make(utils.StringSet)
for _, objID := range objIDs {
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
// ReplicationTask represents a replication operation that can be saved to disk
// and executed later, typically used for failed replications.
type ReplicationTask struct {
ConnIDs []string
Filtered bool
Path string
ObjType string
ObjID string
Method string
Args any
failedDir string
}
// NewReplicationTaskFromFile loads a replication task from the specified file.
// The file is removed after successful loading.
func NewReplicationTaskFromFile(path string) (*ReplicationTask, error) {
var taskBytes []byte
if err := guardian.Guardian.Guard(func() error {
var err error
if taskBytes, err = os.ReadFile(path); err != nil {
return err
}
return os.Remove(path) // file is not needed anymore
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path); err != nil {
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(taskBytes))
var task *ReplicationTask
if err := dec.Decode(&task); err != nil {
return nil, err
}
return task, nil
}
// WriteToFile saves the replication task to the specified path.
// This allows failed tasks to be recovered and retried later.
func (r *ReplicationTask) WriteToFile(path string) error {
return guardian.Guardian.Guard(func() error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
enc := gob.NewEncoder(f)
return enc.Encode(r)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path)
}
// Execute performs the replication task.
func (r *ReplicationTask) Execute(cm *ConnManager) error {
return replicate(cm, r.ConnIDs, r.Filtered, r.ObjType, r.ObjID, r.Method, r.Args)
}

View File

@@ -117,7 +117,7 @@ func (db *DataDBService) Reload() (err error) {
func (db *DataDBService) Shutdown() (err error) {
db.srvDep[utils.DataDB].Wait()
db.Lock()
db.dm.DataDB().Close()
db.dm.Close()
db.dm = nil
db.Unlock()
return