From 8fae8c1acf326cd1db431ea6905ca54edab7e06c Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 5 Nov 2019 03:29:39 -0500 Subject: [PATCH] Implement Replication Remote mechanism for DataManager for some methods --- engine/datamanager.go | 103 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 92 insertions(+), 11 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index 45e269aef..bb905fac9 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -435,8 +435,18 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) { } func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err error) { - return dm.DataDB().RemoveThresholdDrv(tenant, id) - + if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil { + return + } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveThreshold(tenant, id, + utils.NonTransactional); err != nil { + return + } + } + } + return } func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWrite bool, @@ -452,11 +462,24 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr } th, err = dm.dataDB.GetThresholdProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if th, rmtErr = rmtDM.GetThresholdProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheThresholdProfiles, tntID, th, nil, @@ -488,7 +511,17 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool) } } } - return createAndIndex(utils.ThresholdProfilePrefix, th.Tenant, utils.EmptyString, th.ID, th.FilterIDs, dm) + if err := createAndIndex(utils.ThresholdProfilePrefix, th.Tenant, + utils.EmptyString, th.ID, th.FilterIDs, dm); err != nil { + return err + } + } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err := rplDM.SetThresholdProfile(th, withIndex); err != nil { + return err + } + } } return } @@ -506,8 +539,18 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id, return utils.ErrNotFound } if withIndex { - return NewFilterIndexer(dm, - utils.ThresholdProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldTh.FilterIDs) + if err = NewFilterIndexer(dm, utils.ThresholdProfilePrefix, + tenant).RemoveItemFromIndex(tenant, id, oldTh.FilterIDs); err != nil { + return + } + } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveThresholdProfile(tenant, id, + utils.NonTransactional, withIndex); err != nil { + return + } + } } return } @@ -579,7 +622,18 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id, return utils.ErrNotFound } if withIndex { - return NewFilterIndexer(dm, utils.StatQueueProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldSts.FilterIDs) + if err = NewFilterIndexer(dm, utils.StatQueueProfilePrefix, + tenant).RemoveItemFromIndex(tenant, id, oldSts.FilterIDs); err != nil { + return + } + } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveStatQueueProfile(tenant, id, + utils.NonTransactional, withIndex); err != nil { + return + } + } } return } @@ -618,6 +672,14 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) { if err = dm.DataDB().RemoveTimingDrv(id); err != nil { return } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveTiming(id, + utils.NonTransactional); err != nil { + return + } + } + } Cache.Remove(utils.CacheTimings, id, cacheCommit(transactionID), transactionID) return @@ -660,6 +722,14 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil { return } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveResource(tenant, id, + utils.NonTransactional); err != nil { + return + } + } + } return } @@ -732,7 +802,18 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w return utils.ErrNotFound } if withIndex { - return NewFilterIndexer(dm, utils.ResourceProfilesPrefix, tenant).RemoveItemFromIndex(tenant, id, oldRes.FilterIDs) + if err = NewFilterIndexer(dm, utils.ResourceProfilesPrefix, + tenant).RemoveItemFromIndex(tenant, id, oldRes.FilterIDs); err != nil { + return + } + } + if len(dm.rplDataDBs) != 0 { + for _, rplDM := range dm.rplDataDBs { + if err = rplDM.RemoveResourceProfile(tenant, id, + utils.NonTransactional, withIndex); err != nil { + return + } + } } return }