Implement Replication Remote mechanism for DataManager for some methods

This commit is contained in:
TeoV
2019-11-05 03:29:39 -05:00
committed by Dan Christian Bogos
parent c68440aaec
commit 8fae8c1acf

View File

@@ -435,8 +435,18 @@ func (dm *DataManager) SetThreshold(th *Threshold) (err error) {
}
func (dm *DataManager) RemoveThreshold(tenant, id, transactionID string) (err error) {
return dm.DataDB().RemoveThresholdDrv(tenant, id)
if err = dm.DataDB().RemoveThresholdDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveThreshold(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWrite bool,
@@ -452,11 +462,24 @@ func (dm *DataManager) GetThresholdProfile(tenant, id string, cacheRead, cacheWr
}
th, err = dm.dataDB.GetThresholdProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if th, rmtErr = rmtDM.GetThresholdProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheThresholdProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheThresholdProfiles, tntID, th, nil,
@@ -488,7 +511,17 @@ func (dm *DataManager) SetThresholdProfile(th *ThresholdProfile, withIndex bool)
}
}
}
return createAndIndex(utils.ThresholdProfilePrefix, th.Tenant, utils.EmptyString, th.ID, th.FilterIDs, dm)
if err := createAndIndex(utils.ThresholdProfilePrefix, th.Tenant,
utils.EmptyString, th.ID, th.FilterIDs, dm); err != nil {
return err
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err := rplDM.SetThresholdProfile(th, withIndex); err != nil {
return err
}
}
}
return
}
@@ -506,8 +539,18 @@ func (dm *DataManager) RemoveThresholdProfile(tenant, id,
return utils.ErrNotFound
}
if withIndex {
return NewFilterIndexer(dm,
utils.ThresholdProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldTh.FilterIDs)
if err = NewFilterIndexer(dm, utils.ThresholdProfilePrefix,
tenant).RemoveItemFromIndex(tenant, id, oldTh.FilterIDs); err != nil {
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveThresholdProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -579,7 +622,18 @@ func (dm *DataManager) RemoveStatQueueProfile(tenant, id,
return utils.ErrNotFound
}
if withIndex {
return NewFilterIndexer(dm, utils.StatQueueProfilePrefix, tenant).RemoveItemFromIndex(tenant, id, oldSts.FilterIDs)
if err = NewFilterIndexer(dm, utils.StatQueueProfilePrefix,
tenant).RemoveItemFromIndex(tenant, id, oldSts.FilterIDs); err != nil {
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveStatQueueProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}
@@ -618,6 +672,14 @@ func (dm *DataManager) RemoveTiming(id, transactionID string) (err error) {
if err = dm.DataDB().RemoveTimingDrv(id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveTiming(id,
utils.NonTransactional); err != nil {
return
}
}
}
Cache.Remove(utils.CacheTimings, id,
cacheCommit(transactionID), transactionID)
return
@@ -660,6 +722,14 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err
if err = dm.DataDB().RemoveResourceDrv(tenant, id); err != nil {
return
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveResource(tenant, id,
utils.NonTransactional); err != nil {
return
}
}
}
return
}
@@ -732,7 +802,18 @@ func (dm *DataManager) RemoveResourceProfile(tenant, id, transactionID string, w
return utils.ErrNotFound
}
if withIndex {
return NewFilterIndexer(dm, utils.ResourceProfilesPrefix, tenant).RemoveItemFromIndex(tenant, id, oldRes.FilterIDs)
if err = NewFilterIndexer(dm, utils.ResourceProfilesPrefix,
tenant).RemoveItemFromIndex(tenant, id, oldRes.FilterIDs); err != nil {
return
}
}
if len(dm.rplDataDBs) != 0 {
for _, rplDM := range dm.rplDataDBs {
if err = rplDM.RemoveResourceProfile(tenant, id,
utils.NonTransactional, withIndex); err != nil {
return
}
}
}
return
}