Remove marshaling items from InternalDB

This commit is contained in:
TeoV
2019-10-02 08:44:36 -04:00
committed by Dan Christian Bogos
parent d930e82886
commit e95127d84a
4 changed files with 113 additions and 414 deletions

View File

@@ -155,3 +155,35 @@ func TestCsvPairToRecord(t *testing.T) {
t.Error(err)
}
}
func TestCsvSecondUsage(t *testing.T) {
cgrConfig, _ := config.NewDefaultCGRConfig()
cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0]
data, _ := engine.NewMapStorage()
dm := engine.NewDataManager(data)
filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm)
cdrcConfig.CdrSourceId = "TEST_CDRC"
cdrcConfig.ContentFields = []*config.FCTemplate{
{Tag: "TORField", Type: utils.META_COMPOSED, FieldId: utils.ToR,
Value: config.NewRSRParsersMustCompile("~0", true, utils.INFIELD_SEP)},
{Tag: "Usage", Type: utils.META_COMPOSED, FieldId: utils.Usage,
Value: config.NewRSRParsersMustCompile("~1;s", true, utils.INFIELD_SEP)},
}
csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcCfg{cdrcConfig}, filterS: filterS}
cdrRow := []string{"*voice", "12"}
expectedCdr := &engine.CDR{
CGRID: utils.Sha1("", "0.0.0.0"),
ToR: cdrRow[0],
OriginHost: "0.0.0.0",
Source: "TEST_CDRC",
Usage: time.Duration(12) * time.Second,
ExtraFields: map[string]string{},
Cost: -1,
}
if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow,
cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) {
t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr)
}
}

View File

