Start Adding InternalStorage instead of MapStorage

This commit is contained in:
TeoV
2019-09-26 18:35:19 +03:00
committed by Dan Christian Bogos
parent 67544007a4
commit a6eb92d448
8 changed files with 565 additions and 6 deletions

View File

@@ -73,6 +73,13 @@ func TestFilterITMongo(t *testing.T) {
}
}
func TestFilterITInternal(t *testing.T) {
filterConfigDIR = "tutinternal"
for _, stest := range sTestsFilter {
t.Run(filterConfigDIR, stest)
}
}
func testFilterInitCfg(t *testing.T) {
var err error
filterCfgPath = path.Join(filterDataDir, "conf", "samples", filterConfigDIR)

View File

@@ -74,6 +74,13 @@ func TestTPChrgsITMongo(t *testing.T) {
}
}
func TestTPChrgsITMapStorage(t *testing.T) {
tpChrgsConfigDIR = "tutinternal"
for _, stest := range sTestsTPChrgs {
t.Run(tpChrgsConfigDIR, stest)
}
}
func testTPChrgsInitCfg(t *testing.T) {
var err error
tpChrgsCfgPath = path.Join(tpChrgsDataDir, "conf", "samples", tpChrgsConfigDIR)

View File

@@ -0,0 +1,122 @@
{
// CGRateS Configuration file
//
"general": {
"log_level": 7,
"reply_timeout": "50s",
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "*internal", // data_db type: <redis|mongo>
},
"stor_db": {
"db_type": "*internal",
},
"rals": {
"enabled": true,
"thresholds_conns": [
{"address": "*internal"},
],
},
"scheduler": {
"enabled": true,
"cdrs_conns": [
{"address": "*internal"},
],
},
"cdrs": {
"enabled": true,
"chargers_conns":[
{"address": "*internal"},
],
},
"attributes": {
"enabled": true,
},
"chargers": {
"enabled": true,
"attributes_conns": [
{"address": "*internal"},
],
},
"resources": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},
"stats": {
"enabled": true,
"store_interval": "1s",
"thresholds_conns": [
{"address": "*internal"}
],
},
"thresholds": {
"enabled": true,
"store_interval": "1s",
},
"suppliers": {
"enabled": true,
"prefix_indexed_fields":["Destination"],
"stats_conns": [
{"address": "*internal"},
],
"resources_conns": [
{"address": "*internal"},
],
},
"sessions": {
"enabled": true,
"suppliers_conns": [
{"address": "*internal"}
],
"resources_conns": [
{"address": "*internal"}
],
"attributes_conns": [
{"address": "*internal"}
],
"rals_conns": [
{"address": "*internal"},
],
"cdrs_conns": [
{"address": "*internal"}
],
"chargers_conns": [
{"address": "*internal"},
],
},
}

View File

