diff --git a/cmd/cgr-loader/cgr-loader.go b/cmd/cgr-loader/cgr-loader.go index b0f41c0a2..422347518 100755 --- a/cmd/cgr-loader/cgr-loader.go +++ b/cmd/cgr-loader/cgr-loader.go @@ -19,7 +19,6 @@ along with this program. If not, see package main import ( - "errors" "flag" "fmt" "log" @@ -246,20 +245,6 @@ func loadConfig() (ldrCfg *config.CGRConfig) { return } -func importData(cfg *config.CGRConfig) (err error) { - if cfg.LoaderCgrCfg().TpID == utils.EmptyString { - return errors.New("TPid required") - } - csvImporter := engine.TPCSVImporter{ - TPid: cfg.LoaderCgrCfg().TpID, - DirPath: *dataPath, - Sep: cfg.LoaderCgrCfg().FieldSeparator, - Verbose: *verbose, - ImportID: *importID, - } - return csvImporter.Run() -} - func getLoader(cfg *config.CGRConfig) (loader engine.LoadReader, err error) { if gprefix := utils.MetaGoogleAPI + utils.ConcatenatedKeySep; strings.HasPrefix(*dataPath, gprefix) { // Default load from csv files to dataDb diff --git a/config/accountscfg.go b/config/accountscfg.go index 75b959ed9..b9c44b9aa 100644 --- a/config/accountscfg.go +++ b/config/accountscfg.go @@ -139,17 +139,11 @@ func (acS *AccountSCfg) loadFromJSONCfg(jsnCfg *AccountSJsonCfg) (err error) { // AsMapInterface returns the config as a map[string]interface{} func (acS AccountSCfg) AsMapInterface(string) interface{} { - opts := map[string]interface{}{ - utils.MetaProfileIDs: acS.Opts.ProfileIDs, - utils.MetaUsage: acS.Opts.Usage, - utils.MetaProfileIgnoreFilters: acS.Opts.ProfileIgnoreFilters, - } mp := map[string]interface{}{ utils.EnabledCfg: acS.Enabled, utils.IndexedSelectsCfg: acS.IndexedSelects, utils.NestedFieldsCfg: acS.NestedFields, utils.MaxIterations: acS.MaxIterations, - utils.OptsCfg: opts, } if acS.AttributeSConns != nil { mp[utils.AttributeSConnsCfg] = getInternalJSONConns(acS.AttributeSConns) @@ -160,21 +154,6 @@ func (acS AccountSCfg) AsMapInterface(string) interface{} { if acS.ThresholdSConns != nil { mp[utils.ThresholdSConnsCfg] = getInternalJSONConns(acS.ThresholdSConns) } - if acS.StringIndexedFields != nil { - mp[utils.StringIndexedFieldsCfg] = utils.CloneStringSlice(*acS.StringIndexedFields) - } - if acS.PrefixIndexedFields != nil { - mp[utils.PrefixIndexedFieldsCfg] = utils.CloneStringSlice(*acS.PrefixIndexedFields) - } - if acS.SuffixIndexedFields != nil { - mp[utils.SuffixIndexedFieldsCfg] = utils.CloneStringSlice(*acS.SuffixIndexedFields) - } - if acS.ExistsIndexedFields != nil { - mp[utils.ExistsIndexedFieldsCfg] = utils.CloneStringSlice(*acS.ExistsIndexedFields) - } - if acS.NotExistsIndexedFields != nil { - mp[utils.NotExistsIndexedFieldsCfg] = utils.CloneStringSlice(*acS.NotExistsIndexedFields) - } if acS.MaxUsage != nil { mp[utils.MaxUsage] = acS.MaxUsage.String() } diff --git a/console/load_tp_from_stordb.go b/console/load_tp_from_stordb.go deleted file mode 100644 index dec32b8f7..000000000 --- a/console/load_tp_from_stordb.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package console - -/* -import ( - "github.com/cgrates/cgrates/utils" -) - -func init() { - c := &LoadTpFromStorDb{ - name: "load_tp_from_stordb", - rpcMethod: utils.APIerSv1LoadTariffPlanFromStorDb, - } - commands[c.Name()] = c - c.CommandExecuter = &CommandExecuter{c} -} - -// Commander implementation -type LoadTpFromStorDb struct { - name string - rpcMethod string - rpcParams *v1.AttrLoadTpFromStorDb - rpcResult string - *CommandExecuter -} - -func (self *LoadTpFromStorDb) Name() string { - return self.name -} - -func (self *LoadTpFromStorDb) RpcMethod() string { - return self.rpcMethod -} - -func (self *LoadTpFromStorDb) RpcParams(reset bool) interface{} { - if reset || self.rpcParams == nil { - self.rpcParams = &v1.AttrLoadTpFromStorDb{} - } - return self.rpcParams -} - -func (self *LoadTpFromStorDb) PostprocessRpcParams() error { - return nil -} - -func (self *LoadTpFromStorDb) RpcResult() interface{} { - var s string - return &s -} -*/ diff --git a/console/set_stordb_versions.go b/console/set_stordb_versions.go deleted file mode 100644 index 7b81a3a56..000000000 --- a/console/set_stordb_versions.go +++ /dev/null @@ -1,62 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package console - -/* -func init() { - c := &CmdSetStorDBVersions{ - name: "set_stordb_versions", - rpcMethod: utils.APIerSv1SetStorDBVersions, - rpcParams: &v1.SetVersionsArg{}, - } - commands[c.Name()] = c - c.CommandExecuter = &CommandExecuter{c} -} - -type CmdSetStorDBVersions struct { - name string - rpcMethod string - rpcParams *v1.SetVersionsArg - *CommandExecuter -} - -func (self *CmdSetStorDBVersions) Name() string { - return self.name -} - -func (self *CmdSetStorDBVersions) RpcMethod() string { - return self.rpcMethod -} - -func (self *CmdSetStorDBVersions) RpcParams(reset bool) interface{} { - if reset || self.rpcParams == nil { - self.rpcParams = &v1.SetVersionsArg{} - } - return self.rpcParams -} - -func (self *CmdSetStorDBVersions) PostprocessRpcParams() error { - return nil -} - -func (self *CmdSetStorDBVersions) RpcResult() interface{} { - var atr string - return &atr -} -*/ diff --git a/console/stordb_versions.go b/console/stordb_versions.go deleted file mode 100644 index 1c14429d1..000000000 --- a/console/stordb_versions.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package console - -import ( - "github.com/cgrates/cgrates/engine" -) - -func init() { - c := &CmdGetStorDBVersions{ - name: "stordb_versions", - // rpcMethod: utils.APIerSv1GetStorDBVersions, - } - commands[c.Name()] = c - c.CommandExecuter = &CommandExecuter{c} -} - -// Commander implementation -type CmdGetStorDBVersions struct { - name string - rpcMethod string - rpcParams *EmptyWrapper - *CommandExecuter -} - -func (self *CmdGetStorDBVersions) Name() string { - return self.name -} - -func (self *CmdGetStorDBVersions) RpcMethod() string { - return self.rpcMethod -} - -func (self *CmdGetStorDBVersions) RpcParams(reset bool) interface{} { - if reset || self.rpcParams == nil { - self.rpcParams = &EmptyWrapper{} - } - return self.rpcParams -} - -func (self *CmdGetStorDBVersions) PostprocessRpcParams() error { - return nil -} - -func (self *CmdGetStorDBVersions) RpcResult() interface{} { - s := engine.Versions{} - return &s -} - -func (self *CmdGetStorDBVersions) ClientArgs() (args []string) { - return -} diff --git a/engine/storage_interface.go b/engine/storage_interface.go index e4ef88c87..1352dde26 100644 --- a/engine/storage_interface.go +++ b/engine/storage_interface.go @@ -106,12 +106,6 @@ type DataDBDriver interface { config.ConfigDB } -type StorDB interface { - Storage - LoadReader - LoadWriter -} - type LoadStorage interface { Storage LoadReader diff --git a/engine/storage_internal_datadb.go b/engine/storage_internal_datadb.go index 227067fee..c52a64b3d 100644 --- a/engine/storage_internal_datadb.go +++ b/engine/storage_internal_datadb.go @@ -32,7 +32,7 @@ import ( "github.com/cgrates/cgrates/utils" ) -// InternalDB is used as a DataDB and a StorDB +// InternalDB is used as a DataDB type InternalDB struct { stringIndexedFields []string prefixIndexedFields []string @@ -63,20 +63,6 @@ func NewInternalDB(stringIndexedFields, prefixIndexedFields []string, } } -// SetStringIndexedFields set the stringIndexedFields, used at StorDB reload (is thread safe) -func (iDB *InternalDB) SetStringIndexedFields(stringIndexedFields []string) { - iDB.indexedFieldsMutex.Lock() - iDB.stringIndexedFields = stringIndexedFields - iDB.indexedFieldsMutex.Unlock() -} - -// SetPrefixIndexedFields set the prefixIndexedFields, used at StorDB reload (is thread safe) -func (iDB *InternalDB) SetPrefixIndexedFields(prefixIndexedFields []string) { - iDB.indexedFieldsMutex.Lock() - iDB.prefixIndexedFields = prefixIndexedFields - iDB.indexedFieldsMutex.Unlock() -} - // Close only to implement Storage interface func (iDB *InternalDB) Close() {} diff --git a/engine/storage_internal_stordb.go b/engine/storage_internal_stordb.go deleted file mode 100644 index ccc6a4a9c..000000000 --- a/engine/storage_internal_stordb.go +++ /dev/null @@ -1,479 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -import ( - "strings" - - "github.com/cgrates/cgrates/utils" -) - -// GetTpIds implements LoadReader interface -func (iDB *InternalDB) GetTpIds(colName string) (ids []string, err error) { - tpIDs := make(utils.StringSet) - if colName == utils.EmptyString { // if colName is empty we need to parse all partitions - for _, conNm := range utils.CacheStorDBPartitions { // iterate through all columns - for _, key := range iDB.db.GetItemIDs(conNm, utils.EmptyString) { - tpIDs.Add(strings.Split(key, utils.InInFieldSep)[0]) - } - } - } else { - for _, key := range iDB.db.GetItemIDs(utils.CacheStorDBPartitions[colName], utils.EmptyString) { - tpIDs.Add(strings.Split(key, utils.InInFieldSep)[0]) - } - } - return tpIDs.AsSlice(), nil -} - -func (iDB *InternalDB) GetTpTableIds(tpid, table string, distinct []string, - filters map[string]string, paginator *utils.PaginatorWithSearch) (ids []string, err error) { - fullIDs := iDB.db.GetItemIDs(utils.CacheStorDBPartitions[table], tpid) - idSet := make(utils.StringSet) - for _, fullID := range fullIDs { - idSet.Add(fullID[len(tpid)+1:]) - } - ids = idSet.AsSlice() - return -} - -func (iDB *InternalDB) GetTPResources(tpid, tenant, id string) (resources []*utils.TPResourceProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPResources, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPResources, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - resources = append(resources, x.(*utils.TPResourceProfile)) - - } - if len(resources) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPStats(tpid, tenant, id string) (stats []*utils.TPStatProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPStats, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPStats, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - stats = append(stats, x.(*utils.TPStatProfile)) - - } - if len(stats) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPThresholds(tpid, tenant, id string) (ths []*utils.TPThresholdProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPThresholds, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPThresholds, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - ths = append(ths, x.(*utils.TPThresholdProfile)) - - } - if len(ths) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPFilters(tpid, tenant, id string) (fltrs []*utils.TPFilterProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPFilters, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPFilters, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - fltrs = append(fltrs, x.(*utils.TPFilterProfile)) - - } - if len(fltrs) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPRoutes(tpid, tenant, id string) (supps []*utils.TPRouteProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPRoutes, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPRoutes, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - supps = append(supps, x.(*utils.TPRouteProfile)) - - } - if len(supps) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPAttributes(tpid, tenant, id string) (attrs []*utils.TPAttributeProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPAttributes, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPAttributes, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - attrs = append(attrs, x.(*utils.TPAttributeProfile)) - - } - if len(attrs) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPChargers(tpid, tenant, id string) (cpps []*utils.TPChargerProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPChargers, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPChargers, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - cpps = append(cpps, x.(*utils.TPChargerProfile)) - - } - if len(cpps) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPDispatcherProfiles(tpid, tenant, id string) (dpps []*utils.TPDispatcherProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPDispatchers, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPDispatchers, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - dpps = append(dpps, x.(*utils.TPDispatcherProfile)) - - } - if len(dpps) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPDispatcherHosts(tpid, tenant, id string) (dpps []*utils.TPDispatcherHost, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPDispatcherHosts, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPDispatcherHosts, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - dpps = append(dpps, x.(*utils.TPDispatcherHost)) - - } - if len(dpps) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPRateProfiles(tpid, tenant, id string) (tpPrfs []*utils.TPRateProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPRateProfiles, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPRateProfiles, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - tpPrfs = append(tpPrfs, x.(*utils.TPRateProfile)) - } - if len(tpPrfs) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPActionProfiles(tpid, tenant, id string) (tpPrfs []*utils.TPActionProfile, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPActionProfiles, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPActionProfiles, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - tpPrfs = append(tpPrfs, x.(*utils.TPActionProfile)) - } - if len(tpPrfs) == 0 { - return nil, utils.ErrNotFound - } - return -} - -func (iDB *InternalDB) GetTPAccounts(tpid, tenant, id string) (tpPrfs []*utils.TPAccount, err error) { - key := tpid - if tenant != utils.EmptyString { - key += utils.ConcatenatedKeySep + tenant - } - if id != utils.EmptyString { - key += utils.ConcatenatedKeySep + id - } - ids := iDB.db.GetItemIDs(utils.CacheTBLTPAccounts, key) - for _, id := range ids { - x, ok := iDB.db.Get(utils.CacheTBLTPAccounts, id) - if !ok || x == nil { - return nil, utils.ErrNotFound - } - tpPrfs = append(tpPrfs, x.(*utils.TPAccount)) - } - if len(tpPrfs) == 0 { - return nil, utils.ErrNotFound - } - return -} - -//implement LoadWriter interface -func (iDB *InternalDB) RemTpData(table, tpid string, args map[string]string) (err error) { - if table == utils.EmptyString { - return iDB.Flush(utils.EmptyString) - } - key := tpid - if args != nil { - if tag, has := args["tag"]; has { - key += utils.ConcatenatedKeySep + tag - } else if id, has := args["id"]; has { - key += utils.ConcatenatedKeySep + args["tenant"] + - utils.ConcatenatedKeySep + id - } - } - ids := iDB.db.GetItemIDs(utils.CacheStorDBPartitions[table], key) - for _, id := range ids { - iDB.db.Remove(utils.CacheStorDBPartitions[table], id, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPResources(resources []*utils.TPResourceProfile) (err error) { - if len(resources) == 0 { - return nil - } - for _, resource := range resources { - iDB.db.Set(utils.CacheTBLTPResources, utils.ConcatenatedKey(resource.TPid, resource.Tenant, resource.ID), resource, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPStats(stats []*utils.TPStatProfile) (err error) { - if len(stats) == 0 { - return nil - } - for _, stat := range stats { - iDB.db.Set(utils.CacheTBLTPStats, utils.ConcatenatedKey(stat.TPid, stat.Tenant, stat.ID), stat, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPThresholds(thresholds []*utils.TPThresholdProfile) (err error) { - if len(thresholds) == 0 { - return nil - } - - for _, threshold := range thresholds { - iDB.db.Set(utils.CacheTBLTPThresholds, utils.ConcatenatedKey(threshold.TPid, threshold.Tenant, threshold.ID), threshold, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPFilters(filters []*utils.TPFilterProfile) (err error) { - if len(filters) == 0 { - return nil - } - - for _, filter := range filters { - iDB.db.Set(utils.CacheTBLTPFilters, utils.ConcatenatedKey(filter.TPid, filter.Tenant, filter.ID), filter, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPRoutes(routes []*utils.TPRouteProfile) (err error) { - if len(routes) == 0 { - return nil - } - for _, route := range routes { - iDB.db.Set(utils.CacheTBLTPRoutes, utils.ConcatenatedKey(route.TPid, route.Tenant, route.ID), route, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPAttributes(attributes []*utils.TPAttributeProfile) (err error) { - if len(attributes) == 0 { - return nil - } - - for _, attribute := range attributes { - iDB.db.Set(utils.CacheTBLTPAttributes, utils.ConcatenatedKey(attribute.TPid, attribute.Tenant, attribute.ID), attribute, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPChargers(cpps []*utils.TPChargerProfile) (err error) { - if len(cpps) == 0 { - return nil - } - - for _, cpp := range cpps { - iDB.db.Set(utils.CacheTBLTPChargers, utils.ConcatenatedKey(cpp.TPid, cpp.Tenant, cpp.ID), cpp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPDispatcherProfiles(dpps []*utils.TPDispatcherProfile) (err error) { - if len(dpps) == 0 { - return nil - } - - for _, dpp := range dpps { - iDB.db.Set(utils.CacheTBLTPDispatchers, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} -func (iDB *InternalDB) SetTPDispatcherHosts(dpps []*utils.TPDispatcherHost) (err error) { - if len(dpps) == 0 { - return nil - } - for _, dpp := range dpps { - iDB.db.Set(utils.CacheTBLTPDispatcherHosts, utils.ConcatenatedKey(dpp.TPid, dpp.Tenant, dpp.ID), dpp, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPRateProfiles(tpPrfs []*utils.TPRateProfile) (err error) { - if len(tpPrfs) == 0 { - return nil - } - for _, tpPrf := range tpPrfs { - iDB.db.Set(utils.CacheTBLTPRateProfiles, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPActionProfiles(tpPrfs []*utils.TPActionProfile) (err error) { - if len(tpPrfs) == 0 { - return nil - } - for _, tpPrf := range tpPrfs { - iDB.db.Set(utils.CacheTBLTPActionProfiles, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} - -func (iDB *InternalDB) SetTPAccounts(tpPrfs []*utils.TPAccount) (err error) { - if len(tpPrfs) == 0 { - return nil - } - for _, tpPrf := range tpPrfs { - iDB.db.Set(utils.CacheTBLTPAccounts, utils.ConcatenatedKey(tpPrf.TPid, tpPrf.Tenant, tpPrf.ID), tpPrf, nil, - cacheCommit(utils.NonTransactional), utils.NonTransactional) - } - return -} diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index bb432d5a4..c6a06c087 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -237,7 +237,7 @@ type MongoStorage struct { ctxTTL time.Duration ctxTTLMutex sync.RWMutex // used for TTL reload db string - storageType string // datadb, stordb + storageType string // datadb ms utils.Marshaler cdrsIndexes []string cnter *utils.Counter @@ -322,13 +322,7 @@ func (ms *MongoStorage) ensureIndexesForCol(col string) (err error) { // exporte if err = ms.enusureIndex(col, true, "id"); err != nil { return } - //StorDB - case utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers, - utils.TBLTPDispatcherHosts, utils.TBLTPChargers, - utils.TBLTPRoutes, utils.TBLTPThresholds: - if err = ms.enusureIndex(col, true, "tpid", "id"); err != nil { - return - } + case utils.CDRsTBL: if err = ms.enusureIndex(col, true, MetaOriginLow, RunIDLow, OriginIDLow); err != nil { @@ -376,16 +370,6 @@ func (ms *MongoStorage) EnsureIndexes(cols ...string) (err error) { } } } - if ms.storageType == utils.StorDB { - for _, col := range []string{utils.TBLTPStats, utils.TBLTPResources, utils.TBLTPDispatchers, - utils.TBLTPDispatcherHosts, utils.TBLTPChargers, - utils.TBLTPRoutes, utils.TBLTPThresholds, - utils.CDRsTBL, utils.SessionCostsTBL} { - if err = ms.ensureIndexesForCol(col); err != nil { - return - } - } - } return } diff --git a/engine/storage_mysql.go b/engine/storage_mysql.go index 7f796f670..7b61414f0 100644 --- a/engine/storage_mysql.go +++ b/engine/storage_mysql.go @@ -54,7 +54,6 @@ func NewMySQLStorage(host, port, name, user, password string, return &SQLStorage{ DB: mySQLStorage.DB, db: mySQLStorage.db, - StorDB: mySQLStorage, SQLImpl: mySQLStorage, }, nil } diff --git a/engine/storage_postgres.go b/engine/storage_postgres.go index 096bd342b..9daa7a166 100644 --- a/engine/storage_postgres.go +++ b/engine/storage_postgres.go @@ -19,41 +19,9 @@ along with this program. If not, see package engine import ( - "fmt" - "time" - "github.com/cgrates/cgrates/utils" - "gorm.io/driver/postgres" - "gorm.io/gorm" ) -// NewPostgresStorage returns the posgres storDB -func NewPostgresStorage(host, port, name, user, password, sslmode string, maxConn, maxIdleConn int, connMaxLifetime time.Duration) (*SQLStorage, error) { - connectString := fmt.Sprintf("host=%s port=%s dbname=%s user=%s password=%s sslmode=%s", host, port, name, user, password, sslmode) - db, err := gorm.Open(postgres.Open(connectString), &gorm.Config{AllowGlobalUpdate: true}) - if err != nil { - return nil, err - } - postgressStorage := new(PostgresStorage) - if postgressStorage.DB, err = db.DB(); err != nil { - return nil, err - } - if err = postgressStorage.DB.Ping(); err != nil { - return nil, err - } - postgressStorage.DB.SetMaxIdleConns(maxIdleConn) - postgressStorage.DB.SetMaxOpenConns(maxConn) - postgressStorage.DB.SetConnMaxLifetime(connMaxLifetime) - //db.LogMode(true) - postgressStorage.db = db - return &SQLStorage{ - DB: postgressStorage.DB, - db: postgressStorage.db, - StorDB: postgressStorage, - SQLImpl: postgressStorage, - }, nil -} - type PostgresStorage struct { SQLStorage } @@ -81,22 +49,6 @@ func (poS *PostgresStorage) SetVersions(vrs Versions, overwrite bool) (err error return } -func (poS *PostgresStorage) extraFieldsExistsQry(field string) string { - return fmt.Sprintf(" extra_fields ?'%s'", field) -} - -func (poS *PostgresStorage) extraFieldsValueQry(field, value string) string { - return fmt.Sprintf(" (extra_fields ->> '%s') = '%s'", field, value) -} - -func (poS *PostgresStorage) notExtraFieldsExistsQry(field string) string { - return fmt.Sprintf(" NOT extra_fields ?'%s'", field) -} - -func (poS *PostgresStorage) notExtraFieldsValueQry(field, value string) string { - return fmt.Sprintf(" NOT (extra_fields ?'%s' AND (extra_fields ->> '%s') = '%s')", field, field, value) -} - func (poS *PostgresStorage) GetStorageType() string { return utils.Postgres } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 3e85f95f9..fbdf0940c 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -40,7 +40,6 @@ type SQLImpl interface { type SQLStorage struct { DB *sql.DB db *gorm.DB - StorDB SQLImpl } diff --git a/engine/tpexporter.go b/engine/tpexporter.go deleted file mode 100644 index ce263c94f..000000000 --- a/engine/tpexporter.go +++ /dev/null @@ -1,315 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -import ( - "archive/zip" - "bytes" - "encoding/csv" - "errors" - "fmt" - "io" - "os" - "path" - "unicode/utf8" - - "github.com/cgrates/cgrates/utils" -) - -func NewTPExporter(storDB LoadStorage, tpID, expPath, fileFormat, sep string, compress bool) (*TPExporter, error) { - if len(tpID) == 0 { - return nil, errors.New("Missing TPid") - } - if utils.CSV != fileFormat { - return nil, errors.New("Unsupported file format") - } - tpExp := &TPExporter{ - storDB: storDB, - tpID: tpID, - exportPath: expPath, - fileFormat: fileFormat, - compress: compress, - cacheBuff: new(bytes.Buffer), - } - runeSep, _ := utf8.DecodeRuneInString(sep) - if runeSep == utf8.RuneError { - return nil, fmt.Errorf("Invalid field separator: %s", sep) - } else { - tpExp.sep = runeSep - } - if compress { - if len(tpExp.exportPath) == 0 { - tpExp.zipWritter = zip.NewWriter(tpExp.cacheBuff) - } else { - if fileOut, err := os.Create(path.Join(tpExp.exportPath, "tpexport.zip")); err != nil { - return nil, err - } else { - tpExp.zipWritter = zip.NewWriter(fileOut) - } - } - } - return tpExp, nil -} - -// Export TariffPlan to a folder -type TPExporter struct { - storDB LoadStorage // StorDb connection handle - tpID string // Load data on this tpid - exportPath string // Directory path to export to - fileFormat string // The file format - sep rune // Separator in the csv file - compress bool // Use ZIP to compress the folder - cacheBuff *bytes.Buffer // Will be written in case of no output folder is specified - zipWritter *zip.Writer // Populated in case of needing to write zipped content - exportedFiles []string -} - -func (tpExp *TPExporter) Run() error { - tpExp.removeFiles() // Make sure we clean the folder before starting with new one - var withError bool - toExportMap := make(map[string][]interface{}) - - storDataResources, err := tpExp.storDB.GetTPResources(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpResources)) - withError = true - } - for _, sd := range storDataResources { - sdModels := APItoModelResource(sd) - for _, sdModel := range sdModels { - toExportMap[utils.ResourcesCsv] = append(toExportMap[utils.ResourcesCsv], sdModel) - } - } - - storDataStats, err := tpExp.storDB.GetTPStats(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpStats)) - withError = true - } - for _, sd := range storDataStats { - sdModels := APItoModelStats(sd) - for _, sdModel := range sdModels { - toExportMap[utils.StatsCsv] = append(toExportMap[utils.StatsCsv], sdModel) - } - } - - storDataThresholds, err := tpExp.storDB.GetTPThresholds(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpThresholds)) - withError = true - } - for _, sd := range storDataThresholds { - sdModels := APItoModelTPThreshold(sd) - for _, sdModel := range sdModels { - toExportMap[utils.ThresholdsCsv] = append(toExportMap[utils.ThresholdsCsv], sdModel) - } - } - - storDataFilters, err := tpExp.storDB.GetTPFilters(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpFilters)) - withError = true - } - for _, sd := range storDataFilters { - sdModels := APItoModelTPFilter(sd) - for _, sdModel := range sdModels { - toExportMap[utils.FiltersCsv] = append(toExportMap[utils.FiltersCsv], sdModel) - } - } - - storDataRoutes, err := tpExp.storDB.GetTPRoutes(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpRoutes)) - withError = true - } - for _, sd := range storDataRoutes { - sdModels := APItoModelTPRoutes(sd) - for _, sdModel := range sdModels { - toExportMap[utils.RoutesCsv] = append(toExportMap[utils.RoutesCsv], sdModel) - } - } - - storeDataAttributes, err := tpExp.storDB.GetTPAttributes(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpAttributes)) - withError = true - } - for _, sd := range storeDataAttributes { - sdModels := APItoModelTPAttribute(sd) - for _, sdModel := range sdModels { - toExportMap[utils.AttributesCsv] = append(toExportMap[utils.AttributesCsv], sdModel) - } - } - - storDataChargers, err := tpExp.storDB.GetTPChargers(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpChargers)) - withError = true - } - for _, sd := range storDataChargers { - sdModels := APItoModelTPCharger(sd) - for _, sdModel := range sdModels { - toExportMap[utils.ChargersCsv] = append(toExportMap[utils.ChargersCsv], sdModel) - } - } - - storDataDispatcherProfiles, err := tpExp.storDB.GetTPDispatcherProfiles(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpDispatcherProfiles)) - withError = true - } - for _, sd := range storDataDispatcherProfiles { - sdModels := APItoModelTPDispatcherProfile(sd) - for _, sdModel := range sdModels { - toExportMap[utils.DispatcherProfilesCsv] = append(toExportMap[utils.DispatcherProfilesCsv], sdModel) - } - } - - storDataDispatcherHosts, err := tpExp.storDB.GetTPDispatcherHosts(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpDispatcherHosts)) - withError = true - } - for _, sd := range storDataDispatcherHosts { - toExportMap[utils.DispatcherHostsCsv] = append(toExportMap[utils.DispatcherHostsCsv], APItoModelTPDispatcherHost(sd)) - } - - storDataRateProfiles, err := tpExp.storDB.GetTPRateProfiles(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpRateProfiles)) - withError = true - } - for _, sd := range storDataRateProfiles { - sdModels := APItoModelTPRateProfile(sd) - for _, sdModel := range sdModels { - toExportMap[utils.RatesCsv] = append(toExportMap[utils.RatesCsv], sdModel) - } - } - - storDataActionProfiles, err := tpExp.storDB.GetTPActionProfiles(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpActionProfiles)) - withError = true - } - for _, sd := range storDataActionProfiles { - sdModels := APItoModelTPActionProfile(sd) - for _, sdModel := range sdModels { - toExportMap[utils.ActionsCsv] = append(toExportMap[utils.ActionsCsv], sdModel) - } - } - - storDataAccounts, err := tpExp.storDB.GetTPAccounts(tpExp.tpID, "", "") - if err != nil && err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning(fmt.Sprintf("<%s> error: %s, when getting %s from stordb for export", utils.AdminS, err, utils.TpAccounts)) - withError = true - } - for _, sd := range storDataAccounts { - sdModels := APItoModelTPAccount(sd) - for _, sdModel := range sdModels { - toExportMap[utils.AccountsCsv] = append(toExportMap[utils.AccountsCsv], sdModel) - } - } - - if len(toExportMap) == 0 { // if we don't have anything to export we return not found error - return utils.ErrNotFound - } - - for fileName, storData := range toExportMap { - if err := tpExp.writeOut(fileName, storData); err != nil { - tpExp.removeFiles() - return err - } - tpExp.exportedFiles = append(tpExp.exportedFiles, fileName) - } - - if tpExp.compress { - if err := tpExp.zipWritter.Close(); err != nil { - return err - } - } - if withError { // if we export something but have error we return partially executed - return utils.ErrPartiallyExecuted - } - return nil -} - -// Some export did not end up well, remove the files here -func (tpExp *TPExporter) removeFiles() error { - if len(tpExp.exportPath) == 0 { - return nil - } - for _, fileName := range tpExp.exportedFiles { - os.Remove(path.Join(tpExp.exportPath, fileName)) - } - return nil -} - -// General method to write the content out to a file on path or zip archive -func (tpExp *TPExporter) writeOut(fileName string, tpData []interface{}) error { - if len(tpData) == 0 { - return nil - } - var fWriter io.Writer - var writerOut utils.NopFlushWriter - var err error - - if tpExp.compress { - if fWriter, err = tpExp.zipWritter.Create(fileName); err != nil { - return err - } - } else if len(tpExp.exportPath) != 0 { - if f, err := os.Create(path.Join(tpExp.exportPath, fileName)); err != nil { - return err - } else { - fWriter = f - defer f.Close() - } - - } else { - fWriter = new(bytes.Buffer) - } - - switch tpExp.fileFormat { - case utils.CSV: - csvWriter := csv.NewWriter(fWriter) - csvWriter.Comma = tpExp.sep - writerOut = csvWriter - default: - writerOut = utils.NewNopFlushWriter(fWriter) - } - for _, tpItem := range tpData { - record, err := CsvDump(tpItem) - if err != nil { - return err - } - if err := writerOut.Write(record); err != nil { - return err - } - } - writerOut.Flush() // In case of .csv will dump data on hdd - return nil -} - -func (tpExp *TPExporter) ExportStats() *utils.ExportedTPStats { - return &utils.ExportedTPStats{ExportPath: tpExp.exportPath, ExportedFiles: tpExp.exportedFiles, Compressed: tpExp.compress} -} - -func (tpExp *TPExporter) GetCacheBuffer() *bytes.Buffer { - return tpExp.cacheBuff -} diff --git a/engine/tpimporter_csv.go b/engine/tpimporter_csv.go deleted file mode 100644 index 3912a98f3..000000000 --- a/engine/tpimporter_csv.go +++ /dev/null @@ -1,207 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package engine - -import ( - "fmt" - "log" - "os" - - "github.com/cgrates/cgrates/utils" -) - -// Import tariff plan from csv into storDb -type TPCSVImporter struct { - TPid string // Load data on this tpid - StorDB LoadWriter // StorDb connection handle - DirPath string // Directory path to import from - Sep rune // Separator in the csv file - Verbose bool // If true will print a detailed information instead of silently discarding it - ImportID string // Use this to differentiate between imports (eg: when autogenerating fields like RatingProfileID - csvr LoadReader -} - -// Maps csv file to handler which should process it. Defined like this since tests on 1.0.3 were failing on Travis. -// Change it to func(string) error as soon as Travis updates. -var fileHandlers = map[string]func(*TPCSVImporter, string) error{ - utils.ResourcesCsv: (*TPCSVImporter).importResources, - utils.StatsCsv: (*TPCSVImporter).importStats, - utils.ThresholdsCsv: (*TPCSVImporter).importThresholds, - utils.FiltersCsv: (*TPCSVImporter).importFilters, - utils.RoutesCsv: (*TPCSVImporter).importRoutes, - utils.AttributesCsv: (*TPCSVImporter).importAttributeProfiles, - utils.ChargersCsv: (*TPCSVImporter).importChargerProfiles, - utils.DispatcherProfilesCsv: (*TPCSVImporter).importDispatcherProfiles, - utils.DispatcherHostsCsv: (*TPCSVImporter).importDispatcherHosts, - utils.RatesCsv: (*TPCSVImporter).importRateProfiles, - utils.ActionsCsv: (*TPCSVImporter).importActionProfiles, - utils.AccountsCsv: (*TPCSVImporter).importAccounts, -} - -func (tpImp *TPCSVImporter) Run() error { - tpImp.csvr = NewFileCSVStorage(tpImp.Sep, tpImp.DirPath) - files, _ := os.ReadDir(tpImp.DirPath) - var withErrors bool - for _, f := range files { - fHandler, hasName := fileHandlers[f.Name()] - if !hasName { - continue - } - if err := fHandler(tpImp, f.Name()); err != nil { - withErrors = true - utils.Logger.Err(fmt.Sprintf(" Importing file: %s, got error: %s", f.Name(), err.Error())) - } - } - if withErrors { - return utils.ErrPartiallyExecuted - } - return nil -} - -func (tpImp *TPCSVImporter) importResources(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rls, err := tpImp.csvr.GetTPResources(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPResources(rls) -} - -func (tpImp *TPCSVImporter) importStats(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - sts, err := tpImp.csvr.GetTPStats(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPStats(sts) -} - -func (tpImp *TPCSVImporter) importThresholds(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - sts, err := tpImp.csvr.GetTPThresholds(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPThresholds(sts) -} - -func (tpImp *TPCSVImporter) importFilters(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - sts, err := tpImp.csvr.GetTPFilters(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPFilters(sts) -} - -func (tpImp *TPCSVImporter) importRoutes(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rls, err := tpImp.csvr.GetTPRoutes(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPRoutes(rls) -} - -func (tpImp *TPCSVImporter) importAttributeProfiles(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rls, err := tpImp.csvr.GetTPAttributes(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPAttributes(rls) -} - -func (tpImp *TPCSVImporter) importChargerProfiles(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rls, err := tpImp.csvr.GetTPChargers(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPChargers(rls) -} - -func (tpImp *TPCSVImporter) importDispatcherProfiles(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - dpps, err := tpImp.csvr.GetTPDispatcherProfiles(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPDispatcherProfiles(dpps) -} - -func (tpImp *TPCSVImporter) importDispatcherHosts(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - dpps, err := tpImp.csvr.GetTPDispatcherHosts(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPDispatcherHosts(dpps) -} - -func (tpImp *TPCSVImporter) importRateProfiles(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rpps, err := tpImp.csvr.GetTPRateProfiles(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPRateProfiles(rpps) -} - -func (tpImp *TPCSVImporter) importActionProfiles(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rpps, err := tpImp.csvr.GetTPActionProfiles(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPActionProfiles(rpps) -} - -func (tpImp *TPCSVImporter) importAccounts(fn string) error { - if tpImp.Verbose { - log.Printf("Processing file: <%s> ", fn) - } - rpps, err := tpImp.csvr.GetTPAccounts(tpImp.TPid, "", "") - if err != nil { - return err - } - return tpImp.StorDB.SetTPAccounts(rpps) -} diff --git a/engine/version.go b/engine/version.go index 76339c0fc..467f3fca5 100644 --- a/engine/version.go +++ b/engine/version.go @@ -34,10 +34,6 @@ var ( utils.RQF: "cgr-migrator -exec=*filters", utils.Routes: "cgr-migrator -exec=*routes", } - storDBVers = map[string]string{ - utils.CostDetails: "cgr-migrator -exec=*cost_details", - utils.SessionSCosts: "cgr-migrator -exec=*sessions_costs", - } allVers map[string]string // init will fill this with a merge of data+stor ) @@ -46,9 +42,6 @@ func init() { for k, v := range dataDBVers { allVers[k] = v } - for k, v := range storDBVers { - allVers[k] = v - } } // Versions will keep trac of various item versions @@ -114,15 +107,10 @@ func (vers Versions) Compare(curent Versions, storType string, isDataDB bool) st var message map[string]string switch storType { case utils.Mongo: - if isDataDB { - message = dataDBVers - } else { - message = storDBVers - } + message = dataDBVers case utils.Internal: message = allVers - case utils.Postgres, utils.MySQL: - message = storDBVers + case utils.Redis: message = dataDBVers } @@ -154,36 +142,13 @@ func CurrentDataDBVersions() Versions { } } -// CurrentStorDBVersions returns the needed StorDB versions -func CurrentStorDBVersions() Versions { - return Versions{ - utils.CostDetails: 2, - utils.SessionSCosts: 3, - utils.CDRs: 2, - utils.TpFilters: 1, - utils.TpThresholds: 1, - utils.TpRoutes: 1, - utils.TpStats: 1, - utils.TpResources: 1, - utils.TpResource: 1, - utils.TpChargers: 1, - utils.TpDispatchers: 1, - utils.TpRateProfiles: 1, - utils.TpActionProfiles: 1, - } -} - -// CurrentAllDBVersions returns the both DataDB and StorDB versions +// CurrentAllDBVersions returns the both DataDB func CurrentAllDBVersions() Versions { dataDBVersions := CurrentDataDBVersions() - storDBVersions := CurrentStorDBVersions() allVersions := make(Versions) for k, v := range dataDBVersions { allVersions[k] = v } - for k, v := range storDBVersions { - allVersions[k] = v - } return allVersions } @@ -191,14 +156,9 @@ func CurrentAllDBVersions() Versions { func CurrentDBVersions(storType string, isDataDB bool) Versions { switch storType { case utils.Mongo: - if isDataDB { - return CurrentDataDBVersions() - } - return CurrentStorDBVersions() + return CurrentDataDBVersions() case utils.Internal: return CurrentAllDBVersions() - case utils.Postgres, utils.MySQL: - return CurrentStorDBVersions() case utils.Redis: return CurrentDataDBVersions() } diff --git a/migrator/migrator.go b/migrator/migrator.go index a2b486cb7..44c8106ad 100644 --- a/migrator/migrator.go +++ b/migrator/migrator.go @@ -67,10 +67,6 @@ func (m *Migrator) Migrate(taskIDs []string) (err error, stats map[string]int) { return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), fmt.Sprintf("error: <%s> when seting versions for DataDB", err.Error())), nil } - if err != nil { - return utils.NewCGRError(utils.Migrator, utils.ServerErrorCaps, err.Error(), - fmt.Sprintf("error: <%s> when seting versions for StorDB", err.Error())), nil - } case utils.MetaEnsureIndexes: if m.dmOut.DataManager().DataDB().GetStorageType() == utils.Mongo { diff --git a/migrator/migrator_stordb.go b/migrator/migrator_stordb.go deleted file mode 100644 index 0910b799c..000000000 --- a/migrator/migrator_stordb.go +++ /dev/null @@ -1,30 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package migrator - -import ( - "github.com/cgrates/cgrates/engine" -) - -type MigratorStorDB interface { - createV1SMCosts() (err error) - renameV1SMCosts() (err error) - StorDB() engine.StorDB - close() -} diff --git a/migrator/storage_map_stordb.go b/migrator/storage_map_stordb.go deleted file mode 100644 index 4c87927b4..000000000 --- a/migrator/storage_map_stordb.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package migrator - -import ( - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -func newInternalStorDBMigrator(stor engine.StorDB) (iDBMig *internalStorDBMigrator) { - return &internalStorDBMigrator{ - storDB: &stor, - iDB: stor.(*engine.InternalDB), - } -} - -type internalStorDBMigrator struct { - storDB *engine.StorDB - iDB *engine.InternalDB - dataKeys []string - qryIdx *int -} - -func (iDBMig *internalStorDBMigrator) close() {} - -func (iDBMig *internalStorDBMigrator) StorDB() engine.StorDB { - return *iDBMig.storDB -} - -//SMCost methods -//rename -func (iDBMig *internalStorDBMigrator) renameV1SMCosts() (err error) { - return utils.ErrNotImplemented -} - -func (iDBMig *internalStorDBMigrator) createV1SMCosts() (err error) { - return utils.ErrNotImplemented -} diff --git a/migrator/storage_mongo_stordb.go b/migrator/storage_mongo_stordb.go deleted file mode 100644 index 177f99d73..000000000 --- a/migrator/storage_mongo_stordb.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package migrator - -import ( - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" -) - -func newMongoStorDBMigrator(stor engine.StorDB) (mgoMig *mongoStorDBMigrator) { - return &mongoStorDBMigrator{ - storDB: &stor, - mgoDB: stor.(*engine.MongoStorage), - cursor: nil, - } -} - -type mongoStorDBMigrator struct { - storDB *engine.StorDB - mgoDB *engine.MongoStorage - cursor *mongo.Cursor -} - -func (mgoMig *mongoStorDBMigrator) close() { - mgoMig.mgoDB.Close() -} - -func (mgoMig *mongoStorDBMigrator) StorDB() engine.StorDB { - return *mgoMig.storDB -} - -//SMCost methods -//rename -func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) { - if err = v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()); err != nil { - return err - } - return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), - bson.D{{Key: "create", Value: utils.SessionCostsTBL}}).Err() -} - -func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) { - v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()) - v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext()) - return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), - bson.D{{Key: "create", Value: utils.OldSMCosts}, {Key: "size", Value: 1024}, {Key: "capped", Value: true}}).Err() -} diff --git a/migrator/storage_sql.go b/migrator/storage_sql.go index 76b827e40..6be338e8f 100644 --- a/migrator/storage_sql.go +++ b/migrator/storage_sql.go @@ -23,19 +23,10 @@ import ( "fmt" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" _ "github.com/go-sql-driver/mysql" ) -func newMigratorSQL(stor engine.StorDB) (sqlMig *migratorSQL) { - return &migratorSQL{ - storDB: &stor, - sqlStorage: stor.(*engine.SQLStorage), - } -} - type migratorSQL struct { - storDB *engine.StorDB sqlStorage *engine.SQLStorage rowIter *sql.Rows } @@ -44,15 +35,8 @@ func (sqlMig *migratorSQL) close() { sqlMig.sqlStorage.Close() } -func (sqlMig *migratorSQL) StorDB() engine.StorDB { - return *sqlMig.storDB -} - func (mgSQL *migratorSQL) renameV1SMCosts() (err error) { qry := "RENAME TABLE sm_costs TO session_costs;" - if mgSQL.StorDB().GetStorageType() == utils.Postgres { - qry = "ALTER TABLE sm_costs RENAME TO session_costs" - } if _, err := mgSQL.sqlStorage.DB.Exec(qry); err != nil { return err } @@ -61,22 +45,7 @@ func (mgSQL *migratorSQL) renameV1SMCosts() (err error) { func (mgSQL *migratorSQL) createV1SMCosts() (err error) { qry := fmt.Sprint("CREATE TABLE sm_costs ( id int(11) NOT NULL AUTO_INCREMENT, run_id varchar(64) NOT NULL, origin_host varchar(64) NOT NULL, origin_id varchar(128) NOT NULL, cost_source varchar(64) NOT NULL, `usage` BIGINT NOT NULL, cost_details MEDIUMTEXT, created_at TIMESTAMP NULL,deleted_at TIMESTAMP NULL, PRIMARY KEY (`id`),UNIQUE KEY costid ( run_id),KEY origin_idx (origin_host, origin_id),KEY run_origin_idx (run_id, origin_id),KEY deleted_at_idx (deleted_at));") - if mgSQL.StorDB().GetStorageType() == utils.Postgres { - qry = ` - CREATE TABLE sm_costs ( - id SERIAL PRIMARY KEY, - run_id VARCHAR(64) NOT NULL, - origin_host VARCHAR(64) NOT NULL, - origin_id VARCHAR(128) NOT NULL, - cost_source VARCHAR(64) NOT NULL, - usage BIGINT NOT NULL, - cost_details jsonb, - created_at TIMESTAMP WITH TIME ZONE, - deleted_at TIMESTAMP WITH TIME ZONE NULL, - UNIQUE ( run_id) - ); - ` - } + if _, err := mgSQL.sqlStorage.DB.Exec("DROP TABLE IF EXISTS session_costs;"); err != nil { return err } diff --git a/services/adminsv1.go b/services/adminsv1.go index b22f3c8f5..72210c70d 100644 --- a/services/adminsv1.go +++ b/services/adminsv1.go @@ -82,11 +82,6 @@ func (apiService *AdminSv1Service) Start(ctx *context.Context, _ context.CancelF return } - // apiService.stopChan = make(chan struct{}) - // storDBChan := make(chan engine.StorDB, 1) - // apiService.storDB.RegisterSyncChan(storDBChan) - // stordb := <-storDBChan - apiService.Lock() defer apiService.Unlock() diff --git a/services/cgr-engine.go b/services/cgr-engine.go index 565554f81..42c6304bb 100644 --- a/services/cgr-engine.go +++ b/services/cgr-engine.go @@ -75,7 +75,6 @@ func NewCGREngine(cfg *config.CGRConfig, cM *engine.ConnManager, shdWg *sync.Wai utils.SessionS: new(sync.WaitGroup), utils.SIPAgent: new(sync.WaitGroup), utils.StatS: new(sync.WaitGroup), - utils.StorDB: new(sync.WaitGroup), utils.ThresholdS: new(sync.WaitGroup), utils.ActionS: new(sync.WaitGroup), utils.AccountS: new(sync.WaitGroup), diff --git a/utils/consts.go b/utils/consts.go index eca5645a3..c980d7dfd 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -45,13 +45,8 @@ var ( CacheActionProfilesFilterIndexes, CacheAccountsFilterIndexes, CacheReverseFilterIndexes, CacheAccounts}) - storDBPartition = NewStringSet([]string{ - CacheTBLTPResources, CacheTBLTPStats, CacheTBLTPThresholds, CacheTBLTPFilters, CacheSessionCostsTBL, CacheCDRsTBL, - CacheTBLTPRoutes, CacheTBLTPAttributes, CacheTBLTPChargers, CacheTBLTPDispatchers, - CacheTBLTPDispatcherHosts, CacheTBLTPRateProfiles, CacheTBLTPActionProfiles, CacheTBLTPAccounts, CacheVersions}) - // CachePartitions enables creation of cache partitions - CachePartitions = JoinStringSet(extraDBPartition, DataDBPartitions /*,storDBPartition*/) + CachePartitions = JoinStringSet(extraDBPartition, DataDBPartitions) CacheInstanceToPrefix = map[string]string{ CacheResourceProfiles: ResourceProfilesPrefix, @@ -114,23 +109,6 @@ var ( CacheAccounts: CacheAccountsFilterIndexes, } - CacheStorDBPartitions = map[string]string{ - TBLTPResources: CacheTBLTPResources, - TBLTPStats: CacheTBLTPStats, - TBLTPThresholds: CacheTBLTPThresholds, - TBLTPFilters: CacheTBLTPFilters, - SessionCostsTBL: CacheSessionCostsTBL, - CDRsTBL: CacheCDRsTBL, - TBLTPRoutes: CacheTBLTPRoutes, - TBLTPAttributes: CacheTBLTPAttributes, - TBLTPChargers: CacheTBLTPChargers, - TBLTPDispatchers: CacheTBLTPDispatchers, - TBLTPDispatcherHosts: CacheTBLTPDispatcherHosts, - TBLTPRateProfiles: CacheTBLTPRateProfiles, - TBLTPActionProfiles: CacheTBLTPActionProfiles, - TBLTPAccounts: CacheTBLTPAccounts, - } - // ProtectedSFlds are the fields that sessions should not alter ProtectedSFlds = NewStringSet([]string{OriginHost, OriginID, Usage}) @@ -352,7 +330,6 @@ const ( MetaDumpToJSON = "*dump_to_json" NonTransactional = "" DataDB = "data_db" - StorDB = "stor_db" NotFoundCaps = "NOT_FOUND" ServerErrorCaps = "SERVER_ERROR" MandatoryIEMissingCaps = "MANDATORY_IE_MISSING" @@ -490,7 +467,6 @@ const ( Subscribers = "Subscribers" //Destinations = "Destinations" MetaSubscribers = "*subscribers" - MetaStorDB = "*stordb" MetaDataDB = "*datadb" MetaWeight = "*weight" MetaLC = "*lc" @@ -1188,11 +1164,11 @@ const ( AdminSv1GetFiltersCount = "AdminSv1.GetFiltersCount" AdminSv1GetFilters = "AdminSv1.GetFilters" // APIerSv1SetDataDBVersions = "APIerSv1.SetDataDBVersions" - // APIerSv1SetStorDBVersions = "APIerSv1.SetStorDBVersions" + // APIerSv1GetActions = "APIerSv1.GetActions" // APIerSv1GetDataDBVersions = "APIerSv1.GetDataDBVersions" - // APIerSv1GetStorDBVersions = "APIerSv1.GetStorDBVersions" + // APIerSv1GetCDRs = "APIerSv1.GetCDRs" // APIerSv1GetTPActions = "APIerSv1.GetTPActions" // APIerSv1GetTPAttributeProfile = "APIerSv1.GetTPAttributeProfile" @@ -1263,7 +1239,7 @@ const ( // APIerSv1 TP APIs const ( -// APIerSv1LoadTariffPlanFromStorDb = "APIerSv1.LoadTariffPlanFromStorDb" + // APIerSv1RemoveTPFromFolder = "APIerSv1.RemoveTPFromFolder" ) @@ -1678,20 +1654,8 @@ const ( // storDB - CacheTBLTPResources = "*tp_resources" - CacheTBLTPStats = "*tp_stats" - CacheTBLTPThresholds = "*tp_thresholds" - CacheTBLTPFilters = "*tp_filters" - CacheSessionCostsTBL = "*session_costs" - CacheCDRsTBL = "*cdrs" - CacheTBLTPRoutes = "*tp_routes" - CacheTBLTPAttributes = "*tp_attributes" - CacheTBLTPChargers = "*tp_chargers" - CacheTBLTPDispatchers = "*tp_dispatcher_profiles" - CacheTBLTPDispatcherHosts = "*tp_dispatcher_hosts" - CacheTBLTPRateProfiles = "*tp_rate_profiles" - CacheTBLTPActionProfiles = "*tp_action_profiles" - CacheTBLTPAccounts = "*tp_accounts" + CacheSessionCostsTBL = "*session_costs" + CacheCDRsTBL = "*cdrs" ) // Prefix for indexing @@ -1728,16 +1692,6 @@ const ( GoogleCredentialsFileName = "credentials.json" ) -// StorDB -var ( - PostgressSSLModeDisable = "disable" - PostgressSSLModeAllow = "allow" - PostgressSSLModePrefer = "prefer" - PostgressSSLModeRequire = "require" - PostgressSSLModeVerifyCa = "verify-ca" - PostgressSSLModeVerifyFull = "verify-full" -) - // GeneralCfg const ( NodeIDCfg = "node_id" @@ -2171,13 +2125,6 @@ const ( OutDataDBPasswordCfg = "out_datadb_password" OutDataDBEncodingCfg = "out_datadb_encoding" OutDataDBRedisSentinel = "out_redis_sentinel" - OutStorDBTypeCfg = "out_stordb_type" - OutStorDBHostCfg = "out_stordb_host" - OutStorDBPortCfg = "out_stordb_port" - OutStorDBNameCfg = "out_stordb_name" - OutStorDBUserCfg = "out_stordb_user" - OutStorDBPasswordCfg = "out_stordb_password" - OutStorDBOptsCfg = "out_stordb_opts" OutDataDBOptsCfg = "out_datadb_opts" UsersFiltersCfg = "users_filters" ) @@ -2630,20 +2577,11 @@ const ( CpuPathCgr = "cpu.prof" //Cgr loader CgrLoader = "cgr-loader" - StorDBTypeCgr = "stordb_type" - StorDBHostCgr = "stordb_host" - StorDBPortCgr = "stordb_port" - StorDBNameCgr = "stordb_name" - StorDBUserCgr = "stordb_user" - StorDBPasswdCgr = "stordb_passwd" CachingArgCgr = "caching" FieldSepCgr = "field_sep" ImportIDCgr = "import_id" DisableReverseCgr = "disable_reverse_mappings" - FlushStorDB = "flush_stordb" RemoveCgr = "remove" - FromStorDBCgr = "from_stordb" - ToStorDBcgr = "to_stordb" CacheSAddress = "caches_address" SchedulerAddress = "scheduler_address" //Cgr migrator diff --git a/utils/dynamicweight.go b/utils/dynamicweight.go index c656f83db..12f832d2a 100644 --- a/utils/dynamicweight.go +++ b/utils/dynamicweight.go @@ -24,7 +24,7 @@ import ( "strings" ) -// NewDynamicWeightsFromString creates a DynamicWeight list based on the string received from .csv/StorDB +// NewDynamicWeightsFromString creates a DynamicWeight list based on the string received from .csv func NewDynamicWeightsFromString(s, dWSep, fltrSep string) (dWs DynamicWeights, err error) { if len(s) == 0 { return DynamicWeights{{}}, nil