mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
CDRs refactoring - removing old derived charging, APIs redesign
This commit is contained in:
@@ -19,9 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v1
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -52,7 +49,7 @@ func (apier *ApierV1) GetEventCost(attrs utils.AttrGetCallCost, reply *engine.Ev
|
||||
}
|
||||
|
||||
// Retrieves CDRs based on the filters
|
||||
func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error {
|
||||
func (apier *ApierV1) GetCDRs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error {
|
||||
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -69,18 +66,6 @@ func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.External
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove Cdrs out of CDR storage
|
||||
func (apier *ApierV1) RemCdrs(attrs utils.AttrRemCdrs, reply *string) error {
|
||||
if len(attrs.CgrIds) == 0 {
|
||||
return fmt.Errorf("%s:CgrIds", utils.ErrMandatoryIeMissing.Error())
|
||||
}
|
||||
if _, _, err := apier.CdrDb.GetCDRs(&utils.CDRsFilter{CGRIDs: attrs.CgrIds}, true); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// New way of removing CDRs
|
||||
func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error {
|
||||
cdrsFilter, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
|
||||
@@ -94,10 +79,35 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// New way of (re-)rating CDRs
|
||||
func (apier *ApierV1) RateCDRs(attrs utils.AttrRateCDRs, reply *string) error {
|
||||
if apier.CDRs == nil {
|
||||
return errors.New("CDRS_NOT_ENABLED")
|
||||
}
|
||||
return apier.CDRs.Call("CDRsV1.RateCDRs", attrs, reply)
|
||||
// Receive CDRs via RPC methods
|
||||
type CDRsV1 struct {
|
||||
CDRs *engine.CDRServer
|
||||
}
|
||||
|
||||
// ProcessCDR will process a CDR in CGRateS internal format
|
||||
func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
|
||||
return cdrSv1.CDRs.V1ProcessCDR(cdr, reply)
|
||||
}
|
||||
|
||||
// ProcessExternalCDR will process a CDR in external format
|
||||
func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
|
||||
return cdrSv1.CDRs.V1ProcessExternalCDR(cdr, reply)
|
||||
}
|
||||
|
||||
// RateCDRs can re-/rate remotely CDRs
|
||||
func (cdrSv1 *CDRsV1) RateCDRs(arg *engine.ArgRateCDRs, reply *string) error {
|
||||
return cdrSv1.CDRs.V1RateCDRs(arg, reply)
|
||||
}
|
||||
|
||||
// StoreSMCost will store
|
||||
func (cdrSv1 *CDRsV1) StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply *string) error {
|
||||
return cdrSv1.CDRs.V1StoreSessionCost(attr, reply)
|
||||
}
|
||||
|
||||
func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error {
|
||||
return cdrSv1.CDRs.V1CountCDRs(args, reply)
|
||||
}
|
||||
|
||||
func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
|
||||
return cdrSv1.CDRs.V1GetCDRs(args, reply)
|
||||
}
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
package v1
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Receive CDRs via RPC methods
|
||||
type CdrsV1 struct {
|
||||
CdrSrv *engine.CdrServer
|
||||
}
|
||||
|
||||
// Designed for CGR internal usage
|
||||
// Deprecated
|
||||
func (self *CdrsV1) ProcessCdr(cdr *engine.CDR, reply *string) error {
|
||||
return self.ProcessCDR(cdr, reply)
|
||||
}
|
||||
|
||||
// Designed for CGR internal usage
|
||||
func (self *CdrsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
|
||||
return self.CdrSrv.V1ProcessCDR(cdr, reply)
|
||||
}
|
||||
|
||||
// Designed for external programs feeding CDRs to CGRateS
|
||||
// Deprecated
|
||||
func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) error {
|
||||
return self.ProcessExternalCDR(cdr, reply)
|
||||
}
|
||||
|
||||
// Designed for external programs feeding CDRs to CGRateS
|
||||
func (self *CdrsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
|
||||
if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remotely (re)rating
|
||||
// Deprecated
|
||||
func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error {
|
||||
return self.RateCDRs(attrs, reply)
|
||||
}
|
||||
|
||||
// Remotely (re)rating
|
||||
func (self *CdrsV1) RateCDRs(attrs utils.AttrRateCdrs, reply *string) error {
|
||||
cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone())
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
if err := self.CdrSrv.RateCDRs(cdrsFltr, attrs.SendToStats); err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *CdrsV1) StoreSMCost(attr engine.AttrCDRSStoreSMCost, reply *string) error {
|
||||
return self.CdrSrv.V1StoreSMCost(attr, reply)
|
||||
}
|
||||
|
||||
func (self *CdrsV1) CountCDRs(args utils.RPCCDRsFilter, reply *int64) error {
|
||||
return self.CdrSrv.V1CountCDRs(args, reply)
|
||||
}
|
||||
|
||||
func (self *CdrsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
|
||||
return self.CdrSrv.V1GetCDRs(args, reply)
|
||||
}
|
||||
@@ -19,14 +19,14 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
package v2
|
||||
|
||||
import (
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
|
||||
// Retrieves CDRs based on the filters
|
||||
func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error {
|
||||
func (apier *ApierV2) GetCDRs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error {
|
||||
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
|
||||
if err != nil {
|
||||
return utils.NewErrServerError(err)
|
||||
@@ -46,7 +46,7 @@ func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.Extern
|
||||
return nil
|
||||
}
|
||||
|
||||
func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error {
|
||||
func (apier *ApierV2) CountCDRs(attrs utils.RPCCDRsFilter, reply *int64) error {
|
||||
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
|
||||
if err != nil {
|
||||
if err.Error() != utils.NotFoundCaps {
|
||||
@@ -64,19 +64,14 @@ func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error {
|
||||
}
|
||||
|
||||
// Receive CDRs via RPC methods, not included with APIer because it has way less dependencies and can be standalone
|
||||
type CdrsV2 struct {
|
||||
v1.CdrsV1
|
||||
type CDRsV2 struct {
|
||||
v1.CDRsV1
|
||||
}
|
||||
|
||||
func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error {
|
||||
return self.CdrSrv.V2StoreSMCost(args, reply)
|
||||
func (cdrSv2 *CDRsV2) StoreSessionCost(args *engine.ArgsV2CDRSStoreSMCost, reply *string) error {
|
||||
return cdrSv2.CDRs.V2StoreSessionCost(args, reply)
|
||||
}
|
||||
|
||||
func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error {
|
||||
return self.CdrSrv.V2ProcessCDR(cgrEv, reply)
|
||||
}
|
||||
|
||||
// RateCDRs will rate/re-rate CDRs using ChargerS
|
||||
func (self *CdrsV2) RateCDRs(args *utils.RPCCDRsFilter, reply *string) error {
|
||||
return self.CdrSrv.V2RateCDRs(args, reply)
|
||||
func (cdrSv2 *CDRsV2) ProcessCDR(arg *engine.ArgV2ProcessCDR, reply *string) error {
|
||||
return cdrSv2.CDRs.V2ProcessCDR(arg, reply)
|
||||
}
|
||||
|
||||
@@ -610,16 +610,15 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
|
||||
return
|
||||
}
|
||||
}
|
||||
cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn,
|
||||
attrSConn, usersConn,
|
||||
cdrServer := engine.NewCDRServer(cfg, cdrDb, dm,
|
||||
ralConn, pubSubConn, attrSConn, usersConn,
|
||||
thresholdSConn, statsConn, chargerSConn, filterS)
|
||||
cdrServer.SetTimeToLive(cfg.GeneralCfg().ResponseCacheTTL, nil)
|
||||
utils.Logger.Info("Registering CDRS HTTP Handlers.")
|
||||
cdrServer.RegisterHandlersToServer(server)
|
||||
utils.Logger.Info("Registering CDRS RPC service.")
|
||||
cdrSrv := v1.CdrsV1{CdrSrv: cdrServer}
|
||||
cdrSrv := v1.CDRsV1{CDRs: cdrServer}
|
||||
server.RpcRegister(&cdrSrv)
|
||||
server.RpcRegister(&v2.CdrsV2{CdrsV1: cdrSrv})
|
||||
server.RpcRegister(&v2.CDRsV2{CDRsV1: cdrSrv})
|
||||
// Make the cdr server available for internal communication
|
||||
server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this)
|
||||
internalCdrSChan <- cdrServer // Signal that cdrS is operational
|
||||
|
||||
@@ -21,8 +21,8 @@ package main
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cgrates/cgrates/apier/v1"
|
||||
"github.com/cgrates/cgrates/apier/v2"
|
||||
v1 "github.com/cgrates/cgrates/apier/v1"
|
||||
v2 "github.com/cgrates/cgrates/apier/v2"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -178,8 +178,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
|
||||
|
||||
utils.RegisterRpcParams("PubSubV1", &engine.PubSub{})
|
||||
utils.RegisterRpcParams("UsersV1", &engine.UserMap{})
|
||||
utils.RegisterRpcParams("", &v1.CdrsV1{})
|
||||
utils.RegisterRpcParams("", &v2.CdrsV2{})
|
||||
utils.RegisterRpcParams("", &v1.CDRsV1{})
|
||||
utils.RegisterRpcParams("", &v2.CDRsV2{})
|
||||
utils.RegisterRpcParams("", &v1.SMGenericV1{})
|
||||
utils.RegisterRpcParams("", responder)
|
||||
utils.RegisterRpcParams("", apierRpcV1)
|
||||
|
||||
@@ -32,8 +32,8 @@ CREATE TABLE cdrs (
|
||||
UNIQUE KEY cdrrun (cgrid, run_id, origin_id)
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS sessions_costs;
|
||||
CREATE TABLE sessions_costs (
|
||||
DROP TABLE IF EXISTS session_costs;
|
||||
CREATE TABLE session_costs (
|
||||
id int(11) NOT NULL AUTO_INCREMENT,
|
||||
cgrid varchar(40) NOT NULL,
|
||||
run_id varchar(64) NOT NULL,
|
||||
|
||||
@@ -35,8 +35,8 @@ DROP INDEX IF EXISTS deleted_at_cp_idx;
|
||||
CREATE INDEX deleted_at_cp_idx ON cdrs (deleted_at);
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS sessions_costs;
|
||||
CREATE TABLE sessions_costs (
|
||||
DROP TABLE IF EXISTS session_costs;
|
||||
CREATE TABLE session_costs (
|
||||
id SERIAL PRIMARY KEY,
|
||||
cgrid VARCHAR(40) NOT NULL,
|
||||
run_id VARCHAR(64) NOT NULL,
|
||||
@@ -50,10 +50,10 @@ CREATE TABLE sessions_costs (
|
||||
UNIQUE (cgrid, run_id)
|
||||
);
|
||||
DROP INDEX IF EXISTS cgrid_sessionscost_idx;
|
||||
CREATE INDEX cgrid_sessionscost_idx ON sessions_costs (cgrid, run_id);
|
||||
CREATE INDEX cgrid_sessionscost_idx ON session_costs (cgrid, run_id);
|
||||
DROP INDEX IF EXISTS origin_sessionscost_idx;
|
||||
CREATE INDEX origin_sessionscost_idx ON sessions_costs (origin_host, origin_id);
|
||||
CREATE INDEX origin_sessionscost_idx ON session_costs (origin_host, origin_id);
|
||||
DROP INDEX IF EXISTS run_origin_sessionscost_idx;
|
||||
CREATE INDEX run_origin_sessionscost_idx ON sessions_costs (run_id, origin_id);
|
||||
CREATE INDEX run_origin_sessionscost_idx ON session_costs (run_id, origin_id);
|
||||
DROP INDEX IF EXISTS deleted_at_sessionscost_idx;
|
||||
CREATE INDEX deleted_at_sessionscost_idx ON sessions_costs (deleted_at);
|
||||
CREATE INDEX deleted_at_sessionscost_idx ON session_costs (deleted_at);
|
||||
|
||||
940
engine/cdrs.go
940
engine/cdrs.go
File diff suppressed because it is too large
Load Diff
@@ -382,7 +382,7 @@ func (t CDRsql) TableName() string {
|
||||
return utils.CDRsTBL
|
||||
}
|
||||
|
||||
type SessionsCostsSQL struct {
|
||||
type SessionCostsSQL struct {
|
||||
ID int64
|
||||
Cgrid string
|
||||
RunID string
|
||||
@@ -395,8 +395,8 @@ type SessionsCostsSQL struct {
|
||||
DeletedAt *time.Time
|
||||
}
|
||||
|
||||
func (t SessionsCostsSQL) TableName() string {
|
||||
return utils.SessionsCostsTBL
|
||||
func (t SessionCostsSQL) TableName() string {
|
||||
return utils.SessionCostsTBL
|
||||
}
|
||||
|
||||
type TBLVersion struct {
|
||||
|
||||
@@ -298,17 +298,17 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
if err = ms.EnusureIndex(utils.SessionsCostsTBL, true, CGRIDLow,
|
||||
if err = ms.EnusureIndex(utils.SessionCostsTBL, true, CGRIDLow,
|
||||
RunIDLow); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, OriginHostLow,
|
||||
if err = ms.EnusureIndex(utils.SessionCostsTBL, false, OriginHostLow,
|
||||
OriginIDLow); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, RunIDLow,
|
||||
if err = ms.EnusureIndex(utils.SessionCostsTBL, false, RunIDLow,
|
||||
OriginIDLow); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -993,7 +993,7 @@ func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
|
||||
return nil
|
||||
}
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(utils.SessionsCostsTBL).InsertOne(sctx, smc)
|
||||
_, err = ms.getCol(utils.SessionCostsTBL).InsertOne(sctx, smc)
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -1004,7 +1004,7 @@ func (ms *MongoStorage) RemoveSMCost(smc *SMCost) error {
|
||||
remParams = bson.M{"cgrid": smc.CGRID, "runid": smc.RunID}
|
||||
}
|
||||
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
_, err = ms.getCol(utils.SessionsCostsTBL).DeleteMany(sctx, remParams)
|
||||
_, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, remParams)
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -1024,7 +1024,7 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
|
||||
filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "")
|
||||
}
|
||||
err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
|
||||
cur, err := ms.getCol(utils.SessionsCostsTBL).Find(sctx, filter)
|
||||
cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ func (self *SQLStorage) IsDBEmpty() (resp bool, err error) {
|
||||
utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionTriggers,
|
||||
utils.TBLTPAccountActions, utils.TBLTPDerivedChargers, utils.TBLTPUsers,
|
||||
utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds,
|
||||
utils.TBLTPFilters, utils.SessionsCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans,
|
||||
utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans,
|
||||
utils.TBLVersions, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers,
|
||||
}
|
||||
for _, tbl := range tbls {
|
||||
@@ -745,7 +745,7 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
|
||||
return nil
|
||||
}
|
||||
tx := self.db.Begin()
|
||||
cd := &SessionsCostsSQL{
|
||||
cd := &SessionCostsSQL{
|
||||
Cgrid: smc.CGRID,
|
||||
RunID: smc.RunID,
|
||||
OriginHost: smc.OriginHost,
|
||||
@@ -765,12 +765,12 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
|
||||
|
||||
func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
|
||||
tx := self.db.Begin()
|
||||
var rmParam *SessionsCostsSQL
|
||||
var rmParam *SessionCostsSQL
|
||||
if smc != nil {
|
||||
rmParam = &SessionsCostsSQL{Cgrid: smc.CGRID,
|
||||
rmParam = &SessionCostsSQL{Cgrid: smc.CGRID,
|
||||
RunID: smc.RunID}
|
||||
}
|
||||
if err := tx.Where(rmParam).Delete(SessionsCostsSQL{}).Error; err != nil {
|
||||
if err := tx.Where(rmParam).Delete(SessionCostsSQL{}).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
@@ -781,7 +781,7 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
|
||||
// GetSMCosts is used to retrieve one or multiple SMCosts based on filter
|
||||
func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) {
|
||||
var smCosts []*SMCost
|
||||
filter := &SessionsCostsSQL{}
|
||||
filter := &SessionCostsSQL{}
|
||||
if cgrid != "" {
|
||||
filter.Cgrid = cgrid
|
||||
}
|
||||
@@ -795,7 +795,7 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
|
||||
if originIDPrefix != "" {
|
||||
q = self.db.Where(filter).Where(fmt.Sprintf("origin_id LIKE '%s%%'", originIDPrefix))
|
||||
}
|
||||
results := make([]*SessionsCostsSQL, 0)
|
||||
results := make([]*SessionCostsSQL, 0)
|
||||
if err := q.Find(&results).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -153,7 +153,7 @@ func (v2Cost *v2SessionsCost) V2toV3Cost() (cost *engine.SMCost) {
|
||||
return
|
||||
}
|
||||
|
||||
func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 *v2SessionsCost, err error) {
|
||||
func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionCostsSQL) (smV2 *v2SessionsCost, err error) {
|
||||
smV2 = new(v2SessionsCost)
|
||||
smV2.CGRID = smSql.Cgrid
|
||||
smV2.RunID = smSql.RunID
|
||||
@@ -168,8 +168,8 @@ func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2
|
||||
return
|
||||
}
|
||||
|
||||
func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionsCostsSQL) {
|
||||
smSql = new(engine.SessionsCostsSQL)
|
||||
func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionCostsSQL) {
|
||||
smSql = new(engine.SessionCostsSQL)
|
||||
smSql.Cgrid = v2Cost.CGRID
|
||||
smSql.RunID = v2Cost.RunID
|
||||
smSql.OriginHost = v2Cost.OriginHost
|
||||
|
||||
@@ -79,12 +79,12 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
|
||||
return err
|
||||
}
|
||||
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
|
||||
bson.D{{"create", utils.SessionsCostsTBL}}).Err()
|
||||
bson.D{{"create", utils.SessionCostsTBL}}).Err()
|
||||
}
|
||||
|
||||
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
|
||||
v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext())
|
||||
v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Drop(v1ms.mgoDB.GetContext())
|
||||
v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext())
|
||||
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
|
||||
bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err()
|
||||
}
|
||||
@@ -93,7 +93,7 @@ func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
|
||||
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
|
||||
if v1ms.cursor == nil {
|
||||
var cursor mongo.Cursor
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -113,12 +113,12 @@ func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err erro
|
||||
|
||||
//set
|
||||
func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost)
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost)
|
||||
return
|
||||
}
|
||||
|
||||
//remove
|
||||
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
_, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ func (mgSQL *migratorSQL) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
scSql := new(engine.SessionsCostsSQL)
|
||||
scSql := new(engine.SessionCostsSQL)
|
||||
mgSQL.rowIter.Scan(&scSql)
|
||||
v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql)
|
||||
|
||||
@@ -151,12 +151,12 @@ func (mgSQL *migratorSQL) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
|
||||
|
||||
func (mgSQL *migratorSQL) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
|
||||
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
|
||||
var rmParam *engine.SessionsCostsSQL
|
||||
var rmParam *engine.SessionCostsSQL
|
||||
if v2Cost != nil {
|
||||
rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID,
|
||||
rmParam = &engine.SessionCostsSQL{Cgrid: v2Cost.CGRID,
|
||||
RunID: v2Cost.RunID}
|
||||
}
|
||||
if err := tx.Where(rmParam).Delete(engine.SessionsCostsSQL{}).Error; err != nil {
|
||||
if err := tx.Where(rmParam).Delete(engine.SessionCostsSQL{}).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -765,67 +765,6 @@ type AttrRemCdrs struct {
|
||||
CgrIds []string // List of CgrIds to remove from storeDb
|
||||
}
|
||||
|
||||
type AttrRateCdrs struct {
|
||||
CgrIds []string // If provided, it will filter based on the cgrids present in list
|
||||
MediationRunIds []string // If provided, it will filter on mediation runid
|
||||
TORs []string // If provided, filter on TypeOfRecord
|
||||
CdrHosts []string // If provided, it will filter cdrhost
|
||||
CdrSources []string // If provided, it will filter cdrsource
|
||||
ReqTypes []string // If provided, it will fiter reqtype
|
||||
Tenants []string // If provided, it will filter tenant
|
||||
Categories []string // If provided, it will filter çategory
|
||||
Accounts []string // If provided, it will filter account
|
||||
Subjects []string // If provided, it will filter the rating subject
|
||||
DestinationPrefixes []string // If provided, it will filter on destination prefix
|
||||
OrderIdStart *int64 // Export from this order identifier
|
||||
OrderIdEnd *int64 // Export smaller than this order identifier
|
||||
TimeStart string // If provided, it will represent the starting of the CDRs interval (>=)
|
||||
TimeEnd string // If provided, it will represent the end of the CDRs interval (<)
|
||||
RerateErrors bool // Rerate previous CDRs with errors (makes sense for reqtype rated and pseudoprepaid
|
||||
RerateRated bool // Rerate CDRs which were previously rated (makes sense for reqtype rated and pseudoprepaid)
|
||||
SendToStats bool // Set to true if the CDRs should be sent to stats server
|
||||
}
|
||||
|
||||
func (attrRateCDRs *AttrRateCdrs) AsCDRsFilter(timezone string) (*CDRsFilter, error) {
|
||||
cdrFltr := &CDRsFilter{
|
||||
CGRIDs: attrRateCDRs.CgrIds,
|
||||
RunIDs: attrRateCDRs.MediationRunIds,
|
||||
OriginHosts: attrRateCDRs.CdrHosts,
|
||||
Sources: attrRateCDRs.CdrSources,
|
||||
ToRs: attrRateCDRs.TORs,
|
||||
RequestTypes: attrRateCDRs.ReqTypes,
|
||||
Tenants: attrRateCDRs.Tenants,
|
||||
Categories: attrRateCDRs.Categories,
|
||||
Accounts: attrRateCDRs.Accounts,
|
||||
Subjects: attrRateCDRs.Subjects,
|
||||
DestinationPrefixes: attrRateCDRs.DestinationPrefixes,
|
||||
OrderIDStart: attrRateCDRs.OrderIdStart,
|
||||
OrderIDEnd: attrRateCDRs.OrderIdEnd,
|
||||
}
|
||||
if aTime, err := ParseTimeDetectLayout(attrRateCDRs.TimeStart, timezone); err != nil {
|
||||
return nil, err
|
||||
} else if !aTime.IsZero() {
|
||||
cdrFltr.AnswerTimeStart = &aTime
|
||||
}
|
||||
if aTimeEnd, err := ParseTimeDetectLayout(attrRateCDRs.TimeEnd, timezone); err != nil {
|
||||
return nil, err
|
||||
} else if !aTimeEnd.IsZero() {
|
||||
cdrFltr.AnswerTimeEnd = &aTimeEnd
|
||||
}
|
||||
if attrRateCDRs.RerateErrors {
|
||||
cdrFltr.MinCost = Float64Pointer(-1.0)
|
||||
if !attrRateCDRs.RerateRated {
|
||||
cdrFltr.MaxCost = Float64Pointer(0.0)
|
||||
}
|
||||
} else if attrRateCDRs.RerateRated {
|
||||
cdrFltr.MinCost = Float64Pointer(0.0)
|
||||
}
|
||||
if attrRateCDRs.RerateErrors || attrRateCDRs.RerateRated {
|
||||
cdrFltr.NotRunIDs = append(cdrFltr.NotRunIDs, MetaRaw)
|
||||
}
|
||||
return cdrFltr, nil
|
||||
}
|
||||
|
||||
type AttrLoadTpFromFolder struct {
|
||||
FolderPath string // Take files from folder absolute path
|
||||
DryRun bool // Do not write to database but parse only
|
||||
@@ -1124,13 +1063,6 @@ type AttrGetCallCost struct {
|
||||
RunId string // Run Id
|
||||
}
|
||||
|
||||
type AttrRateCDRs struct {
|
||||
RPCCDRsFilter
|
||||
StoreCDRs *bool
|
||||
SendToStatS *bool // Set to true if the CDRs should be sent to stats server
|
||||
ReplicateCDRs *bool // Replicate results
|
||||
}
|
||||
|
||||
type AttrSetBalance struct {
|
||||
Tenant string
|
||||
Account string
|
||||
|
||||
@@ -906,7 +906,7 @@ const (
|
||||
TBLTPStats = "tp_stats"
|
||||
TBLTPThresholds = "tp_thresholds"
|
||||
TBLTPFilters = "tp_filters"
|
||||
SessionsCostsTBL = "sessions_costs"
|
||||
SessionCostsTBL = "session_costs"
|
||||
CDRsTBL = "cdrs"
|
||||
TBLTPSuppliers = "tp_suppliers"
|
||||
TBLTPAttributes = "tp_attributes"
|
||||
|
||||
Reference in New Issue
Block a user