diff --git a/apier/v1/filters_it_test.go b/apier/v1/filters_it_test.go index 0b3f0e293..8fc575011 100644 --- a/apier/v1/filters_it_test.go +++ b/apier/v1/filters_it_test.go @@ -73,6 +73,13 @@ func TestFilterITMongo(t *testing.T) { } } +func TestFilterITInternal(t *testing.T) { + filterConfigDIR = "tutinternal" + for _, stest := range sTestsFilter { + t.Run(filterConfigDIR, stest) + } +} + func testFilterInitCfg(t *testing.T) { var err error filterCfgPath = path.Join(filterDataDir, "conf", "samples", filterConfigDIR) diff --git a/apier/v1/tpchargers_it_test.go b/apier/v1/tpchargers_it_test.go index 7a4e20ef7..fbb382422 100644 --- a/apier/v1/tpchargers_it_test.go +++ b/apier/v1/tpchargers_it_test.go @@ -74,6 +74,13 @@ func TestTPChrgsITMongo(t *testing.T) { } } +func TestTPChrgsITMapStorage(t *testing.T) { + tpChrgsConfigDIR = "tutinternal" + for _, stest := range sTestsTPChrgs { + t.Run(tpChrgsConfigDIR, stest) + } +} + func testTPChrgsInitCfg(t *testing.T) { var err error tpChrgsCfgPath = path.Join(tpChrgsDataDir, "conf", "samples", tpChrgsConfigDIR) diff --git a/data/conf/samples/tutinternal/cgrates.json b/data/conf/samples/tutinternal/cgrates.json new file mode 100644 index 000000000..79ae3adf5 --- /dev/null +++ b/data/conf/samples/tutinternal/cgrates.json @@ -0,0 +1,122 @@ +{ +// CGRateS Configuration file +// + + +"general": { + "log_level": 7, + "reply_timeout": "50s", +}, + + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "*internal", // data_db type: +}, + +"stor_db": { + "db_type": "*internal", +}, + + +"rals": { + "enabled": true, + "thresholds_conns": [ + {"address": "*internal"}, + ], +}, + + +"scheduler": { + "enabled": true, + "cdrs_conns": [ + {"address": "*internal"}, + ], +}, + + +"cdrs": { + "enabled": true, + "chargers_conns":[ + {"address": "*internal"}, + ], +}, + + +"attributes": { + "enabled": true, +}, + + +"chargers": { + "enabled": true, + "attributes_conns": [ + {"address": "*internal"}, + ], +}, + + +"resources": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], +}, + + +"stats": { + "enabled": true, + "store_interval": "1s", + "thresholds_conns": [ + {"address": "*internal"} + ], +}, + +"thresholds": { + "enabled": true, + "store_interval": "1s", +}, + + +"suppliers": { + "enabled": true, + "prefix_indexed_fields":["Destination"], + "stats_conns": [ + {"address": "*internal"}, + ], + "resources_conns": [ + {"address": "*internal"}, + ], +}, + + +"sessions": { + "enabled": true, + "suppliers_conns": [ + {"address": "*internal"} + ], + "resources_conns": [ + {"address": "*internal"} + ], + "attributes_conns": [ + {"address": "*internal"} + ], + "rals_conns": [ + {"address": "*internal"}, + ], + "cdrs_conns": [ + {"address": "*internal"} + ], + "chargers_conns": [ + {"address": "*internal"}, + ], +}, + + +} diff --git a/engine/storage_intenal_datadb.go b/engine/storage_intenal_datadb.go new file mode 100644 index 000000000..548e2320d --- /dev/null +++ b/engine/storage_intenal_datadb.go @@ -0,0 +1,385 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine + +import ( + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/ltcache" +) + +type IntenalStorage struct { + tasks [][]byte + ms Marshaler + cache *ltcache.TransCache +} + +func NewInternalStorage() *IntenalStorage { + return &IntenalStorage{cache: ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig()), + ms: NewCodecMsgpackMarshaler()} +} + +func NewInternalStorageJson() (internalStorage *IntenalStorage) { + internalStorage = NewInternalStorage() + internalStorage.ms = new(JSONBufMarshaler) + return +} + +func (is *IntenalStorage) Close() {} + +func (is *IntenalStorage) Flush(_ string) error { + is.cache = ltcache.NewTransCache(config.CgrConfig().CacheCfg().AsTransCacheConfig()) + return nil +} + +func (is *IntenalStorage) Marshaler() Marshaler { + return is.ms +} + +func (is *IntenalStorage) SelectDatabase(dbName string) (err error) { + return nil +} + +func (is *IntenalStorage) GetKeysForPrefix(string) ([]string, error) { + // keysForPrefix := make([]string, 0) + // for key := range ms.dict { + // if strings.HasPrefix(key, prefix) { + // keysForPrefix = append(keysForPrefix, key) + // } + // } + // is.cache.GetItemIDs(chID, prfx) + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) RebuildReverseForPrefix(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveReverseForPrefix(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetVersions(itm string) (vrs Versions, err error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetVersions(vrs Versions, overwrite bool) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveVersions(vrs Versions) (err error) { + return utils.ErrNotImplemented +} + +func (is *IntenalStorage) GetStorageType() string { + return utils.INTERNAL +} +func (is *IntenalStorage) IsDBEmpty() (resp bool, err error) { + return false, utils.ErrNotImplemented +} + +func (is *IntenalStorage) HasDataDrv(string, string, string) (bool, error) { + return false, utils.ErrNotImplemented +} +func (is *IntenalStorage) GetRatingPlanDrv(string) (*RatingPlan, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetRatingPlanDrv(*RatingPlan) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveRatingPlanDrv(key string) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetRatingProfileDrv(string) (*RatingProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetRatingProfileDrv(*RatingProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveRatingProfileDrv(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetDestination(string, bool, string) (*Destination, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetDestination(*Destination, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveDestination(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) SetReverseDestination(*Destination, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetReverseDestination(string, bool, string) ([]string, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) UpdateReverseDestination(*Destination, *Destination, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetActionsDrv(string) (Actions, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetActionsDrv(string, Actions) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveActionsDrv(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetSharedGroupDrv(string) (*SharedGroup, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetSharedGroupDrv(*SharedGroup) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveSharedGroupDrv(id, transactionID string) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetActionTriggersDrv(string) (ActionTriggers, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetActionTriggersDrv(string, ActionTriggers) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveActionTriggersDrv(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetActionPlan(string, bool, string) (*ActionPlan, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetActionPlan(string, *ActionPlan, bool, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveActionPlan(key string, transactionID string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetAllActionPlans() (map[string]*ActionPlan, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) GetAccountActionPlans(acntID string, skipCache bool, + transactionID string) (apIDs []string, err error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetAccountActionPlans(acntID string, apIDs []string, overwrite bool) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemAccountActionPlans(acntID string, apIDs []string) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) PushTask(*Task) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) PopTask() (*Task, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) GetAccount(string) (*Account, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetAccount(*Account) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveAccount(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetResourceProfileDrv(string, string) (*ResourceProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetResourceProfileDrv(*ResourceProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveResourceProfileDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetResourceDrv(string, string) (*Resource, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetResourceDrv(*Resource) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveResourceDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetTimingDrv(string) (*utils.TPTiming, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetTimingDrv(*utils.TPTiming) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveTimingDrv(string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetLoadHistory(int, bool, string) ([]*utils.LoadInstance, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) AddLoadHistory(*utils.LoadInstance, int, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetFilterIndexesDrv(cacheID, itemIDPrefix, filterType string, + fldNameVal map[string]string) (indexes map[string]utils.StringMap, err error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetFilterIndexesDrv(cacheID, itemIDPrefix string, + indexes map[string]utils.StringMap, commit bool, transactionID string) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveFilterIndexesDrv(cacheID, itemIDPrefix string) (err error) { + return utils.ErrNotImplemented +} + +func (is *IntenalStorage) MatchFilterIndexDrv(cacheID, itemIDPrefix, + filterType, fieldName, fieldVal string) (itemIDs utils.StringMap, err error) { + return nil, utils.ErrNotImplemented +} + +func (is *IntenalStorage) GetStatQueueProfileDrv(tenant string, id string) (*StatQueueProfile, error) { + x, ok := is.cache.Get(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id)) + if ok && x == nil { + return nil, utils.ErrNotFound + } + return x.(*StatQueueProfile), nil + +} +func (is *IntenalStorage) SetStatQueueProfileDrv(sq *StatQueueProfile) (err error) { + is.cache.Set(utils.CacheStatQueueProfiles, sq.TenantID(), sq, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) RemStatQueueProfileDrv(tenant, id string) (err error) { + is.cache.Remove(utils.CacheStatQueueProfiles, utils.ConcatenatedKey(tenant, id), + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) GetStoredStatQueueDrv(tenant, id string) (sq *StoredStatQueue, err error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetStoredStatQueueDrv(sq *StoredStatQueue) (err error) { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemStoredStatQueueDrv(tenant, id string) (err error) { + return +} + +func (is *IntenalStorage) GetThresholdProfileDrv(tenant, id string) (tp *ThresholdProfile, err error) { + x, ok := is.cache.Get(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id)) + if ok && x == nil { + return nil, utils.ErrNotFound + } + return x.(*ThresholdProfile), nil +} + +func (is *IntenalStorage) SetThresholdProfileDrv(tp *ThresholdProfile) (err error) { + is.cache.Set(utils.CacheThresholdProfiles, tp.TenantID(), tp, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) RemThresholdProfileDrv(tenant, id string) (err error) { + is.cache.Remove(utils.CacheThresholdProfiles, utils.ConcatenatedKey(tenant, id), + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) GetThresholdDrv(tenant, id string) (*Threshold, error) { + x, ok := is.cache.Get(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id)) + if ok && x == nil { + return nil, utils.ErrNotFound + } + return x.(*Threshold), nil +} + +func (is *IntenalStorage) SetThresholdDrv(th *Threshold) (err error) { + is.cache.Set(utils.CacheThresholds, th.TenantID(), th, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) RemoveThresholdDrv(tenant, id string) (err error) { + is.cache.Remove(utils.CacheThresholds, utils.ConcatenatedKey(tenant, id), + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) GetFilterDrv(tenant, id string) (*Filter, error) { + x, ok := is.cache.Get(utils.CacheFilters, utils.ConcatenatedKey(tenant, id)) + if ok && x == nil { + return nil, utils.ErrNotFound + } + return x.(*Filter), nil +} + +func (is *IntenalStorage) SetFilterDrv(fltr *Filter) (err error) { + is.cache.Set(utils.CacheFilters, fltr.TenantID(), fltr, nil, + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) RemoveFilterDrv(tenant, id string) (err error) { + is.cache.Remove(utils.CacheFilters, utils.ConcatenatedKey(tenant, id), + cacheCommit(utils.NonTransactional), utils.NonTransactional) + return +} + +func (is *IntenalStorage) GetSupplierProfileDrv(string, string) (*SupplierProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetSupplierProfileDrv(*SupplierProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveSupplierProfileDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetAttributeProfileDrv(string, string) (*AttributeProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetAttributeProfileDrv(*AttributeProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveAttributeProfileDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetChargerProfileDrv(string, string) (*ChargerProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetChargerProfileDrv(*ChargerProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveChargerProfileDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetDispatcherProfileDrv(string, string) (*DispatcherProfile, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetDispatcherProfileDrv(*DispatcherProfile) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveDispatcherProfileDrv(string, string) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetItemLoadIDsDrv(itemIDPrefix string) (loadIDs map[string]int64, err error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetLoadIDsDrv(loadIDs map[string]int64) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) GetDispatcherHostDrv(string, string) (*DispatcherHost, error) { + return nil, utils.ErrNotImplemented +} +func (is *IntenalStorage) SetDispatcherHostDrv(*DispatcherHost) error { + return utils.ErrNotImplemented +} +func (is *IntenalStorage) RemoveDispatcherHostDrv(string, string) error { + return utils.ErrNotImplemented +} diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go new file mode 100644 index 000000000..88c63c3dc --- /dev/null +++ b/engine/storage_internal_stordb.go @@ -0,0 +1,19 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package engine diff --git a/engine/storage_map_datadb.go b/engine/storage_map_datadb.go index 385f5ae59..515136ae3 100644 --- a/engine/storage_map_datadb.go +++ b/engine/storage_map_datadb.go @@ -532,8 +532,8 @@ func (ms *MapStorage) SetAccount(ub *Account) (err error) { func (ms *MapStorage) RemoveAccount(key string) (err error) { ms.mu.Lock() - defer ms.mu.Unlock() delete(ms.dict, utils.ACCOUNT_PREFIX+key) + ms.mu.Unlock() return } diff --git a/engine/storage_map_stordb.go b/engine/storage_map_stordb.go index fcbf5e480..aff1e2e2c 100755 --- a/engine/storage_map_stordb.go +++ b/engine/storage_map_stordb.go @@ -19,6 +19,8 @@ along with this program. If not, see package engine import ( + "strings" + "github.com/cgrates/cgrates/utils" ) @@ -29,7 +31,22 @@ func (ms *MapStorage) GetTpIds(colName string) (ids []string, err error) { func (ms *MapStorage) GetTpTableIds(tpid, table string, distinct utils.TPDistinctIds, filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) { - return nil, utils.ErrNotImplemented + key := table + utils.CONCATENATED_KEY_SEP + tpid + fullIDs, _ := ms.GetKeysForPrefix(key) + for _, fullID := range fullIDs { + var buildedID string + sliceID := strings.Split(fullID[len(key)+1:], utils.CONCATENATED_KEY_SEP) + + for i := 0; i < len(distinct); i++ { + if len(buildedID) == 0 { + buildedID += sliceID[len(sliceID)-i-1] + } else { + buildedID += utils.CONCATENATED_KEY_SEP + sliceID[len(sliceID)-i+1] + } + } + ids = append(ids, buildedID) + } + return } func (ms *MapStorage) GetTPTimings(tpid, id string) (timings []*utils.ApierTPTiming, err error) { @@ -503,8 +520,6 @@ func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err if table == utils.EmptyString { return ms.Flush(utils.EmptyString) } - ms.mu.Lock() - defer ms.mu.Unlock() key := table + utils.CONCATENATED_KEY_SEP + tpid if args != nil { for _, val := range args { @@ -512,9 +527,11 @@ func (ms *MapStorage) RemTpData(table, tpid string, args map[string]string) (err } } ids, _ := ms.GetKeysForPrefix(key) + ms.mu.Lock() for _, id := range ids { delete(ms.dict, id) } + ms.mu.Unlock() return } diff --git a/engine/storage_utils.go b/engine/storage_utils.go index 388f9ae9e..1072bfe9e 100644 --- a/engine/storage_utils.go +++ b/engine/storage_utils.go @@ -52,9 +52,11 @@ func ConfigureDataStorage(db_type, host, port, name, user, pass, marshaler strin dm = NewDataManager(d.(DataDB)) case utils.INTERNAL: if marshaler == utils.JSON { - d, err = NewMapStorageJson() + //d, err = NewMapStorageJson() + d = NewInternalStorageJson() } else { - d, err = NewMapStorage() + d = NewInternalStorage() + //d, err = NewMapStorage() } dm = NewDataManager(d.(DataDB)) default: