diff --git a/apier/v1/apier.go b/apier/v1/apier.go index b2b74a81b..fd910da87 100644 --- a/apier/v1/apier.go +++ b/apier/v1/apier.go @@ -1494,6 +1494,62 @@ func (apierSv1 *APIerSv1) ReplayFailedPosts(ctx *context.Context, args ReplayFai return nil } +// ReplayFailedReplicationsArgs contains args for replaying failed replications. +type ReplayFailedReplicationsArgs struct { + SourcePath string // path for events to be replayed + FailedPath string // path for events that failed to replay, *none to discard, defaults to SourcePath if empty +} + +// ReplayFailedReplications will repost failed requests found in the SourcePath. +func (a *APIerSv1) ReplayFailedReplications(ctx *context.Context, args ReplayFailedReplicationsArgs, reply *string) error { + + // Set default directories if not provided. + if args.SourcePath == "" { + args.SourcePath = a.Config.DataDbCfg().RplFailedDir + } + if args.SourcePath == "" { + return utils.NewErrServerError( + errors.New("no source directory specified: both SourcePath and replication_failed_dir configuration are empty"), + ) + } + if args.FailedPath == "" { + args.FailedPath = args.SourcePath + } + + if err := filepath.WalkDir(args.SourcePath, func(path string, d fs.DirEntry, err error) error { + if err != nil { + utils.Logger.Warning(fmt.Sprintf(" failed to access path %s: %v", path, err)) + return nil // skip paths that cause an error + } + if d.IsDir() { + return nil // skip directories + } + + task, err := engine.NewReplicationTaskFromFile(path) + if err != nil { + return fmt.Errorf("failed to init ExportEvents from %s: %v", path, err) + } + + // Determine the failover path. + failoverPath := utils.MetaNone + if args.FailedPath != utils.MetaNone { + failoverPath = filepath.Join(args.FailedPath, d.Name()) + } + + if err := task.Execute(a.ConnMgr); err != nil && failoverPath != utils.MetaNone { + // Write the events that failed to be replayed to the failover directory + if err = task.WriteToFile(failoverPath); err != nil { + return fmt.Errorf("failed to write the events that failed to be replayed to %s: %v", path, err) + } + } + return nil + }); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + func (apierSv1 *APIerSv1) GetLoadIDs(ctx *context.Context, args *string, reply *map[string]int64) (err error) { var loadIDs map[string]int64 if loadIDs, err = apierSv1.DataManager.GetItemLoadIDs(*args, false); err != nil { diff --git a/engine/datamanager.go b/engine/datamanager.go index 9d5c1cab8..3d3e8c152 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -86,21 +86,29 @@ var ( // NewDataManager returns a new DataManager func NewDataManager(dataDB DataDB, cacheCfg *config.CacheCfg, connMgr *ConnManager) *DataManager { ms, _ := NewMarshaler(config.CgrConfig().GeneralCfg().DBDataEncoding) + rpl := newReplicator(connMgr) return &DataManager{ - dataDB: dataDB, - cacheCfg: cacheCfg, - connMgr: connMgr, - ms: ms, + dataDB: dataDB, + cacheCfg: cacheCfg, + connMgr: connMgr, + ms: ms, + replicator: rpl, } } // DataManager is the data storage manager for CGRateS // transparently manages data retrieval, further serialization and caching type DataManager struct { - dataDB DataDB - cacheCfg *config.CacheCfg - connMgr *ConnManager - ms Marshaler + dataDB DataDB + cacheCfg *config.CacheCfg + connMgr *ConnManager + ms Marshaler + replicator *replicator +} + +func (dm *DataManager) Close() { + dm.replicator.close() + dm.dataDB.Close() } // DataDB exports access to dataDB @@ -408,18 +416,16 @@ func (dm *DataManager) SetDestination(dest *Destination, transactionID string) ( if err = dm.dataDB.SetDestinationDrv(dest, transactionID); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DestinationPrefix, dest.Id, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetDestination, - &DestinationWithAPIOpts{ - Destination: dest, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations] + return dm.replicator.replicate( + utils.DestinationPrefix, dest.Id, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetDestination, + &DestinationWithAPIOpts{ + Destination: dest, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveDestination(destID string, transactionID string) (err error) { @@ -450,17 +456,16 @@ func (dm *DataManager) RemoveDestination(destID string, transactionID string) (e dm.GetReverseDestination(prfx, false, true, transactionID) // it will recache the destination } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DestinationPrefix, destID, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveDestination, - &utils.StringWithAPIOpts{ - Arg: destID, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations] + _ = dm.replicator.replicate( + utils.DestinationPrefix, destID, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveDestination, + &utils.StringWithAPIOpts{ + Arg: destID, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -471,14 +476,16 @@ func (dm *DataManager) SetReverseDestination(destID string, prefixes []string, t if err = dm.dataDB.SetReverseDestinationDrv(destID, prefixes, transactionID); err != nil { return } - if config.CgrConfig().DataDbCfg().Items[utils.MetaReverseDestinations].Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DestinationPrefix, destID, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetReverseDestination, - &DestinationWithAPIOpts{Destination: &Destination{Id: destID, Prefixes: prefixes}}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaReverseDestinations] + return dm.replicator.replicate( + utils.DestinationPrefix, destID, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetReverseDestination, + &DestinationWithAPIOpts{ + Destination: &Destination{ + Id: destID, + Prefixes: prefixes, + }, + }, itm) } func (dm *DataManager) GetReverseDestination(prefix string, @@ -604,45 +611,39 @@ func (dm *DataManager) GetAccount(id string) (acc *Account, err error) { return } -func (dm *DataManager) SetAccount(acc *Account) (err error) { +func (dm *DataManager) SetAccount(acc *Account) error { if dm == nil { return utils.ErrNoDatabaseConn } - if err = dm.dataDB.SetAccountDrv(acc); err != nil { - return + if err := dm.dataDB.SetAccountDrv(acc); err != nil { + return err } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.AccountPrefix, acc.ID, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetAccount, - &AccountWithAPIOpts{ - Account: acc, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - utils.EmptyString, utils.EmptyString)}) // the account doesn't have cache - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts] + return dm.replicator.replicate(utils.AccountPrefix, acc.ID, + utils.ReplicatorSv1SetAccount, + &AccountWithAPIOpts{ + Account: acc, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, "", ""), + }, itm) } -func (dm *DataManager) RemoveAccount(id string) (err error) { +func (dm *DataManager) RemoveAccount(id string) error { if dm == nil { return utils.ErrNoDatabaseConn } - if err = dm.dataDB.RemoveAccountDrv(id); err != nil { - return + if err := dm.dataDB.RemoveAccountDrv(id); err != nil { + return err } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.AccountPrefix, id, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveAccount, - &utils.StringWithAPIOpts{ - Arg: id, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - utils.EmptyString, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccounts] + _ = dm.replicator.replicate( + utils.AccountPrefix, id, + utils.ReplicatorSv1RemoveAccount, + &utils.StringWithAPIOpts{ + Arg: id, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, "", ""), + }, itm) + return nil } // GetFilter returns a filter based on the given ID @@ -722,17 +723,15 @@ func (dm *DataManager) SetFilter(fltr *Filter, withIndex bool) (err error) { return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.FilterPrefix, fltr.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetFilter, - &FilterWithAPIOpts{ - Filter: fltr, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters] + return dm.replicator.replicate( + utils.FilterPrefix, fltr.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetFilter, + &FilterWithAPIOpts{ + Filter: fltr, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveFilter(tenant, id string, withIndex bool) (err error) { @@ -753,7 +752,7 @@ func (dm *DataManager) RemoveFilter(tenant, id string, withIndex bool) (err erro if err != utils.ErrNotFound { return } - err = nil // no index for this filter so no remove needed from index side + err = nil // no index for this filter so no remove needed from index side } else { return fmt.Errorf("cannot remove filter <%s> because will broken the reference to following items: %s", tntCtx, utils.ToJSON(rcvIndx)) @@ -765,16 +764,15 @@ func (dm *DataManager) RemoveFilter(tenant, id string, withIndex bool) (err erro if oldFlt == nil { return utils.ErrNotFound } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.FilterPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveFilter, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaFilters] + _ = dm.replicator.replicate( + utils.FilterPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveFilter, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -833,17 +831,15 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) { if err = dm.DataDB().SetThresholdDrv(th); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ThresholdPrefix, th.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetThreshold, - &ThresholdWithAPIOpts{ - Threshold: th, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds] + return dm.replicator.replicate( + utils.ThresholdPrefix, th.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetThreshold, + &ThresholdWithAPIOpts{ + Threshold: th, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveThreshold(tenant, id string) (err error) { @@ -853,16 +849,15 @@ func (dm *DataManager) RemoveThreshold(tenant, id string) (err error) { if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ThresholdPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveThreshold, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholds] + _ = dm.replicator.replicate( + utils.ThresholdPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache + utils.ReplicatorSv1RemoveThreshold, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -943,17 +938,16 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) return err } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate { - if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ThresholdProfilePrefix, th.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetThresholdProfile, - &ThresholdProfileWithAPIOpts{ - ThresholdProfile: th, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { - return - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles] + if err = dm.replicator.replicate( + utils.ThresholdProfilePrefix, th.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetThresholdProfile, + &ThresholdProfileWithAPIOpts{ + ThresholdProfile: th, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm); err != nil { + return } if oldTh == nil || // create the threshold if it didn't exist before @@ -999,16 +993,15 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id string, withIndex bool) return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ThresholdProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveThresholdProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaThresholdProfiles] + _ = dm.replicator.replicate( + utils.ThresholdProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveThresholdProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return dm.RemoveThreshold(tenant, id) // remove the threshold } @@ -1084,17 +1077,15 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) { if err = dm.dataDB.SetStatQueueDrv(ssq, sq); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetStatQueue, - &StatQueueWithAPIOpts{ - StatQueue: sq, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues] + return dm.replicator.replicate( + utils.StatQueuePrefix, sq.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueue, + &StatQueueWithAPIOpts{ + StatQueue: sq, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } // RemoveStatQueue removes the StoredStatQueue @@ -1105,16 +1096,15 @@ func (dm *DataManager) RemoveStatQueue(tenant, id string) (err error) { if err = dm.dataDB.RemStatQueueDrv(tenant, id); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveStatQueue, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueues] + _ = dm.replicator.replicate( + utils.StatQueuePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueue, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -1196,23 +1186,22 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool return err } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { - if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueueProfilePrefix, sqp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetStatQueueProfile, - &StatQueueProfileWithAPIOpts{ - StatQueueProfile: sqp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { - return - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles] + if err = dm.replicator.replicate( + utils.StatQueueProfilePrefix, sqp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetStatQueueProfile, + &StatQueueProfileWithAPIOpts{ + StatQueueProfile: sqp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm); err != nil { + return } if oldSts == nil || // create the stats queue if it didn't exist before oldSts.QueueLength != sqp.QueueLength || oldSts.TTL != sqp.TTL || oldSts.MinItems != sqp.MinItems || - (oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed this fields + (oldSts.Stored != sqp.Stored && oldSts.Stored) { // reset the stats queue if the profile changed these fields guardian.Guardian.Guard(func() (_ error) { // we change the queue so lock it var sq *StatQueue if sq, err = NewStatQueue(sqp.Tenant, sqp.ID, sqp.Metrics, @@ -1284,16 +1273,15 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id string, withIndex bool) return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.StatQueueProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveStatQueueProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaStatQueueProfiles] + _ = dm.replicator.replicate( + utils.StatQueueProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveStatQueueProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return dm.RemoveStatQueue(tenant, id) } @@ -1372,19 +1360,15 @@ func (dm *DataManager) SetTrend(tr *Trend) (err error) { if err = dm.DataDB().SetTrendDrv(tr); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate { - if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TrendPrefix, tr.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetTrend, - &TrendWithAPIOpts{ - Trend: tr, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { - return - } - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends] + return dm.replicator.replicate( + utils.TrendPrefix, tr.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetTrend, + &TrendWithAPIOpts{ + Trend: tr, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } // RemoveTrend removes the stored Trend @@ -1395,16 +1379,15 @@ func (dm *DataManager) RemoveTrend(tenant, id string) (err error) { if err = dm.DataDB().RemoveTrendDrv(tenant, id); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TrendPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveTrend, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrends] + _ = dm.replicator.replicate( + utils.TrendPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveTrend, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -1501,15 +1484,16 @@ func (dm *DataManager) SetTrendProfile(trp *TrendProfile) (err error) { if err = dm.DataDB().SetTrendProfileDrv(trp); err != nil { return err } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TrendsProfilePrefix, trp.TenantID(), - utils.ReplicatorSv1SetTrendProfile, - &TrendProfileWithAPIOpts{ - TrendProfile: trp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTrendProfiles] + if err = dm.replicator.replicate( + utils.TrendsProfilePrefix, trp.TenantID(), + utils.ReplicatorSv1SetTrendProfile, + &TrendProfileWithAPIOpts{ + TrendProfile: trp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm); err != nil { + return } if oldTrd == nil || oldTrd.QueueLength != trp.QueueLength || @@ -1536,16 +1520,15 @@ func (dm *DataManager) RemoveTrendProfile(tenant, id string) (err error) { if oldTrs == nil { return utils.ErrNotFound } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TrendsProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveTrendProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles] + _ = dm.replicator.replicate( + utils.TrendsProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveTrendProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return dm.RemoveTrend(tenant, id) } @@ -1640,15 +1623,16 @@ func (dm *DataManager) SetRankingProfile(rnp *RankingProfile) (err error) { if err = dm.DataDB().SetRankingProfileDrv(rnp); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RankingsProfilePrefix, rnp.TenantID(), - utils.ReplicatorSv1SetRankingProfile, - &RankingProfileWithAPIOpts{ - RankingProfile: rnp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles] + if err = dm.replicator.replicate( + utils.RankingsProfilePrefix, rnp.TenantID(), + utils.ReplicatorSv1SetRankingProfile, + &RankingProfileWithAPIOpts{ + RankingProfile: rnp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm); err != nil { + return } if oldRnk == nil || oldRnk.Sorting != rnp.Sorting || oldRnk.Schedule != rnp.Schedule { @@ -1673,16 +1657,15 @@ func (dm *DataManager) RemoveRankingProfile(tenant, id string) (err error) { if oldSgs == nil { return utils.ErrNotFound } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RankingsProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveRankingProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankingProfiles] + _ = dm.replicator.replicate( + utils.RankingsProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRankingProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } func (dm *DataManager) GetRanking(tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *Ranking, err error) { @@ -1741,19 +1724,15 @@ func (dm *DataManager) SetRanking(rn *Ranking) (err error) { if err = dm.DataDB().SetRankingDrv(rn); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate { - if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RankingPrefix, rn.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetRanking, - &RankingWithAPIOpts{ - Ranking: rn, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { - return - } - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings] + return dm.replicator.replicate( + utils.RankingPrefix, rn.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetRanking, + &RankingWithAPIOpts{ + Ranking: rn, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } // RemoveRanking removes the stored Ranking @@ -1764,16 +1743,15 @@ func (dm *DataManager) RemoveRanking(tenant, id string) (err error) { if err = dm.DataDB().RemoveRankingDrv(tenant, id); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RankingPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveRanking, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRankings] + _ = dm.replicator.replicate( + utils.RankingPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRanking, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -1841,17 +1819,15 @@ func (dm *DataManager) SetTiming(t *utils.TPTiming) (err error) { if err = dm.CacheDataFromDB(utils.TimingsPrefix, []string{t.ID}, true); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTimings]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TimingsPrefix, t.ID, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetTiming, - &utils.TPTimingWithAPIOpts{ - TPTiming: t, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTimings] + return dm.replicator.replicate( + utils.TimingsPrefix, t.ID, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetTiming, + &utils.TPTimingWithAPIOpts{ + TPTiming: t, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { @@ -1865,13 +1841,11 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { cacheCommit(transactionID), transactionID); errCh != nil { return errCh } - if config.CgrConfig().DataDbCfg().Items[utils.MetaTimings].Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.TimingsPrefix, id, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveTiming, - id) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaTimings] + _ = dm.replicator.replicate( + utils.TimingsPrefix, id, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveTiming, + id, itm) return } @@ -1932,17 +1906,15 @@ func (dm *DataManager) SetResource(rs *Resource) (err error) { if err = dm.DataDB().SetResourceDrv(rs); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ResourcesPrefix, rs.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetResource, - &ResourceWithAPIOpts{ - Resource: rs, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources] + return dm.replicator.replicate( + utils.ResourcesPrefix, rs.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetResource, + &ResourceWithAPIOpts{ + Resource: rs, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveResource(tenant, id string) (err error) { @@ -1952,16 +1924,15 @@ func (dm *DataManager) RemoveResource(tenant, id string) (err error) { if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ResourcesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveResource, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResources] + _ = dm.replicator.replicate( + utils.ResourcesPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveResource, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2043,22 +2014,21 @@ func (dm *DataManager) SetResourceProfile(rp *ResourceProfile, withIndex bool) ( } Cache.Clear([]string{utils.CacheEventResources}) } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate { - if err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ResourceProfilesPrefix, rp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetResourceProfile, - &ResourceProfileWithAPIOpts{ - ResourceProfile: rp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}); err != nil { - return - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile] + if err = dm.replicator.replicate( + utils.ResourceProfilesPrefix, rp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetResourceProfile, + &ResourceProfileWithAPIOpts{ + ResourceProfile: rp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm); err != nil { + return } if oldRes == nil || // create the resource if it didn't exist before oldRes.UsageTTL != rp.UsageTTL || oldRes.Limit != rp.Limit || - (oldRes.Stored != rp.Stored && oldRes.Stored) { // reset the resource if the profile changed this fields + (oldRes.Stored != rp.Stored && oldRes.Stored) { // reset the resource if the profile changed these fields err = dm.SetResource(&Resource{ Tenant: rp.Tenant, ID: rp.ID, @@ -2098,16 +2068,15 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id string, withIndex bool) return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ResourceProfilesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveResourceProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaResourceProfile] + _ = dm.replicator.replicate( + utils.ResourceProfilesPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveResourceProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return dm.RemoveResource(tenant, id) } @@ -2168,17 +2137,16 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error cacheCommit(transactionID), transactionID); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionTriggerPrefix, id, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveActionTriggers, - &utils.StringWithAPIOpts{ - Arg: id, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers] + _ = dm.replicator.replicate( + utils.ActionTriggerPrefix, id, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActionTriggers, + &utils.StringWithAPIOpts{ + Arg: id, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2200,19 +2168,17 @@ func (dm *DataManager) SetActionTriggers(key string, attr ActionTriggers) (err e if err = dm.CacheDataFromDB(utils.ActionTriggerPrefix, []string{key}, true); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionTriggerPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetActionTriggers, - &SetActionTriggersArgWithAPIOpts{ - Attrs: attr, - Key: key, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers] + return dm.replicator.replicate( + utils.ActionTriggerPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetActionTriggers, + &SetActionTriggersArgWithAPIOpts{ + Attrs: attr, + Key: key, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) GetSharedGroup(key string, skipCache bool, @@ -2272,18 +2238,16 @@ func (dm *DataManager) SetSharedGroup(sg *SharedGroup) (err error) { []string{sg.Id}, true); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.SharedGroupPrefix, sg.Id, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetSharedGroup, - &SharedGroupWithAPIOpts{ - SharedGroup: sg, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups] + return dm.replicator.replicate( + utils.SharedGroupPrefix, sg.Id, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetSharedGroup, + &SharedGroupWithAPIOpts{ + SharedGroup: sg, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) { @@ -2297,17 +2261,16 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) { cacheCommit(transactionID), transactionID); errCh != nil { return errCh } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.SharedGroupPrefix, id, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveSharedGroup, - &utils.StringWithAPIOpts{ - Arg: id, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups] + _ = dm.replicator.replicate( + utils.SharedGroupPrefix, id, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveSharedGroup, + &utils.StringWithAPIOpts{ + Arg: id, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2372,19 +2335,17 @@ func (dm *DataManager) SetActions(key string, as Actions) (err error) { if err = dm.DataDB().SetActionsDrv(key, as); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetActions, - &SetActionsArgsWithAPIOpts{ - Key: key, - Acs: as, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions] + return dm.replicator.replicate( + utils.ActionPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetActions, + &SetActionsArgsWithAPIOpts{ + Key: key, + Acs: as, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveActions(key string) (err error) { @@ -2394,17 +2355,16 @@ func (dm *DataManager) RemoveActions(key string) (err error) { if err = dm.DataDB().RemoveActionsDrv(key); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveActions, - &utils.StringWithAPIOpts{ - Arg: key, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions] + _ = dm.replicator.replicate( + utils.ActionPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActions, + &utils.StringWithAPIOpts{ + Arg: key, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2487,19 +2447,17 @@ func (dm *DataManager) SetActionPlan(key string, ats *ActionPlan, if err = dm.dataDB.SetActionPlanDrv(key, ats); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionPlanPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetActionPlan, - &SetActionPlanArgWithAPIOpts{ - Key: key, - Ats: ats, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans] + return dm.replicator.replicate( + utils.ActionPlanPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetActionPlan, + &SetActionPlanArgWithAPIOpts{ + Key: key, + Ats: ats, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) GetAllActionPlans() (ats map[string]*ActionPlan, err error) { @@ -2533,17 +2491,16 @@ func (dm *DataManager) RemoveActionPlan(key string, transactionID string) (err e if err = dm.dataDB.RemoveActionPlanDrv(key); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ActionPlanPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveActionPlan, - &utils.StringWithAPIOpts{ - Arg: key, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionPlans] + _ = dm.replicator.replicate( + utils.ActionPlanPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveActionPlan, + &utils.StringWithAPIOpts{ + Arg: key, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } func (dm *DataManager) GetAccountActionPlans(acntID string, cacheRead, cacheWrite bool, transactionID string) (apIDs []string, err error) { @@ -2620,19 +2577,17 @@ func (dm *DataManager) SetAccountActionPlans(acntID string, aPlIDs []string, ove if err = dm.dataDB.SetAccountActionPlansDrv(acntID, aPlIDs); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetAccountActionPlans, - &SetAccountActionPlansArgWithAPIOpts{ - AcntID: acntID, - AplIDs: aPlIDs, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans] + return dm.replicator.replicate( + utils.AccountActionPlansPrefix, acntID, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetAccountActionPlans, + &SetAccountActionPlansArgWithAPIOpts{ + AcntID: acntID, + AplIDs: aPlIDs, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } // RemAccountActionPlansArgsWithAPIOpts is used in replicatorV1 for dispatcher @@ -2668,7 +2623,7 @@ func (dm *DataManager) RemAccountActionPlans(acntID string, apIDs []string) (err if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAccountActionPlans]; itm.Replicate { replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, - utils.AccountActionPlansPrefix, acntID, // this are used to get the host IDs from cache + utils.AccountActionPlansPrefix, acntID, // these are used to get the host IDs from cache utils.ReplicatorSv1RemAccountActionPlans, &RemAccountActionPlansArgsWithAPIOpts{ AcntID: acntID, @@ -2734,18 +2689,16 @@ func (dm *DataManager) SetRatingPlan(rp *RatingPlan) (err error) { if err = dm.DataDB().SetRatingPlanDrv(rp); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RatingPlanPrefix, rp.Id, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetRatingPlan, - &RatingPlanWithAPIOpts{ - RatingPlan: rp, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans] + return dm.replicator.replicate( + utils.RatingPlanPrefix, rp.Id, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetRatingPlan, + &RatingPlanWithAPIOpts{ + RatingPlan: rp, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err error) { @@ -2755,17 +2708,16 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e if err = dm.DataDB().RemoveRatingPlanDrv(key); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RatingPlanPrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveRatingPlan, - &utils.StringWithAPIOpts{ - Arg: key, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans] + _ = dm.replicator.replicate( + utils.RatingPlanPrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRatingPlan, + &utils.StringWithAPIOpts{ + Arg: key, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2826,18 +2778,16 @@ func (dm *DataManager) SetRatingProfile(rpf *RatingProfile) (err error) { if err = dm.DataDB().SetRatingProfileDrv(rpf); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RatingProfilePrefix, rpf.Id, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetRatingProfile, - &RatingProfileWithAPIOpts{ - RatingProfile: rpf, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles] + return dm.replicator.replicate( + utils.RatingProfilePrefix, rpf.Id, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetRatingProfile, + &RatingProfileWithAPIOpts{ + RatingProfile: rpf, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveRatingProfile(key string) (err error) { @@ -2847,17 +2797,16 @@ func (dm *DataManager) RemoveRatingProfile(key string) (err error) { if err = dm.DataDB().RemoveRatingProfileDrv(key); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RatingProfilePrefix, key, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveRatingProfile, - &utils.StringWithAPIOpts{ - Arg: key, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles] + _ = dm.replicator.replicate( + utils.RatingProfilePrefix, key, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRatingProfile, + &utils.StringWithAPIOpts{ + Arg: key, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -2950,17 +2899,15 @@ func (dm *DataManager) SetRouteProfile(rpp *RouteProfile, withIndex bool) (err e return err } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRouteProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RouteProfilePrefix, rpp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetRouteProfile, - &RouteProfileWithAPIOpts{ - RouteProfile: rpp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRouteProfiles] + return dm.replicator.replicate( + utils.RouteProfilePrefix, rpp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetRouteProfile, + &RouteProfileWithAPIOpts{ + RouteProfile: rpp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveRouteProfile(tenant, id string, withIndex bool) (err error) { @@ -2986,16 +2933,15 @@ func (dm *DataManager) RemoveRouteProfile(tenant, id string, withIndex bool) (er return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRouteProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.RouteProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveRouteProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRouteProfiles] + _ = dm.replicator.replicate( + utils.RouteProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveRouteProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3089,17 +3035,15 @@ func (dm *DataManager) SetAttributeProfile(ap *AttributeProfile, withIndex bool) return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.AttributeProfilePrefix, ap.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetAttributeProfile, - &AttributeProfileWithAPIOpts{ - AttributeProfile: ap, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles] + return dm.replicator.replicate( + utils.AttributeProfilePrefix, ap.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetAttributeProfile, + &AttributeProfileWithAPIOpts{ + AttributeProfile: ap, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveAttributeProfile(tenant, id string, withIndex bool) (err error) { @@ -3127,16 +3071,15 @@ func (dm *DataManager) RemoveAttributeProfile(tenant, id string, withIndex bool) } } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveAttributeProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaAttributeProfiles] + _ = dm.replicator.replicate( + utils.AttributeProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveAttributeProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3218,17 +3161,15 @@ func (dm *DataManager) SetChargerProfile(cpp *ChargerProfile, withIndex bool) (e return err } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaChargerProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ChargerProfilePrefix, cpp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetChargerProfile, - &ChargerProfileWithAPIOpts{ - ChargerProfile: cpp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaChargerProfiles] + return dm.replicator.replicate( + utils.ChargerProfilePrefix, cpp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetChargerProfile, + &ChargerProfileWithAPIOpts{ + ChargerProfile: cpp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveChargerProfile(tenant, id string, withIndex bool) (err error) { @@ -3254,16 +3195,15 @@ func (dm *DataManager) RemoveChargerProfile(tenant, id string, withIndex bool) ( return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaChargerProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.ChargerProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveChargerProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaChargerProfiles] + _ = dm.replicator.replicate( + utils.ChargerProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveChargerProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3349,17 +3289,15 @@ func (dm *DataManager) SetDispatcherProfile(dpp *DispatcherProfile, withIndex bo return } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherProfiles]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DispatcherProfilePrefix, dpp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetDispatcherProfile, - &DispatcherProfileWithAPIOpts{ - DispatcherProfile: dpp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherProfiles] + return dm.replicator.replicate( + utils.DispatcherProfilePrefix, dpp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetDispatcherProfile, + &DispatcherProfileWithAPIOpts{ + DispatcherProfile: dpp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, withIndex bool) (err error) { @@ -3387,16 +3325,15 @@ func (dm *DataManager) RemoveDispatcherProfile(tenant, id string, withIndex bool } } } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherProfiles]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DispatcherProfilePrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveDispatcherProfile, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherProfiles] + _ = dm.replicator.replicate( + utils.DispatcherProfilePrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveDispatcherProfile, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3457,17 +3394,15 @@ func (dm *DataManager) SetDispatcherHost(dpp *DispatcherHost) (err error) { if err = dm.DataDB().SetDispatcherHostDrv(dpp); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherHosts]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DispatcherHostPrefix, dpp.TenantID(), // this are used to get the host IDs from cache - utils.ReplicatorSv1SetDispatcherHost, - &DispatcherHostWithAPIOpts{ - DispatcherHost: dpp, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherHosts] + return dm.replicator.replicate( + utils.DispatcherHostPrefix, dpp.TenantID(), // these are used to get the host IDs from cache + utils.ReplicatorSv1SetDispatcherHost, + &DispatcherHostWithAPIOpts{ + DispatcherHost: dpp, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveDispatcherHost(tenant, id string) (err error) { @@ -3484,16 +3419,15 @@ func (dm *DataManager) RemoveDispatcherHost(tenant, id string) (err error) { if oldDpp == nil { return utils.ErrDSPHostNotFound } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherHosts]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.DispatcherHostPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveDispatcherHost, - &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{Tenant: tenant, ID: id}, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDispatcherHosts] + _ = dm.replicator.replicate( + utils.DispatcherHostPrefix, utils.ConcatenatedKey(tenant, id), // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveDispatcherHost, + &utils.TenantIDWithAPIOpts{ + TenantID: &utils.TenantID{Tenant: tenant, ID: id}, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3557,7 +3491,7 @@ func (dm *DataManager) SetLoadIDs(loadIDs map[string]int64) (err error) { } err = replicateMultipleIDs(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, config.CgrConfig().DataDbCfg().RplFiltered, - utils.LoadIDPrefix, objIDs, // this are used to get the host IDs from cache + utils.LoadIDPrefix, objIDs, // these are used to get the host IDs from cache utils.ReplicatorSv1SetLoadIDs, &utils.LoadIDsWithAPIOpts{ LoadIDs: loadIDs, @@ -3646,20 +3580,18 @@ func (dm *DataManager) SetIndexes(idxItmType, tntCtx string, indexes, commit, transactionID); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[idxItmType]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.CacheInstanceToPrefix[idxItmType], tntCtx, // this are used to get the host IDs from cache - utils.ReplicatorSv1SetIndexes, - &utils.SetIndexesArg{ - IdxItmType: idxItmType, - TntCtx: tntCtx, - Indexes: indexes, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } - return + itm := config.CgrConfig().DataDbCfg().Items[idxItmType] + return dm.replicator.replicate( + utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache + utils.ReplicatorSv1SetIndexes, + &utils.SetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + Indexes: indexes, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) } func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err error) { @@ -3669,19 +3601,18 @@ func (dm *DataManager) RemoveIndexes(idxItmType, tntCtx, idxKey string) (err err if err = dm.DataDB().RemoveIndexesDrv(idxItmType, tntCtx, idxKey); err != nil { return } - if itm := config.CgrConfig().DataDbCfg().Items[idxItmType]; itm.Replicate { - replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.CacheInstanceToPrefix[idxItmType], tntCtx, // this are used to get the host IDs from cache - utils.ReplicatorSv1RemoveIndexes, - &utils.GetIndexesArg{ - IdxItmType: idxItmType, - TntCtx: tntCtx, - IdxKey: idxKey, - Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, - APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, - config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString)}) - } + itm := config.CgrConfig().DataDbCfg().Items[idxItmType] + _ = dm.replicator.replicate( + utils.CacheInstanceToPrefix[idxItmType], tntCtx, // these are used to get the host IDs from cache + utils.ReplicatorSv1RemoveIndexes, + &utils.GetIndexesArg{ + IdxItmType: idxItmType, + TntCtx: tntCtx, + IdxKey: idxKey, + Tenant: config.CgrConfig().GeneralCfg().DefaultTenant, + APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, + config.CgrConfig().DataDbCfg().RplCache, utils.EmptyString), + }, itm) return } @@ -3786,18 +3717,15 @@ func (dm *DataManager) SetBackupSessions(nodeID, tenant string, return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), - utils.ReplicatorSv1SetBackupSessions, - &SetBackupSessionsArgs{ - StoredSessions: storedSessions, - NodeID: nodeID, - Tenant: tenant, - }) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup] + return dm.replicator.replicate( + utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), + utils.ReplicatorSv1SetBackupSessions, + &SetBackupSessionsArgs{ + StoredSessions: storedSessions, + NodeID: nodeID, + Tenant: tenant, + }, itm) } type RemoveSessionBackupArgs struct { @@ -3815,16 +3743,13 @@ func (dm *DataManager) RemoveSessionsBackup(nodeID, tenant, cgrid string) (err e return } - if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup]; itm.Replicate { - err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns, - config.CgrConfig().DataDbCfg().RplFiltered, - utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), - utils.ReplicatorSv1RemoveSessionBackup, - &RemoveSessionBackupArgs{ - CGRID: cgrid, - NodeID: nodeID, - Tenant: tenant, - }) - } - return + itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSessionsBackup] + return dm.replicator.replicate( + utils.SessionsBackupPrefix, utils.ConcatenatedKey(tenant, nodeID), + utils.ReplicatorSv1RemoveSessionBackup, + &RemoveSessionBackupArgs{ + CGRID: cgrid, + NodeID: nodeID, + Tenant: tenant, + }, itm) } diff --git a/engine/datamanager_test.go b/engine/datamanager_test.go index 0e73eff75..6ec324639 100644 --- a/engine/datamanager_test.go +++ b/engine/datamanager_test.go @@ -195,8 +195,8 @@ func TestDmGetFilterRemote(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) exp := &Filter{ Tenant: "cgrates.org", @@ -611,9 +611,9 @@ func TestDMSetAccount(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) + config.SetCgrConfig(cfg) dm := NewDataManager(db, cfg.CacheCfg(), connMgr) dm.ms = &JSONMarshaler{} - config.SetCgrConfig(cfg) SetDataStorage(dm) if err := dm.SetAccount(acc); err != nil { t.Error(err) @@ -683,8 +683,8 @@ func TestDMRemoveAccount(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err = dm.RemoveAccount(acc.ID); err != nil { t.Error(err) @@ -750,8 +750,8 @@ func TestDmSetFilter(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetFilter(filter, false); err != nil { t.Error(err) @@ -808,8 +808,8 @@ func TestDMSetThreshold(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err = dm.SetThreshold(thS); err != nil { @@ -867,8 +867,8 @@ func TestDmRemoveThreshold(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.RemoveThreshold(thS.Tenant, thS.ID); err != nil { t.Error(err) @@ -919,8 +919,8 @@ func TestDMReverseDestinationRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dest := &Destination{ Id: "nat", Prefixes: []string{"0257", "0256", "0723"}, @@ -986,8 +986,8 @@ func TestDMStatQueueRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) sq := &StatQueue{ Tenant: "cgrates.org", @@ -1063,8 +1063,8 @@ func TestDmTimingR(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) tp := &utils.TPTiming{ ID: "MIDNIGHT", @@ -1136,8 +1136,8 @@ func TestDMSetActionTriggers(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) attrs := ActionTriggers{ &ActionTrigger{ @@ -1224,8 +1224,8 @@ func TestDMResourceProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) rp := &ResourceProfile{ Tenant: "cgrates.org", @@ -1309,8 +1309,8 @@ func TestDmSharedGroup(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) sg := &SharedGroup{ Id: "SG2", @@ -1399,8 +1399,8 @@ func TestDMThresholdProfile(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) th := &ThresholdProfile{ Tenant: "cgrates.org", @@ -1536,8 +1536,8 @@ func TestDmDispatcherHost(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dH := &DispatcherHost{ Tenant: "testTenant", @@ -1603,8 +1603,8 @@ func TestGetDispatcherHostErr(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dH := &DispatcherHost{ Tenant: "testTenant", @@ -1675,8 +1675,8 @@ func TestChargerProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) chrPrf := &ChargerProfile{ Tenant: "cgrates.org", @@ -1753,8 +1753,8 @@ func TestDispatcherProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dsp := &DispatcherProfile{ Tenant: "cgrates.org", @@ -1828,8 +1828,8 @@ func TestRouteProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) rpp := &RouteProfile{ Tenant: "cgrates.org", @@ -1900,8 +1900,8 @@ func TestRatingPlanRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) rP := &RatingPlan{ Id: "RP1", @@ -1991,8 +1991,8 @@ func TestGetResourceRemote(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetResource(rS.Tenant, rS.ID, false, true, utils.NonTransactional); err != nil { @@ -2059,8 +2059,8 @@ func TestGetResourceProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetResourceProfile(rsP.Tenant, rsP.ID, false, true, utils.NonTransactional); err != nil { t.Error(err) @@ -2123,8 +2123,8 @@ func TestGetActionTriggers(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) Cache.Set(utils.CacheActionTriggers, "Test", ActionTriggers{}, []string{}, false, utils.NonTransactional) if val, err := dm.GetActionTriggers(aT[0].ID, false, utils.NonTransactional); err != nil { @@ -2187,8 +2187,8 @@ func TestGetActionTriggersErr(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr1) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr1) SetDataStorage(dm) if _, err := dm.GetActionTriggers(aT[0].ID, true, utils.NonTransactional); err == nil { t.Error(err) @@ -2266,8 +2266,8 @@ func TestGetSharedGroupRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetSharedGroup(shG.Id, true, utils.NonTransactional); err != nil { t.Error(err) @@ -2324,8 +2324,8 @@ func TestGetStatQueueProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetStatQueueProfile(sqP.Tenant, sqP.ID, true, true, utils.NonTransactional); err != nil { t.Error(err) @@ -2397,8 +2397,8 @@ func TestStatQueueProfileRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) Cache.Set(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(sqP.Tenant, sqP.ID), &StatQueueProfile{ QueueLength: 2, @@ -2469,8 +2469,8 @@ func TestDMActionsRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.MetaReplicator): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetActions("KeyActions", acs); err != nil { t.Error(err) @@ -2540,8 +2540,8 @@ func TestGetDispatcherHost(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetDispatcherHost("cgrates.org", "HostID", false, true, utils.NonTransactional); err != nil { @@ -2598,8 +2598,8 @@ func TestGetReverseDestinationRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetReverseDestination("CRUDReverseDestination", false, true, utils.NonTransactional); err != nil { t.Error(err) @@ -2686,8 +2686,8 @@ func TestDMRemoveDestination(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dm.DataDB().SetDestinationDrv(dest, utils.NonTransactional) if err := dm.RemoveDestination(dest.Id, utils.NonTransactional); err != nil { @@ -2765,8 +2765,8 @@ func TestDMRemoveFilter(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) fltr := &Filter{ Tenant: "cgrates.org", @@ -2859,8 +2859,8 @@ func TestRemoveStatQueueProfile(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) dm.DataDB().SetStatQueueProfileDrv(sQ) if err = dm.RemoveStatQueueProfile(sQ.Tenant, sQ.ID, true); err == nil { @@ -2935,8 +2935,8 @@ func TestDMGetTimingRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if _, err := dm.GetTiming(tp.ID, true, utils.NonTransactional); err != nil { t.Error(err) @@ -3011,8 +3011,8 @@ func TestDmGetActions(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if _, err := dm.GetActions("MINI", true, utils.NonTransactional); err != nil { t.Error(err) @@ -3061,8 +3061,8 @@ func TestDMSetLoadIDs(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) ld := map[string]int64{ "load1": 23, @@ -3130,8 +3130,8 @@ func TestGetItemLoadIDsRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetItemLoadIDs("load1", true); err != nil { t.Error(err) @@ -3206,11 +3206,10 @@ func TestDMItemLoadIDsRemoteErr(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) + config.SetCgrConfig(cfg) dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetConnManager(connMgr) - config.SetCgrConfig(cfg) - Cache = NewCacheS(cfg, dm, nil) if _, err := dm.GetItemLoadIDs("load1", true); err == nil || err.Error() != "Can't replicate" { t.Error(err) @@ -3294,8 +3293,8 @@ func TestActionPlanRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetActionPlan("act_key", actPln, true, utils.NonTransactional); err != nil { @@ -3361,8 +3360,8 @@ func TestAccountActionPlansRemote(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetAccountActionPlans("acc_ID", []string{"act_pln", "act_pln"}, true); err != nil { @@ -3669,8 +3668,8 @@ func TestDMRatingProfile(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetRatingProfile(rpf); err != nil { t.Error(err) @@ -3811,8 +3810,8 @@ func TestDMGetRatingPlan(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if _, err := dm.GetRatingPlan("id", true, utils.NonTransactional); err != nil { t.Error(err) @@ -3877,8 +3876,8 @@ func TestDMChargerProfile(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if _, err := dm.GetChargerProfile(chP.Tenant, chP.ID, false, true, utils.NonTransactional); err != nil { t.Error(err) @@ -3952,8 +3951,8 @@ func TestDMDispatcherProfile(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if _, err := dm.GetDispatcherProfile(dPP.Tenant, dPP.ID, false, true, utils.NonTransactional); err != nil { t.Error(err) @@ -4386,8 +4385,8 @@ func TestDMGetRouteProfile(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if val, err := dm.GetRouteProfile(rpL.Tenant, rpL.ID, false, true, utils.NonTransactional); err != nil { t.Error(err) @@ -4441,8 +4440,8 @@ func TestDMGetRouteProfileErr(t *testing.T) { utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, utils.ConcatenatedKey(utils.MetaInternal, utils.MetaCaches): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) Cache = NewCacheS(cfg, dm, nil) SetConnManager(connMgr) @@ -4628,8 +4627,8 @@ func TestDMAttributeProfile(t *testing.T) { connMgr := NewConnManager(cfg, map[string]chan birpc.ClientConnector{ utils.ConcatenatedKey(utils.MetaInternal, utils.ReplicatorSv1): clientConn, }) - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) SetDataStorage(dm) if err := dm.SetAttributeProfile(attrPrf, false); err != nil { t.Error(err) @@ -4813,6 +4812,7 @@ func TestDmIndexes(t *testing.T) { if dErr != nil { t.Error(dErr) } + config.SetCgrConfig(cfg) dm := NewDataManager(db, cfg.CacheCfg(), connMgr) idxes := map[string]utils.StringSet{ "*string:Account:1001": { @@ -4827,7 +4827,6 @@ func TestDmIndexes(t *testing.T) { "RL5": struct{}{}, }, } - config.SetCgrConfig(cfg) if err := dm.SetIndexes(utils.CacheResourceFilterIndexes, "cgrates.org", idxes, false, utils.NonTransactional); err != nil { t.Error(err) @@ -4875,8 +4874,8 @@ func TestDmCheckFilters(t *testing.T) { if dErr != nil { t.Error(dErr) } - dm := NewDataManager(db, cfg.CacheCfg(), connMgr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMgr) if err := dm.checkFilters("cgrates.org", []string{"FLTR_1"}); err == nil || err.Error() != "broken reference to filter: " { t.Error(err) } diff --git a/engine/destinations_test.go b/engine/destinations_test.go index 968e411a5..edf277c33 100644 --- a/engine/destinations_test.go +++ b/engine/destinations_test.go @@ -212,8 +212,8 @@ func TestDMSetDestinationSucces(t *testing.T) { Id: "dest21", Prefixes: []string{}, } - dm := NewDataManager(db, cfg.CacheCfg(), connMngr) config.SetCgrConfig(cfg) + dm := NewDataManager(db, cfg.CacheCfg(), connMngr) if err := dm.SetDestination(dest, utils.NonTransactional); err != nil { t.Error(err) diff --git a/engine/remoterepl.go b/engine/remoterepl.go deleted file mode 100644 index d2e17ef74..000000000 --- a/engine/remoterepl.go +++ /dev/null @@ -1,79 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -import ( - "github.com/cgrates/birpc/context" - "github.com/cgrates/cgrates/utils" -) - -// UpdateReplicationFilters will set the connID in cache -func UpdateReplicationFilters(objType, objID, connID string) { - if connID == utils.EmptyString { - return - } - Cache.SetWithoutReplicate(utils.CacheReplicationHosts, objType+objID+utils.ConcatenatedKeySep+connID, connID, []string{objType + objID}, - true, utils.NonTransactional) -} - -// replicate will call Set/Remove APIs on ReplicatorSv1 -func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args any) (err error) { - // the reply is string for Set/Remove APIs - // ignored in favor of the error - var reply string - if !filtered { - // is not partial so send to all defined connections - return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply)) - } - // is partial so get all the replicationHosts from cache based on object Type and ID - // alp_cgrates.org:ATTR1 - rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) - rplcHostIDs := make(utils.StringSet) - for _, hostID := range rplcHostIDsIfaces { - rplcHostIDs.Add(hostID.(string)) - } - // using the replication hosts call the method - return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, - method, args, &reply)) -} - -// replicateMultipleIDs will do the same thing as replicate but uses multiple objectIDs -// used when setting the LoadIDs -func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args any) (err error) { - // the reply is string for Set/Remove APIs - // ignored in favor of the error - var reply string - if !filtered { - // is not partial so send to all defined connections - return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply)) - } - // is partial so get all the replicationHosts from cache based on object Type and ID - // combine all hosts in a single set so if we receive a get with one ID in list - // send all list to that hos - rplcHostIDs := make(utils.StringSet) - for _, objID := range objIDs { - rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) - for _, hostID := range rplcHostIDsIfaces { - rplcHostIDs.Add(hostID.(string)) - } - } - // using the replication hosts call the method - return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, - method, args, &reply)) -} diff --git a/engine/replicator.go b/engine/replicator.go new file mode 100644 index 000000000..a46e916b9 --- /dev/null +++ b/engine/replicator.go @@ -0,0 +1,310 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "bytes" + "encoding/gob" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/guardian" + "github.com/cgrates/cgrates/utils" +) + +// replicationData holds the information about a pending replication task. +type replicationData struct { + objType string + objID string + method string + args any +} + +// replicator manages replication tasks to synchronize data across instances. +// It can perform immediate replication or batch tasks to replicate on intervals. +// +// For failed replications, files are created with predictable names based on +// "methodName_objTypeObjID" as the key. Before each replication attempt, any existing +// file for that key is removed. A new file is created only if the replication fails. +// This ensures at most one failed replication file exists per unique item. +type replicator struct { + mu sync.Mutex + + cm *ConnManager + conns []string // ids of connections to replicate to + + // pending stores the latest version of the object, named by the key, that + // is to be replicated. + pending map[string]*replicationData + + interval time.Duration // replication frequency + failedDir string // where failed replications are stored (one per id) + filtered bool // whether to replicate only objects coming from remote + stop chan struct{} // stop replication loop + wg sync.WaitGroup // wait for any pending replications before closing +} + +// newReplicator creates a replication manager that either performs immediate +// or batched replications based on configuration. +// When interval > 0, replications are queued and processed in batches at that interval. +// When interval = 0, each replication is performed immediately when requested. +func newReplicator(cm *ConnManager) *replicator { + cfg := config.CgrConfig().DataDbCfg() + r := &replicator{ + cm: cm, + pending: make(map[string]*replicationData), + interval: cfg.RplInterval, + failedDir: cfg.RplFailedDir, + conns: cfg.RplConns, + filtered: cfg.RplFiltered, + stop: make(chan struct{}), + } + if r.interval > 0 { + r.wg.Add(1) + go r.replicationLoop() + } + return r + +} + +// replicate handles the object replication based on configuration. +// When interval > 0, the replication task is queued for the next batch. +// Otherwise, it executes immediately. +func (r *replicator) replicate(objType, objID, method string, args any, + item *config.ItemOpt) error { + if !item.Replicate { + return nil + } + + if r.interval > 0 { + + // Form a unique key by joining method name with object identifiers. + // Including the method name (Set/Remove) allows different operations + // on the same object to have distinct keys, which also serve as + // predictable filenames if replication fails. + _, methodName, _ := strings.Cut(method, utils.NestingSep) + key := methodName + "_" + objType + objID + + r.mu.Lock() + defer r.mu.Unlock() + r.pending[key] = &replicationData{ + objType: objType, + objID: objID, + method: method, + args: args, + } + return nil + } + + return replicate(r.cm, r.conns, r.filtered, objType, objID, method, args) +} + +// replicate performs the actual replication by calling Set/Remove APIs on ReplicatorSv1 +// It either replicates to all connections or only to filtered ones based on configuration. +func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args any) (err error) { + // the reply is string for Set/Remove APIs + // ignored in favor of the error + var reply string + if !filtered { + // is not partial so send to all defined connections + return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply)) + } + // is partial so get all the replicationHosts from cache based on object Type and ID + // alp_cgrates.org:ATTR1 + rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) + rplcHostIDs := make(utils.StringSet) + for _, hostID := range rplcHostIDsIfaces { + rplcHostIDs.Add(hostID.(string)) + } + // using the replication hosts call the method + return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, + method, args, &reply)) +} + +// replicationLoop runs periodically according to the configured interval +// to flush pending replications. It stops when the Replicator is closed. +func (r *replicator) replicationLoop() { + defer r.wg.Done() + ticker := time.NewTicker(r.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + r.flush() + case <-r.stop: + r.flush() + return + } + } +} + +// flush immediately processes all pending replications. +// Failed replications are saved to disk if a failedDir is configured. +func (r *replicator) flush() { + r.mu.Lock() + if len(r.pending) == 0 { + // Skip processing when there are no pending replications. + r.mu.Unlock() + return + } + pending := r.pending + r.pending = make(map[string]*replicationData) + r.mu.Unlock() + + for key, data := range pending { + var failedPath string + + if r.failedDir != "" { + failedPath = filepath.Join(r.failedDir, key+utils.GOBSuffix) + + // Clean up any existing file containing failed replications. + if err := os.Remove(failedPath); err != nil && !os.IsNotExist(err) { + utils.Logger.Warning(fmt.Sprintf( + " failed to remove file for %q: %v", key, err)) + } + } + + if err := replicate(r.cm, r.conns, r.filtered, data.objType, data.objID, + data.method, data.args); err != nil { + utils.Logger.Warning(fmt.Sprintf( + " failed to replicate %q for object %q: %v", + data.method, data.objType+data.objID, err)) + + if failedPath != "" { + task := &ReplicationTask{ + ConnIDs: r.conns, + Filtered: r.filtered, + ObjType: data.objType, + ObjID: data.objID, + Method: data.method, + Args: data.args, + } + if err := task.WriteToFile(failedPath); err != nil { + utils.Logger.Err(fmt.Sprintf( + " failed to dump replication task: %v", err)) + } + } + } + } +} + +// close stops the replication loop if it's running and waits for pending +// replications to complete. +func (r *replicator) close() { + if r.interval > 0 { + close(r.stop) + r.wg.Wait() + } +} + +// UpdateReplicationFilters sets the connection ID in cache for filtered replication. +// It's a no-op if connID is empty. +func UpdateReplicationFilters(objType, objID, connID string) { + if connID == utils.EmptyString { + return + } + Cache.SetWithoutReplicate(utils.CacheReplicationHosts, objType+objID+utils.ConcatenatedKeySep+connID, connID, []string{objType + objID}, + true, utils.NonTransactional) +} + +// replicateMultipleIDs replicates operations for multiple object IDs. +// It functions similarly to replicate but handles a collection of IDs rather than a single one. +// Used primarily for setting LoadIDs. +// TODO: merge with replicate function. +func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args any) (err error) { + // the reply is string for Set/Remove APIs + // ignored in favor of the error + var reply string + if !filtered { + // is not partial so send to all defined connections + return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply)) + } + // is partial so get all the replicationHosts from cache based on object Type and ID + // combine all hosts in a single set so if we receive a get with one ID in list + // send all list to that hos + rplcHostIDs := make(utils.StringSet) + for _, objID := range objIDs { + rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID) + for _, hostID := range rplcHostIDsIfaces { + rplcHostIDs.Add(hostID.(string)) + } + } + // using the replication hosts call the method + return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs, + method, args, &reply)) +} + +// ReplicationTask represents a replication operation that can be saved to disk +// and executed later, typically used for failed replications. +type ReplicationTask struct { + ConnIDs []string + Filtered bool + Path string + ObjType string + ObjID string + Method string + Args any + failedDir string +} + +// NewReplicationTaskFromFile loads a replication task from the specified file. +// The file is removed after successful loading. +func NewReplicationTaskFromFile(path string) (*ReplicationTask, error) { + var taskBytes []byte + if err := guardian.Guardian.Guard(func() error { + var err error + if taskBytes, err = os.ReadFile(path); err != nil { + return err + } + return os.Remove(path) // file is not needed anymore + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path); err != nil { + return nil, err + } + dec := gob.NewDecoder(bytes.NewBuffer(taskBytes)) + var task *ReplicationTask + if err := dec.Decode(&task); err != nil { + return nil, err + } + return task, nil +} + +// WriteToFile saves the replication task to the specified path. +// This allows failed tasks to be recovered and retried later. +func (r *ReplicationTask) WriteToFile(path string) error { + return guardian.Guardian.Guard(func() error { + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + enc := gob.NewEncoder(f) + return enc.Encode(r) + }, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path) +} + +// Execute performs the replication task. +func (r *ReplicationTask) Execute(cm *ConnManager) error { + return replicate(cm, r.ConnIDs, r.Filtered, r.ObjType, r.ObjID, r.Method, r.Args) +} diff --git a/services/datadb.go b/services/datadb.go index 2c8180d16..9e1858803 100644 --- a/services/datadb.go +++ b/services/datadb.go @@ -117,7 +117,7 @@ func (db *DataDBService) Reload() (err error) { func (db *DataDBService) Shutdown() (err error) { db.srvDep[utils.DataDB].Wait() db.Lock() - db.dm.DataDB().Close() + db.dm.Close() db.dm = nil db.Unlock() return