Implement Remote mechanism for Get method from DataManager

This commit is contained in:
TeoV
2019-11-05 04:53:23 -05:00
committed by Dan Christian Bogos
parent 1a768269c6
commit 59b61a7fbc

View File

@@ -332,11 +332,23 @@ func (dm *DataManager) GetStatQueue(tenant, id string,
}
ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheStatQueues, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if ssq, rmtErr = rmtDM.dataDB.GetStoredStatQueueDrv(tenant, id); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheStatQueues, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if sq, err = ssq.AsStatQueue(dm.dataDB.Marshaler()); err != nil {
return nil, err
@@ -391,13 +403,25 @@ func (dm *DataManager) GetFilter(tenant, id string, cacheRead, cacheWrite bool,
fltr, err = dm.DataDB().GetFilterDrv(tenant, id)
}
if err != nil {
if err == utils.ErrNotFound {
if cacheWrite {
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if fltr, rmtErr = rmtDM.GetFilter(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheFilters, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheFilters, tntID, fltr, nil,
@@ -439,11 +463,24 @@ func (dm *DataManager) GetThreshold(tenant, id string,
}
th, err = dm.dataDB.GetThresholdDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheThresholds, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if th, rmtErr = rmtDM.GetThreshold(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheThresholds, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheThresholds, tntID, th, nil,
@@ -590,11 +627,24 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWr
}
sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheStatQueueProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if sqp, rmtErr = rmtDM.GetStatQueueProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheStatQueueProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheStatQueueProfiles, tntID, sqp, nil,
@@ -672,11 +722,24 @@ func (dm *DataManager) GetTiming(id string, skipCache bool,
}
t, err = dm.dataDB.GetTimingDrv(id)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheTimings, id, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if t, rmtErr = rmtDM.GetTiming(id, false,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheTimings, id, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheTimings, id, t, nil,
cacheCommit(transactionID), transactionID)
@@ -720,11 +783,24 @@ func (dm *DataManager) GetResource(tenant, id string, cacheRead, cacheWrite bool
}
rs, err = dm.dataDB.GetResourceDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheResources, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rs, rmtErr = rmtDM.GetResource(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheResources, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheResources, tntID, rs, nil,
@@ -768,11 +844,24 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, cacheRead, cacheWri
}
rp, err = dm.dataDB.GetResourceProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheResourceProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rp, rmtErr = rmtDM.GetResourceProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheResourceProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheResourceProfiles, tntID, rp, nil,
@@ -852,11 +941,23 @@ func (dm *DataManager) GetActionTriggers(id string, skipCache bool,
}
attrs, err = dm.dataDB.GetActionTriggersDrv(id)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheActionTriggers, id, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if attrs, rmtErr = rmtDM.GetActionTriggers(id, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheActionTriggers, id, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheActionTriggers, id, attrs, nil,
cacheCommit(transactionID), transactionID)
@@ -900,11 +1001,23 @@ func (dm *DataManager) GetSharedGroup(key string, skipCache bool,
}
sg, err = dm.DataDB().GetSharedGroupDrv(key)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheSharedGroups, key, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if sg, rmtErr = rmtDM.GetSharedGroup(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheSharedGroups, key, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheSharedGroups, key, sg, nil,
cacheCommit(transactionID), transactionID)
@@ -951,11 +1064,23 @@ func (dm *DataManager) GetActions(key string, skipCache bool, transactionID stri
}
as, err = dm.DataDB().GetActionsDrv(key)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheActions, key, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if as, rmtErr = rmtDM.GetActions(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheActions, key, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheActions, key, as, nil,
cacheCommit(transactionID), transactionID)
@@ -998,11 +1123,23 @@ func (dm *DataManager) GetRatingPlan(key string, skipCache bool,
}
rp, err = dm.DataDB().GetRatingPlanDrv(key)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheRatingPlans, key, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rp, rmtErr = rmtDM.GetRatingPlan(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheRatingPlans, key, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheRatingPlans, key, rp, nil,
cacheCommit(transactionID), transactionID)
@@ -1045,11 +1182,23 @@ func (dm *DataManager) GetRatingProfile(key string, skipCache bool,
}
rpf, err = dm.DataDB().GetRatingProfileDrv(key)
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheRatingProfiles, key, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if rpf, rmtErr = rmtDM.GetRatingProfile(key, true,
utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound {
Cache.Set(utils.CacheRatingProfiles, key, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
Cache.Set(utils.CacheRatingProfiles, key, rpf, nil,
cacheCommit(transactionID), transactionID)
@@ -1088,7 +1237,22 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err
func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
return dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal)
if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil {
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix,
filterType, fldNameVal); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
return nil, err
}
}
return
}
func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string,
@@ -1160,11 +1324,24 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri
}
supp, err = dm.dataDB.GetSupplierProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if supp, rmtErr = rmtDM.GetSupplierProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
// populate cache will compute specific config parameters
if err = supp.Compile(); err != nil {
@@ -1246,11 +1423,24 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, cacheRead, cacheWr
}
attrPrfl, err = dm.dataDB.GetAttributeProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheAttributeProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if attrPrfl, rmtErr = rmtDM.GetAttributeProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheAttributeProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if err = attrPrfl.Compile(); err != nil {
return nil, err
@@ -1344,11 +1534,24 @@ func (dm *DataManager) GetChargerProfile(tenant, id string, cacheRead, cacheWrit
}
cpp, err = dm.dataDB.GetChargerProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheChargerProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if cpp, rmtErr = rmtDM.GetChargerProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheChargerProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheChargerProfiles, tntID, cpp, nil,
@@ -1427,11 +1630,24 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW
}
dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if dpp, rmtErr = rmtDM.GetDispatcherProfile(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
Cache.Set(utils.CacheDispatcherProfiles, tntID, dpp, nil,
@@ -1522,11 +1738,24 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit
}
dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id)
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
if len(dm.rmtDataDBs) != 0 {
var rmtErr error
for _, rmtDM := range dm.rmtDataDBs {
if dH, rmtErr = rmtDM.GetDispatcherHost(tenant, id, false,
false, utils.NonTransactional); rmtErr == nil {
break
}
}
err = rmtErr
}
if err != nil {
if err == utils.ErrNotFound && cacheWrite {
Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil,
cacheCommit(transactionID), transactionID)
}
return nil, err
}
return nil, err
}
if cacheWrite {
cfg := config.CgrConfig()