@@ -19,11 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"bytes"
"compress/zlib"
"errors"
"fmt"
"io/ioutil"
"sync"
"github.com/cgrates/cgrates/config"
@@ -226,33 +223,11 @@ func (iDB *InternalDB) GetRatingPlanDrv(id string) (rp *RatingPlan, err error) {
if !ok || x == nil {
return nil, utils.ErrNotFound
}
b := bytes.NewBuffer(x.([]byte))
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
err = iDB.ms.Unmarshal(out, &rp)
if err != nil {
return nil, err
}
return
return x.(*RatingPlan), nil
}
func (iDB *InternalDB) SetRatingPlanDrv(rp *RatingPlan) (err error) {
result, err := iDB.ms.Marshal(rp)
if err != nil {
return err
}
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
iDB.db.Set(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes(), nil,
iDB.db.Set(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+rp.Id, rp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -268,19 +243,11 @@ func (iDB *InternalDB) GetRatingProfileDrv(id string) (rp *RatingProfile, err er
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &rp)
if err != nil {
return nil, err
}
return
return x.(*RatingProfile), nil
}
func (iDB *InternalDB) SetRatingProfileDrv(rp *RatingProfile) (err error) {
result, err := iDB.ms.Marshal(rp)
if err != nil {
return err
}
iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, result, nil,
iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, rp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -305,44 +272,16 @@ func (iDB *InternalDB) GetDestination(key string, skipCache bool, transactionID
x, ok := iDB.db.Get(utils.CacheDestinations, utils.DESTINATION_PREFIX+key)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
b := bytes.NewBuffer(x.([]byte))
r, err := zlib.NewReader(b)
if err != nil {
return nil, err
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
r.Close()
dest = new(Destination)
err = iDB.ms.Unmarshal(out, &dest)
if err != nil {
Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
if dest == nil {
Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID)
return nil, utils.ErrNotFound
}
dest = x.(*Destination)
Cache.Set(utils.CacheDestinations, key, dest, nil, cCommit, transactionID)
return
}
func (iDB *InternalDB) SetDestination(dest *Destination, transactionID string) (err error) {
result, err := iDB.ms.Marshal(dest)
if err != nil {
return err
}
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(result)
w.Close()
iDB.db.Set(utils.CacheDestinations, utils.DESTINATION_PREFIX+dest.Id, b.Bytes(), nil,
iDB.db.Set(utils.CacheDestinations, utils.DESTINATION_PREFIX+dest.Id, dest, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
Cache.Remove(utils.CacheDestinations, dest.Id,
cacheCommit(transactionID), transactionID)
@@ -487,18 +426,11 @@ func (iDB *InternalDB) GetActionsDrv(id string) (acts Actions, err error) {
if !ok || x == nil {
return nil, utils.ErrNotFound
}
if err = iDB.ms.Unmarshal(x.([]byte), &acts); err != nil {
return nil, err
}
return
return x.(Actions), err
}
func (iDB *InternalDB) SetActionsDrv(id string, acts Actions) (err error) {
result, err := iDB.ms.Marshal(acts)
if err != nil {
return err
}
iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, result, nil,
iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, acts, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -514,18 +446,11 @@ func (iDB *InternalDB) GetSharedGroupDrv(id string) (sh *SharedGroup, err error)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
if err = iDB.ms.Unmarshal(x.([]byte), &sh); err != nil {
return nil, err
}
return
return x.(*SharedGroup), nil
}
func (iDB *InternalDB) SetSharedGroupDrv(sh *SharedGroup) (err error) {
result, err := iDB.ms.Marshal(sh)
if err != nil {
return err
}
iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, result, nil,
iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, sh, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -541,18 +466,11 @@ func (iDB *InternalDB) GetActionTriggersDrv(id string) (at ActionTriggers, err e
if !ok || x == nil {
return nil, utils.ErrNotFound
}
if err = iDB.ms.Unmarshal(x.([]byte), &at); err != nil {
return nil, err
}
return
return x.(ActionTriggers), nil
}
func (iDB *InternalDB) SetActionTriggersDrv(id string, at ActionTriggers) (err error) {
result, err := iDB.ms.Marshal(at)
if err != nil {
return err
}
iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, result, nil,
iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, at, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -579,7 +497,7 @@ func (iDB *InternalDB) GetActionPlan(key string, skipCache bool, transactionID s
cCommit, transactionID)
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &ats)
ats = x.(*ActionPlan)
Cache.Set(utils.CacheActionPlans, key, ats, nil,
cCommit, transactionID)
return
@@ -607,12 +525,7 @@ func (iDB *InternalDB) SetActionPlan(key string, ats *ActionPlan,
}
}
}
result, err := iDB.ms.Marshal(&ats)
if err != nil {
return err
}
iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, result, nil,
iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, ats, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -656,15 +569,12 @@ func (iDB *InternalDB) GetAccountActionPlans(acntID string,
cacheCommit(transactionID), transactionID)
return nil, utils.ErrNotFound
}
if err = iDB.ms.Unmarshal(x.([]byte), &apIDs); err != nil {
return nil, err
}
apIDs = x.([]string)
Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil,
cacheCommit(transactionID), transactionID)
return
}
func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) {
if !overwrite {
if oldaPlIDs, err := iDB.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound {
@@ -677,16 +587,11 @@ func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, over
}
}
}
result, err := iDB.ms.Marshal(apIDs)
if err != nil {
return err
}
iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil,
iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, apIDs, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err error) {
key := utils.AccountActionPlansPrefix + acntID
if len(apIDs) == 0 {
@@ -710,11 +615,7 @@ func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
var result []byte
if result, err = iDB.ms.Marshal(oldaPlIDs); err != nil {
return err
}
iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil,
iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, oldaPlIDs, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -744,12 +645,7 @@ func (iDB *InternalDB) GetAccount(id string) (acc *Account, err error) {
if !ok || x == nil {
return nil, utils.ErrNotFound
}
acc = &Account{ID: id}
err = iDB.ms.Unmarshal(x.([]byte), &acc)
if err != nil {
return nil, err
}
return
return x.(*Account), nil
}
func (iDB *InternalDB) SetAccount(acc *Account) (err error) {
@@ -766,11 +662,7 @@ func (iDB *InternalDB) SetAccount(acc *Account) (err error) {
}
}
result, err := iDB.ms.Marshal(acc)
if err != nil {
return err
}
iDB.db.Set(utils.CacheAccounts, utils.ACCOUNT_PREFIX+acc.ID, result, nil,
iDB.db.Set(utils.CacheAccounts, utils.ACCOUNT_PREFIX+acc.ID, acc, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -786,19 +678,11 @@ func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourcePro
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &rp)
if err != nil {
return nil, err
}
return
return x.(*ResourceProfile), nil
}
func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) {
result, err := iDB.ms.Marshal(rp)
if err != nil {
return err
}
iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), result, nil,
iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), rp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -814,19 +698,11 @@ func (iDB *InternalDB) GetResourceDrv(tenant, id string) (r *Resource, err error
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &r)
if err != nil {
return nil, err
}
return
return x.(*Resource), nil
}
func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) {
result, err := iDB.ms.Marshal(r)
if err != nil {
return err
}
iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), result, nil,
iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), r, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -842,19 +718,11 @@ func (iDB *InternalDB) GetTimingDrv(id string) (tmg *utils.TPTiming, err error)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &tmg)
if err != nil {
return nil, err
}
return
return x.(*utils.TPTiming), nil
}
func (iDB *InternalDB) SetTimingDrv(timing *utils.TPTiming) (err error) {
result, err := iDB.ms.Marshal(timing)
if err != nil {
return err
}
iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, result, nil,
iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, timing, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -881,10 +749,7 @@ func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType str
return nil, utils.ErrNotFound
}
if len(fldNameVal) != 0 {
rcvidx := make(map[string]utils.StringMap)
if err = iDB.ms.Unmarshal(x.([]byte), &rcvidx); err != nil {
return nil, err
}
rcvidx := x.(map[string]utils.StringMap)
indexes = make(map[string]utils.StringMap)
for fldName, fldVal := range fldNameVal {
if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has {
@@ -898,9 +763,7 @@ func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType str
}
return
} else {
if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil {
return nil, err
}
indexes = x.(map[string]utils.StringMap)
if len(indexes) == 0 {
return nil, utils.ErrNotFound
}
@@ -937,20 +800,12 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
x, ok := iDB.db.Get(cacheID, dbKey)
if !ok || x == nil {
result, err := iDB.ms.Marshal(toBeAdded)
if err != nil {
return err
}
iDB.db.Set(cacheID, dbKey, result, nil,
iDB.db.Set(cacheID, dbKey, toBeAdded, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return err
}
mp := make(map[string]utils.StringMap)
err = iDB.ms.Unmarshal(x.([]byte), &mp)
if err != nil {
return err
}
mp := x.(map[string]utils.StringMap)
for _, key := range toBeDeleted {
delete(mp, key)
}
@@ -960,11 +815,7 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
}
mp[key] = strMp
}
result, err := iDB.ms.Marshal(mp)
if err != nil {
return err
}
iDB.db.Set(cacheID, dbKey, result, nil,
iDB.db.Set(cacheID, dbKey, mp, nil,
cacheCommit(transactionID), transactionID)
return nil
}
@@ -982,10 +833,8 @@ func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix,
return nil, utils.ErrNotFound
}
var indexes map[string]utils.StringMap
if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil {
return nil, err
}
indexes := x.(map[string]utils.StringMap)
if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt {
itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]
}
@@ -1000,19 +849,11 @@ func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *Sta
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &sq)
if err != nil {
return nil, err
}
return
return x.(*StatQueueProfile), nil
}
func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) {
result, err := iDB.ms.Marshal(sq)
if err != nil {
return err
}
iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), result, nil,
iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), sq, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -1028,18 +869,10 @@ func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQ
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &sq)
if err != nil {
return nil, err
}
return
return x.(*StoredStatQueue), nil
}
func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
result, err := iDB.ms.Marshal(sq)
if err != nil {
return err
}
iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), result, nil,
iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
@@ -1184,10 +1017,7 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin
if !ok || x == nil {
return nil, utils.ErrNotFound
}
err = iDB.ms.Unmarshal(x.([]byte), &loadIDs)
if err != nil {
return nil, err
}
loadIDs = x.(map[string]int64)
if itemIDPrefix != utils.EmptyString {
return map[string]int64{itemIDPrefix: loadIDs[itemIDPrefix]}, nil
}
@@ -1195,11 +1025,7 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin
}
func (iDB *InternalDB) SetLoadIDsDrv(loadIDs map[string]int64) (err error) {
result, err := iDB.ms.Marshal(loadIDs)
if err != nil {
return err
}
iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, result, nil,
iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, loadIDs, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}

