diff --git a/apier/v1/filters.go b/apier/v1/filters.go index 96ced8c3c..d93d53774 100644 --- a/apier/v1/filters.go +++ b/apier/v1/filters.go @@ -28,7 +28,7 @@ func (self *ApierV1) SetFilter(attrs *engine.Filter, reply *string) error { if missing := utils.MissingStructFields(attrs, []string{"Tenant", "ID"}); len(missing) != 0 { return utils.NewErrMandatoryIeMissing(missing...) } - if err := self.DataManager.DataDB().SetFilter(attrs); err != nil { + if err := self.DataManager.SetFilter(attrs); err != nil { return utils.APIErrorHandler(err) } *reply = utils.OK diff --git a/engine/datamanager.go b/engine/datamanager.go index 3d28888e8..4d88ffa1c 100644 --- a/engine/datamanager.go +++ b/engine/datamanager.go @@ -99,3 +99,7 @@ func (dm *DataManager) GetFilter(tenant, id string, skipCache bool, transactionI cache.Set(key, fltr, cacheCommit(transactionID), transactionID) return } + +func (dm *DataManager) SetFilter(fltr *Filter) (err error) { + return dm.DataDB().SetFilterDrv(fltr) +} diff --git a/engine/onstor_it_test.go b/engine/onstor_it_test.go index 09152d539..b15486775 100644 --- a/engine/onstor_it_test.go +++ b/engine/onstor_it_test.go @@ -2139,7 +2139,7 @@ func testOnStorITCRUDFilter(t *testing.T) { if _, rcvErr := onStor.GetFilter("cgrates.org", "Filter1", true, utils.NonTransactional); rcvErr != nil && rcvErr != utils.ErrNotFound { t.Error(rcvErr) } - if err := onStor.DataDB().SetFilter(fp); err != nil { + if err := onStor.SetFilter(fp); err != nil { t.Error(err) } if rcv, err := onStor.GetFilter("cgrates.org", "Filter1", true, utils.NonTransactional); err != nil { diff --git a/engine/storage_interface.go b/engine/storage_interface.go index b1fea3039..f9ca1e1e8 100755 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -127,7 +127,7 @@ type DataDB interface { SetThreshold(*Threshold) error RemoveThreshold(string, string, string) error GetFilterDrv(string, string) (*Filter, error) - SetFilter(*Filter) error + SetFilterDrv(*Filter) error RemoveFilter(string, string, string) error // CacheDataFromDB loads data to cache, prefix represents the cache prefix, IDs should be nil if all available data should be loaded CacheDataFromDB(prefix string, IDs []string, mustBeCached bool) error // ToDo: Move this to dataManager diff --git a/engine/storage_map.go b/engine/storage_map.go index d4985afc5..653f64483 100755 --- a/engine/storage_map.go +++ b/engine/storage_map.go @@ -1680,7 +1680,7 @@ func (ms *MapStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) { return } -func (ms *MapStorage) SetFilter(r *Filter) (err error) { +func (ms *MapStorage) SetFilterDrv(r *Filter) (err error) { ms.mu.Lock() defer ms.mu.Unlock() result, err := ms.ms.Marshal(r) diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index fd8a39de0..860e05d16 100755 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -2265,7 +2265,7 @@ func (ms *MongoStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) { return } -func (ms *MongoStorage) SetFilter(r *Filter) (err error) { +func (ms *MongoStorage) SetFilterDrv(r *Filter) (err error) { session, col := ms.conn(colFlt) defer session.Close() _, err = col.Upsert(bson.M{"tenant": r.Tenant, "id": r.ID}, r) diff --git a/engine/storage_redis.go b/engine/storage_redis.go index 68933c1ee..c7f9231aa 100755 --- a/engine/storage_redis.go +++ b/engine/storage_redis.go @@ -1791,7 +1791,7 @@ func (rs *RedisStorage) GetFilterDrv(tenant, id string) (r *Filter, err error) { return } -func (rs *RedisStorage) SetFilter(r *Filter) (err error) { +func (rs *RedisStorage) SetFilterDrv(r *Filter) (err error) { result, err := rs.ms.Marshal(r) if err != nil { return err diff --git a/engine/tp_reader.go b/engine/tp_reader.go index 10323ac5d..eb669ed94 100755 --- a/engine/tp_reader.go +++ b/engine/tp_reader.go @@ -153,10 +153,10 @@ func (tpr *TpReader) LoadDestinationsFiltered(tag string) (bool, error) { for _, tpDst := range tpDests { dst := NewDestinationFromTPDestination(tpDst) // ToDo: Fix transactions at onlineDB level - if err = tpr.dataStorage.SetDestination(dst, transID); err != nil { + if err = tpr.dm.DataDB().SetDestination(dst, transID); err != nil { cache.RollbackTransaction(transID) } - if err = tpr.dataStorage.SetReverseDestination(dst, transID); err != nil { + if err = tpr.dm.DataDB().SetReverseDestination(dst, transID); err != nil { cache.RollbackTransaction(transID) } } @@ -239,7 +239,7 @@ func (tpr *TpReader) LoadDestinationRates() (err error) { _, destinationExists = tpr.destinations[dr.DestinationId] } if !destinationExists && tpr.dataStorage != nil { - if destinationExists, err = tpr.dataStorage.HasData(utils.DESTINATION_PREFIX, dr.DestinationId); err != nil { + if destinationExists, err = tpr.dm.DataDB().HasData(utils.DESTINATION_PREFIX, dr.DestinationId); err != nil { return err } } @@ -308,7 +308,7 @@ func (tpr *TpReader) LoadRatingPlansFiltered(tag string) (bool, error) { } destsExist := len(dms) != 0 if !destsExist && tpr.dataStorage != nil { - if dbExists, err := tpr.dataStorage.HasData(utils.DESTINATION_PREFIX, drate.DestinationId); err != nil { + if dbExists, err := tpr.dm.DataDB().HasData(utils.DESTINATION_PREFIX, drate.DestinationId); err != nil { return false, err } else if dbExists { destsExist = true @@ -319,12 +319,12 @@ func (tpr *TpReader) LoadRatingPlansFiltered(tag string) (bool, error) { return false, fmt.Errorf("could not get destination for tag %v", drate.DestinationId) } for _, destination := range dms { - tpr.dataStorage.SetDestination(destination, utils.NonTransactional) - tpr.dataStorage.SetReverseDestination(destination, utils.NonTransactional) + tpr.dm.DataDB().SetDestination(destination, utils.NonTransactional) + tpr.dm.DataDB().SetReverseDestination(destination, utils.NonTransactional) } } } - if err := tpr.dataStorage.SetRatingPlan(ratingPlan, utils.NonTransactional); err != nil { + if err := tpr.dm.DataDB().SetRatingPlan(ratingPlan, utils.NonTransactional); err != nil { return false, err } } @@ -381,7 +381,7 @@ func (tpr *TpReader) LoadRatingProfilesFiltered(qriedRpf *utils.TPRatingProfile) } _, exists := tpr.ratingPlans[tpRa.RatingPlanId] if !exists && tpr.dataStorage != nil { - if exists, err = tpr.dataStorage.HasData(utils.RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if exists, err = tpr.dm.DataDB().HasData(utils.RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err } } @@ -396,7 +396,7 @@ func (tpr *TpReader) LoadRatingProfilesFiltered(qriedRpf *utils.TPRatingProfile) CdrStatQueueIds: strings.Split(tpRa.CdrStatQueueIds, utils.INFIELD_SEP), }) } - if err := tpr.dataStorage.SetRatingProfile(resultRatingProfile, utils.NonTransactional); err != nil { + if err := tpr.dm.DataDB().SetRatingProfile(resultRatingProfile, utils.NonTransactional); err != nil { return err } } @@ -421,7 +421,7 @@ func (tpr *TpReader) LoadRatingProfiles() (err error) { } _, exists := tpr.ratingPlans[tpRa.RatingPlanId] if !exists && tpr.dataStorage != nil { // Only query if there is a connection, eg on dry run there is none - if exists, err = tpr.dataStorage.HasData(utils.RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { + if exists, err = tpr.dm.DataDB().HasData(utils.RATING_PLAN_PREFIX, tpRa.RatingPlanId); err != nil { return err } } @@ -465,7 +465,7 @@ func (tpr *TpReader) LoadSharedGroupsFiltered(tag string, save bool) (err error) } if save { for _, sg := range tpr.sharedGroups { - if err := tpr.dataStorage.SetSharedGroup(sg, utils.NonTransactional); err != nil { + if err := tpr.dm.DataDB().SetSharedGroup(sg, utils.NonTransactional); err != nil { return err } } @@ -495,7 +495,7 @@ func (tpr *TpReader) LoadLCRs() (err error) { } } if !found && tpr.dataStorage != nil { - if keys, err := tpr.dataStorage.GetKeysForPrefix(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey); err != nil { + if keys, err := tpr.dm.DataDB().GetKeysForPrefix(utils.RATING_PROFILE_PREFIX + ratingProfileSearchKey); err != nil { return fmt.Errorf("[LCR] error querying dataDb %s", err.Error()) } else if len(keys) != 0 { found = true @@ -509,7 +509,7 @@ func (tpr *TpReader) LoadLCRs() (err error) { if rule.DestinationId != "" && rule.DestinationId != utils.ANY { _, found := tpr.destinations[rule.DestinationId] if !found && tpr.dataStorage != nil { - if found, err = tpr.dataStorage.HasData(utils.DESTINATION_PREFIX, rule.DestinationId); err != nil { + if found, err = tpr.dm.DataDB().HasData(utils.DESTINATION_PREFIX, rule.DestinationId); err != nil { return fmt.Errorf("[LCR] error querying dataDb %s", err.Error()) } } @@ -675,7 +675,7 @@ func (tpr *TpReader) LoadActionPlans() (err error) { _, exists := tpr.actions[at.ActionsId] if !exists && tpr.dataStorage != nil { - if exists, err = tpr.dataStorage.HasData(utils.ACTION_PREFIX, at.ActionsId); err != nil { + if exists, err = tpr.dm.DataDB().HasData(utils.ACTION_PREFIX, at.ActionsId); err != nil { return fmt.Errorf("[ActionPlans] Error querying actions: %v - %s", at.ActionsId, err.Error()) } } @@ -829,7 +829,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) if accountAction.ActionPlanId != "" { // get old userBalanceIds exitingAccountIds := make(utils.StringMap) - existingActionPlan, err := tpr.dataStorage.GetActionPlan(accountAction.ActionPlanId, true, utils.NonTransactional) + existingActionPlan, err := tpr.dm.DataDB().GetActionPlan(accountAction.ActionPlanId, true, utils.NonTransactional) if err == nil && existingActionPlan != nil { exitingAccountIds = existingActionPlan.AccountIDs } @@ -899,20 +899,20 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) AccountID: accID, ActionsID: at.ActionsID, } - if err = tpr.dataStorage.PushTask(t); err != nil { + if err = tpr.dm.DataDB().PushTask(t); err != nil { return err } } } } // write action plan - if err = tpr.dataStorage.SetActionPlan(accountAction.ActionPlanId, actionPlan, false, utils.NonTransactional); err != nil { + if err = tpr.dm.DataDB().SetActionPlan(accountAction.ActionPlanId, actionPlan, false, utils.NonTransactional); err != nil { return errors.New(err.Error() + " (SetActionPlan): " + accountAction.ActionPlanId) } - if err = tpr.dataStorage.SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { + if err = tpr.dm.DataDB().SetAccountActionPlans(id, []string{accountAction.ActionPlanId}, false); err != nil { return err } - if err = tpr.dataStorage.CacheDataFromDB(utils.AccountActionPlansPrefix, []string{id}, true); err != nil { + if err = tpr.dm.DataDB().CacheDataFromDB(utils.AccountActionPlansPrefix, []string{id}, true); err != nil { return err } } @@ -1012,7 +1012,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) actionIDs = append(actionIDs, atr.ActionsID) } // write action triggers - err = tpr.dataStorage.SetActionTriggers(accountAction.ActionTriggersId, actionTriggers, utils.NonTransactional) + err = tpr.dm.DataDB().SetActionTriggers(accountAction.ActionTriggersId, actionTriggers, utils.NonTransactional) if err != nil { return errors.New(err.Error() + " (SetActionTriggers): " + accountAction.ActionTriggersId) } @@ -1124,12 +1124,12 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) } // write actions for k, as := range facts { - err = tpr.dataStorage.SetActions(k, as, utils.NonTransactional) + err = tpr.dm.DataDB().SetActions(k, as, utils.NonTransactional) if err != nil { return err } } - ub, err := tpr.dataStorage.GetAccount(id) + ub, err := tpr.dm.DataDB().GetAccount(id) if err != nil { ub = &Account{ ID: id, @@ -1138,7 +1138,7 @@ func (tpr *TpReader) LoadAccountActionsFiltered(qriedAA *utils.TPAccountActions) ub.ActionTriggers = actionTriggers // init counters ub.InitCounters() - if err := tpr.dataStorage.SetAccount(ub); err != nil { + if err := tpr.dm.DataDB().SetAccount(ub); err != nil { return err } } @@ -1224,7 +1224,7 @@ func (tpr *TpReader) LoadDerivedChargersFiltered(filter *utils.TPDerivedChargers } if save { for dcsKey, dcs := range tpr.derivedChargers { - if err := tpr.dataStorage.SetDerivedChargers(dcsKey, dcs, utils.NonTransactional); err != nil { + if err := tpr.dm.DataDB().SetDerivedChargers(dcsKey, dcs, utils.NonTransactional); err != nil { return err } } @@ -1352,7 +1352,7 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) { return fmt.Errorf("could not get action triggers for cdr stats id %s: %s", cs.Id, triggerTag) } // write action triggers - err = tpr.dataStorage.SetActionTriggers(triggerTag, triggers, utils.NonTransactional) + err = tpr.dm.DataDB().SetActionTriggers(triggerTag, triggers, utils.NonTransactional) if err != nil { return errors.New(err.Error() + " (SetActionTriggers): " + triggerTag) } @@ -1453,13 +1453,13 @@ func (tpr *TpReader) LoadCdrStatsFiltered(tag string, save bool) (err error) { if save { // write actions for k, as := range tpr.actions { - err = tpr.dataStorage.SetActions(k, as, utils.NonTransactional) + err = tpr.dm.DataDB().SetActions(k, as, utils.NonTransactional) if err != nil { return err } } for _, stat := range tpr.cdrStats { - if err := tpr.dataStorage.SetCdrStats(stat); err != nil { + if err := tpr.dm.DataDB().SetCdrStats(stat); err != nil { return err } } @@ -1485,7 +1485,7 @@ func (tpr *TpReader) LoadUsersFiltered(filter *utils.TPUsers) (bool, error) { for _, up := range tpUser.Profile { user.Profile[up.AttrName] = up.AttrValue } - tpr.dataStorage.SetUser(user) + tpr.dm.DataDB().SetUser(user) } return len(tpUsers) > 0, err } @@ -1546,8 +1546,8 @@ func (tpr *TpReader) LoadAliasesFiltered(filter *utils.TPAliases) (bool, error) } } - tpr.dataStorage.SetAlias(alias, utils.NonTransactional) - tpr.dataStorage.SetReverseAlias(alias, utils.NonTransactional) + tpr.dm.DataDB().SetAlias(alias, utils.NonTransactional) + tpr.dm.DataDB().SetReverseAlias(alias, utils.NonTransactional) return len(tpAliases) > 0, err } @@ -1615,7 +1615,7 @@ func (tpr *TpReader) LoadResourceProfilesFiltered(tag string) error { for tenant, mpID := range mapRsPfls { for id := range mpID { rTid := &utils.TenantID{tenant, id} - if has, err := tpr.dataStorage.HasData(utils.ResourcesPrefix, rTid.TenantID()); err != nil { + if has, err := tpr.dm.DataDB().HasData(utils.ResourcesPrefix, rTid.TenantID()); err != nil { return err } else if !has { tpr.resources = append(tpr.resources, rTid) @@ -1645,7 +1645,7 @@ func (tpr *TpReader) LoadStatsFiltered(tag string) error { for tenant, mpID := range mapSTs { for sqID := range mpID { sqTntID := &utils.TenantID{tenant, sqID} - if has, err := tpr.dataStorage.HasData(utils.StatQueuePrefix, sqTntID.TenantID()); err != nil { + if has, err := tpr.dm.DataDB().HasData(utils.StatQueuePrefix, sqTntID.TenantID()); err != nil { return err } else if !has { tpr.statQueues = append(tpr.statQueues, sqTntID) @@ -1675,7 +1675,7 @@ func (tpr *TpReader) LoadThresholdsFiltered(tag string) error { for tenant, mpID := range mapTHs { for thID := range mpID { thTntID := &utils.TenantID{Tenant: tenant, ID: thID} - if has, err := tpr.dataStorage.HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil { + if has, err := tpr.dm.DataDB().HasData(utils.ThresholdPrefix, thTntID.TenantID()); err != nil { return err } else if !has { tpr.thresholds = append(tpr.thresholds, thTntID) @@ -1705,7 +1705,7 @@ func (tpr *TpReader) LoadFilterFiltered(tag string) error { for tenant, mpID := range mapTHs { for thID := range mpID { thTntID := &utils.TenantID{Tenant: tenant, ID: thID} - if has, err := tpr.dataStorage.HasData(utils.FilterPrefix, thTntID.TenantID()); err != nil { + if has, err := tpr.dm.DataDB().HasData(utils.FilterPrefix, thTntID.TenantID()); err != nil { return err } else if !has { tpr.filters = append(tpr.filters, thTntID) @@ -1807,13 +1807,13 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err return errors.New("no database connection") } if flush { // ToDo - //tpr.dataStorage.Flush("") + //tpr.dm.DataDB().Flush("") } if verbose { log.Print("Destinations:") } for _, d := range tpr.destinations { - err = tpr.dataStorage.SetDestination(d, utils.NonTransactional) + err = tpr.dm.DataDB().SetDestination(d, utils.NonTransactional) if err != nil { return err } @@ -1831,7 +1831,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Rating Plans:") } for _, rp := range tpr.ratingPlans { - err = tpr.dataStorage.SetRatingPlan(rp, utils.NonTransactional) + err = tpr.dm.DataDB().SetRatingPlan(rp, utils.NonTransactional) if err != nil { return err } @@ -1843,7 +1843,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Rating Profiles:") } for _, rp := range tpr.ratingProfiles { - err = tpr.dataStorage.SetRatingProfile(rp, utils.NonTransactional) + err = tpr.dm.DataDB().SetRatingProfile(rp, utils.NonTransactional) if err != nil { return err } @@ -1866,7 +1866,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Println("\tTask: ", t) } - if err = tpr.dataStorage.PushTask(t); err != nil { + if err = tpr.dm.DataDB().PushTask(t); err != nil { return err } } @@ -1878,13 +1878,13 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Println("\tTask: ", t) } - if err = tpr.dataStorage.PushTask(t); err != nil { + if err = tpr.dm.DataDB().PushTask(t); err != nil { return err } } } } - err = tpr.dataStorage.SetActionPlan(k, ap, false, utils.NonTransactional) + err = tpr.dm.DataDB().SetActionPlan(k, ap, false, utils.NonTransactional) if err != nil { return err } @@ -1902,7 +1902,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Action Triggers:") } for k, atrs := range tpr.actionsTriggers { - err = tpr.dataStorage.SetActionTriggers(k, atrs, utils.NonTransactional) + err = tpr.dm.DataDB().SetActionTriggers(k, atrs, utils.NonTransactional) if err != nil { return err } @@ -1914,7 +1914,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Shared Groups:") } for k, sg := range tpr.sharedGroups { - err = tpr.dataStorage.SetSharedGroup(sg, utils.NonTransactional) + err = tpr.dm.DataDB().SetSharedGroup(sg, utils.NonTransactional) if err != nil { return err } @@ -1926,7 +1926,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("LCR Rules:") } for k, lcr := range tpr.lcrs { - err = tpr.dataStorage.SetLCR(lcr, utils.NonTransactional) + err = tpr.dm.DataDB().SetLCR(lcr, utils.NonTransactional) if err != nil { return err } @@ -1938,7 +1938,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Actions:") } for k, as := range tpr.actions { - err = tpr.dataStorage.SetActions(k, as, utils.NonTransactional) + err = tpr.dm.DataDB().SetActions(k, as, utils.NonTransactional) if err != nil { return err } @@ -1950,7 +1950,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Account Actions:") } for _, ub := range tpr.accountActions { - err = tpr.dataStorage.SetAccount(ub) + err = tpr.dm.DataDB().SetAccount(ub) if err != nil { return err } @@ -1962,7 +1962,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Derived Chargers:") } for key, dcs := range tpr.derivedChargers { - err = tpr.dataStorage.SetDerivedChargers(key, dcs, utils.NonTransactional) + err = tpr.dm.DataDB().SetDerivedChargers(key, dcs, utils.NonTransactional) if err != nil { return err } @@ -1974,7 +1974,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("CDR Stats Queues:") } for _, sq := range tpr.cdrStats { - err = tpr.dataStorage.SetCdrStats(sq) + err = tpr.dm.DataDB().SetCdrStats(sq) if err != nil { return err } @@ -1986,7 +1986,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Users:") } for _, u := range tpr.users { - err = tpr.dataStorage.SetUser(u) + err = tpr.dm.DataDB().SetUser(u) if err != nil { return err } @@ -1998,7 +1998,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Aliases:") } for _, al := range tpr.aliases { - err = tpr.dataStorage.SetAlias(al, utils.NonTransactional) + err = tpr.dm.DataDB().SetAlias(al, utils.NonTransactional) if err != nil { return err } @@ -2021,7 +2021,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dataStorage.SetResourceProfile(rsp); err != nil { + if err = tpr.dm.DataDB().SetResourceProfile(rsp); err != nil { return err } if verbose { @@ -2033,7 +2033,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Resources:") } for _, rTid := range tpr.resources { - if err = tpr.dataStorage.SetResource(&Resource{Tenant: rTid.Tenant, ID: rTid.ID, Usages: make(map[string]*ResourceUsage)}); err != nil { + if err = tpr.dm.DataDB().SetResource(&Resource{Tenant: rTid.Tenant, ID: rTid.ID, Usages: make(map[string]*ResourceUsage)}); err != nil { return } if verbose { @@ -2049,7 +2049,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dataStorage.SetStatQueueProfile(st); err != nil { + if err = tpr.dm.DataDB().SetStatQueueProfile(st); err != nil { return err } if verbose { @@ -2086,7 +2086,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dataStorage.SetThresholdProfile(th); err != nil { + if err = tpr.dm.DataDB().SetThresholdProfile(th); err != nil { return err } if verbose { @@ -2098,7 +2098,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Thresholds:") } for _, thd := range tpr.thresholds { - if err = tpr.dataStorage.SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + if err = tpr.dm.DataDB().SetThreshold(&Threshold{Tenant: thd.Tenant, ID: thd.ID}); err != nil { return err } if verbose { @@ -2114,7 +2114,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if err != nil { return err } - if err = tpr.dataStorage.SetFilter(th); err != nil { + if err = tpr.dm.SetFilter(th); err != nil { return err } if verbose { @@ -2126,7 +2126,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Filters:") } for _, thd := range tpr.filters { - if err = tpr.dataStorage.SetFilter(&Filter{Tenant: thd.Tenant, ID: thd.ID}); err != nil { + if err = tpr.dm.SetFilter(&Filter{Tenant: thd.Tenant, ID: thd.ID}); err != nil { return err } if verbose { @@ -2137,7 +2137,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err log.Print("Timings:") } for _, t := range tpr.timings { - if err = tpr.dataStorage.SetTiming(t, utils.NonTransactional); err != nil { + if err = tpr.dm.DataDB().SetTiming(t, utils.NonTransactional); err != nil { return err } if verbose { @@ -2149,7 +2149,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("Rebuilding reverse destinations") } - if err = tpr.dataStorage.RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { + if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.REVERSE_DESTINATION_PREFIX); err != nil { return err } } @@ -2157,7 +2157,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("Rebuilding account action plans") } - if err = tpr.dataStorage.RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { + if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.AccountActionPlansPrefix); err != nil { return err } } @@ -2165,7 +2165,7 @@ func (tpr *TpReader) WriteToDatabase(flush, verbose, disable_reverse bool) (err if verbose { log.Print("Rebuilding reverse aliases") } - if err = tpr.dataStorage.RebuildReverseForPrefix(utils.REVERSE_ALIASES_PREFIX); err != nil { + if err = tpr.dm.DataDB().RebuildReverseForPrefix(utils.REVERSE_ALIASES_PREFIX); err != nil { return err } }