@@ -0,0 +1,385 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/ltcache"
)
type IntenalStorage struct {
tasks [][]byte
ms Marshaler
cache *ltcache.TransCache
}
func NewInternalStorage() *IntenalStorage {
return &IntenalStorage{cache: ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig()),
ms: NewCodecMsgpackMarshaler()}
}
func NewInternalStorageJson() (internalStorage *IntenalStorage) {
internalStorage = NewInternalStorage()
internalStorage.ms = new(JSONBufMarshaler)
return
}
func (is *IntenalStorage) Close() {}
func (is *IntenalStorage) Flush(_ string) error {
is.cache = ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig())
return nil
}
func (is *IntenalStorage) Marshaler() Marshaler {
return is.ms
}
func (is *IntenalStorage) SelectDatabase(dbName string) (err error) {
return nil
}
func (is *IntenalStorage) GetKeysForPrefix(string) ([]string, error) {
// keysForPrefix := make([]string, 0)
// for key := range ms.dict {
// if strings.HasPrefix(key, prefix) {
// keysForPrefix = append(keysForPrefix, key)
// }
// }
// is.cache.GetItemIDs(chID, prfx)
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) RebuildReverseForPrefix(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveReverseForPrefix(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetVersions(itm string) (vrs Versions, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetVersions(vrs Versions, overwrite bool) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveVersions(vrs Versions) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetStorageType() string {
return utils.INTERNAL
}
func (is *IntenalStorage) IsDBEmpty() (resp bool, err error) {
return false, utils.ErrNotImplemented
}
func (is *IntenalStorage) HasDataDrv(string, string, string) (bool, error) {
return false, utils.ErrNotImplemented
}
func (is *IntenalStorage) GetRatingPlanDrv(string) (*RatingPlan, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetRatingPlanDrv(*RatingPlan) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveRatingPlanDrv(key string) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetRatingProfileDrv(string) (*RatingProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetRatingProfileDrv(*RatingProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveRatingProfileDrv(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetDestination(string, bool, string) (*Destination, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetDestination(*Destination, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveDestination(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) SetReverseDestination(*Destination, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetReverseDestination(string, bool, string) ([]string, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) UpdateReverseDestination(*Destination, *Destination, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetActionsDrv(string) (Actions, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetActionsDrv(string, Actions) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveActionsDrv(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetSharedGroupDrv(string) (*SharedGroup, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetSharedGroupDrv(*SharedGroup) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveSharedGroupDrv(id, transactionID string) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetActionTriggersDrv(string) (ActionTriggers, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetActionTriggersDrv(string, ActionTriggers) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveActionTriggersDrv(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetActionPlan(string, bool, string) (*ActionPlan, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetActionPlan(string, *ActionPlan, bool, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveActionPlan(key string, transactionID string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetAllActionPlans() (map[string]*ActionPlan, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) GetAccountActionPlans(acntID string, skipCache bool,
transactionID string) (apIDs []string, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) PushTask(*Task) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) PopTask() (*Task, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) GetAccount(string) (*Account, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetAccount(*Account) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveAccount(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetResourceProfileDrv(string, string) (*ResourceProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetResourceProfileDrv(*ResourceProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveResourceProfileDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetResourceDrv(string, string) (*Resource, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetResourceDrv(*Resource) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveResourceDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetTimingDrv(string) (*utils.TPTiming, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetTimingDrv(*utils.TPTiming) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveTimingDrv(string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) AddLoadHistory(*utils.LoadInstance, int, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string,
fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string,
indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix,
filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) {
x, ok := is.cache.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id))
if ok && x == nil {
return nil, utils.ErrNotFound
}
return x.(*StatQueueProfile), nil
}
func (is *IntenalStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) {
is.cache.Set(utils.CacheStatQueueProfiles, sq.TenantID(), sq, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) RemStatQueueProfileDrv(tenant, id string) (err error) {
is.cache.Remove(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemStoredStatQueueDrv(tenant, id string) (err error) {
return
}
func (is *IntenalStorage) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) {
x, ok := is.cache.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id))
if ok && x == nil {
return nil, utils.ErrNotFound
}
return x.(*ThresholdProfile), nil
}
func (is *IntenalStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) {
is.cache.Set(utils.CacheThresholdProfiles, tp.TenantID(), tp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) RemThresholdProfileDrv(tenant, id string) (err error) {
is.cache.Remove(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) GetThresholdDrv(tenant, id string) (*Threshold, error) {
x, ok := is.cache.Get(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id))
if ok && x == nil {
return nil, utils.ErrNotFound
}
return x.(*Threshold), nil
}
func (is *IntenalStorage) SetThresholdDrv(th *Threshold) (err error) {
is.cache.Set(utils.CacheThresholds, th.TenantID(), th, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) RemoveThresholdDrv(tenant, id string) (err error) {
is.cache.Remove(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) GetFilterDrv(tenant, id string) (*Filter, error) {
x, ok := is.cache.Get(utils.CacheFilters, utils.ConcatenatedKey(tenant, id))
if ok && x == nil {
return nil, utils.ErrNotFound
}
return x.(*Filter), nil
}
func (is *IntenalStorage) SetFilterDrv(fltr *Filter) (err error) {
is.cache.Set(utils.CacheFilters, fltr.TenantID(), fltr, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) RemoveFilterDrv(tenant, id string) (err error) {
is.cache.Remove(utils.CacheFilters, utils.ConcatenatedKey(tenant, id),
cacheCommit(utils.NonTransactional), utils.NonTransactional)
return
}
func (is *IntenalStorage) GetSupplierProfileDrv(string, string) (*SupplierProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetSupplierProfileDrv(*SupplierProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveSupplierProfileDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetAttributeProfileDrv(string, string) (*AttributeProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetAttributeProfileDrv(*AttributeProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveAttributeProfileDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetChargerProfileDrv(string, string) (*ChargerProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetChargerProfileDrv(*ChargerProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveChargerProfileDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetDispatcherProfileDrv(string, string) (*DispatcherProfile, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetDispatcherProfileDrv(*DispatcherProfile) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveDispatcherProfileDrv(string, string) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetLoadIDsDrv(loadIDs map[string]int64) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) GetDispatcherHostDrv(string, string) (*DispatcherHost, error) {
return nil, utils.ErrNotImplemented
}
func (is *IntenalStorage) SetDispatcherHostDrv(*DispatcherHost) error {
return utils.ErrNotImplemented
}
func (is *IntenalStorage) RemoveDispatcherHostDrv(string, string) error {
return utils.ErrNotImplemented
}

View File

@@ -0,0 +1,19 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine

View File

@@ -532,8 +532,8 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) {
func (ms *MapStorage) RemoveAccount(key string) (err error) {
ms.mu.Lock()
defer ms.mu.Unlock()
delete(ms.dict, utils.ACCOUNT_PREFIX+key)
ms.mu.Unlock()
return
}

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"strings"
"github.com/cgrates/cgrates/utils"
)
@@ -29,7 +31,22 @@ func (ms *MapStorage) GetTpIds(colName string) (ids []string, err error) {
func (ms *MapStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds,
filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) {
return nil, utils.ErrNotImplemented
key := table + utils.CONCATENATED_KEY_SEP + tpid
fullIDs, _ := ms.GetKeysForPrefix(key)
for _, fullID := range fullIDs {
var buildedID string
sliceID := strings.Split(fullID[len(key)+1:], utils.CONCATENATED_KEY_SEP)
for i := 0; i < len(distinct); i++ {
if len(buildedID) == 0 {
buildedID += sliceID[len(sliceID)-i-1]
} else {
buildedID += utils.CONCATENATED_KEY_SEP + sliceID[len(sliceID)-i+1]
}
}
ids = append(ids, buildedID)
}
return
}
func (ms *MapStorage) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTiming, err error) {
@@ -503,8 +520,6 @@ func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err
if table == utils.EmptyString {
return ms.Flush(utils.EmptyString)
}
ms.mu.Lock()
defer ms.mu.Unlock()
key := table + utils.CONCATENATED_KEY_SEP + tpid
if args != nil {
for _, val := range args {
@@ -512,9 +527,11 @@ func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err
}
}
ids, _ := ms.GetKeysForPrefix(key)
ms.mu.Lock()
for _, id := range ids {
delete(ms.dict, id)
}
ms.mu.Unlock()
return
}

View File

@@ -52,9 +52,11 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin
dm = NewDataManager(d.(DataDB))
case utils.INTERNAL:
if marshaler == utils.JSON {
d, err = NewMapStorageJson()
//d, err = NewMapStorageJson()
d = NewInternalStorageJson()
} else {
d, err = NewMapStorage()
d = NewInternalStorage()
//d, err = NewMapStorage()
}
dm = NewDataManager(d.(DataDB))
default: