From 59b61a7fbc57ab9ad7a1dc15ddb27d7ee9260cb2 Mon Sep 17 00:00:00 2001 From: TeoV Date: Tue, 5 Nov 2019 04:53:23 -0500 Subject: [PATCH] Implement Remote mechanism for Get method from DataManager --- engine/datamanager.go | 365 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 297 insertions(+), 68 deletions(-) diff --git a/engine/datamanager.go b/engine/datamanager.go index cebef7160..fbdf74c64 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -332,11 +332,23 @@ func (dm *DataManager) GetStatQueue(tenant, id string, } ssq, err := dm.dataDB.GetStoredStatQueueDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheStatQueues, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if ssq, rmtErr = rmtDM.dataDB.GetStoredStatQueueDrv(tenant, id); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheStatQueues, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if sq, err = ssq.AsStatQueue(dm.dataDB.Marshaler()); err != nil { return nil, err @@ -391,13 +403,25 @@ func (dm *DataManager) GetFilter(tenant, id string, cacheRead, cacheWrite bool, fltr, err = dm.DataDB().GetFilterDrv(tenant, id) } if err != nil { - if err == utils.ErrNotFound { - if cacheWrite { + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if fltr, rmtErr = rmtDM.GetFilter(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { Cache.Set(utils.CacheFilters, tntID, nil, nil, cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err + } if cacheWrite { Cache.Set(utils.CacheFilters, tntID, fltr, nil, @@ -439,11 +463,24 @@ func (dm *DataManager) GetThreshold(tenant, id string, } th, err = dm.dataDB.GetThresholdDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheThresholds, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if th, rmtErr = rmtDM.GetThreshold(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheThresholds, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheThresholds, tntID, th, nil, @@ -590,11 +627,24 @@ func (dm *DataManager) GetStatQueueProfile(tenant, id string, cacheRead, cacheWr } sqp, err = dm.dataDB.GetStatQueueProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheStatQueueProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if sqp, rmtErr = rmtDM.GetStatQueueProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheStatQueueProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheStatQueueProfiles, tntID, sqp, nil, @@ -672,11 +722,24 @@ func (dm *DataManager) GetTiming(id string, skipCache bool, } t, err = dm.dataDB.GetTimingDrv(id) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheTimings, id, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if t, rmtErr = rmtDM.GetTiming(id, false, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheTimings, id, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } Cache.Set(utils.CacheTimings, id, t, nil, cacheCommit(transactionID), transactionID) @@ -720,11 +783,24 @@ func (dm *DataManager) GetResource(tenant, id string, cacheRead, cacheWrite bool } rs, err = dm.dataDB.GetResourceDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheResources, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if rs, rmtErr = rmtDM.GetResource(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheResources, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheResources, tntID, rs, nil, @@ -768,11 +844,24 @@ func (dm *DataManager) GetResourceProfile(tenant, id string, cacheRead, cacheWri } rp, err = dm.dataDB.GetResourceProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheResourceProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if rp, rmtErr = rmtDM.GetResourceProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheResourceProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheResourceProfiles, tntID, rp, nil, @@ -852,11 +941,23 @@ func (dm *DataManager) GetActionTriggers(id string, skipCache bool, } attrs, err = dm.dataDB.GetActionTriggersDrv(id) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheActionTriggers, id, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if attrs, rmtErr = rmtDM.GetActionTriggers(id, true, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheActionTriggers, id, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err } Cache.Set(utils.CacheActionTriggers, id, attrs, nil, cacheCommit(transactionID), transactionID) @@ -900,11 +1001,23 @@ func (dm *DataManager) GetSharedGroup(key string, skipCache bool, } sg, err = dm.DataDB().GetSharedGroupDrv(key) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheSharedGroups, key, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if sg, rmtErr = rmtDM.GetSharedGroup(key, true, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheSharedGroups, key, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err } Cache.Set(utils.CacheSharedGroups, key, sg, nil, cacheCommit(transactionID), transactionID) @@ -951,11 +1064,23 @@ func (dm *DataManager) GetActions(key string, skipCache bool, transactionID stri } as, err = dm.DataDB().GetActionsDrv(key) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheActions, key, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if as, rmtErr = rmtDM.GetActions(key, true, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheActions, key, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err } Cache.Set(utils.CacheActions, key, as, nil, cacheCommit(transactionID), transactionID) @@ -998,11 +1123,23 @@ func (dm *DataManager) GetRatingPlan(key string, skipCache bool, } rp, err = dm.DataDB().GetRatingPlanDrv(key) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheRatingPlans, key, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if rp, rmtErr = rmtDM.GetRatingPlan(key, true, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheRatingPlans, key, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err } Cache.Set(utils.CacheRatingPlans, key, rp, nil, cacheCommit(transactionID), transactionID) @@ -1045,11 +1182,23 @@ func (dm *DataManager) GetRatingProfile(key string, skipCache bool, } rpf, err = dm.DataDB().GetRatingProfileDrv(key) if err != nil { - if err == utils.ErrNotFound { - Cache.Set(utils.CacheRatingProfiles, key, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if rpf, rmtErr = rmtDM.GetRatingProfile(key, true, + utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound { + Cache.Set(utils.CacheRatingProfiles, key, nil, nil, + cacheCommit(transactionID), transactionID) + } + return nil, err } - return nil, err } Cache.Set(utils.CacheRatingProfiles, key, rpf, nil, cacheCommit(transactionID), transactionID) @@ -1088,7 +1237,22 @@ func (dm *DataManager) HasData(category, subject, tenant string) (has bool, err func (dm *DataManager) GetFilterIndexes(cacheID, itemIDPrefix, filterType string, fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { - return dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal) + if indexes, err = dm.DataDB().GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType, fldNameVal); err != nil { + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if indexes, rmtErr = rmtDM.GetFilterIndexes(cacheID, itemIDPrefix, + filterType, fldNameVal); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + return nil, err + } + } + return } func (dm *DataManager) SetFilterIndexes(cacheID, itemIDPrefix string, @@ -1160,11 +1324,24 @@ func (dm *DataManager) GetSupplierProfile(tenant, id string, cacheRead, cacheWri } supp, err = dm.dataDB.GetSupplierProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if supp, rmtErr = rmtDM.GetSupplierProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheSupplierProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } // populate cache will compute specific config parameters if err = supp.Compile(); err != nil { @@ -1246,11 +1423,24 @@ func (dm *DataManager) GetAttributeProfile(tenant, id string, cacheRead, cacheWr } attrPrfl, err = dm.dataDB.GetAttributeProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheAttributeProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if attrPrfl, rmtErr = rmtDM.GetAttributeProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheAttributeProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if err = attrPrfl.Compile(); err != nil { return nil, err @@ -1344,11 +1534,24 @@ func (dm *DataManager) GetChargerProfile(tenant, id string, cacheRead, cacheWrit } cpp, err = dm.dataDB.GetChargerProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheChargerProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if cpp, rmtErr = rmtDM.GetChargerProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheChargerProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheChargerProfiles, tntID, cpp, nil, @@ -1427,11 +1630,24 @@ func (dm *DataManager) GetDispatcherProfile(tenant, id string, cacheRead, cacheW } dpp, err = dm.dataDB.GetDispatcherProfileDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if dpp, rmtErr = rmtDM.GetDispatcherProfile(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheDispatcherProfiles, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { Cache.Set(utils.CacheDispatcherProfiles, tntID, dpp, nil, @@ -1522,11 +1738,24 @@ func (dm *DataManager) GetDispatcherHost(tenant, id string, cacheRead, cacheWrit } dH, err = dm.dataDB.GetDispatcherHostDrv(tenant, id) if err != nil { - if err == utils.ErrNotFound && cacheWrite { - Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil, - cacheCommit(transactionID), transactionID) + if len(dm.rmtDataDBs) != 0 { + var rmtErr error + for _, rmtDM := range dm.rmtDataDBs { + if dH, rmtErr = rmtDM.GetDispatcherHost(tenant, id, false, + false, utils.NonTransactional); rmtErr == nil { + break + } + } + err = rmtErr + } + if err != nil { + if err == utils.ErrNotFound && cacheWrite { + Cache.Set(utils.CacheDispatcherHosts, tntID, nil, nil, + cacheCommit(transactionID), transactionID) + + } + return nil, err } - return nil, err } if cacheWrite { cfg := config.CgrConfig()