View File

@@ -108,16 +108,11 @@ func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTi
ids := iDB.db.GetItemIDs(utils.TBLTPTimings, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.TBLTPTimings, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.ApierTPTiming
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
timings = append(timings, result)
timings = append(timings, x.(*utils.ApierTPTiming))
}
if len(timings) == 0 {
return nil, utils.ErrNotFound
@@ -136,11 +131,7 @@ func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDesti
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPDestination
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
dsts = append(dsts, result)
dsts = append(dsts, x.(*utils.TPDestination))
}
if len(dsts) == 0 {
@@ -160,10 +151,7 @@ func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err e
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPRate
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
result := x.(*utils.TPRate)
for _, rs := range result.RateSlots {
rs.SetDurations()
}
@@ -188,11 +176,8 @@ func (iDB *InternalDB) GetTPDestinationRates(tpid, id string,
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPDestinationRate
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
dRates = append(dRates, result)
dRates = append(dRates, x.(*utils.TPDestinationRate))
}
if len(dRates) == 0 {
@@ -236,11 +221,7 @@ func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Pagina
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPRatingPlan
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
rPlans = append(rPlans, result)
rPlans = append(rPlans, x.(*utils.TPRatingPlan))
}
if len(rPlans) == 0 {
@@ -294,11 +275,7 @@ func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProf
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPRatingProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
rProfiles = append(rProfiles, result)
rProfiles = append(rProfiles, x.(*utils.TPRatingProfile))
}
if len(rProfiles) == 0 {
@@ -318,11 +295,7 @@ func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSh
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPSharedGroups
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
sGroups = append(sGroups, result)
sGroups = append(sGroups, x.(*utils.TPSharedGroups))
}
if len(sGroups) == 0 {
@@ -342,11 +315,7 @@ func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPActions
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
actions = append(actions, result)
actions = append(actions, x.(*utils.TPActions))
}
if len(actions) == 0 {
@@ -366,11 +335,7 @@ func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActi
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPActionPlan
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
aPlans = append(aPlans, result)
aPlans = append(aPlans, x.(*utils.TPActionPlan))
}
if len(aPlans) == 0 {
@@ -390,11 +355,7 @@ func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils.
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPActionTriggers
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
aTriggers = append(aTriggers, result)
aTriggers = append(aTriggers, x.(*utils.TPActionTriggers))
}
if len(aTriggers) == 0 {
return nil, utils.ErrNotFound
@@ -419,11 +380,7 @@ func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (acco
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPAccountActions
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
accounts = append(accounts, result)
accounts = append(accounts, x.(*utils.TPAccountActions))
}
if len(accounts) == 0 {
@@ -446,11 +403,7 @@ func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*uti
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPResourceProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
resources = append(resources, result)
resources = append(resources, x.(*utils.TPResourceProfile))
}
if len(resources) == 0 {
@@ -473,11 +426,7 @@ func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPSta
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPStatProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
stats = append(stats, result)
stats = append(stats, x.(*utils.TPStatProfile))
}
if len(stats) == 0 {
@@ -500,11 +449,7 @@ func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TP
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPThresholdProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
ths = append(ths, result)
ths = append(ths, x.(*utils.TPThresholdProfile))
}
if len(ths) == 0 {
@@ -527,11 +472,7 @@ func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPF
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPFilterProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
fltrs = append(fltrs, result)
fltrs = append(fltrs, x.(*utils.TPFilterProfile))
}
if len(fltrs) == 0 {
@@ -554,11 +495,7 @@ func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.T
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPSupplierProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
supps = append(supps, result)
supps = append(supps, x.(*utils.TPSupplierProfile))
}
if len(supps) == 0 {
@@ -581,11 +518,7 @@ func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPAttributeProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
attrs = append(attrs, result)
attrs = append(attrs, x.(*utils.TPAttributeProfile))
}
if len(attrs) == 0 {
@@ -608,11 +541,7 @@ func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPC
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPChargerProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
cpps = append(cpps, result)
cpps = append(cpps, x.(*utils.TPChargerProfile))
}
if len(cpps) == 0 {
@@ -635,11 +564,7 @@ func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPDispatcherProfile
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
dpps = append(dpps, result)
dpps = append(dpps, x.(*utils.TPDispatcherProfile))
}
if len(dpps) == 0 {
@@ -662,11 +587,7 @@ func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*ut
if !ok || x == nil {
return nil, utils.ErrNotFound
}
var result *utils.TPDispatcherHost
if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil {
return nil, err
}
dpps = append(dpps, result)
dpps = append(dpps, x.(*utils.TPDispatcherHost))
}
if len(dpps) == 0 {
@@ -699,11 +620,7 @@ func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error)
return nil
}
for _, timing := range timings {
result, err := iDB.ms.Marshal(timing)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), result, nil,
iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), timing, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -713,11 +630,7 @@ func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err erro
return nil
}
for _, destination := range dests {
result, err := iDB.ms.Marshal(destination)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), result, nil,
iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), destination, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -728,11 +641,7 @@ func (iDB *InternalDB) SetTPRates(rates []*utils.TPRate) (err error) {
return nil
}
for _, rate := range rates {
result, err := iDB.ms.Marshal(rate)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), result, nil,
iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), rate, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -743,11 +652,7 @@ func (iDB *InternalDB) SetTPDestinationRates(dRates []*utils.TPDestinationRate)
return nil
}
for _, dRate := range dRates {
result, err := iDB.ms.Marshal(dRate)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), result, nil,
iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), dRate, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -758,11 +663,7 @@ func (iDB *InternalDB) SetTPRatingPlans(ratingPlans []*utils.TPRatingPlan) (err
return nil
}
for _, rPlan := range ratingPlans {
result, err := iDB.ms.Marshal(rPlan)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), result, nil,
iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), rPlan, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -773,12 +674,8 @@ func (iDB *InternalDB) SetTPRatingProfiles(ratingProfiles []*utils.TPRatingProfi
return nil
}
for _, rProfile := range ratingProfiles {
result, err := iDB.ms.Marshal(rProfile)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(utils.TBLTPRateProfiles, rProfile.TPid,
rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), result, nil,
rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), rProfile, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -789,11 +686,7 @@ func (iDB *InternalDB) SetTPSharedGroups(groups []*utils.TPSharedGroups) (err er
return nil
}
for _, group := range groups {
result, err := iDB.ms.Marshal(group)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), result, nil,
iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), group, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -804,11 +697,7 @@ func (iDB *InternalDB) SetTPActions(acts []*utils.TPActions) (err error) {
return nil
}
for _, action := range acts {
result, err := iDB.ms.Marshal(action)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), result, nil,
iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), action, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -819,11 +708,7 @@ func (iDB *InternalDB) SetTPActionPlans(aPlans []*utils.TPActionPlan) (err error
return nil
}
for _, aPlan := range aPlans {
result, err := iDB.ms.Marshal(aPlan)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), result, nil,
iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), aPlan, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -834,11 +719,7 @@ func (iDB *InternalDB) SetTPActionTriggers(aTriggers []*utils.TPActionTriggers)
return nil
}
for _, aTrigger := range aTriggers {
result, err := iDB.ms.Marshal(aTrigger)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), result, nil,
iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), aTrigger, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -849,12 +730,8 @@ func (iDB *InternalDB) SetTPAccountActions(accActions []*utils.TPAccountActions)
return nil
}
for _, accAction := range accActions {
result, err := iDB.ms.Marshal(accAction)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(utils.TBLTPAccountActions, accAction.TPid,
accAction.LoadId, accAction.Tenant, accAction.Account), result, nil,
accAction.LoadId, accAction.Tenant, accAction.Account), accAction, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -865,11 +742,7 @@ func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err
return nil
}
for _, resource := range resources {
result, err := iDB.ms.Marshal(resource)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), result, nil,
iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), resource, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -879,11 +752,7 @@ func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) {
return nil
}
for _, stat := range stats {
result, err := iDB.ms.Marshal(stat)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), result, nil,
iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), stat, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -894,11 +763,7 @@ func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) (
}
for _, threshold := range thresholds {
result, err := iDB.ms.Marshal(threshold)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), result, nil,
iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -909,11 +774,7 @@ func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error
}
for _, filter := range filters {
result, err := iDB.ms.Marshal(filter)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), result, nil,
iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), filter, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -924,11 +785,7 @@ func (iDB *InternalDB) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err
return nil
}
for _, supplier := range suppliers {
result, err := iDB.ms.Marshal(supplier)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), result, nil,
iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -940,11 +797,7 @@ func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) (
}
for _, attribute := range attributes {
result, err := iDB.ms.Marshal(attribute)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), result, nil,
iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -955,11 +808,7 @@ func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error)
}
for _, cpp := range cpps {
result, err := iDB.ms.Marshal(cpp)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), result, nil,
iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -970,11 +819,7 @@ func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile
}
for _, dpp := range dpps {
result, err := iDB.ms.Marshal(dpp)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), result, nil,
iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
@@ -984,11 +829,7 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err
return nil
}
for _, dpp := range dpps {
result, err := iDB.ms.Marshal(dpp)
if err != nil {
return err
}
iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), result, nil,
iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return

View File

@@ -25,7 +25,7 @@ import (
type mapMigrator struct {
dm *engine.DataManager
mp *engine.MapStorage
iDB *engine.InternalDB
dataKeys []string
qryIdx *int
}
@@ -33,7 +33,7 @@ type mapMigrator struct {
func newMapMigrator(dm *engine.DataManager) (mM *mapMigrator) {
return &mapMigrator{
dm: dm,
mp: dm.DataDB().(*engine.MapStorage),
iDB: dm.DataDB().(*engine.InternalDB),
}
}