Files
cgrates/engine/replicator.go
2025-04-11 23:46:00 +03:00

311 lines
10 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
// replicationData holds the information about a pending replication task.
type replicationData struct {
objType string
objID string
method string
args any
}
// replicator manages replication tasks to synchronize data across instances.
// It can perform immediate replication or batch tasks to replicate on intervals.
//
// For failed replications, files are created with predictable names based on
// "methodName_objTypeObjID" as the key. Before each replication attempt, any existing
// file for that key is removed. A new file is created only if the replication fails.
// This ensures at most one failed replication file exists per unique item.
type replicator struct {
mu sync.Mutex
cm *ConnManager
conns []string // ids of connections to replicate to
// pending stores the latest version of the object, named by the key, that
// is to be replicated.
pending map[string]*replicationData
interval time.Duration // replication frequency
failedDir string // where failed replications are stored (one per id)
filtered bool // whether to replicate only objects coming from remote
stop chan struct{} // stop replication loop
wg sync.WaitGroup // wait for any pending replications before closing
}
// newReplicator creates a replication manager that either performs immediate
// or batched replications based on configuration.
// When interval > 0, replications are queued and processed in batches at that interval.
// When interval = 0, each replication is performed immediately when requested.
func newReplicator(cm *ConnManager) *replicator {
cfg := config.CgrConfig().DataDbCfg()
r := &replicator{
cm: cm,
pending: make(map[string]*replicationData),
interval: cfg.RplInterval,
failedDir: cfg.RplFailedDir,
conns: cfg.RplConns,
filtered: cfg.RplFiltered,
stop: make(chan struct{}),
}
if r.interval > 0 {
r.wg.Add(1)
go r.replicationLoop()
}
return r
}
// replicate handles the object replication based on configuration.
// When interval > 0, the replication task is queued for the next batch.
// Otherwise, it executes immediately.
func (r *replicator) replicate(objType, objID, method string, args any,
item *config.ItemOpt) error {
if !item.Replicate {
return nil
}
if r.interval > 0 {
// Form a unique key by joining method name with object identifiers.
// Including the method name (Set/Remove) allows different operations
// on the same object to have distinct keys, which also serve as
// predictable filenames if replication fails.
_, methodName, _ := strings.Cut(method, utils.NestingSep)
key := methodName + "_" + objType + objID
r.mu.Lock()
defer r.mu.Unlock()
r.pending[key] = &replicationData{
objType: objType,
objID: objID,
method: method,
args: args,
}
return nil
}
return replicate(r.cm, r.conns, r.filtered, objType, objID, method, args)
}
// replicate performs the actual replication by calling Set/Remove APIs on ReplicatorSv1
// It either replicates to all connections or only to filtered ones based on configuration.
func replicate(connMgr *ConnManager, connIDs []string, filtered bool, objType, objID, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// alp_cgrates.org:ATTR1
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
rplcHostIDs := make(utils.StringSet)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
// replicationLoop runs periodically according to the configured interval
// to flush pending replications. It stops when the Replicator is closed.
func (r *replicator) replicationLoop() {
defer r.wg.Done()
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.flush()
case <-r.stop:
r.flush()
return
}
}
}
// flush immediately processes all pending replications.
// Failed replications are saved to disk if a failedDir is configured.
func (r *replicator) flush() {
r.mu.Lock()
if len(r.pending) == 0 {
// Skip processing when there are no pending replications.
r.mu.Unlock()
return
}
pending := r.pending
r.pending = make(map[string]*replicationData)
r.mu.Unlock()
for key, data := range pending {
var failedPath string
if r.failedDir != "" {
failedPath = filepath.Join(r.failedDir, key+utils.GOBSuffix)
// Clean up any existing file containing failed replications.
if err := os.Remove(failedPath); err != nil && !os.IsNotExist(err) {
utils.Logger.Warning(fmt.Sprintf(
"<DataManager> failed to remove file for %q: %v", key, err))
}
}
if err := replicate(r.cm, r.conns, r.filtered, data.objType, data.objID,
data.method, data.args); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<DataManager> failed to replicate %q for object %q: %v",
data.method, data.objType+data.objID, err))
if failedPath != "" {
task := &ReplicationTask{
ConnIDs: r.conns,
Filtered: r.filtered,
ObjType: data.objType,
ObjID: data.objID,
Method: data.method,
Args: data.args,
}
if err := task.WriteToFile(failedPath); err != nil {
utils.Logger.Err(fmt.Sprintf(
"<DataManager> failed to dump replication task: %v", err))
}
}
}
}
}
// close stops the replication loop if it's running and waits for pending
// replications to complete.
func (r *replicator) close() {
if r.interval > 0 {
close(r.stop)
r.wg.Wait()
}
}
// UpdateReplicationFilters sets the connection ID in cache for filtered replication.
// It's a no-op if connID is empty.
func UpdateReplicationFilters(objType, objID, connID string) {
if connID == utils.EmptyString {
return
}
Cache.SetWithoutReplicate(utils.CacheReplicationHosts, objType+objID+utils.ConcatenatedKeySep+connID, connID, []string{objType + objID},
true, utils.NonTransactional)
}
// replicateMultipleIDs replicates operations for multiple object IDs.
// It functions similarly to replicate but handles a collection of IDs rather than a single one.
// Used primarily for setting LoadIDs.
// TODO: merge with replicate function.
func replicateMultipleIDs(connMgr *ConnManager, connIDs []string, filtered bool, objType string, objIDs []string, method string, args any) (err error) {
// the reply is string for Set/Remove APIs
// ignored in favor of the error
var reply string
if !filtered {
// is not partial so send to all defined connections
return utils.CastRPCErr(connMgr.Call(context.TODO(), connIDs, method, args, &reply))
}
// is partial so get all the replicationHosts from cache based on object Type and ID
// combine all hosts in a single set so if we receive a get with one ID in list
// send all list to that hos
rplcHostIDs := make(utils.StringSet)
for _, objID := range objIDs {
rplcHostIDsIfaces := Cache.tCache.GetGroupItems(utils.CacheReplicationHosts, objType+objID)
for _, hostID := range rplcHostIDsIfaces {
rplcHostIDs.Add(hostID.(string))
}
}
// using the replication hosts call the method
return utils.CastRPCErr(connMgr.CallWithConnIDs(connIDs, rplcHostIDs,
method, args, &reply))
}
// ReplicationTask represents a replication operation that can be saved to disk
// and executed later, typically used for failed replications.
type ReplicationTask struct {
ConnIDs []string
Filtered bool
Path string
ObjType string
ObjID string
Method string
Args any
failedDir string
}
// NewReplicationTaskFromFile loads a replication task from the specified file.
// The file is removed after successful loading.
func NewReplicationTaskFromFile(path string) (*ReplicationTask, error) {
var taskBytes []byte
if err := guardian.Guardian.Guard(func() error {
var err error
if taskBytes, err = os.ReadFile(path); err != nil {
return err
}
return os.Remove(path) // file is not needed anymore
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path); err != nil {
return nil, err
}
dec := gob.NewDecoder(bytes.NewBuffer(taskBytes))
var task *ReplicationTask
if err := dec.Decode(&task); err != nil {
return nil, err
}
return task, nil
}
// WriteToFile saves the replication task to the specified path.
// This allows failed tasks to be recovered and retried later.
func (r *ReplicationTask) WriteToFile(path string) error {
return guardian.Guardian.Guard(func() error {
f, err := os.Create(path)
if err != nil {
return err
}
defer f.Close()
enc := gob.NewEncoder(f)
return enc.Encode(r)
}, config.CgrConfig().GeneralCfg().LockingTimeout, utils.FileLockPrefix+path)
}
// Execute performs the replication task.
func (r *ReplicationTask) Execute(cm *ConnManager) error {
return replicate(cm, r.ConnIDs, r.Filtered, r.ObjType, r.ObjID, r.Method, r.Args)
}