Finished replacing replication in v0.10

This commit is contained in:
andronache
2021-07-13 16:08:04 +03:00
committed by Dan Christian Bogos
parent 07111d1e4d
commit 03fce91219
5 changed files with 51 additions and 48 deletions

View File

@@ -310,7 +310,6 @@ func testPrecacheGetCacheStatsAfterRestart(t *testing.T) {
Items: 0,
Groups: 0,
},
utils.CacheReplicationHosts: {},
}
if err := precacheRPC.Call(utils.CacheSv1GetCacheStats, args, &reply); err != nil {
t.Error(err.Error())

View File

@@ -19,6 +19,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package v1
import (
"fmt"
"time"
"github.com/cgrates/cgrates/engine"
@@ -65,6 +66,7 @@ func (APIerSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, re
if missing := utils.MissingStructFields(arg.StatQueueProfile, []string{"Tenant", "ID"}); len(missing) != 0 {
return utils.NewErrMandatoryIeMissing(missing...)
}
utils.Logger.Warning(fmt.Sprintf("yay2================================================================"))
if err := APIerSv1.DataManager.SetStatQueueProfile(arg.StatQueueProfile, true); err != nil {
return utils.APIErrorHandler(err)
}
@@ -85,6 +87,7 @@ func (APIerSv1 *APIerSv1) SetStatQueueProfile(arg *engine.StatQueueWithCache, re
if has, err := APIerSv1.DataManager.HasData(utils.StatQueuePrefix, arg.ID, arg.Tenant); err != nil {
return err
} else if !has {
utils.Logger.Warning(fmt.Sprintf("yay2================================================================"))
//compose metrics for StatQueue
metrics := make(map[string]engine.StatMetric)
for _, metric := range arg.Metrics {

View File

@@ -389,11 +389,11 @@ func (dm *DataManager) SetDestination(dest *Destination, transactionID string) (
return
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaDestinations]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
err = replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.DestinationPrefix, destID, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveDestination,
destID)
utils.DestinationPrefix, dest.Id, // this are used to get the host IDs from cache
utils.ReplicatorSv1SetDestination,
dest)
}
return
}
@@ -556,7 +556,7 @@ func (dm *DataManager) SetStatQueue(sq *StatQueue) (err error) {
config.CgrConfig().DataDbCfg().RplFiltered,
utils.StatQueuePrefix, sq.TenantID(), // this are used to get the host IDs from cache
utils.ReplicatorSv1SetStatQueue,
sq)
ssq)
}
return
}
@@ -896,6 +896,8 @@ func (dm *DataManager) SetStatQueueProfile(sqp *StatQueueProfile, withIndex bool
utils.ReplicatorSv1SetStatQueueProfile,
sqp); err != nil {
return
}
}
return
}
@@ -1054,7 +1056,7 @@ func (dm *DataManager) RemoveResource(tenant, id, transactionID string) (err err
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ResourcesPrefix, utils.ConcatenatedKey(tenant, id), // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveResource,
&utils.TenantID{Tenant: tenant, ID: id}
&utils.TenantID{Tenant: tenant, ID: id})
}
return
}
@@ -1202,13 +1204,13 @@ func (dm *DataManager) RemoveActionTriggers(id, transactionID string) (err error
}
Cache.Remove(utils.CacheActionTriggers, id,
cacheCommit(transactionID), transactionID)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_TRIGGER_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActionTriggers,
id)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActionTriggers]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_TRIGGER_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActionTriggers,
id)
}
return
}
@@ -1297,13 +1299,13 @@ func (dm *DataManager) RemoveSharedGroup(id, transactionID string) (err error) {
}
Cache.Remove(utils.CacheSharedGroups, id,
cacheCommit(transactionID), transactionID)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.SHARED_GROUP_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveSharedGroup,
id)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaSharedGroups]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.SHARED_GROUP_PREFIX, id, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveSharedGroup,
id)
}
return
}
@@ -1374,13 +1376,13 @@ func (dm *DataManager) RemoveActions(key, transactionID string) (err error) {
}
Cache.Remove(utils.CacheActions, key,
cacheCommit(transactionID), transactionID)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActions,
key)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaActions]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.ACTION_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveActions,
key)
}
return
}
@@ -1454,7 +1456,6 @@ func (dm *DataManager) SetActionPlan(key string, ats *ActionPlan,
&SetActionPlanArg{
Key: key,
Ats: ats})
}
}
return
}
@@ -1647,13 +1648,13 @@ func (dm *DataManager) RemoveRatingPlan(key string, transactionID string) (err e
}
Cache.Remove(utils.CacheRatingPlans, key,
cacheCommit(transactionID), transactionID)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PLAN_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingPlan,
key)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingPlans]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PLAN_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingPlan,
key)
}
return
}
@@ -1719,13 +1720,13 @@ func (dm *DataManager) RemoveRatingProfile(key string,
}
Cache.Remove(utils.CacheRatingProfiles, key,
cacheCommit(transactionID), transactionID)
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PROFILE_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingProfile,
key)
}
if itm := config.CgrConfig().DataDbCfg().Items[utils.MetaRatingProfiles]; itm.Replicate {
replicate(dm.connMgr, config.CgrConfig().DataDbCfg().RplConns,
config.CgrConfig().DataDbCfg().RplFiltered,
utils.RATING_PROFILE_PREFIX, key, // this are used to get the host IDs from cache
utils.ReplicatorSv1RemoveRatingProfile,
key)
}
return
}

View File

@@ -621,6 +621,5 @@ func GetDefaultEmptyCacheStats() map[string]*ltcache.CacheStats {
Items: 0,
Groups: 0,
},
utils.CacheReplicationHosts: {},
}
}

View File

@@ -891,11 +891,12 @@ type SetFilterIndexesArg struct {
}
func CastRPCErr(err error) error {
if _, has := ErrMap[err.Error()]; has {
return ErrMap[err.Error()]
} else {
return err
if err != nil {
if _, has := ErrMap[err.Error()]; has {
return ErrMap[err.Error()]
}
}
return err
}
// RandomInteger returns a random integer between min and max values