diff --git a/cdrc/csv_test.go b/cdrc/csv_test.go index ffaee3fb5..b0a63678f 100644 --- a/cdrc/csv_test.go +++ b/cdrc/csv_test.go @@ -155,3 +155,35 @@ func TestCsvPairToRecord(t *testing.T) { t.Error(err) } } + +func TestCsvSecondUsage(t *testing.T) { + cgrConfig, _ := config.NewDefaultCGRConfig() + cdrcConfig := cgrConfig.CdrcProfiles["/var/spool/cgrates/cdrc/in"][0] + data, _ := engine.NewMapStorage() + dm := engine.NewDataManager(data) + filterS := engine.NewFilterS(cgrConfig, nil, nil, nil, dm) + cdrcConfig.CdrSourceId = "TEST_CDRC" + cdrcConfig.ContentFields = []*config.FCTemplate{ + {Tag: "TORField", Type: utils.META_COMPOSED, FieldId: utils.ToR, + Value: config.NewRSRParsersMustCompile("~0", true, utils.INFIELD_SEP)}, + + {Tag: "Usage", Type: utils.META_COMPOSED, FieldId: utils.Usage, + Value: config.NewRSRParsersMustCompile("~1;s", true, utils.INFIELD_SEP)}, + } + csvProcessor := &CsvRecordsProcessor{dfltCdrcCfg: cdrcConfig, cdrcCfgs: []*config.CdrcCfg{cdrcConfig}, filterS: filterS} + + cdrRow := []string{"*voice", "12"} + expectedCdr := &engine.CDR{ + CGRID: utils.Sha1("", "0.0.0.0"), + ToR: cdrRow[0], + OriginHost: "0.0.0.0", + Source: "TEST_CDRC", + Usage: time.Duration(12) * time.Second, + ExtraFields: map[string]string{}, + Cost: -1, + } + if rtCdr, _ := csvProcessor.recordToStoredCdr(cdrRow, + cdrcConfig, "cgrates.org"); !reflect.DeepEqual(expectedCdr, rtCdr) { + t.Errorf("Expected: \n%v, \nreceived: \n%v", expectedCdr, rtCdr) + } +} diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index a1c355795..a5b5fdef6 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -19,11 +19,8 @@ along with this program. If not, see package engine import ( - "bytes" - "compress/zlib" "errors" "fmt" - "io/ioutil" "sync" "github.com/cgrates/cgrates/config" @@ -226,33 +223,11 @@ func (iDB *InternalDB) GetRatingPlanDrv(id string) (rp *RatingPlan, err error) { if !ok || x == nil { return nil, utils.ErrNotFound } - b := bytes.NewBuffer(x.([]byte)) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err - } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - err = iDB.ms.Unmarshal(out, &rp) - if err != nil { - return nil, err - } - return + return x.(*RatingPlan), nil } func (iDB *InternalDB) SetRatingPlanDrv(rp *RatingPlan) (err error) { - result, err := iDB.ms.Marshal(rp) - if err != nil { - return err - } - var b bytes.Buffer - w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - iDB.db.Set(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+rp.Id, b.Bytes(), nil, + iDB.db.Set(utils.CacheRatingPlans, utils.RATING_PLAN_PREFIX+rp.Id, rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -268,19 +243,11 @@ func (iDB *InternalDB) GetRatingProfileDrv(id string) (rp *RatingProfile, err er if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &rp) - if err != nil { - return nil, err - } - return + return x.(*RatingProfile), nil } func (iDB *InternalDB) SetRatingProfileDrv(rp *RatingProfile) (err error) { - result, err := iDB.ms.Marshal(rp) - if err != nil { - return err - } - iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, result, nil, + iDB.db.Set(utils.CacheRatingProfiles, utils.RATING_PROFILE_PREFIX+rp.Id, rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -305,44 +272,16 @@ func (iDB *InternalDB) GetDestination(key string, skipCache bool, transactionID x, ok := iDB.db.Get(utils.CacheDestinations, utils.DESTINATION_PREFIX+key) if !ok || x == nil { - return nil, utils.ErrNotFound - } - - b := bytes.NewBuffer(x.([]byte)) - r, err := zlib.NewReader(b) - if err != nil { - return nil, err - } - out, err := ioutil.ReadAll(r) - if err != nil { - return nil, err - } - r.Close() - dest = new(Destination) - err = iDB.ms.Unmarshal(out, &dest) - if err != nil { - Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID) - return nil, utils.ErrNotFound - } - - if dest == nil { Cache.Set(utils.CacheDestinations, key, nil, nil, cCommit, transactionID) return nil, utils.ErrNotFound } + dest = x.(*Destination) Cache.Set(utils.CacheDestinations, key, dest, nil, cCommit, transactionID) return } func (iDB *InternalDB) SetDestination(dest *Destination, transactionID string) (err error) { - result, err := iDB.ms.Marshal(dest) - if err != nil { - return err - } - var b bytes.Buffer - w := zlib.NewWriter(&b) - w.Write(result) - w.Close() - iDB.db.Set(utils.CacheDestinations, utils.DESTINATION_PREFIX+dest.Id, b.Bytes(), nil, + iDB.db.Set(utils.CacheDestinations, utils.DESTINATION_PREFIX+dest.Id, dest, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) Cache.Remove(utils.CacheDestinations, dest.Id, cacheCommit(transactionID), transactionID) @@ -487,18 +426,11 @@ func (iDB *InternalDB) GetActionsDrv(id string) (acts Actions, err error) { if !ok || x == nil { return nil, utils.ErrNotFound } - if err = iDB.ms.Unmarshal(x.([]byte), &acts); err != nil { - return nil, err - } - return + return x.(Actions), err } func (iDB *InternalDB) SetActionsDrv(id string, acts Actions) (err error) { - result, err := iDB.ms.Marshal(acts) - if err != nil { - return err - } - iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, result, nil, + iDB.db.Set(utils.CacheActions, utils.ACTION_PREFIX+id, acts, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -514,18 +446,11 @@ func (iDB *InternalDB) GetSharedGroupDrv(id string) (sh *SharedGroup, err error) if !ok || x == nil { return nil, utils.ErrNotFound } - if err = iDB.ms.Unmarshal(x.([]byte), &sh); err != nil { - return nil, err - } - return + return x.(*SharedGroup), nil } func (iDB *InternalDB) SetSharedGroupDrv(sh *SharedGroup) (err error) { - result, err := iDB.ms.Marshal(sh) - if err != nil { - return err - } - iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, result, nil, + iDB.db.Set(utils.CacheSharedGroups, utils.SHARED_GROUP_PREFIX+sh.Id, sh, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -541,18 +466,11 @@ func (iDB *InternalDB) GetActionTriggersDrv(id string) (at ActionTriggers, err e if !ok || x == nil { return nil, utils.ErrNotFound } - if err = iDB.ms.Unmarshal(x.([]byte), &at); err != nil { - return nil, err - } - return + return x.(ActionTriggers), nil } func (iDB *InternalDB) SetActionTriggersDrv(id string, at ActionTriggers) (err error) { - result, err := iDB.ms.Marshal(at) - if err != nil { - return err - } - iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, result, nil, + iDB.db.Set(utils.CacheActionTriggers, utils.ACTION_TRIGGER_PREFIX+id, at, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -579,7 +497,7 @@ func (iDB *InternalDB) GetActionPlan(key string, skipCache bool, transactionID s cCommit, transactionID) return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &ats) + ats = x.(*ActionPlan) Cache.Set(utils.CacheActionPlans, key, ats, nil, cCommit, transactionID) return @@ -607,12 +525,7 @@ func (iDB *InternalDB) SetActionPlan(key string, ats *ActionPlan, } } } - - result, err := iDB.ms.Marshal(&ats) - if err != nil { - return err - } - iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, result, nil, + iDB.db.Set(utils.CacheActionPlans, utils.ACTION_PLAN_PREFIX+key, ats, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -656,15 +569,12 @@ func (iDB *InternalDB) GetAccountActionPlans(acntID string, cacheCommit(transactionID), transactionID) return nil, utils.ErrNotFound } - if err = iDB.ms.Unmarshal(x.([]byte), &apIDs); err != nil { - return nil, err - } + apIDs = x.([]string) Cache.Set(utils.CacheAccountActionPlans, acntID, apIDs, nil, cacheCommit(transactionID), transactionID) return } - func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { if !overwrite { if oldaPlIDs, err := iDB.GetAccountActionPlans(acntID, true, utils.NonTransactional); err != nil && err != utils.ErrNotFound { @@ -677,16 +587,11 @@ func (iDB *InternalDB) SetAccountActionPlans(acntID string, apIDs []string, over } } } - result, err := iDB.ms.Marshal(apIDs) - if err != nil { - return err - } - iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil, + iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, apIDs, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } - func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err error) { key := utils.AccountActionPlansPrefix + acntID if len(apIDs) == 0 { @@ -710,11 +615,7 @@ func (iDB *InternalDB) RemAccountActionPlans(acntID string, apIDs []string) (err cacheCommit(utils.NonTransactional), utils.NonTransactional) return } - var result []byte - if result, err = iDB.ms.Marshal(oldaPlIDs); err != nil { - return err - } - iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, result, nil, + iDB.db.Set(utils.CacheAccountActionPlans, utils.AccountActionPlansPrefix+acntID, oldaPlIDs, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -744,12 +645,7 @@ func (iDB *InternalDB) GetAccount(id string) (acc *Account, err error) { if !ok || x == nil { return nil, utils.ErrNotFound } - acc = &Account{ID: id} - err = iDB.ms.Unmarshal(x.([]byte), &acc) - if err != nil { - return nil, err - } - return + return x.(*Account), nil } func (iDB *InternalDB) SetAccount(acc *Account) (err error) { @@ -766,11 +662,7 @@ func (iDB *InternalDB) SetAccount(acc *Account) (err error) { } } - result, err := iDB.ms.Marshal(acc) - if err != nil { - return err - } - iDB.db.Set(utils.CacheAccounts, utils.ACCOUNT_PREFIX+acc.ID, result, nil, + iDB.db.Set(utils.CacheAccounts, utils.ACCOUNT_PREFIX+acc.ID, acc, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -786,19 +678,11 @@ func (iDB *InternalDB) GetResourceProfileDrv(tenant, id string) (rp *ResourcePro if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &rp) - if err != nil { - return nil, err - } - return + return x.(*ResourceProfile), nil } func (iDB *InternalDB) SetResourceProfileDrv(rp *ResourceProfile) (err error) { - result, err := iDB.ms.Marshal(rp) - if err != nil { - return err - } - iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), result, nil, + iDB.db.Set(utils.CacheResourceProfiles, utils.ResourceProfilesPrefix+rp.TenantID(), rp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -814,19 +698,11 @@ func (iDB *InternalDB) GetResourceDrv(tenant, id string) (r *Resource, err error if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &r) - if err != nil { - return nil, err - } - return + return x.(*Resource), nil } func (iDB *InternalDB) SetResourceDrv(r *Resource) (err error) { - result, err := iDB.ms.Marshal(r) - if err != nil { - return err - } - iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), result, nil, + iDB.db.Set(utils.CacheResources, utils.ResourcesPrefix+r.TenantID(), r, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -842,19 +718,11 @@ func (iDB *InternalDB) GetTimingDrv(id string) (tmg *utils.TPTiming, err error) if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &tmg) - if err != nil { - return nil, err - } - return + return x.(*utils.TPTiming), nil } func (iDB *InternalDB) SetTimingDrv(timing *utils.TPTiming) (err error) { - result, err := iDB.ms.Marshal(timing) - if err != nil { - return err - } - iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, result, nil, + iDB.db.Set(utils.CacheTimings, utils.TimingsPrefix+timing.ID, timing, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -881,10 +749,7 @@ func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType str return nil, utils.ErrNotFound } if len(fldNameVal) != 0 { - rcvidx := make(map[string]utils.StringMap) - if err = iDB.ms.Unmarshal(x.([]byte), &rcvidx); err != nil { - return nil, err - } + rcvidx := x.(map[string]utils.StringMap) indexes = make(map[string]utils.StringMap) for fldName, fldVal := range fldNameVal { if _, has := indexes[utils.ConcatenatedKey(filterType, fldName, fldVal)]; !has { @@ -898,9 +763,7 @@ func (iDB *InternalDB) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType str } return } else { - if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil { - return nil, err - } + indexes = x.(map[string]utils.StringMap) if len(indexes) == 0 { return nil, utils.ErrNotFound } @@ -937,20 +800,12 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, x, ok := iDB.db.Get(cacheID, dbKey) if !ok || x == nil { - result, err := iDB.ms.Marshal(toBeAdded) - if err != nil { - return err - } - iDB.db.Set(cacheID, dbKey, result, nil, + iDB.db.Set(cacheID, dbKey, toBeAdded, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return err } - mp := make(map[string]utils.StringMap) - err = iDB.ms.Unmarshal(x.([]byte), &mp) - if err != nil { - return err - } + mp := x.(map[string]utils.StringMap) for _, key := range toBeDeleted { delete(mp, key) } @@ -960,11 +815,7 @@ func (iDB *InternalDB) SetFilterIndexesDrv(cacheID, itemIDPrefix string, } mp[key] = strMp } - result, err := iDB.ms.Marshal(mp) - if err != nil { - return err - } - iDB.db.Set(cacheID, dbKey, result, nil, + iDB.db.Set(cacheID, dbKey, mp, nil, cacheCommit(transactionID), transactionID) return nil } @@ -982,10 +833,8 @@ func (iDB *InternalDB) MatchFilterIndexDrv(cacheID, itemIDPrefix, return nil, utils.ErrNotFound } - var indexes map[string]utils.StringMap - if err = iDB.ms.Unmarshal(x.([]byte), &indexes); err != nil { - return nil, err - } + indexes := x.(map[string]utils.StringMap) + if _, hasIt := indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)]; hasIt { itemIDs = indexes[utils.ConcatenatedKey(filterType, fieldName, fieldVal)] } @@ -1000,19 +849,11 @@ func (iDB *InternalDB) GetStatQueueProfileDrv(tenant string, id string) (sq *Sta if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &sq) - if err != nil { - return nil, err - } - return + return x.(*StatQueueProfile), nil } func (iDB *InternalDB) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { - result, err := iDB.ms.Marshal(sq) - if err != nil { - return err - } - iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), result, nil, + iDB.db.Set(utils.CacheStatQueueProfiles, utils.StatQueueProfilePrefix+sq.TenantID(), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -1028,18 +869,10 @@ func (iDB *InternalDB) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQ if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &sq) - if err != nil { - return nil, err - } - return + return x.(*StoredStatQueue), nil } func (iDB *InternalDB) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { - result, err := iDB.ms.Marshal(sq) - if err != nil { - return err - } - iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), result, nil, + iDB.db.Set(utils.CacheStatQueues, utils.StatQueuePrefix+utils.ConcatenatedKey(sq.Tenant, sq.ID), sq, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } @@ -1184,10 +1017,7 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin if !ok || x == nil { return nil, utils.ErrNotFound } - err = iDB.ms.Unmarshal(x.([]byte), &loadIDs) - if err != nil { - return nil, err - } + loadIDs = x.(map[string]int64) if itemIDPrefix != utils.EmptyString { return map[string]int64{itemIDPrefix: loadIDs[itemIDPrefix]}, nil } @@ -1195,11 +1025,7 @@ func (iDB *InternalDB) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[strin } func (iDB *InternalDB) SetLoadIDsDrv(loadIDs map[string]int64) (err error) { - result, err := iDB.ms.Marshal(loadIDs) - if err != nil { - return err - } - iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, result, nil, + iDB.db.Set(utils.CacheLoadIDs, utils.LoadIDs, loadIDs, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) return } diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go index 174c14b9a..4c5aedd52 100644 --- a/engine/storage_internal_stordb.go +++ b/engine/storage_internal_stordb.go @@ -108,16 +108,11 @@ func (iDB *InternalDB) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTi ids := iDB.db.GetItemIDs(utils.TBLTPTimings, key) for _, id := range ids { - x, ok := iDB.db.Get(utils.TBLTPTimings, id) if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.ApierTPTiming - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - timings = append(timings, result) + timings = append(timings, x.(*utils.ApierTPTiming)) } if len(timings) == 0 { return nil, utils.ErrNotFound @@ -136,11 +131,7 @@ func (iDB *InternalDB) GetTPDestinations(tpid, id string) (dsts []*utils.TPDesti if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPDestination - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - dsts = append(dsts, result) + dsts = append(dsts, x.(*utils.TPDestination)) } if len(dsts) == 0 { @@ -160,10 +151,7 @@ func (iDB *InternalDB) GetTPRates(tpid, id string) (rates []*utils.TPRate, err e if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPRate - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } + result := x.(*utils.TPRate) for _, rs := range result.RateSlots { rs.SetDurations() } @@ -188,11 +176,8 @@ func (iDB *InternalDB) GetTPDestinationRates(tpid, id string, if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPDestinationRate - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - dRates = append(dRates, result) + + dRates = append(dRates, x.(*utils.TPDestinationRate)) } if len(dRates) == 0 { @@ -236,11 +221,7 @@ func (iDB *InternalDB) GetTPRatingPlans(tpid, id string, paginator *utils.Pagina if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPRatingPlan - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - rPlans = append(rPlans, result) + rPlans = append(rPlans, x.(*utils.TPRatingPlan)) } if len(rPlans) == 0 { @@ -294,11 +275,7 @@ func (iDB *InternalDB) GetTPRatingProfiles(filter *utils.TPRatingProfile) (rProf if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPRatingProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - rProfiles = append(rProfiles, result) + rProfiles = append(rProfiles, x.(*utils.TPRatingProfile)) } if len(rProfiles) == 0 { @@ -318,11 +295,7 @@ func (iDB *InternalDB) GetTPSharedGroups(tpid, id string) (sGroups []*utils.TPSh if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPSharedGroups - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - sGroups = append(sGroups, result) + sGroups = append(sGroups, x.(*utils.TPSharedGroups)) } if len(sGroups) == 0 { @@ -342,11 +315,7 @@ func (iDB *InternalDB) GetTPActions(tpid, id string) (actions []*utils.TPActions if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPActions - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - actions = append(actions, result) + actions = append(actions, x.(*utils.TPActions)) } if len(actions) == 0 { @@ -366,11 +335,7 @@ func (iDB *InternalDB) GetTPActionPlans(tpid, id string) (aPlans []*utils.TPActi if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPActionPlan - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - aPlans = append(aPlans, result) + aPlans = append(aPlans, x.(*utils.TPActionPlan)) } if len(aPlans) == 0 { @@ -390,11 +355,7 @@ func (iDB *InternalDB) GetTPActionTriggers(tpid, id string) (aTriggers []*utils. if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPActionTriggers - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - aTriggers = append(aTriggers, result) + aTriggers = append(aTriggers, x.(*utils.TPActionTriggers)) } if len(aTriggers) == 0 { return nil, utils.ErrNotFound @@ -419,11 +380,7 @@ func (iDB *InternalDB) GetTPAccountActions(filter *utils.TPAccountActions) (acco if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPAccountActions - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - accounts = append(accounts, result) + accounts = append(accounts, x.(*utils.TPAccountActions)) } if len(accounts) == 0 { @@ -446,11 +403,7 @@ func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*uti if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPResourceProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - resources = append(resources, result) + resources = append(resources, x.(*utils.TPResourceProfile)) } if len(resources) == 0 { @@ -473,11 +426,7 @@ func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPSta if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPStatProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - stats = append(stats, result) + stats = append(stats, x.(*utils.TPStatProfile)) } if len(stats) == 0 { @@ -500,11 +449,7 @@ func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TP if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPThresholdProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - ths = append(ths, result) + ths = append(ths, x.(*utils.TPThresholdProfile)) } if len(ths) == 0 { @@ -527,11 +472,7 @@ func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPF if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPFilterProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - fltrs = append(fltrs, result) + fltrs = append(fltrs, x.(*utils.TPFilterProfile)) } if len(fltrs) == 0 { @@ -554,11 +495,7 @@ func (iDB *InternalDB) GetTPSuppliers(tpid, tenant, id string) (supps []*utils.T if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPSupplierProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - supps = append(supps, result) + supps = append(supps, x.(*utils.TPSupplierProfile)) } if len(supps) == 0 { @@ -581,11 +518,7 @@ func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils. if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPAttributeProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - attrs = append(attrs, result) + attrs = append(attrs, x.(*utils.TPAttributeProfile)) } if len(attrs) == 0 { @@ -608,11 +541,7 @@ func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPC if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPChargerProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - cpps = append(cpps, result) + cpps = append(cpps, x.(*utils.TPChargerProfile)) } if len(cpps) == 0 { @@ -635,11 +564,7 @@ func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps [] if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPDispatcherProfile - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - dpps = append(dpps, result) + dpps = append(dpps, x.(*utils.TPDispatcherProfile)) } if len(dpps) == 0 { @@ -662,11 +587,7 @@ func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*ut if !ok || x == nil { return nil, utils.ErrNotFound } - var result *utils.TPDispatcherHost - if err = iDB.ms.Unmarshal(x.([]byte), &result); err != nil { - return nil, err - } - dpps = append(dpps, result) + dpps = append(dpps, x.(*utils.TPDispatcherHost)) } if len(dpps) == 0 { @@ -699,11 +620,7 @@ func (iDB *InternalDB) SetTPTimings(timings []*utils.ApierTPTiming) (err error) return nil } for _, timing := range timings { - result, err := iDB.ms.Marshal(timing) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), result, nil, + iDB.db.Set(utils.TBLTPTimings, utils.ConcatenatedKey(utils.TBLTPTimings, timing.TPid, timing.ID), timing, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -713,11 +630,7 @@ func (iDB *InternalDB) SetTPDestinations(dests []*utils.TPDestination) (err erro return nil } for _, destination := range dests { - result, err := iDB.ms.Marshal(destination) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), result, nil, + iDB.db.Set(utils.TBLTPDestinations, utils.ConcatenatedKey(utils.TBLTPDestinations, destination.TPid, destination.ID), destination, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -728,11 +641,7 @@ func (iDB *InternalDB) SetTPRates(rates []*utils.TPRate) (err error) { return nil } for _, rate := range rates { - result, err := iDB.ms.Marshal(rate) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), result, nil, + iDB.db.Set(utils.TBLTPRates, utils.ConcatenatedKey(utils.TBLTPRates, rate.TPid, rate.ID), rate, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -743,11 +652,7 @@ func (iDB *InternalDB) SetTPDestinationRates(dRates []*utils.TPDestinationRate) return nil } for _, dRate := range dRates { - result, err := iDB.ms.Marshal(dRate) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), result, nil, + iDB.db.Set(utils.TBLTPDestinationRates, utils.ConcatenatedKey(utils.TBLTPDestinationRates, dRate.TPid, dRate.ID), dRate, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -758,11 +663,7 @@ func (iDB *InternalDB) SetTPRatingPlans(ratingPlans []*utils.TPRatingPlan) (err return nil } for _, rPlan := range ratingPlans { - result, err := iDB.ms.Marshal(rPlan) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), result, nil, + iDB.db.Set(utils.TBLTPRatingPlans, utils.ConcatenatedKey(utils.TBLTPRatingPlans, rPlan.TPid, rPlan.ID), rPlan, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -773,12 +674,8 @@ func (iDB *InternalDB) SetTPRatingProfiles(ratingProfiles []*utils.TPRatingProfi return nil } for _, rProfile := range ratingProfiles { - result, err := iDB.ms.Marshal(rProfile) - if err != nil { - return err - } iDB.db.Set(utils.TBLTPRateProfiles, utils.ConcatenatedKey(utils.TBLTPRateProfiles, rProfile.TPid, - rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), result, nil, + rProfile.LoadId, rProfile.Tenant, rProfile.Category, rProfile.Subject), rProfile, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -789,11 +686,7 @@ func (iDB *InternalDB) SetTPSharedGroups(groups []*utils.TPSharedGroups) (err er return nil } for _, group := range groups { - result, err := iDB.ms.Marshal(group) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), result, nil, + iDB.db.Set(utils.TBLTPSharedGroups, utils.ConcatenatedKey(utils.TBLTPSharedGroups, group.TPid, group.ID), group, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -804,11 +697,7 @@ func (iDB *InternalDB) SetTPActions(acts []*utils.TPActions) (err error) { return nil } for _, action := range acts { - result, err := iDB.ms.Marshal(action) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), result, nil, + iDB.db.Set(utils.TBLTPActions, utils.ConcatenatedKey(utils.TBLTPActions, action.TPid, action.ID), action, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -819,11 +708,7 @@ func (iDB *InternalDB) SetTPActionPlans(aPlans []*utils.TPActionPlan) (err error return nil } for _, aPlan := range aPlans { - result, err := iDB.ms.Marshal(aPlan) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), result, nil, + iDB.db.Set(utils.TBLTPActionPlans, utils.ConcatenatedKey(utils.TBLTPActionPlans, aPlan.TPid, aPlan.ID), aPlan, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -834,11 +719,7 @@ func (iDB *InternalDB) SetTPActionTriggers(aTriggers []*utils.TPActionTriggers) return nil } for _, aTrigger := range aTriggers { - result, err := iDB.ms.Marshal(aTrigger) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), result, nil, + iDB.db.Set(utils.TBLTPActionTriggers, utils.ConcatenatedKey(utils.TBLTPActionTriggers, aTrigger.TPid, aTrigger.ID), aTrigger, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -849,12 +730,8 @@ func (iDB *InternalDB) SetTPAccountActions(accActions []*utils.TPAccountActions) return nil } for _, accAction := range accActions { - result, err := iDB.ms.Marshal(accAction) - if err != nil { - return err - } iDB.db.Set(utils.TBLTPAccountActions, utils.ConcatenatedKey(utils.TBLTPAccountActions, accAction.TPid, - accAction.LoadId, accAction.Tenant, accAction.Account), result, nil, + accAction.LoadId, accAction.Tenant, accAction.Account), accAction, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -865,11 +742,7 @@ func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err return nil } for _, resource := range resources { - result, err := iDB.ms.Marshal(resource) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), result, nil, + iDB.db.Set(utils.TBLTPResources, utils.ConcatenatedKey(utils.TBLTPResources, resource.TPid, resource.Tenant, resource.ID), resource, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -879,11 +752,7 @@ func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) { return nil } for _, stat := range stats { - result, err := iDB.ms.Marshal(stat) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), result, nil, + iDB.db.Set(utils.TBLTPStats, utils.ConcatenatedKey(utils.TBLTPStats, stat.TPid, stat.Tenant, stat.ID), stat, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -894,11 +763,7 @@ func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) ( } for _, threshold := range thresholds { - result, err := iDB.ms.Marshal(threshold) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), result, nil, + iDB.db.Set(utils.TBLTPThresholds, utils.ConcatenatedKey(utils.TBLTPThresholds, threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -909,11 +774,7 @@ func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error } for _, filter := range filters { - result, err := iDB.ms.Marshal(filter) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), result, nil, + iDB.db.Set(utils.TBLTPFilters, utils.ConcatenatedKey(utils.TBLTPFilters, filter.TPid, filter.Tenant, filter.ID), filter, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -924,11 +785,7 @@ func (iDB *InternalDB) SetTPSuppliers(suppliers []*utils.TPSupplierProfile) (err return nil } for _, supplier := range suppliers { - result, err := iDB.ms.Marshal(supplier) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), result, nil, + iDB.db.Set(utils.TBLTPSuppliers, utils.ConcatenatedKey(utils.TBLTPSuppliers, supplier.TPid, supplier.Tenant, supplier.ID), supplier, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -940,11 +797,7 @@ func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) ( } for _, attribute := range attributes { - result, err := iDB.ms.Marshal(attribute) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), result, nil, + iDB.db.Set(utils.TBLTPAttributes, utils.ConcatenatedKey(utils.TBLTPAttributes, attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -955,11 +808,7 @@ func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) } for _, cpp := range cpps { - result, err := iDB.ms.Marshal(cpp) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), result, nil, + iDB.db.Set(utils.TBLTPChargers, utils.ConcatenatedKey(utils.TBLTPChargers, cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -970,11 +819,7 @@ func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile } for _, dpp := range dpps { - result, err := iDB.ms.Marshal(dpp) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), result, nil, + iDB.db.Set(utils.TBLTPDispatchers, utils.ConcatenatedKey(utils.TBLTPDispatchers, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return @@ -984,11 +829,7 @@ func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err return nil } for _, dpp := range dpps { - result, err := iDB.ms.Marshal(dpp) - if err != nil { - return err - } - iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), result, nil, + iDB.db.Set(utils.TBLTPDispatcherHosts, utils.ConcatenatedKey(utils.TBLTPDispatcherHosts, dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, cacheCommit(utils.NonTransactional), utils.NonTransactional) } return diff --git a/migrator/storage_map_datadb.go b/migrator/storage_map_datadb.go index 496a2f74d..aa5ae5b88 100755 --- a/migrator/storage_map_datadb.go +++ b/migrator/storage_map_datadb.go @@ -25,7 +25,7 @@ import ( type mapMigrator struct { dm *engine.DataManager - mp *engine.MapStorage + iDB *engine.InternalDB dataKeys []string qryIdx *int } @@ -33,7 +33,7 @@ type mapMigrator struct { func newMapMigrator(dm *engine.DataManager) (mM *mapMigrator) { return &mapMigrator{ dm: dm, - mp: dm.DataDB().(*engine.MapStorage), + iDB: dm.DataDB().(*engine.InternalDB), } }