Removing StorDB

This commit is contained in:
andronache98
2022-03-07 18:39:34 +02:00
committed by Dan Christian Bogos
parent 03793c0082
commit 928dbd9e42
24 changed files with 15 additions and 1627 deletions

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package main
import (
"errors"
"flag"
"fmt"
"log"
@@ -246,20 +245,6 @@ func loadConfig() (ldrCfg *config.CGRConfig) {
return
}
func importData(cfg *config.CGRConfig) (err error) {
if cfg.LoaderCgrCfg().TpID == utils.EmptyString {
return errors.New("TPid required")
}
csvImporter := engine.TPCSVImporter{
TPid: cfg.LoaderCgrCfg().TpID,
DirPath: *dataPath,
Sep: cfg.LoaderCgrCfg().FieldSeparator,
Verbose: *verbose,
ImportID: *importID,
}
return csvImporter.Run()
}
func getLoader(cfg *config.CGRConfig) (loader engine.LoadReader, err error) {
if gprefix := utils.MetaGoogleAPI + utils.ConcatenatedKeySep; strings.HasPrefix(*dataPath, gprefix) { // Default load from csv files to dataDb

View File

@@ -139,17 +139,11 @@ func (acS *AccountSCfg) loadFromJSONCfg(jsnCfg *AccountSJsonCfg) (err error) {
// AsMapInterface returns the config as a map[string]interface{}
func (acS AccountSCfg) AsMapInterface(string) interface{} {
opts := map[string]interface{}{
utils.MetaProfileIDs: acS.Opts.ProfileIDs,
utils.MetaUsage: acS.Opts.Usage,
utils.MetaProfileIgnoreFilters: acS.Opts.ProfileIgnoreFilters,
}
mp := map[string]interface{}{
utils.EnabledCfg: acS.Enabled,
utils.IndexedSelectsCfg: acS.IndexedSelects,
utils.NestedFieldsCfg: acS.NestedFields,
utils.MaxIterations: acS.MaxIterations,
utils.OptsCfg: opts,
}
if acS.AttributeSConns != nil {
mp[utils.AttributeSConnsCfg] = getInternalJSONConns(acS.AttributeSConns)
@@ -160,21 +154,6 @@ func (acS AccountSCfg) AsMapInterface(string) interface{} {
if acS.ThresholdSConns != nil {
mp[utils.ThresholdSConnsCfg] = getInternalJSONConns(acS.ThresholdSConns)
}
if acS.StringIndexedFields != nil {
mp[utils.StringIndexedFieldsCfg] = utils.CloneStringSlice(*acS.StringIndexedFields)
}
if acS.PrefixIndexedFields != nil {
mp[utils.PrefixIndexedFieldsCfg] = utils.CloneStringSlice(*acS.PrefixIndexedFields)
}
if acS.SuffixIndexedFields != nil {
mp[utils.SuffixIndexedFieldsCfg] = utils.CloneStringSlice(*acS.SuffixIndexedFields)
}
if acS.ExistsIndexedFields != nil {
mp[utils.ExistsIndexedFieldsCfg] = utils.CloneStringSlice(*acS.ExistsIndexedFields)
}
if acS.NotExistsIndexedFields != nil {
mp[utils.NotExistsIndexedFieldsCfg] = utils.CloneStringSlice(*acS.NotExistsIndexedFields)
}
if acS.MaxUsage != nil {
mp[utils.MaxUsage] = acS.MaxUsage.String()
}

View File

@@ -1,67 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
/*
import (
"github.com/cgrates/cgrates/utils"
)
func init() {
c := &LoadTpFromStorDb{
name: "load_tp_from_stordb",
rpcMethod: utils.APIerSv1LoadTariffPlanFromStorDb,
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type LoadTpFromStorDb struct {
name string
rpcMethod string
rpcParams *v1.AttrLoadTpFromStorDb
rpcResult string
*CommandExecuter
}
func (self *LoadTpFromStorDb) Name() string {
return self.name
}
func (self *LoadTpFromStorDb) RpcMethod() string {
return self.rpcMethod
}
func (self *LoadTpFromStorDb) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &v1.AttrLoadTpFromStorDb{}
}
return self.rpcParams
}
func (self *LoadTpFromStorDb) PostprocessRpcParams() error {
return nil
}
func (self *LoadTpFromStorDb) RpcResult() interface{} {
var s string
return &s
}
*/

View File

@@ -1,62 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
/*
func init() {
c := &CmdSetStorDBVersions{
name: "set_stordb_versions",
rpcMethod: utils.APIerSv1SetStorDBVersions,
rpcParams: &v1.SetVersionsArg{},
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
type CmdSetStorDBVersions struct {
name string
rpcMethod string
rpcParams *v1.SetVersionsArg
*CommandExecuter
}
func (self *CmdSetStorDBVersions) Name() string {
return self.name
}
func (self *CmdSetStorDBVersions) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdSetStorDBVersions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &v1.SetVersionsArg{}
}
return self.rpcParams
}
func (self *CmdSetStorDBVersions) PostprocessRpcParams() error {
return nil
}
func (self *CmdSetStorDBVersions) RpcResult() interface{} {
var atr string
return &atr
}
*/

View File

@@ -1,68 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package console
import (
"github.com/cgrates/cgrates/engine"
)
func init() {
c := &CmdGetStorDBVersions{
name: "stordb_versions",
// rpcMethod: utils.APIerSv1GetStorDBVersions,
}
commands[c.Name()] = c
c.CommandExecuter = &CommandExecuter{c}
}
// Commander implementation
type CmdGetStorDBVersions struct {
name string
rpcMethod string
rpcParams *EmptyWrapper
*CommandExecuter
}
func (self *CmdGetStorDBVersions) Name() string {
return self.name
}
func (self *CmdGetStorDBVersions) RpcMethod() string {
return self.rpcMethod
}
func (self *CmdGetStorDBVersions) RpcParams(reset bool) interface{} {
if reset || self.rpcParams == nil {
self.rpcParams = &EmptyWrapper{}
}
return self.rpcParams
}
func (self *CmdGetStorDBVersions) PostprocessRpcParams() error {
return nil
}
func (self *CmdGetStorDBVersions) RpcResult() interface{} {
s := engine.Versions{}
return &s
}
func (self *CmdGetStorDBVersions) ClientArgs() (args []string) {
return
}

View File

@@ -106,12 +106,6 @@ type DataDBDriver interface {
config.ConfigDB
}
type StorDB interface {
Storage
LoadReader
LoadWriter
}
type LoadStorage interface {
Storage
LoadReader

View File

@@ -32,7 +32,7 @@ import (
"github.com/cgrates/cgrates/utils"
)
// InternalDB is used as a DataDB and a StorDB
// InternalDB is used as a DataDB
type InternalDB struct {
stringIndexedFields []string
prefixIndexedFields []string
@@ -63,20 +63,6 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string,
}
}
// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe)
func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) {
iDB.indexedFieldsMutex.Lock()
iDB.stringIndexedFields = stringIndexedFields
iDB.indexedFieldsMutex.Unlock()
}
// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe)
func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) {
iDB.indexedFieldsMutex.Lock()
iDB.prefixIndexedFields = prefixIndexedFields
iDB.indexedFieldsMutex.Unlock()
}
// Close only to implement Storage interface
func (iDB *InternalDB) Close() {}

View File

@@ -1,479 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"strings"
"github.com/cgrates/cgrates/utils"
)
// GetTpIds implements LoadReader interface
func (iDB *InternalDB) GetTpIds(colName string) (ids []string, err error) {
tpIDs := make(utils.StringSet)
if colName == utils.EmptyString { // if colName is empty we need to parse all partitions
for _, conNm := range utils.CacheStorDBPartitions { // iterate through all columns
for _, key := range iDB.db.GetItemIDs(conNm, utils.EmptyString) {
tpIDs.Add(strings.Split(key, utils.InInFieldSep)[0])
}
}
} else {
for _, key := range iDB.db.GetItemIDs(utils.CacheStorDBPartitions[colName], utils.EmptyString) {
tpIDs.Add(strings.Split(key, utils.InInFieldSep)[0])
}
}
return tpIDs.AsSlice(), nil
}
func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct []string,
filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) {
fullIDs := iDB.db.GetItemIDs(utils.CacheStorDBPartitions[table], tpid)
idSet := make(utils.StringSet)
for _, fullID := range fullIDs {
idSet.Add(fullID[len(tpid)+1:])
}
ids = idSet.AsSlice()
return
}
func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPResources, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPResources, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
resources = append(resources, x.(*utils.TPResourceProfile))
}
if len(resources) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPStatProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPStats, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPStats, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
stats = append(stats, x.(*utils.TPStatProfile))
}
if len(stats) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TPThresholdProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPThresholds, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPThresholds, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
ths = append(ths, x.(*utils.TPThresholdProfile))
}
if len(ths) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPFilterProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPFilters, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPFilters, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
fltrs = append(fltrs, x.(*utils.TPFilterProfile))
}
if len(fltrs) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPRoutes(tpid, tenant, id string) (supps []*utils.TPRouteProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPRoutes, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPRoutes, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
supps = append(supps, x.(*utils.TPRouteProfile))
}
if len(supps) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.TPAttributeProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPAttributes, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPAttributes, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
attrs = append(attrs, x.(*utils.TPAttributeProfile))
}
if len(attrs) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPChargerProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPChargers, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPChargers, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
cpps = append(cpps, x.(*utils.TPChargerProfile))
}
if len(cpps) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []*utils.TPDispatcherProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPDispatchers, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPDispatchers, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
dpps = append(dpps, x.(*utils.TPDispatcherProfile))
}
if len(dpps) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*utils.TPDispatcherHost, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPDispatcherHosts, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPDispatcherHosts, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
dpps = append(dpps, x.(*utils.TPDispatcherHost))
}
if len(dpps) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPRateProfiles(tpid, tenant, id string) (tpPrfs []*utils.TPRateProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPRateProfiles, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPRateProfiles, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
tpPrfs = append(tpPrfs, x.(*utils.TPRateProfile))
}
if len(tpPrfs) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPActionProfiles(tpid, tenant, id string) (tpPrfs []*utils.TPActionProfile, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPActionProfiles, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPActionProfiles, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
tpPrfs = append(tpPrfs, x.(*utils.TPActionProfile))
}
if len(tpPrfs) == 0 {
return nil, utils.ErrNotFound
}
return
}
func (iDB *InternalDB) GetTPAccounts(tpid, tenant, id string) (tpPrfs []*utils.TPAccount, err error) {
key := tpid
if tenant != utils.EmptyString {
key += utils.ConcatenatedKeySep + tenant
}
if id != utils.EmptyString {
key += utils.ConcatenatedKeySep + id
}
ids := iDB.db.GetItemIDs(utils.CacheTBLTPAccounts, key)
for _, id := range ids {
x, ok := iDB.db.Get(utils.CacheTBLTPAccounts, id)
if !ok || x == nil {
return nil, utils.ErrNotFound
}
tpPrfs = append(tpPrfs, x.(*utils.TPAccount))
}
if len(tpPrfs) == 0 {
return nil, utils.ErrNotFound
}
return
}
//implement LoadWriter interface
func (iDB *InternalDB) RemTpData(table, tpid string, args map[string]string) (err error) {
if table == utils.EmptyString {
return iDB.Flush(utils.EmptyString)
}
key := tpid
if args != nil {
if tag, has := args["tag"]; has {
key += utils.ConcatenatedKeySep + tag
} else if id, has := args["id"]; has {
key += utils.ConcatenatedKeySep + args["tenant"] +
utils.ConcatenatedKeySep + id
}
}
ids := iDB.db.GetItemIDs(utils.CacheStorDBPartitions[table], key)
for _, id := range ids {
iDB.db.Remove(utils.CacheStorDBPartitions[table], id,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err error) {
if len(resources) == 0 {
return nil
}
for _, resource := range resources {
iDB.db.Set(utils.CacheTBLTPResources, utils.ConcatenatedKey(resource.TPid, resource.Tenant, resource.ID), resource, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) {
if len(stats) == 0 {
return nil
}
for _, stat := range stats {
iDB.db.Set(utils.CacheTBLTPStats, utils.ConcatenatedKey(stat.TPid, stat.Tenant, stat.ID), stat, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) (err error) {
if len(thresholds) == 0 {
return nil
}
for _, threshold := range thresholds {
iDB.db.Set(utils.CacheTBLTPThresholds, utils.ConcatenatedKey(threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error) {
if len(filters) == 0 {
return nil
}
for _, filter := range filters {
iDB.db.Set(utils.CacheTBLTPFilters, utils.ConcatenatedKey(filter.TPid, filter.Tenant, filter.ID), filter, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPRoutes(routes []*utils.TPRouteProfile) (err error) {
if len(routes) == 0 {
return nil
}
for _, route := range routes {
iDB.db.Set(utils.CacheTBLTPRoutes, utils.ConcatenatedKey(route.TPid, route.Tenant, route.ID), route, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) (err error) {
if len(attributes) == 0 {
return nil
}
for _, attribute := range attributes {
iDB.db.Set(utils.CacheTBLTPAttributes, utils.ConcatenatedKey(attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) {
if len(cpps) == 0 {
return nil
}
for _, cpp := range cpps {
iDB.db.Set(utils.CacheTBLTPChargers, utils.ConcatenatedKey(cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile) (err error) {
if len(dpps) == 0 {
return nil
}
for _, dpp := range dpps {
iDB.db.Set(utils.CacheTBLTPDispatchers, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err error) {
if len(dpps) == 0 {
return nil
}
for _, dpp := range dpps {
iDB.db.Set(utils.CacheTBLTPDispatcherHosts, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPRateProfiles(tpPrfs []*utils.TPRateProfile) (err error) {
if len(tpPrfs) == 0 {
return nil
}
for _, tpPrf := range tpPrfs {
iDB.db.Set(utils.CacheTBLTPRateProfiles, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPActionProfiles(tpPrfs []*utils.TPActionProfile) (err error) {
if len(tpPrfs) == 0 {
return nil
}
for _, tpPrf := range tpPrfs {
iDB.db.Set(utils.CacheTBLTPActionProfiles, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}
func (iDB *InternalDB) SetTPAccounts(tpPrfs []*utils.TPAccount) (err error) {
if len(tpPrfs) == 0 {
return nil
}
for _, tpPrf := range tpPrfs {
iDB.db.Set(utils.CacheTBLTPAccounts, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional)
}
return
}

View File

@@ -237,7 +237,7 @@ type MongoStorage struct {
ctxTTL time.Duration
ctxTTLMutex sync.RWMutex // used for TTL reload
db string
storageType string // datadb, stordb
storageType string // datadb
ms utils.Marshaler
cdrsIndexes []string
cnter *utils.Counter
@@ -322,13 +322,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte
if err = ms.enusureIndex(col, true, "id"); err != nil {
return
}
//StorDB
case utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers,
utils.TBLTPDispatcherHosts, utils.TBLTPChargers,
utils.TBLTPRoutes, utils.TBLTPThresholds:
if err = ms.enusureIndex(col, true, "tpid", "id"); err != nil {
return
}
case utils.CDRsTBL:
if err = ms.enusureIndex(col, true, MetaOriginLow, RunIDLow,
OriginIDLow); err != nil {
@@ -376,16 +370,6 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) {
}
}
}
if ms.storageType == utils.StorDB {
for _, col := range []string{utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers,
utils.TBLTPDispatcherHosts, utils.TBLTPChargers,
utils.TBLTPRoutes, utils.TBLTPThresholds,
utils.CDRsTBL, utils.SessionCostsTBL} {
if err = ms.ensureIndexesForCol(col); err != nil {
return
}
}
}
return
}

View File

@@ -54,7 +54,6 @@ func NewMySQLStorage(host, port, name, user, password string,
return &SQLStorage{
DB: mySQLStorage.DB,
db: mySQLStorage.db,
StorDB: mySQLStorage,
SQLImpl: mySQLStorage,
}, nil
}

View File

@@ -19,41 +19,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package engine
import (
"fmt"
"time"
"github.com/cgrates/cgrates/utils"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
// NewPostgresStorage returns the posgres storDB
func NewPostgresStorage(host, port, name, user, password, sslmode string, maxConn, maxIdleConn int, connMaxLifetime time.Duration) (*SQLStorage, error) {
connectString := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", host, port, name, user, password, sslmode)
db, err := gorm.Open(postgres.Open(connectString), &gorm.Config{AllowGlobalUpdate: true})
if err != nil {
return nil, err
}
postgressStorage := new(PostgresStorage)
if postgressStorage.DB, err = db.DB(); err != nil {
return nil, err
}
if err = postgressStorage.DB.Ping(); err != nil {
return nil, err
}
postgressStorage.DB.SetMaxIdleConns(maxIdleConn)
postgressStorage.DB.SetMaxOpenConns(maxConn)
postgressStorage.DB.SetConnMaxLifetime(connMaxLifetime)
//db.LogMode(true)
postgressStorage.db = db
return &SQLStorage{
DB: postgressStorage.DB,
db: postgressStorage.db,
StorDB: postgressStorage,
SQLImpl: postgressStorage,
}, nil
}
type PostgresStorage struct {
SQLStorage
}
@@ -81,22 +49,6 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error
return
}
func (poS *PostgresStorage) extraFieldsExistsQry(field string) string {
return fmt.Sprintf(" extra_fields ?'%s'", field)
}
func (poS *PostgresStorage) extraFieldsValueQry(field, value string) string {
return fmt.Sprintf(" (extra_fields ->> '%s') = '%s'", field, value)
}
func (poS *PostgresStorage) notExtraFieldsExistsQry(field string) string {
return fmt.Sprintf(" NOT extra_fields ?'%s'", field)
}
func (poS *PostgresStorage) notExtraFieldsValueQry(field, value string) string {
return fmt.Sprintf(" NOT (extra_fields ?'%s' AND (extra_fields ->> '%s') = '%s')", field, field, value)
}
func (poS *PostgresStorage) GetStorageType() string {
return utils.Postgres
}

View File

@@ -40,7 +40,6 @@ type SQLImpl interface {
type SQLStorage struct {
DB *sql.DB
db *gorm.DB
StorDB
SQLImpl
}

View File

@@ -1,315 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"archive/zip"
"bytes"
"encoding/csv"
"errors"
"fmt"
"io"
"os"
"path"
"unicode/utf8"
"github.com/cgrates/cgrates/utils"
)
func NewTPExporter(storDB LoadStorage, tpID, expPath, fileFormat, sep string, compress bool) (*TPExporter, error) {
if len(tpID) == 0 {
return nil, errors.New("Missing TPid")
}
if utils.CSV != fileFormat {
return nil, errors.New("Unsupported file format")
}
tpExp := &TPExporter{
storDB: storDB,
tpID: tpID,
exportPath: expPath,
fileFormat: fileFormat,
compress: compress,
cacheBuff: new(bytes.Buffer),
}
runeSep, _ := utf8.DecodeRuneInString(sep)
if runeSep == utf8.RuneError {
return nil, fmt.Errorf("Invalid field separator: %s", sep)
} else {
tpExp.sep = runeSep
}
if compress {
if len(tpExp.exportPath) == 0 {
tpExp.zipWritter = zip.NewWriter(tpExp.cacheBuff)
} else {
if fileOut, err := os.Create(path.Join(tpExp.exportPath, "tpexport.zip")); err != nil {
return nil, err
} else {
tpExp.zipWritter = zip.NewWriter(fileOut)
}
}
}
return tpExp, nil
}
// Export TariffPlan to a folder
type TPExporter struct {
storDB LoadStorage // StorDb connection handle
tpID string // Load data on this tpid
exportPath string // Directory path to export to
fileFormat string // The file format <csv>
sep rune // Separator in the csv file
compress bool // Use ZIP to compress the folder
cacheBuff *bytes.Buffer // Will be written in case of no output folder is specified
zipWritter *zip.Writer // Populated in case of needing to write zipped content
exportedFiles []string
}
func (tpExp *TPExporter) Run() error {
tpExp.removeFiles() // Make sure we clean the folder before starting with new one
var withError bool
toExportMap := make(map[string][]interface{})
storDataResources, err := tpExp.storDB.GetTPResources(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpResources))
withError = true
}
for _, sd := range storDataResources {
sdModels := APItoModelResource(sd)
for _, sdModel := range sdModels {
toExportMap[utils.ResourcesCsv] = append(toExportMap[utils.ResourcesCsv], sdModel)
}
}
storDataStats, err := tpExp.storDB.GetTPStats(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpStats))
withError = true
}
for _, sd := range storDataStats {
sdModels := APItoModelStats(sd)
for _, sdModel := range sdModels {
toExportMap[utils.StatsCsv] = append(toExportMap[utils.StatsCsv], sdModel)
}
}
storDataThresholds, err := tpExp.storDB.GetTPThresholds(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpThresholds))
withError = true
}
for _, sd := range storDataThresholds {
sdModels := APItoModelTPThreshold(sd)
for _, sdModel := range sdModels {
toExportMap[utils.ThresholdsCsv] = append(toExportMap[utils.ThresholdsCsv], sdModel)
}
}
storDataFilters, err := tpExp.storDB.GetTPFilters(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpFilters))
withError = true
}
for _, sd := range storDataFilters {
sdModels := APItoModelTPFilter(sd)
for _, sdModel := range sdModels {
toExportMap[utils.FiltersCsv] = append(toExportMap[utils.FiltersCsv], sdModel)
}
}
storDataRoutes, err := tpExp.storDB.GetTPRoutes(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpRoutes))
withError = true
}
for _, sd := range storDataRoutes {
sdModels := APItoModelTPRoutes(sd)
for _, sdModel := range sdModels {
toExportMap[utils.RoutesCsv] = append(toExportMap[utils.RoutesCsv], sdModel)
}
}
storeDataAttributes, err := tpExp.storDB.GetTPAttributes(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpAttributes))
withError = true
}
for _, sd := range storeDataAttributes {
sdModels := APItoModelTPAttribute(sd)
for _, sdModel := range sdModels {
toExportMap[utils.AttributesCsv] = append(toExportMap[utils.AttributesCsv], sdModel)
}
}
storDataChargers, err := tpExp.storDB.GetTPChargers(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpChargers))
withError = true
}
for _, sd := range storDataChargers {
sdModels := APItoModelTPCharger(sd)
for _, sdModel := range sdModels {
toExportMap[utils.ChargersCsv] = append(toExportMap[utils.ChargersCsv], sdModel)
}
}
storDataDispatcherProfiles, err := tpExp.storDB.GetTPDispatcherProfiles(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpDispatcherProfiles))
withError = true
}
for _, sd := range storDataDispatcherProfiles {
sdModels := APItoModelTPDispatcherProfile(sd)
for _, sdModel := range sdModels {
toExportMap[utils.DispatcherProfilesCsv] = append(toExportMap[utils.DispatcherProfilesCsv], sdModel)
}
}
storDataDispatcherHosts, err := tpExp.storDB.GetTPDispatcherHosts(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpDispatcherHosts))
withError = true
}
for _, sd := range storDataDispatcherHosts {
toExportMap[utils.DispatcherHostsCsv] = append(toExportMap[utils.DispatcherHostsCsv], APItoModelTPDispatcherHost(sd))
}
storDataRateProfiles, err := tpExp.storDB.GetTPRateProfiles(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpRateProfiles))
withError = true
}
for _, sd := range storDataRateProfiles {
sdModels := APItoModelTPRateProfile(sd)
for _, sdModel := range sdModels {
toExportMap[utils.RatesCsv] = append(toExportMap[utils.RatesCsv], sdModel)
}
}
storDataActionProfiles, err := tpExp.storDB.GetTPActionProfiles(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpActionProfiles))
withError = true
}
for _, sd := range storDataActionProfiles {
sdModels := APItoModelTPActionProfile(sd)
for _, sdModel := range sdModels {
toExportMap[utils.ActionsCsv] = append(toExportMap[utils.ActionsCsv], sdModel)
}
}
storDataAccounts, err := tpExp.storDB.GetTPAccounts(tpExp.tpID, "", "")
if err != nil && err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpAccounts))
withError = true
}
for _, sd := range storDataAccounts {
sdModels := APItoModelTPAccount(sd)
for _, sdModel := range sdModels {
toExportMap[utils.AccountsCsv] = append(toExportMap[utils.AccountsCsv], sdModel)
}
}
if len(toExportMap) == 0 { // if we don't have anything to export we return not found error
return utils.ErrNotFound
}
for fileName, storData := range toExportMap {
if err := tpExp.writeOut(fileName, storData); err != nil {
tpExp.removeFiles()
return err
}
tpExp.exportedFiles = append(tpExp.exportedFiles, fileName)
}
if tpExp.compress {
if err := tpExp.zipWritter.Close(); err != nil {
return err
}
}
if withError { // if we export something but have error we return partially executed
return utils.ErrPartiallyExecuted
}
return nil
}
// Some export did not end up well, remove the files here
func (tpExp *TPExporter) removeFiles() error {
if len(tpExp.exportPath) == 0 {
return nil
}
for _, fileName := range tpExp.exportedFiles {
os.Remove(path.Join(tpExp.exportPath, fileName))
}
return nil
}
// General method to write the content out to a file on path or zip archive
func (tpExp *TPExporter) writeOut(fileName string, tpData []interface{}) error {
if len(tpData) == 0 {
return nil
}
var fWriter io.Writer
var writerOut utils.NopFlushWriter
var err error
if tpExp.compress {
if fWriter, err = tpExp.zipWritter.Create(fileName); err != nil {
return err
}
} else if len(tpExp.exportPath) != 0 {
if f, err := os.Create(path.Join(tpExp.exportPath, fileName)); err != nil {
return err
} else {
fWriter = f
defer f.Close()
}
} else {
fWriter = new(bytes.Buffer)
}
switch tpExp.fileFormat {
case utils.CSV:
csvWriter := csv.NewWriter(fWriter)
csvWriter.Comma = tpExp.sep
writerOut = csvWriter
default:
writerOut = utils.NewNopFlushWriter(fWriter)
}
for _, tpItem := range tpData {
record, err := CsvDump(tpItem)
if err != nil {
return err
}
if err := writerOut.Write(record); err != nil {
return err
}
}
writerOut.Flush() // In case of .csv will dump data on hdd
return nil
}
func (tpExp *TPExporter) ExportStats() *utils.ExportedTPStats {
return &utils.ExportedTPStats{ExportPath: tpExp.exportPath, ExportedFiles: tpExp.exportedFiles, Compressed: tpExp.compress}
}
func (tpExp *TPExporter) GetCacheBuffer() *bytes.Buffer {
return tpExp.cacheBuff
}

View File

@@ -1,207 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"log"
"os"
"github.com/cgrates/cgrates/utils"
)
// Import tariff plan from csv into storDb
type TPCSVImporter struct {
TPid string // Load data on this tpid
StorDB LoadWriter // StorDb connection handle
DirPath string // Directory path to import from
Sep rune // Separator in the csv file
Verbose bool // If true will print a detailed information instead of silently discarding it
ImportID string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileID
csvr LoadReader
}
// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis.
// Change it to func(string) error as soon as Travis updates.
var fileHandlers = map[string]func(*TPCSVImporter, string) error{
utils.ResourcesCsv: (*TPCSVImporter).importResources,
utils.StatsCsv: (*TPCSVImporter).importStats,
utils.ThresholdsCsv: (*TPCSVImporter).importThresholds,
utils.FiltersCsv: (*TPCSVImporter).importFilters,
utils.RoutesCsv: (*TPCSVImporter).importRoutes,
utils.AttributesCsv: (*TPCSVImporter).importAttributeProfiles,
utils.ChargersCsv: (*TPCSVImporter).importChargerProfiles,
utils.DispatcherProfilesCsv: (*TPCSVImporter).importDispatcherProfiles,
utils.DispatcherHostsCsv: (*TPCSVImporter).importDispatcherHosts,
utils.RatesCsv: (*TPCSVImporter).importRateProfiles,
utils.ActionsCsv: (*TPCSVImporter).importActionProfiles,
utils.AccountsCsv: (*TPCSVImporter).importAccounts,
}
func (tpImp *TPCSVImporter) Run() error {
tpImp.csvr = NewFileCSVStorage(tpImp.Sep, tpImp.DirPath)
files, _ := os.ReadDir(tpImp.DirPath)
var withErrors bool
for _, f := range files {
fHandler, hasName := fileHandlers[f.Name()]
if !hasName {
continue
}
if err := fHandler(tpImp, f.Name()); err != nil {
withErrors = true
utils.Logger.Err(fmt.Sprintf("<TPCSVImporter> Importing file: %s, got error: %s", f.Name(), err.Error()))
}
}
if withErrors {
return utils.ErrPartiallyExecuted
}
return nil
}
func (tpImp *TPCSVImporter) importResources(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rls, err := tpImp.csvr.GetTPResources(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPResources(rls)
}
func (tpImp *TPCSVImporter) importStats(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
sts, err := tpImp.csvr.GetTPStats(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPStats(sts)
}
func (tpImp *TPCSVImporter) importThresholds(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
sts, err := tpImp.csvr.GetTPThresholds(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPThresholds(sts)
}
func (tpImp *TPCSVImporter) importFilters(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
sts, err := tpImp.csvr.GetTPFilters(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPFilters(sts)
}
func (tpImp *TPCSVImporter) importRoutes(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rls, err := tpImp.csvr.GetTPRoutes(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPRoutes(rls)
}
func (tpImp *TPCSVImporter) importAttributeProfiles(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rls, err := tpImp.csvr.GetTPAttributes(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPAttributes(rls)
}
func (tpImp *TPCSVImporter) importChargerProfiles(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rls, err := tpImp.csvr.GetTPChargers(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPChargers(rls)
}
func (tpImp *TPCSVImporter) importDispatcherProfiles(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
dpps, err := tpImp.csvr.GetTPDispatcherProfiles(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPDispatcherProfiles(dpps)
}
func (tpImp *TPCSVImporter) importDispatcherHosts(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
dpps, err := tpImp.csvr.GetTPDispatcherHosts(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPDispatcherHosts(dpps)
}
func (tpImp *TPCSVImporter) importRateProfiles(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rpps, err := tpImp.csvr.GetTPRateProfiles(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPRateProfiles(rpps)
}
func (tpImp *TPCSVImporter) importActionProfiles(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rpps, err := tpImp.csvr.GetTPActionProfiles(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPActionProfiles(rpps)
}
func (tpImp *TPCSVImporter) importAccounts(fn string) error {
if tpImp.Verbose {
log.Printf("Processing file: <%s> ", fn)
}
rpps, err := tpImp.csvr.GetTPAccounts(tpImp.TPid, "", "")
if err != nil {
return err
}
return tpImp.StorDB.SetTPAccounts(rpps)
}

View File

@@ -34,10 +34,6 @@ var (
utils.RQF: "cgr-migrator -exec=*filters",
utils.Routes: "cgr-migrator -exec=*routes",
}
storDBVers = map[string]string{
utils.CostDetails: "cgr-migrator -exec=*cost_details",
utils.SessionSCosts: "cgr-migrator -exec=*sessions_costs",
}
allVers map[string]string // init will fill this with a merge of data+stor
)
@@ -46,9 +42,6 @@ func init() {
for k, v := range dataDBVers {
allVers[k] = v
}
for k, v := range storDBVers {
allVers[k] = v
}
}
// Versions will keep trac of various item versions
@@ -114,15 +107,10 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st
var message map[string]string
switch storType {
case utils.Mongo:
if isDataDB {
message = dataDBVers
} else {
message = storDBVers
}
message = dataDBVers
case utils.Internal:
message = allVers
case utils.Postgres, utils.MySQL:
message = storDBVers
case utils.Redis:
message = dataDBVers
}
@@ -154,36 +142,13 @@ func CurrentDataDBVersions() Versions {
}
}
// CurrentStorDBVersions returns the needed StorDB versions
func CurrentStorDBVersions() Versions {
return Versions{
utils.CostDetails: 2,
utils.SessionSCosts: 3,
utils.CDRs: 2,
utils.TpFilters: 1,
utils.TpThresholds: 1,
utils.TpRoutes: 1,
utils.TpStats: 1,
utils.TpResources: 1,
utils.TpResource: 1,
utils.TpChargers: 1,
utils.TpDispatchers: 1,
utils.TpRateProfiles: 1,
utils.TpActionProfiles: 1,
}
}
// CurrentAllDBVersions returns the both DataDB and StorDB versions
// CurrentAllDBVersions returns the both DataDB
func CurrentAllDBVersions() Versions {
dataDBVersions := CurrentDataDBVersions()
storDBVersions := CurrentStorDBVersions()
allVersions := make(Versions)
for k, v := range dataDBVersions {
allVersions[k] = v
}
for k, v := range storDBVersions {
allVersions[k] = v
}
return allVersions
}
@@ -191,14 +156,9 @@ func CurrentAllDBVersions() Versions {
func CurrentDBVersions(storType string, isDataDB bool) Versions {
switch storType {
case utils.Mongo:
if isDataDB {
return CurrentDataDBVersions()
}
return CurrentStorDBVersions()
return CurrentDataDBVersions()
case utils.Internal:
return CurrentAllDBVersions()
case utils.Postgres, utils.MySQL:
return CurrentStorDBVersions()
case utils.Redis:
return CurrentDataDBVersions()
}

View File

@@ -67,10 +67,6 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) {
return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(),
fmt.Sprintf("error: <%s> when seting versions for DataDB", err.Error())), nil
}
if err != nil {
return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(),
fmt.Sprintf("error: <%s> when seting versions for StorDB", err.Error())), nil
}
case utils.MetaEnsureIndexes:
if m.dmOut.DataManager().DataDB().GetStorageType() == utils.Mongo {

View File

@@ -1,30 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
)
type MigratorStorDB interface {
createV1SMCosts() (err error)
renameV1SMCosts() (err error)
StorDB() engine.StorDB
close()
}

View File

@@ -1,54 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
func newInternalStorDBMigrator(stor engine.StorDB) (iDBMig *internalStorDBMigrator) {
return &internalStorDBMigrator{
storDB: &stor,
iDB: stor.(*engine.InternalDB),
}
}
type internalStorDBMigrator struct {
storDB *engine.StorDB
iDB *engine.InternalDB
dataKeys []string
qryIdx *int
}
func (iDBMig *internalStorDBMigrator) close() {}
func (iDBMig *internalStorDBMigrator) StorDB() engine.StorDB {
return *iDBMig.storDB
}
//SMCost methods
//rename
func (iDBMig *internalStorDBMigrator) renameV1SMCosts() (err error) {
return utils.ErrNotImplemented
}
func (iDBMig *internalStorDBMigrator) createV1SMCosts() (err error) {
return utils.ErrNotImplemented
}

View File

@@ -1,65 +0,0 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package migrator
import (
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
func newMongoStorDBMigrator(stor engine.StorDB) (mgoMig *mongoStorDBMigrator) {
return &mongoStorDBMigrator{
storDB: &stor,
mgoDB: stor.(*engine.MongoStorage),
cursor: nil,
}
}
type mongoStorDBMigrator struct {
storDB *engine.StorDB
mgoDB *engine.MongoStorage
cursor *mongo.Cursor
}
func (mgoMig *mongoStorDBMigrator) close() {
mgoMig.mgoDB.Close()
}
func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB {
return *mgoMig.storDB
}
//SMCost methods
//rename
func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
if err = v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()); err != nil {
return err
}
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{Key: "create", Value: utils.SessionCostsTBL}}).Err()
}
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext())
v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext())
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{Key: "create", Value: utils.OldSMCosts}, {Key: "size", Value: 1024}, {Key: "capped", Value: true}}).Err()
}

View File

@@ -23,19 +23,10 @@ import (
"fmt"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
_ "github.com/go-sql-driver/mysql"
)
func newMigratorSQL(stor engine.StorDB) (sqlMig *migratorSQL) {
return &migratorSQL{
storDB: &stor,
sqlStorage: stor.(*engine.SQLStorage),
}
}
type migratorSQL struct {
storDB *engine.StorDB
sqlStorage *engine.SQLStorage
rowIter *sql.Rows
}
@@ -44,15 +35,8 @@ func (sqlMig *migratorSQL) close() {
sqlMig.sqlStorage.Close()
}
func (sqlMig *migratorSQL) StorDB() engine.StorDB {
return *sqlMig.storDB
}
func (mgSQL *migratorSQL) renameV1SMCosts() (err error) {
qry := "RENAME TABLE sm_costs TO session_costs;"
if mgSQL.StorDB().GetStorageType() == utils.Postgres {
qry = "ALTER TABLE sm_costs RENAME TO session_costs"
}
if _, err := mgSQL.sqlStorage.DB.Exec(qry); err != nil {
return err
}
@@ -61,22 +45,7 @@ func (mgSQL *migratorSQL) renameV1SMCosts() (err error) {
func (mgSQL *migratorSQL) createV1SMCosts() (err error) {
qry := fmt.Sprint("CREATE TABLE sm_costs ( id int(11) NOT NULL AUTO_INCREMENT, run_id varchar(64) NOT NULL, origin_host varchar(64) NOT NULL, origin_id varchar(128) NOT NULL, cost_source varchar(64) NOT NULL, `usage` BIGINT NOT NULL, cost_details MEDIUMTEXT, created_at TIMESTAMP NULL,deleted_at TIMESTAMP NULL, PRIMARY KEY (`id`),UNIQUE KEY costid ( run_id),KEY origin_idx (origin_host, origin_id),KEY run_origin_idx (run_id, origin_id),KEY deleted_at_idx (deleted_at));")
if mgSQL.StorDB().GetStorageType() == utils.Postgres {
qry = `
CREATE TABLE sm_costs (
id SERIAL PRIMARY KEY,
run_id VARCHAR(64) NOT NULL,
origin_host VARCHAR(64) NOT NULL,
origin_id VARCHAR(128) NOT NULL,
cost_source VARCHAR(64) NOT NULL,
usage BIGINT NOT NULL,
cost_details jsonb,
created_at TIMESTAMP WITH TIME ZONE,
deleted_at TIMESTAMP WITH TIME ZONE NULL,
UNIQUE ( run_id)
);
`
}
if _, err := mgSQL.sqlStorage.DB.Exec("DROP TABLE IF EXISTS session_costs;"); err != nil {
return err
}

View File

@@ -82,11 +82,6 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF
return
}
// apiService.stopChan = make(chan struct{})
// storDBChan := make(chan engine.StorDB, 1)
// apiService.storDB.RegisterSyncChan(storDBChan)
// stordb := <-storDBChan
apiService.Lock()
defer apiService.Unlock()

View File

@@ -75,7 +75,6 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai
utils.SessionS: new(sync.WaitGroup),
utils.SIPAgent: new(sync.WaitGroup),
utils.StatS: new(sync.WaitGroup),
utils.StorDB: new(sync.WaitGroup),
utils.ThresholdS: new(sync.WaitGroup),
utils.ActionS: new(sync.WaitGroup),
utils.AccountS: new(sync.WaitGroup),

View File

@@ -45,13 +45,8 @@ var (
CacheActionProfilesFilterIndexes, CacheAccountsFilterIndexes, CacheReverseFilterIndexes,
CacheAccounts})
storDBPartition = NewStringSet([]string{
CacheTBLTPResources, CacheTBLTPStats, CacheTBLTPThresholds, CacheTBLTPFilters, CacheSessionCostsTBL, CacheCDRsTBL,
CacheTBLTPRoutes, CacheTBLTPAttributes, CacheTBLTPChargers, CacheTBLTPDispatchers,
CacheTBLTPDispatcherHosts, CacheTBLTPRateProfiles, CacheTBLTPActionProfiles, CacheTBLTPAccounts, CacheVersions})
// CachePartitions enables creation of cache partitions
CachePartitions = JoinStringSet(extraDBPartition, DataDBPartitions /*,storDBPartition*/)
CachePartitions = JoinStringSet(extraDBPartition, DataDBPartitions)
CacheInstanceToPrefix = map[string]string{
CacheResourceProfiles: ResourceProfilesPrefix,
@@ -114,23 +109,6 @@ var (
CacheAccounts: CacheAccountsFilterIndexes,
}
CacheStorDBPartitions = map[string]string{
TBLTPResources: CacheTBLTPResources,
TBLTPStats: CacheTBLTPStats,
TBLTPThresholds: CacheTBLTPThresholds,
TBLTPFilters: CacheTBLTPFilters,
SessionCostsTBL: CacheSessionCostsTBL,
CDRsTBL: CacheCDRsTBL,
TBLTPRoutes: CacheTBLTPRoutes,
TBLTPAttributes: CacheTBLTPAttributes,
TBLTPChargers: CacheTBLTPChargers,
TBLTPDispatchers: CacheTBLTPDispatchers,
TBLTPDispatcherHosts: CacheTBLTPDispatcherHosts,
TBLTPRateProfiles: CacheTBLTPRateProfiles,
TBLTPActionProfiles: CacheTBLTPActionProfiles,
TBLTPAccounts: CacheTBLTPAccounts,
}
// ProtectedSFlds are the fields that sessions should not alter
ProtectedSFlds = NewStringSet([]string{OriginHost, OriginID, Usage})
@@ -352,7 +330,6 @@ const (
MetaDumpToJSON = "*dump_to_json"
NonTransactional = ""
DataDB = "data_db"
StorDB = "stor_db"
NotFoundCaps = "NOT_FOUND"
ServerErrorCaps = "SERVER_ERROR"
MandatoryIEMissingCaps = "MANDATORY_IE_MISSING"
@@ -490,7 +467,6 @@ const (
Subscribers = "Subscribers"
//Destinations = "Destinations"
MetaSubscribers = "*subscribers"
MetaStorDB = "*stordb"
MetaDataDB = "*datadb"
MetaWeight = "*weight"
MetaLC = "*lc"
@@ -1188,11 +1164,11 @@ const (
AdminSv1GetFiltersCount = "AdminSv1.GetFiltersCount"
AdminSv1GetFilters = "AdminSv1.GetFilters"
// APIerSv1SetDataDBVersions = "APIerSv1.SetDataDBVersions"
// APIerSv1SetStorDBVersions = "APIerSv1.SetStorDBVersions"
// APIerSv1GetActions = "APIerSv1.GetActions"
// APIerSv1GetDataDBVersions = "APIerSv1.GetDataDBVersions"
// APIerSv1GetStorDBVersions = "APIerSv1.GetStorDBVersions"
// APIerSv1GetCDRs = "APIerSv1.GetCDRs"
// APIerSv1GetTPActions = "APIerSv1.GetTPActions"
// APIerSv1GetTPAttributeProfile = "APIerSv1.GetTPAttributeProfile"
@@ -1263,7 +1239,7 @@ const (
// APIerSv1 TP APIs
const (
// APIerSv1LoadTariffPlanFromStorDb = "APIerSv1.LoadTariffPlanFromStorDb"
// APIerSv1RemoveTPFromFolder = "APIerSv1.RemoveTPFromFolder"
)
@@ -1678,20 +1654,8 @@ const (
// storDB
CacheTBLTPResources = "*tp_resources"
CacheTBLTPStats = "*tp_stats"
CacheTBLTPThresholds = "*tp_thresholds"
CacheTBLTPFilters = "*tp_filters"
CacheSessionCostsTBL = "*session_costs"
CacheCDRsTBL = "*cdrs"
CacheTBLTPRoutes = "*tp_routes"
CacheTBLTPAttributes = "*tp_attributes"
CacheTBLTPChargers = "*tp_chargers"
CacheTBLTPDispatchers = "*tp_dispatcher_profiles"
CacheTBLTPDispatcherHosts = "*tp_dispatcher_hosts"
CacheTBLTPRateProfiles = "*tp_rate_profiles"
CacheTBLTPActionProfiles = "*tp_action_profiles"
CacheTBLTPAccounts = "*tp_accounts"
CacheSessionCostsTBL = "*session_costs"
CacheCDRsTBL = "*cdrs"
)
// Prefix for indexing
@@ -1728,16 +1692,6 @@ const (
GoogleCredentialsFileName = "credentials.json"
)
// StorDB
var (
PostgressSSLModeDisable = "disable"
PostgressSSLModeAllow = "allow"
PostgressSSLModePrefer = "prefer"
PostgressSSLModeRequire = "require"
PostgressSSLModeVerifyCa = "verify-ca"
PostgressSSLModeVerifyFull = "verify-full"
)
// GeneralCfg
const (
NodeIDCfg = "node_id"
@@ -2171,13 +2125,6 @@ const (
OutDataDBPasswordCfg = "out_datadb_password"
OutDataDBEncodingCfg = "out_datadb_encoding"
OutDataDBRedisSentinel = "out_redis_sentinel"
OutStorDBTypeCfg = "out_stordb_type"
OutStorDBHostCfg = "out_stordb_host"
OutStorDBPortCfg = "out_stordb_port"
OutStorDBNameCfg = "out_stordb_name"
OutStorDBUserCfg = "out_stordb_user"
OutStorDBPasswordCfg = "out_stordb_password"
OutStorDBOptsCfg = "out_stordb_opts"
OutDataDBOptsCfg = "out_datadb_opts"
UsersFiltersCfg = "users_filters"
)
@@ -2630,20 +2577,11 @@ const (
CpuPathCgr = "cpu.prof"
//Cgr loader
CgrLoader = "cgr-loader"
StorDBTypeCgr = "stordb_type"
StorDBHostCgr = "stordb_host"
StorDBPortCgr = "stordb_port"
StorDBNameCgr = "stordb_name"
StorDBUserCgr = "stordb_user"
StorDBPasswdCgr = "stordb_passwd"
CachingArgCgr = "caching"
FieldSepCgr = "field_sep"
ImportIDCgr = "import_id"
DisableReverseCgr = "disable_reverse_mappings"
FlushStorDB = "flush_stordb"
RemoveCgr = "remove"
FromStorDBCgr = "from_stordb"
ToStorDBcgr = "to_stordb"
CacheSAddress = "caches_address"
SchedulerAddress = "scheduler_address"
//Cgr migrator

View File

@@ -24,7 +24,7 @@ import (
"strings"
)
// NewDynamicWeightsFromString creates a DynamicWeight list based on the string received from .csv/StorDB
// NewDynamicWeightsFromString creates a DynamicWeight list based on the string received from .csv
func NewDynamicWeightsFromString(s, dWSep, fltrSep string) (dWs DynamicWeights, err error) {
if len(s) == 0 {
return DynamicWeights{{}}, nil