diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go
index eadebf057..05ef3afed 100644
--- a/apier/v1/cdrs.go
+++ b/apier/v1/cdrs.go
@@ -19,9 +19,6 @@ along with this program. If not, see
package v1
import (
- "errors"
- "fmt"
-
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
@@ -52,7 +49,7 @@ func (apier *ApierV1) GetEventCost(attrs utils.AttrGetCallCost, reply *engine.Ev
}
// Retrieves CDRs based on the filters
-func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error {
+func (apier *ApierV1) GetCDRs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error {
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
@@ -69,18 +66,6 @@ func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.External
return nil
}
-// Remove Cdrs out of CDR storage
-func (apier *ApierV1) RemCdrs(attrs utils.AttrRemCdrs, reply *string) error {
- if len(attrs.CgrIds) == 0 {
- return fmt.Errorf("%s:CgrIds", utils.ErrMandatoryIeMissing.Error())
- }
- if _, _, err := apier.CdrDb.GetCDRs(&utils.CDRsFilter{CGRIDs: attrs.CgrIds}, true); err != nil {
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- return nil
-}
-
// New way of removing CDRs
func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error {
cdrsFilter, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
@@ -94,10 +79,35 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error
return nil
}
-// New way of (re-)rating CDRs
-func (apier *ApierV1) RateCDRs(attrs utils.AttrRateCDRs, reply *string) error {
- if apier.CDRs == nil {
- return errors.New("CDRS_NOT_ENABLED")
- }
- return apier.CDRs.Call("CDRsV1.RateCDRs", attrs, reply)
+// Receive CDRs via RPC methods
+type CDRsV1 struct {
+ CDRs *engine.CDRServer
+}
+
+// ProcessCDR will process a CDR in CGRateS internal format
+func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
+ return cdrSv1.CDRs.V1ProcessCDR(cdr, reply)
+}
+
+// ProcessExternalCDR will process a CDR in external format
+func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
+ return cdrSv1.CDRs.V1ProcessExternalCDR(cdr, reply)
+}
+
+// RateCDRs can re-/rate remotely CDRs
+func (cdrSv1 *CDRsV1) RateCDRs(arg *engine.ArgRateCDRs, reply *string) error {
+ return cdrSv1.CDRs.V1RateCDRs(arg, reply)
+}
+
+// StoreSMCost will store
+func (cdrSv1 *CDRsV1) StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply *string) error {
+ return cdrSv1.CDRs.V1StoreSessionCost(attr, reply)
+}
+
+func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error {
+ return cdrSv1.CDRs.V1CountCDRs(args, reply)
+}
+
+func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
+ return cdrSv1.CDRs.V1GetCDRs(args, reply)
}
diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go
deleted file mode 100644
index 56078a1bf..000000000
--- a/apier/v1/cdrsv1.go
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
-Copyright (C) ITsysCOM GmbH
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program. If not, see
-*/
-
-package v1
-
-import (
- "github.com/cgrates/cgrates/engine"
- "github.com/cgrates/cgrates/utils"
-)
-
-// Receive CDRs via RPC methods
-type CdrsV1 struct {
- CdrSrv *engine.CdrServer
-}
-
-// Designed for CGR internal usage
-// Deprecated
-func (self *CdrsV1) ProcessCdr(cdr *engine.CDR, reply *string) error {
- return self.ProcessCDR(cdr, reply)
-}
-
-// Designed for CGR internal usage
-func (self *CdrsV1) ProcessCDR(cdr *engine.CDR, reply *string) error {
- return self.CdrSrv.V1ProcessCDR(cdr, reply)
-}
-
-// Designed for external programs feeding CDRs to CGRateS
-// Deprecated
-func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) error {
- return self.ProcessExternalCDR(cdr, reply)
-}
-
-// Designed for external programs feeding CDRs to CGRateS
-func (self *CdrsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error {
- if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil {
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- return nil
-}
-
-// Remotely (re)rating
-// Deprecated
-func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error {
- return self.RateCDRs(attrs, reply)
-}
-
-// Remotely (re)rating
-func (self *CdrsV1) RateCDRs(attrs utils.AttrRateCdrs, reply *string) error {
- cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone())
- if err != nil {
- return utils.NewErrServerError(err)
- }
- if err := self.CdrSrv.RateCDRs(cdrsFltr, attrs.SendToStats); err != nil {
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- return nil
-}
-
-func (self *CdrsV1) StoreSMCost(attr engine.AttrCDRSStoreSMCost, reply *string) error {
- return self.CdrSrv.V1StoreSMCost(attr, reply)
-}
-
-func (self *CdrsV1) CountCDRs(args utils.RPCCDRsFilter, reply *int64) error {
- return self.CdrSrv.V1CountCDRs(args, reply)
-}
-
-func (self *CdrsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error {
- return self.CdrSrv.V1GetCDRs(args, reply)
-}
diff --git a/apier/v2/cdrs.go b/apier/v2/cdrs.go
index 130aaa39e..ab4e0d4b3 100644
--- a/apier/v2/cdrs.go
+++ b/apier/v2/cdrs.go
@@ -19,14 +19,14 @@ along with this program. If not, see
package v2
import (
- "github.com/cgrates/cgrates/apier/v1"
+ v1 "github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
// Retrieves CDRs based on the filters
-func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error {
+func (apier *ApierV2) GetCDRs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error {
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
@@ -46,7 +46,7 @@ func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.Extern
return nil
}
-func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error {
+func (apier *ApierV2) CountCDRs(attrs utils.RPCCDRsFilter, reply *int64) error {
cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
@@ -64,19 +64,14 @@ func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error {
}
// Receive CDRs via RPC methods, not included with APIer because it has way less dependencies and can be standalone
-type CdrsV2 struct {
- v1.CdrsV1
+type CDRsV2 struct {
+ v1.CDRsV1
}
-func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error {
- return self.CdrSrv.V2StoreSMCost(args, reply)
+func (cdrSv2 *CDRsV2) StoreSessionCost(args *engine.ArgsV2CDRSStoreSMCost, reply *string) error {
+ return cdrSv2.CDRs.V2StoreSessionCost(args, reply)
}
-func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error {
- return self.CdrSrv.V2ProcessCDR(cgrEv, reply)
-}
-
-// RateCDRs will rate/re-rate CDRs using ChargerS
-func (self *CdrsV2) RateCDRs(args *utils.RPCCDRsFilter, reply *string) error {
- return self.CdrSrv.V2RateCDRs(args, reply)
+func (cdrSv2 *CDRsV2) ProcessCDR(arg *engine.ArgV2ProcessCDR, reply *string) error {
+ return cdrSv2.CDRs.V2ProcessCDR(arg, reply)
}
diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go
index d87a034f1..b193896e5 100644
--- a/cmd/cgr-engine/cgr-engine.go
+++ b/cmd/cgr-engine/cgr-engine.go
@@ -610,16 +610,15 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection,
return
}
}
- cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn,
- attrSConn, usersConn,
+ cdrServer := engine.NewCDRServer(cfg, cdrDb, dm,
+ ralConn, pubSubConn, attrSConn, usersConn,
thresholdSConn, statsConn, chargerSConn, filterS)
- cdrServer.SetTimeToLive(cfg.GeneralCfg().ResponseCacheTTL, nil)
utils.Logger.Info("Registering CDRS HTTP Handlers.")
cdrServer.RegisterHandlersToServer(server)
utils.Logger.Info("Registering CDRS RPC service.")
- cdrSrv := v1.CdrsV1{CdrSrv: cdrServer}
+ cdrSrv := v1.CDRsV1{CDRs: cdrServer}
server.RpcRegister(&cdrSrv)
- server.RpcRegister(&v2.CdrsV2{CdrsV1: cdrSrv})
+ server.RpcRegister(&v2.CDRsV2{CDRsV1: cdrSrv})
// Make the cdr server available for internal communication
server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this)
internalCdrSChan <- cdrServer // Signal that cdrS is operational
diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go
index daf4b3169..4d25629c3 100755
--- a/cmd/cgr-engine/rater.go
+++ b/cmd/cgr-engine/rater.go
@@ -21,8 +21,8 @@ package main
import (
"fmt"
- "github.com/cgrates/cgrates/apier/v1"
- "github.com/cgrates/cgrates/apier/v2"
+ v1 "github.com/cgrates/cgrates/apier/v1"
+ v2 "github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
@@ -178,8 +178,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en
utils.RegisterRpcParams("PubSubV1", &engine.PubSub{})
utils.RegisterRpcParams("UsersV1", &engine.UserMap{})
- utils.RegisterRpcParams("", &v1.CdrsV1{})
- utils.RegisterRpcParams("", &v2.CdrsV2{})
+ utils.RegisterRpcParams("", &v1.CDRsV1{})
+ utils.RegisterRpcParams("", &v2.CDRsV2{})
utils.RegisterRpcParams("", &v1.SMGenericV1{})
utils.RegisterRpcParams("", responder)
utils.RegisterRpcParams("", apierRpcV1)
diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql
index 8cf89b7dc..3ceb65729 100644
--- a/data/storage/mysql/create_cdrs_tables.sql
+++ b/data/storage/mysql/create_cdrs_tables.sql
@@ -32,8 +32,8 @@ CREATE TABLE cdrs (
UNIQUE KEY cdrrun (cgrid, run_id, origin_id)
);
-DROP TABLE IF EXISTS sessions_costs;
-CREATE TABLE sessions_costs (
+DROP TABLE IF EXISTS session_costs;
+CREATE TABLE session_costs (
id int(11) NOT NULL AUTO_INCREMENT,
cgrid varchar(40) NOT NULL,
run_id varchar(64) NOT NULL,
diff --git a/data/storage/postgres/create_cdrs_tables.sql b/data/storage/postgres/create_cdrs_tables.sql
index 8bb674556..70e8b1b26 100644
--- a/data/storage/postgres/create_cdrs_tables.sql
+++ b/data/storage/postgres/create_cdrs_tables.sql
@@ -35,8 +35,8 @@ DROP INDEX IF EXISTS deleted_at_cp_idx;
CREATE INDEX deleted_at_cp_idx ON cdrs (deleted_at);
-DROP TABLE IF EXISTS sessions_costs;
-CREATE TABLE sessions_costs (
+DROP TABLE IF EXISTS session_costs;
+CREATE TABLE session_costs (
id SERIAL PRIMARY KEY,
cgrid VARCHAR(40) NOT NULL,
run_id VARCHAR(64) NOT NULL,
@@ -50,10 +50,10 @@ CREATE TABLE sessions_costs (
UNIQUE (cgrid, run_id)
);
DROP INDEX IF EXISTS cgrid_sessionscost_idx;
-CREATE INDEX cgrid_sessionscost_idx ON sessions_costs (cgrid, run_id);
+CREATE INDEX cgrid_sessionscost_idx ON session_costs (cgrid, run_id);
DROP INDEX IF EXISTS origin_sessionscost_idx;
-CREATE INDEX origin_sessionscost_idx ON sessions_costs (origin_host, origin_id);
+CREATE INDEX origin_sessionscost_idx ON session_costs (origin_host, origin_id);
DROP INDEX IF EXISTS run_origin_sessionscost_idx;
-CREATE INDEX run_origin_sessionscost_idx ON sessions_costs (run_id, origin_id);
+CREATE INDEX run_origin_sessionscost_idx ON session_costs (run_id, origin_id);
DROP INDEX IF EXISTS deleted_at_sessionscost_idx;
-CREATE INDEX deleted_at_sessionscost_idx ON sessions_costs (deleted_at);
+CREATE INDEX deleted_at_sessionscost_idx ON session_costs (deleted_at);
diff --git a/engine/cdrs.go b/engine/cdrs.go
index 6c2f2bc48..b8957c169 100644
--- a/engine/cdrs.go
+++ b/engine/cdrs.go
@@ -32,30 +32,28 @@ import (
"github.com/cgrates/rpcclient"
)
-var cdrServer *CdrServer // Share the server so we can use it in http handlers
+var cdrServer *CDRServer // Share the server so we can use it in http handlers
-type CallCostLog struct {
- CgrId string
- Source string
- RunId string
- Usage float64 // real usage (not increment rounded)
- CallCost *CallCost
- CheckDuplicate bool
-}
-
-// Handler for generic cgr cdr http
+// cgrCdrHandler handles CDRs received over HTTP REST
func cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
- cgrCdr, err := NewCgrCdrFromHttpReq(r, cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
+ cgrCdr, err := NewCgrCdrFromHttpReq(r,
+ cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
- utils.Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error()))
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> could not create CDR entry from http: %+v, err <%s>",
+ utils.CDRs, r.Form, err.Error()))
return
}
- if err := cdrServer.processCdr(cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error()))
+ cdr := cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
+ var ignored string
+ if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
+ utils.CDRs, cdr, err.Error()))
}
}
-// Handler for fs http
+// fsCdrHandler will handle CDRs received from FreeSWITCH over HTTP-JSON
func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
fsCdr, err := NewFSCdr(body, cdrServer.cgrCfg)
@@ -63,21 +61,26 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) {
utils.Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error()))
return
}
- if err := cdrServer.processCdr(fsCdr.AsCDR(cdrServer.Timezone())); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error()))
+ cdr := fsCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)
+ var ignored string
+ if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
+ utils.CDRs, cdr, err.Error()))
}
}
-func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub,
- attrs, users, thdS, stats, chargerS rpcclient.RpcClientConnection, filterS *FilterS) (*CdrServer, error) {
- if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code
+// NewCDRServer is a constructor for CDRServer
+func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub,
+ attrS, users, thdS, statS, chargerS rpcclient.RpcClientConnection, filterS *FilterS) *CDRServer {
+ if rater != nil && reflect.ValueOf(rater).IsNil() {
rater = nil
}
if pubsub != nil && reflect.ValueOf(pubsub).IsNil() {
pubsub = nil
}
- if attrs != nil && reflect.ValueOf(attrs).IsNil() {
- attrs = nil
+ if attrS != nil && reflect.ValueOf(attrS).IsNil() {
+ attrS = nil
}
if users != nil && reflect.ValueOf(users).IsNil() {
users = nil
@@ -85,278 +88,70 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r
if thdS != nil && reflect.ValueOf(thdS).IsNil() {
thdS = nil
}
- if stats != nil && reflect.ValueOf(stats).IsNil() {
- stats = nil
+ if statS != nil && reflect.ValueOf(statS).IsNil() {
+ statS = nil
}
if chargerS != nil && reflect.ValueOf(chargerS).IsNil() {
chargerS = nil
}
- return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
- rals: rater, pubsub: pubsub, attrS: attrs,
+ return &CDRServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm,
+ rals: rater, pubsub: pubsub, attrS: attrS,
users: users,
- stats: stats, thdS: thdS,
+ statS: statS, thdS: thdS,
chargerS: chargerS, guard: guardian.Guardian,
+ respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL),
httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify,
- cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS}, nil
+ cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS}
}
-type CdrServer struct {
- cgrCfg *config.CGRConfig
- cdrDb CdrStorage
- dm *DataManager
- rals rpcclient.RpcClientConnection
- pubsub rpcclient.RpcClientConnection
- attrS rpcclient.RpcClientConnection
- users rpcclient.RpcClientConnection
- thdS rpcclient.RpcClientConnection
- stats rpcclient.RpcClientConnection
- chargerS rpcclient.RpcClientConnection
- guard *guardian.GuardianLocker
- responseCache *utils.ResponseCache
- httpPoster *HTTPPoster // used for replication
- filterS *FilterS
+// CDRServer stores and rates CDRs
+type CDRServer struct {
+ cgrCfg *config.CGRConfig
+ cdrDb CdrStorage
+ dm *DataManager
+ rals rpcclient.RpcClientConnection
+ pubsub rpcclient.RpcClientConnection
+ attrS rpcclient.RpcClientConnection
+ users rpcclient.RpcClientConnection
+ thdS rpcclient.RpcClientConnection
+ statS rpcclient.RpcClientConnection
+ chargerS rpcclient.RpcClientConnection
+ guard *guardian.GuardianLocker
+ respCache *utils.ResponseCache
+ httpPoster *HTTPPoster // used for replication
+ filterS *FilterS
}
-func (self *CdrServer) Timezone() string {
- return self.cgrCfg.GeneralCfg().DefaultTimezone
-}
-func (self *CdrServer) SetTimeToLive(timeToLive time.Duration, out *int) error {
- self.responseCache = utils.NewResponseCache(timeToLive)
- return nil
+// RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers
+func (cdrS *CDRServer) RegisterHandlersToServer(server *utils.Server) {
+ cdrServer = cdrS // Share the server object for handlers
+ server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPCDRsURL, cgrCdrHandler)
+ server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, fsCdrHandler)
}
-func (self *CdrServer) getCache() *utils.ResponseCache {
- if self.responseCache == nil {
- self.responseCache = utils.NewResponseCache(0)
- }
- return self.responseCache
-}
-
-func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) {
- cdrServer = self // Share the server object for handlers
- server.RegisterHttpFunc(self.cgrCfg.HTTPCfg().HTTPCDRsURL, cgrCdrHandler)
- server.RegisterHttpFunc(self.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, fsCdrHandler)
-}
-
-// Used to process external CDRs
-func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error {
- cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.GeneralCfg().DefaultTimezone)
- if err != nil {
- return err
- }
- return self.processCdr(cdr)
-}
-
-func (self *CdrServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
+// storeSMCost will store a SMCost
+func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
smCost.CostDetails.Compute() // make sure the total cost reflect the increment
lockKey := utils.MetaCDRs + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID
if checkDuplicate {
- _, err := self.guard.Guard(func() (interface{}, error) {
- smCosts, err := self.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
+ _, err := cdrS.guard.Guard(func() (interface{}, error) {
+ smCosts, err := cdrS.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
if err != nil && err.Error() != utils.NotFoundCaps {
return nil, err
}
if len(smCosts) != 0 {
return nil, utils.ErrExists
}
- return nil, self.cdrDb.SetSMCost(smCost)
+ return nil, cdrS.cdrDb.SetSMCost(smCost)
}, time.Duration(2*time.Second), lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
return err
}
- return self.cdrDb.SetSMCost(smCost)
-}
-
-// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
-func (self *CdrServer) processCdr(cdr *CDR) (err error) {
- if cdr.RequestType == "" {
- cdr.RequestType = self.cgrCfg.GeneralCfg().DefaultReqType
- }
- if cdr.Tenant == "" {
- cdr.Tenant = self.cgrCfg.GeneralCfg().DefaultTenant
- }
- if cdr.Category == "" {
- cdr.Category = self.cgrCfg.GeneralCfg().DefaultCategory
- }
- if cdr.Subject == "" { // Use account information as rating subject if missing
- cdr.Subject = cdr.Account
- }
- if !cdr.PreRated { // Enforce the RunID if CDR is not rated
- cdr.RunID = utils.MetaRaw
- }
- if cdr.RunID == utils.MetaRaw {
- cdr.Cost = -1.0
- }
- if self.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status
- if err := self.cdrDb.SetCDR(cdr, false); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Storing primary CDR %+v, got error: %s", cdr, err.Error()))
- return err // Error is propagated back and we don't continue processing the CDR if we cannot store it
- }
- }
- if self.thdS != nil {
- // process CDR with thresholdS
- go self.thdSProcessEvent(cdr.AsCGREvent())
- }
- if self.stats != nil {
- var reply []string
-
- go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cdr.AsCGREvent()}, &reply)
- }
- if len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { // Replicate raw CDR
- self.replicateCDRs([]*CDR{cdr})
- }
- if self.rals != nil && !cdr.PreRated { // CDRs not rated will be processed by Rating
- go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CdrsCfg().CDRSStoreCdrs,
- true, len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0)
- }
- return nil
-}
-
-// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline
-func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats, replicate bool) (err error) {
- cdrRuns, err := self.deriveCdrs(cdr)
- if err != nil {
- utils.Logger.Err(fmt.Sprintf(" Deriving CDR %+v, got error: %s", cdr, err.Error()))
- return err
- }
- var ratedCDRs []*CDR // Gather all CDRs received from rating subsystem
- for _, cdrRun := range cdrRuns {
- if self.attrS != nil {
- var rplyEv AttrSProcessEventReply
- cgrEv := cdrRun.AsCGREvent()
- cgrEv.Context = utils.StringPointer(utils.MetaCDRs)
- if err = self.attrS.Call(utils.AttributeSv1ProcessEvent,
- &AttrArgsProcessEvent{
- CGREvent: *cgrEv},
- &rplyEv); err == nil {
- if err = cdrRun.UpdateFromCGREvent(rplyEv.CGREvent,
- rplyEv.AlteredFields); err != nil {
- return
- }
- } else if err.Error() != utils.ErrNotFound.Error() {
- return
- }
- }
- if err := LoadUserProfile(cdrRun, utils.EXTRA_FIELDS); err != nil {
- utils.Logger.Err(fmt.Sprintf(" UserS handling for CDR %+v, got error: %s", cdrRun, err.Error()))
- continue
- }
- rcvRatedCDRs, err := self.rateCDR(cdrRun)
- if err != nil {
- cdrRun.Cost = -1.0 // If there was an error, mark the CDR
- cdrRun.ExtraInfo = err.Error()
- rcvRatedCDRs = []*CDR{cdrRun}
- }
- ratedCDRs = append(ratedCDRs, rcvRatedCDRs...)
- }
- // Request should be processed by SureTax
- for _, ratedCDR := range ratedCDRs {
- if ratedCDR.RunID == utils.META_SURETAX {
- if err := SureTaxProcessCdr(ratedCDR); err != nil {
- ratedCDR.Cost = -1.0
- ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo
- }
- }
- }
- // Store rated CDRs
- if store {
- for _, ratedCDR := range ratedCDRs {
- if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error()))
- }
- }
- }
- // Attach CDR to stats
- if cdrstats { // Send CDR to stats
- for _, ratedCDR := range ratedCDRs {
- if self.stats != nil {
- var reply []string
- go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *ratedCDR.AsCGREvent()}, &reply)
- }
- }
- }
- if replicate {
- self.replicateCDRs(ratedCDRs)
- }
- return nil
-}
-
-func (self *CdrServer) deriveCdrs(cdr *CDR) (drvdCDRs []*CDR, err error) {
- dfltCDRRun := cdr.Clone()
- cdrRuns := []*CDR{dfltCDRRun}
- if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs
- return cdrRuns, nil
- }
- dfltCDRRun.RunID = utils.META_DEFAULT // Rewrite *raw with *default since we have it as first run
- if self.attrS != nil {
- var rplyEv AttrSProcessEventReply
- if err = self.attrS.Call(utils.AttributeSv1ProcessEvent,
- &AttrArgsProcessEvent{
- CGREvent: *(cdr.AsCGREvent())}, &rplyEv); err != nil {
- return
- }
- if err = cdr.UpdateFromCGREvent(rplyEv.CGREvent,
- rplyEv.AlteredFields); err != nil {
- return
- }
- }
- if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil {
- return nil, err
- }
- attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: utils.OUT,
- Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination}
- var dcs utils.DerivedChargers
- if err := self.rals.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil {
- utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", cdr.CGRID, err.Error()))
- return nil, err
- }
- for _, dc := range dcs.Chargers {
- runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP)
- matchingAllFilters := true
- for _, dcRunFilter := range runFilters {
- if _, err := cdr.FieldAsStringWithRSRField(dcRunFilter); err != nil {
- matchingAllFilters = false
- break
- }
- }
- if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched
- continue
- }
- dcRequestTypeFld, _ := utils.NewRSRField(dc.RequestTypeField)
- dcTenantFld, _ := utils.NewRSRField(dc.TenantField)
- dcCategoryFld, _ := utils.NewRSRField(dc.CategoryField)
- dcAcntFld, _ := utils.NewRSRField(dc.AccountField)
- dcSubjFld, _ := utils.NewRSRField(dc.SubjectField)
- dcDstFld, _ := utils.NewRSRField(dc.DestinationField)
- dcSTimeFld, _ := utils.NewRSRField(dc.SetupTimeField)
- dcATimeFld, _ := utils.NewRSRField(dc.AnswerTimeField)
- dcDurFld, _ := utils.NewRSRField(dc.UsageField)
- dcRatedFld, _ := utils.NewRSRField(dc.PreRatedField)
- dcCostFld, _ := utils.NewRSRField(dc.CostField)
-
- dcExtraFields := []*utils.RSRField{}
- for key := range cdr.ExtraFields {
- dcExtraFields = append(dcExtraFields, &utils.RSRField{Id: key})
- }
-
- forkedCdr, err := cdr.ForkCdr(dc.RunID, dcRequestTypeFld, dcTenantFld,
- dcCategoryFld, dcAcntFld, dcSubjFld, dcDstFld, dcSTimeFld, dcATimeFld,
- dcDurFld, dcRatedFld, dcCostFld, dcExtraFields, true,
- self.cgrCfg.GeneralCfg().DefaultTimezone)
- if err != nil {
- utils.Logger.Err(fmt.Sprintf("Could not fork CGR with cgrid %s, run: %s, error: %s", cdr.CGRID, dc.RunID, err.Error()))
- continue // do not add it to the forked CDR list
- }
- if !forkedCdr.PreRated {
- forkedCdr.Cost = -1.0 // Make sure that un-rated CDRs start with Cost -1
- }
- cdrRuns = append(cdrRuns, forkedCdr)
- }
- return cdrRuns, nil
+ return cdrS.cdrDb.SetSMCost(smCost)
}
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
-func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
+func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) {
var qryCC *CallCost
var err error
if cdr.RequestType == utils.META_NONE {
@@ -377,8 +172,8 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT {
cgrID = "" // for queries involving originIDPrefix we ignore CGRID
}
- for i := 0; i < self.cgrCfg.CdrsCfg().CDRSSMCostRetries; i++ {
- smCosts, err = self.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost,
+ for i := 0; i < cdrS.cgrCfg.CdrsCfg().CDRSSMCostRetries; i++ {
+ smCosts, err = cdrS.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost,
cdr.ExtraFields[utils.OriginIDPrefix])
if err == nil && len(smCosts) != 0 {
break
@@ -404,10 +199,10 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
utils.Logger.Warning(
fmt.Sprintf(" WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, originID: %s originHost: %s, will recalculate",
cdr.CGRID, utils.MetaSessionS, cdr.RunID, cdr.OriginID, cdr.OriginHost))
- qryCC, err = self.getCostFromRater(cdr)
+ qryCC, err = cdrS.getCostFromRater(cdr)
}
} else {
- qryCC, err = self.getCostFromRater(cdr)
+ qryCC, err = cdrS.getCostFromRater(cdr)
}
if err != nil {
return nil, err
@@ -419,8 +214,8 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) {
return []*CDR{cdr}, nil
}
-// Retrive the cost from engine
-func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
+// getCostFromRater will retrieve the cost from RALs
+func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
cc := new(CallCost)
var err error
timeStart := cdr.AnswerTime
@@ -441,9 +236,9 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
}
if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID,
utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
- err = self.rals.Call("Responder.Debit", cd, cc)
+ err = cdrS.rals.Call("Responder.Debit", cd, cc)
} else {
- err = self.rals.Call("Responder.GetCost", cd, cc)
+ err = cdrS.rals.Call("Responder.GetCost", cd, cc)
}
if err != nil {
return cc, err
@@ -452,17 +247,142 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) {
return cc, nil
}
-func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
- for _, exportID := range self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports {
- expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer
+// chrgRaStoReThStaCDR will process the CGREvent with ChargerS subsystem
+// it is designed to run in it's own goroutine
+func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent,
+ store, export, thdS, statS *bool) (err error) {
+ var chrgrs []*ChrgSProcessEventReply
+ if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent,
+ cgrEv, &chrgrs); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
+ utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
+ return
+ }
+ var partExec bool
+ for _, chrgr := range chrgrs {
+ cdr, errCdr := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg,
+ cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
+ if errCdr != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.",
+ utils.CDRs, errCdr.Error(), cgrEv, utils.ChargerS))
+ partExec = true
+ continue
+ }
+ cdrS.raStoReThStaCDR(cdr, store, export, thdS, statS)
+ }
+ if partExec {
+ err = utils.ErrPartiallyExecuted
+ }
+ return
+}
+
+// raStoReThStaCDR will RAte/STOtore/REplicate/THresholds/STAts the CDR received
+// used by both chargerS as well as re-/rating
+func (cdrS *CDRServer) raStoReThStaCDR(cdr *CDR,
+ store, export, thdS, statS *bool) {
+ ratedCDRs, err := cdrS.rateCDR(cdr)
+ if err != nil {
+ cdr.Cost = -1.0 // If there was an error, mark the CDR
+ cdr.ExtraInfo = err.Error()
+ ratedCDRs = []*CDR{cdr}
+ }
+ for _, rtCDR := range ratedCDRs {
+ shouldStore := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
+ if store != nil {
+ shouldStore = *store
+ }
+ if shouldStore { // Store CDR
+ go func(rtCDR *CDR) {
+ if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s storing CDR %+v.",
+ utils.CDRs, err.Error(), rtCDR))
+ }
+ }(rtCDR)
+ }
+ shouldExport := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
+ if export != nil {
+ shouldExport = *export
+ }
+ if shouldExport {
+ go cdrS.exportCDRs([]*CDR{rtCDR})
+ }
+ cgrEv := rtCDR.AsCGREvent()
+ shouldThdS := cdrS.thdS != nil
+ if thdS != nil {
+ shouldThdS = *thdS
+ }
+ if shouldThdS {
+ go cdrS.thdSProcessEvent(cgrEv)
+ }
+ shouldStatS := cdrS.statS != nil
+ if statS != nil {
+ shouldStatS = *statS
+ }
+ if shouldStatS {
+ go cdrS.statSProcessEvent(cgrEv)
+ }
+ }
+}
+
+// statSProcessEvent will send the event to StatS if the connection is configured
+func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
+ if cgrEv.Context == nil { // populate if not already in
+ cgrEv.Context = utils.StringPointer(utils.MetaCDRs)
+ }
+ var rplyEv AttrSProcessEventReply
+ if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent,
+ &AttrArgsProcessEvent{
+ CGREvent: *cgrEv},
+ &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
+ *cgrEv = *rplyEv.CGREvent
+ } else if err.Error() == utils.ErrNotFound.Error() {
+ err = nil // cancel ErrNotFound
+ }
+ return
+}
+
+// thdSProcessEvent will send the event to ThresholdS if the connection is configured
+func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
+ var tIDs []string
+ if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
+ &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.",
+ utils.CDRs, err.Error(), cgrEv))
+ return
+ }
+}
+
+// statSProcessEvent will send the event to StatS if the connection is configured
+func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) {
+ var reply []string
+ if err := cdrS.statS.Call(utils.StatSv1ProcessEvent,
+ &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil &&
+ err.Error() != utils.ErrNotFound.Error() {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",
+ utils.CDRs, err.Error(), cgrEv, utils.StatS))
+ return
+ }
+}
+
+// exportCDRs will export the CDRs received
+func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) {
+ for _, exportID := range cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports {
+ expTpl := cdrS.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer
var cdre *CDRExporter
if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat,
- expTpl.ExportPath, self.cgrCfg.GeneralCfg().FailedPostsDir,
+ expTpl.ExportPath, cdrS.cgrCfg.GeneralCfg().FailedPostsDir,
"CDRSReplication", expTpl.Synchronous, expTpl.Attempts,
expTpl.FieldSeparator, expTpl.UsageMultiplyFactor,
- expTpl.CostMultiplyFactor, self.cgrCfg.GeneralCfg().RoundingDecimals,
- self.cgrCfg.GeneralCfg().HttpSkipTlsVerify, self.httpPoster,
- self.filterS); err != nil {
+ expTpl.CostMultiplyFactor, cdrS.cgrCfg.GeneralCfg().RoundingDecimals,
+ cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.httpPoster,
+ cdrS.filterS); err != nil {
utils.Logger.Err(fmt.Sprintf(" Building CDRExporter for online exports got error: <%s>", err.Error()))
continue
}
@@ -474,147 +394,14 @@ func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) {
return
}
-// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational
-func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error {
- cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false)
- if err != nil {
- return err
- }
- for _, cdr := range cdrs {
- if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CdrsCfg().CDRSStoreCdrs,
- sendToStats, len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error()))
- }
- }
- return nil
-}
-
-// Internally used and called from CDRSv1
-// Cached requests for HA setups
-func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error {
- if len(cdr.CGRID) == 0 { // Populate CGRID if not present
- cdr.ComputeCGRID()
- }
- cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID
- if item, err := self.getCache().Get(cacheKey); err == nil && item != nil {
- if item.Value != nil {
- *reply = item.Value.(string)
- }
- return item.Err
- }
- if err := self.processCdr(cdr); err != nil {
- self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
- return utils.NewErrServerError(err)
- }
- self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: utils.OK})
- *reply = utils.OK
- return nil
-}
-
-// RPC method, differs from storeSMCost through it's signature
-func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error {
- if attr.Cost.CGRID == "" {
- return utils.NewCGRError(utils.CDRSCtx,
- utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
- "SMCost: %+v with empty CGRID")
- }
- cacheKey := "V1StoreSMCost" + attr.Cost.CGRID + attr.Cost.RunID + attr.Cost.OriginID
- if item, err := self.getCache().Get(cacheKey); err == nil && item != nil {
- if item.Value != nil {
- *reply = item.Value.(string)
- }
- return item.Err
- }
- if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
- self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
- return utils.NewErrServerError(err)
- }
- self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: utils.OK})
- *reply = utils.OK
- return nil
-}
-
-func (cdrs *CdrServer) V2StoreSMCost(args ArgsV2CDRSStoreSMCost, reply *string) error {
- if args.Cost.CGRID == "" {
- return utils.NewCGRError(utils.CDRSCtx,
- utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
- "SMCost: %+v with empty CGRID")
- }
- cacheKey := "V2StoreSMCost" + args.Cost.CGRID + args.Cost.RunID + args.Cost.OriginID
- if item, err := cdrs.getCache().Get(cacheKey); err == nil && item != nil {
- if item.Value != nil {
- *reply = item.Value.(string)
- }
- return item.Err
- }
- cc := args.Cost.CostDetails.AsCallCost()
- cc.Round()
- roundIncrements := cc.GetRoundIncrements()
- if len(roundIncrements) != 0 {
- cd := cc.CreateCallDescriptor()
- cd.CgrID = args.Cost.CGRID
- cd.RunID = args.Cost.RunID
- cd.Increments = roundIncrements
- var response float64
- if err := cdrs.rals.Call("Responder.RefundRounding", cd, &response); err != nil {
- utils.Logger.Err(fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s", cc, err.Error()))
- }
- }
- if err := cdrs.storeSMCost(&SMCost{
- CGRID: args.Cost.CGRID,
- RunID: args.Cost.RunID,
- OriginHost: args.Cost.OriginHost,
- OriginID: args.Cost.OriginID,
- CostSource: args.Cost.CostSource,
- Usage: args.Cost.Usage,
- CostDetails: args.Cost.CostDetails,
- }, args.CheckDuplicate); err != nil {
- cdrs.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err})
- return utils.NewErrServerError(err)
- }
- *reply = utils.OK
- cdrs.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: *reply})
- return nil
-
-}
-
-// Called by rate/re-rate API, RPC method
-func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error {
- cdrFltr, err := attrs.RPCCDRsFilter.AsCDRsFilter(self.cgrCfg.GeneralCfg().DefaultTimezone)
- if err != nil {
- return utils.NewErrServerError(err)
- }
- cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false)
- if err != nil {
- return err
- }
- storeCDRs := self.cgrCfg.CdrsCfg().CDRSStoreCdrs
- if attrs.StoreCDRs != nil {
- storeCDRs = *attrs.StoreCDRs
- }
- sendToStats := self.stats != nil
- if attrs.SendToStatS != nil {
- sendToStats = *attrs.SendToStatS
- }
- replicate := len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
- if attrs.ReplicateCDRs != nil {
- replicate = *attrs.ReplicateCDRs
- }
- for _, cdr := range cdrs {
- if err := self.deriveRateStoreStatsReplicate(cdr, storeCDRs, sendToStats, replicate); err != nil {
- utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error()))
- }
- }
- return nil
-}
-
-func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply interface{}) error {
+// Call implements the rpcclient.RpcClientConnection interface
+func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interface{}) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
// get method
- method := reflect.ValueOf(cdrsrv).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
+ method := reflect.ValueOf(cdrS).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
@@ -634,145 +421,210 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte
return err
}
-// thdSProcessEvent will send the event to ThresholdS if the connection is configured
-func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) {
- var tIDs []string
- if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent,
- &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil &&
- err.Error() != utils.ErrNotFound.Error() {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.",
- utils.CDRs, err.Error(), cgrEv))
- return
+// V1ProcessCDR processes a CDR
+func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) {
+ if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present
+ cdr.ComputeCGRID()
}
-}
-
-// statSProcessEvent will send the event to StatS if the connection is configured
-func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) {
- var reply []string
- if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil &&
- err.Error() != utils.ErrNotFound.Error() {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.",
- utils.CDRs, err.Error(), cgrEv, utils.StatS))
- return
- }
-}
-
-// rarethsta will RAte/STOtore/REplicate/THresholds/STAts the CDR received
-// used by both chargerS as well as re-/rating
-func (cdrS *CdrServer) raStoReThStaCDR(cdr *CDR) {
- ratedCDRs, err := cdrS.rateCDR(cdr)
- if err != nil {
- cdr.Cost = -1.0 // If there was an error, mark the CDR
- cdr.ExtraInfo = err.Error()
- ratedCDRs = []*CDR{cdr}
- }
- for _, rtCDR := range ratedCDRs {
- if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store CDR
- go func(rtCDR *CDR) {
- if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s storing CDR %+v.",
- utils.CDRs, err.Error(), rtCDR))
- }
- }(rtCDR)
- }
- if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 {
- go cdrS.replicateCDRs([]*CDR{rtCDR})
- }
- cgrEv := rtCDR.AsCGREvent()
- if cdrS.thdS != nil {
- go cdrS.thdSProcessEvent(cgrEv)
- }
- if cdrS.stats != nil {
- go cdrS.statSProcessEvent(cgrEv)
+ cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID
+ if item, err := cdrS.respCache.Get(cacheKey); err == nil && item != nil {
+ if item.Err == nil {
+ *reply = *item.Value.(*string)
}
+ return item.Err
}
-}
+ defer cdrS.respCache.Cache(cacheKey,
+ &utils.ResponseCacheItem{Value: reply, Err: err})
-// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem
-func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) {
- var chrgrs []*ChrgSProcessEventReply
- if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil &&
- err.Error() != utils.ErrNotFound.Error() {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.",
- utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
- return
+ if cdr.RequestType == utils.EmptyString {
+ cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType
}
- for _, chrgr := range chrgrs {
- cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg,
- cgrEv.Tenant, cdrS.Timezone())
- if err != nil {
- utils.Logger.Warning(
- fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.",
- utils.CDRs, err.Error(), cgrEv, utils.ChargerS))
- continue
- }
- cdrS.raStoReThStaCDR(cdr)
-
+ if cdr.Tenant == utils.EmptyString {
+ cdr.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
-}
-
-// statSProcessEvent will send the event to StatS if the connection is configured
-func (cdrS *CdrServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
- if cgrEv.Context == nil { // populate if not already in
- cgrEv.Context = utils.StringPointer(utils.MetaCDRs)
+ if cdr.Category == utils.EmptyString {
+ cdr.Category = cdrS.cgrCfg.GeneralCfg().DefaultCategory
}
- var rplyEv AttrSProcessEventReply
- if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent,
- &AttrArgsProcessEvent{
- CGREvent: *cgrEv},
- &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
- *cgrEv = *rplyEv.CGREvent
- } else if err.Error() == utils.ErrNotFound.Error() {
- err = nil // cancel ErrNotFound
+ if cdr.Subject == utils.EmptyString { // Use account information as rating subject if missing
+ cdr.Subject = cdr.Account
+ }
+ if !cdr.PreRated { // Enforce the RunID if CDR is not rated
+ cdr.RunID = utils.MetaRaw
+ }
+ if utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) {
+ cdr.Cost = -1.0
+ }
+ cgrEv := &utils.CGREvent{
+ Tenant: cdr.Tenant,
+ ID: utils.UUIDSha1Prefix(),
+ Event: cdr.AsMapStringIface(),
}
- return
-}
-
-// V2ProcessCDR will process the CDR out of CGREvent
-func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err error) {
if cdrS.attrS != nil {
- if err := cdrS.attrSProcessEvent(cgrEv); err != nil {
+ if err = cdrS.attrSProcessEvent(cgrEv); err != nil {
return utils.NewErrServerError(err)
}
}
- rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cgrEv.Tenant, cdrS.Timezone())
- if err != nil {
- return utils.NewErrServerError(err)
- }
- if cdrS.chargerS == nil { // backwards compatibility for DerivedChargers
- return cdrS.V1ProcessCDR(rawCDR, reply)
- }
if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR
- if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil {
+ if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s",
+ utils.CDRs, cdr, err.Error()))
return utils.NewErrServerError(err) // Cannot store CDR
}
}
if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 {
- cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR
+ cdrS.exportCDRs([]*CDR{cdr}) // Replicate raw CDR
}
if cdrS.thdS != nil {
go cdrS.thdSProcessEvent(cgrEv)
}
- if cdrS.stats != nil {
+ if cdrS.statS != nil {
go cdrS.statSProcessEvent(cgrEv)
}
- if cdrS.chargerS != nil {
- go cdrS.chrgrSProcessEvent(cgrEv)
+ if cdrS.chargerS != nil &&
+ utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) {
+ go cdrS.chrgRaStoReThStaCDR(cgrEv, nil, nil, nil, nil)
+ }
+ *reply = utils.OK
+
+ return
+}
+
+type ArgV2ProcessCDR struct {
+ utils.CGREvent
+ AttributeS *bool // control AttributeS processing
+ ChargerS *bool // control ChargerS processing
+ Store *bool // control storing of the CDR
+ Export *bool // control online exports for the CDR
+ ThresholdS *bool // control ThresholdS
+ StatS *bool // control sending the CDR to StatS for aggregation
+}
+
+// V2ProcessCDR will process the CDR out of CGREvent
+func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err error) {
+ attrS := cdrS.attrS != nil
+ if arg.AttributeS != nil {
+ attrS = *arg.AttributeS
+ }
+ cgrEv := &arg.CGREvent
+ if attrS {
+ if err := cdrS.attrSProcessEvent(cgrEv); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ }
+ rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
+ cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
+ if err != nil {
+ return utils.NewErrServerError(err)
+ }
+ store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs
+ if arg.Store != nil {
+ store = *arg.Store
+ }
+ if store { // Store *raw CDR
+ if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil {
+ return utils.NewErrServerError(err) // Cannot store CDR
+ }
+ }
+ export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0
+ if arg.Export != nil {
+ export = *arg.Export
+ }
+ if export {
+ cdrS.exportCDRs([]*CDR{rawCDR}) // Replicate raw CDR
+ }
+ thrdS := cdrS.thdS != nil
+ if arg.ThresholdS != nil {
+ thrdS = *arg.ThresholdS
+ }
+ if thrdS {
+ go cdrS.thdSProcessEvent(cgrEv)
+ }
+ statS := cdrS.statS != nil
+ if arg.StatS != nil {
+ statS = *arg.StatS
+ }
+ if statS {
+ go cdrS.statSProcessEvent(cgrEv)
+ }
+ chrgS := cdrS.chargerS != nil
+ if arg.ChargerS != nil {
+ chrgS = *arg.ChargerS
+ }
+ if chrgS {
+ go cdrS.chrgRaStoReThStaCDR(cgrEv,
+ arg.Store, arg.Export, arg.ThresholdS, arg.StatS)
}
*reply = utils.OK
return nil
}
-// Called by rate/re-rate API, RPC method
-func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) error {
- if cdrS.chargerS == nil {
- return utils.NewErrNotConnected(utils.ChargerS)
+// V1StoreSMCost handles storing of the cost into session_costs table
+func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) error {
+ if attr.Cost.CGRID == "" {
+ return utils.NewCGRError(utils.CDRSCtx,
+ utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
+ "SMCost: %+v with empty CGRID")
}
- cdrFltr, err := attrs.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
+ if err := cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ *reply = utils.OK
+ return nil
+}
+
+// V2StoreSessionCost will store the SessionCost into session_costs table
+func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) error {
+ if args.Cost.CGRID == "" {
+ return utils.NewCGRError(utils.CDRSCtx,
+ utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
+ "SMCost: %+v with empty CGRID")
+ }
+ cc := args.Cost.CostDetails.AsCallCost()
+ cc.Round()
+ roundIncrements := cc.GetRoundIncrements()
+ if len(roundIncrements) != 0 {
+ cd := cc.CreateCallDescriptor()
+ cd.CgrID = args.Cost.CGRID
+ cd.RunID = args.Cost.RunID
+ cd.Increments = roundIncrements
+ var response float64
+ if err := cdrS.rals.Call("Responder.RefundRounding",
+ cd, &response); err != nil {
+ utils.Logger.Warning(
+ fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s",
+ cc, err.Error()))
+ }
+ }
+ if err := cdrS.storeSMCost(
+ &SMCost{
+ CGRID: args.Cost.CGRID,
+ RunID: args.Cost.RunID,
+ OriginHost: args.Cost.OriginHost,
+ OriginID: args.Cost.OriginID,
+ CostSource: args.Cost.CostSource,
+ Usage: args.Cost.Usage,
+ CostDetails: args.Cost.CostDetails},
+ args.CheckDuplicate); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ *reply = utils.OK
+ return nil
+
+}
+
+type ArgRateCDRs struct {
+ utils.RPCCDRsFilter
+ ChargerS *bool
+ Store *bool
+ Export *bool // Replicate results
+ ThresholdS *bool
+ StatS *bool // Set to true if the CDRs should be sent to stats server
+}
+
+// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB
+func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) {
+ cdrFltr, err := arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
return utils.NewErrServerError(err)
}
@@ -781,22 +633,42 @@ func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) err
return err
}
for _, cdr := range cdrs {
- go cdrS.raStoReThStaCDR(cdr)
+ if arg.ChargerS != nil && *arg.ChargerS {
+ if cdrS.chargerS == nil {
+ return utils.NewErrNotConnected(utils.ChargerS)
+ }
+ if err = cdrS.chrgRaStoReThStaCDR(cdr.AsCGREvent(),
+ arg.Store, arg.Export, arg.ThresholdS, arg.StatS); err != nil {
+ return utils.NewErrServerError(err)
+ }
+ } else {
+ cdrS.raStoReThStaCDR(cdr, arg.Store,
+ arg.Export, arg.ThresholdS, arg.StatS)
+ }
}
*reply = utils.OK
return nil
}
+// Used to process external CDRs
+func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDR, reply *string) error {
+ cdr, err := NewCDRFromExternalCDR(eCDR, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
+ if err != nil {
+ return err
+ }
+ return cdrS.V1ProcessCDR(cdr, reply)
+}
+
// V1GetCDRs returns CDRs from DB
-func (self *CdrServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error {
- cdrsFltr, err := args.AsCDRsFilter(self.Timezone())
+func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error {
+ cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
}
return err
}
- if qryCDRs, _, err := self.cdrDb.GetCDRs(cdrsFltr, false); err != nil {
+ if qryCDRs, _, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false); err != nil {
return utils.NewErrServerError(err)
} else {
*cdrs = qryCDRs
@@ -805,8 +677,8 @@ func (self *CdrServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error {
}
// V1CountCDRs counts CDRs from DB
-func (self *CdrServer) V1CountCDRs(args utils.RPCCDRsFilter, cnt *int64) error {
- cdrsFltr, err := args.AsCDRsFilter(self.Timezone())
+func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilter, cnt *int64) error {
+ cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
@@ -814,7 +686,7 @@ func (self *CdrServer) V1CountCDRs(args utils.RPCCDRsFilter, cnt *int64) error {
return err
}
cdrsFltr.Count = true
- if _, qryCnt, err := self.cdrDb.GetCDRs(cdrsFltr, false); err != nil {
+ if _, qryCnt, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false); err != nil {
return utils.NewErrServerError(err)
} else {
*cnt = qryCnt
diff --git a/engine/models.go b/engine/models.go
index 351e8a222..8c9aefa55 100644
--- a/engine/models.go
+++ b/engine/models.go
@@ -382,7 +382,7 @@ func (t CDRsql) TableName() string {
return utils.CDRsTBL
}
-type SessionsCostsSQL struct {
+type SessionCostsSQL struct {
ID int64
Cgrid string
RunID string
@@ -395,8 +395,8 @@ type SessionsCostsSQL struct {
DeletedAt *time.Time
}
-func (t SessionsCostsSQL) TableName() string {
- return utils.SessionsCostsTBL
+func (t SessionCostsSQL) TableName() string {
+ return utils.SessionCostsTBL
}
type TBLVersion struct {
diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go
index 24185aa53..39a405761 100644
--- a/engine/storage_mongo_datadb.go
+++ b/engine/storage_mongo_datadb.go
@@ -298,17 +298,17 @@ func (ms *MongoStorage) EnsureIndexes() (err error) {
}
}
- if err = ms.EnusureIndex(utils.SessionsCostsTBL, true, CGRIDLow,
+ if err = ms.EnusureIndex(utils.SessionCostsTBL, true, CGRIDLow,
RunIDLow); err != nil {
return
}
- if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, OriginHostLow,
+ if err = ms.EnusureIndex(utils.SessionCostsTBL, false, OriginHostLow,
OriginIDLow); err != nil {
return
}
- if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, RunIDLow,
+ if err = ms.EnusureIndex(utils.SessionCostsTBL, false, RunIDLow,
OriginIDLow); err != nil {
return
}
diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go
index a89c0e477..099c48e10 100644
--- a/engine/storage_mongo_stordb.go
+++ b/engine/storage_mongo_stordb.go
@@ -993,7 +993,7 @@ func (ms *MongoStorage) SetSMCost(smc *SMCost) error {
return nil
}
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
- _, err = ms.getCol(utils.SessionsCostsTBL).InsertOne(sctx, smc)
+ _, err = ms.getCol(utils.SessionCostsTBL).InsertOne(sctx, smc)
return err
})
}
@@ -1004,7 +1004,7 @@ func (ms *MongoStorage) RemoveSMCost(smc *SMCost) error {
remParams = bson.M{"cgrid": smc.CGRID, "runid": smc.RunID}
}
return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
- _, err = ms.getCol(utils.SessionsCostsTBL).DeleteMany(sctx, remParams)
+ _, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, remParams)
return err
})
}
@@ -1024,7 +1024,7 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "")
}
err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) {
- cur, err := ms.getCol(utils.SessionsCostsTBL).Find(sctx, filter)
+ cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter)
if err != nil {
return err
}
diff --git a/engine/storage_sql.go b/engine/storage_sql.go
index 07078f2fb..79784bd92 100644
--- a/engine/storage_sql.go
+++ b/engine/storage_sql.go
@@ -107,7 +107,7 @@ func (self *SQLStorage) IsDBEmpty() (resp bool, err error) {
utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionTriggers,
utils.TBLTPAccountActions, utils.TBLTPDerivedChargers, utils.TBLTPUsers,
utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds,
- utils.TBLTPFilters, utils.SessionsCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans,
+ utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans,
utils.TBLVersions, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers,
}
for _, tbl := range tbls {
@@ -745,7 +745,7 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
return nil
}
tx := self.db.Begin()
- cd := &SessionsCostsSQL{
+ cd := &SessionCostsSQL{
Cgrid: smc.CGRID,
RunID: smc.RunID,
OriginHost: smc.OriginHost,
@@ -765,12 +765,12 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error {
func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
tx := self.db.Begin()
- var rmParam *SessionsCostsSQL
+ var rmParam *SessionCostsSQL
if smc != nil {
- rmParam = &SessionsCostsSQL{Cgrid: smc.CGRID,
+ rmParam = &SessionCostsSQL{Cgrid: smc.CGRID,
RunID: smc.RunID}
}
- if err := tx.Where(rmParam).Delete(SessionsCostsSQL{}).Error; err != nil {
+ if err := tx.Where(rmParam).Delete(SessionCostsSQL{}).Error; err != nil {
tx.Rollback()
return err
}
@@ -781,7 +781,7 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error {
// GetSMCosts is used to retrieve one or multiple SMCosts based on filter
func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) {
var smCosts []*SMCost
- filter := &SessionsCostsSQL{}
+ filter := &SessionCostsSQL{}
if cgrid != "" {
filter.Cgrid = cgrid
}
@@ -795,7 +795,7 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri
if originIDPrefix != "" {
q = self.db.Where(filter).Where(fmt.Sprintf("origin_id LIKE '%s%%'", originIDPrefix))
}
- results := make([]*SessionsCostsSQL, 0)
+ results := make([]*SessionCostsSQL, 0)
if err := q.Find(&results).Error; err != nil {
return nil, err
}
diff --git a/migrator/sessions_costs.go b/migrator/sessions_costs.go
index 931f9d735..be5962117 100644
--- a/migrator/sessions_costs.go
+++ b/migrator/sessions_costs.go
@@ -153,7 +153,7 @@ func (v2Cost *v2SessionsCost) V2toV3Cost() (cost *engine.SMCost) {
return
}
-func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 *v2SessionsCost, err error) {
+func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionCostsSQL) (smV2 *v2SessionsCost, err error) {
smV2 = new(v2SessionsCost)
smV2.CGRID = smSql.Cgrid
smV2.RunID = smSql.RunID
@@ -168,8 +168,8 @@ func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2
return
}
-func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionsCostsSQL) {
- smSql = new(engine.SessionsCostsSQL)
+func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionCostsSQL) {
+ smSql = new(engine.SessionCostsSQL)
smSql.Cgrid = v2Cost.CGRID
smSql.RunID = v2Cost.RunID
smSql.OriginHost = v2Cost.OriginHost
diff --git a/migrator/storage_mongo_stordb.go b/migrator/storage_mongo_stordb.go
index 0fd0715b4..3feae80b0 100644
--- a/migrator/storage_mongo_stordb.go
+++ b/migrator/storage_mongo_stordb.go
@@ -79,12 +79,12 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) {
return err
}
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
- bson.D{{"create", utils.SessionsCostsTBL}}).Err()
+ bson.D{{"create", utils.SessionCostsTBL}}).Err()
}
func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext())
- v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Drop(v1ms.mgoDB.GetContext())
+ v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext())
return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(),
bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err()
}
@@ -93,7 +93,7 @@ func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) {
func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
if v1ms.cursor == nil {
var cursor mongo.Cursor
- cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{})
+ cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{})
if err != nil {
return nil, err
}
@@ -113,12 +113,12 @@ func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err erro
//set
func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
- _, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost)
+ _, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost)
return
}
//remove
func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
- _, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{})
+ _, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{})
return
}
diff --git a/migrator/storage_sql.go b/migrator/storage_sql.go
index b4d42039c..e90a8b9e2 100755
--- a/migrator/storage_sql.go
+++ b/migrator/storage_sql.go
@@ -125,7 +125,7 @@ func (mgSQL *migratorSQL) getV2SMCost() (v2Cost *v2SessionsCost, err error) {
return nil, err
}
}
- scSql := new(engine.SessionsCostsSQL)
+ scSql := new(engine.SessionCostsSQL)
mgSQL.rowIter.Scan(&scSql)
v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql)
@@ -151,12 +151,12 @@ func (mgSQL *migratorSQL) setV2SMCost(v2Cost *v2SessionsCost) (err error) {
func (mgSQL *migratorSQL) remV2SMCost(v2Cost *v2SessionsCost) (err error) {
tx := mgSQL.sqlStorage.ExportGormDB().Begin()
- var rmParam *engine.SessionsCostsSQL
+ var rmParam *engine.SessionCostsSQL
if v2Cost != nil {
- rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID,
+ rmParam = &engine.SessionCostsSQL{Cgrid: v2Cost.CGRID,
RunID: v2Cost.RunID}
}
- if err := tx.Where(rmParam).Delete(engine.SessionsCostsSQL{}).Error; err != nil {
+ if err := tx.Where(rmParam).Delete(engine.SessionCostsSQL{}).Error; err != nil {
tx.Rollback()
return err
}
diff --git a/utils/apitpdata.go b/utils/apitpdata.go
index 13cd9bdd9..2693249d6 100755
--- a/utils/apitpdata.go
+++ b/utils/apitpdata.go
@@ -765,67 +765,6 @@ type AttrRemCdrs struct {
CgrIds []string // List of CgrIds to remove from storeDb
}
-type AttrRateCdrs struct {
- CgrIds []string // If provided, it will filter based on the cgrids present in list
- MediationRunIds []string // If provided, it will filter on mediation runid
- TORs []string // If provided, filter on TypeOfRecord
- CdrHosts []string // If provided, it will filter cdrhost
- CdrSources []string // If provided, it will filter cdrsource
- ReqTypes []string // If provided, it will fiter reqtype
- Tenants []string // If provided, it will filter tenant
- Categories []string // If provided, it will filter çategory
- Accounts []string // If provided, it will filter account
- Subjects []string // If provided, it will filter the rating subject
- DestinationPrefixes []string // If provided, it will filter on destination prefix
- OrderIdStart *int64 // Export from this order identifier
- OrderIdEnd *int64 // Export smaller than this order identifier
- TimeStart string // If provided, it will represent the starting of the CDRs interval (>=)
- TimeEnd string // If provided, it will represent the end of the CDRs interval (<)
- RerateErrors bool // Rerate previous CDRs with errors (makes sense for reqtype rated and pseudoprepaid
- RerateRated bool // Rerate CDRs which were previously rated (makes sense for reqtype rated and pseudoprepaid)
- SendToStats bool // Set to true if the CDRs should be sent to stats server
-}
-
-func (attrRateCDRs *AttrRateCdrs) AsCDRsFilter(timezone string) (*CDRsFilter, error) {
- cdrFltr := &CDRsFilter{
- CGRIDs: attrRateCDRs.CgrIds,
- RunIDs: attrRateCDRs.MediationRunIds,
- OriginHosts: attrRateCDRs.CdrHosts,
- Sources: attrRateCDRs.CdrSources,
- ToRs: attrRateCDRs.TORs,
- RequestTypes: attrRateCDRs.ReqTypes,
- Tenants: attrRateCDRs.Tenants,
- Categories: attrRateCDRs.Categories,
- Accounts: attrRateCDRs.Accounts,
- Subjects: attrRateCDRs.Subjects,
- DestinationPrefixes: attrRateCDRs.DestinationPrefixes,
- OrderIDStart: attrRateCDRs.OrderIdStart,
- OrderIDEnd: attrRateCDRs.OrderIdEnd,
- }
- if aTime, err := ParseTimeDetectLayout(attrRateCDRs.TimeStart, timezone); err != nil {
- return nil, err
- } else if !aTime.IsZero() {
- cdrFltr.AnswerTimeStart = &aTime
- }
- if aTimeEnd, err := ParseTimeDetectLayout(attrRateCDRs.TimeEnd, timezone); err != nil {
- return nil, err
- } else if !aTimeEnd.IsZero() {
- cdrFltr.AnswerTimeEnd = &aTimeEnd
- }
- if attrRateCDRs.RerateErrors {
- cdrFltr.MinCost = Float64Pointer(-1.0)
- if !attrRateCDRs.RerateRated {
- cdrFltr.MaxCost = Float64Pointer(0.0)
- }
- } else if attrRateCDRs.RerateRated {
- cdrFltr.MinCost = Float64Pointer(0.0)
- }
- if attrRateCDRs.RerateErrors || attrRateCDRs.RerateRated {
- cdrFltr.NotRunIDs = append(cdrFltr.NotRunIDs, MetaRaw)
- }
- return cdrFltr, nil
-}
-
type AttrLoadTpFromFolder struct {
FolderPath string // Take files from folder absolute path
DryRun bool // Do not write to database but parse only
@@ -1124,13 +1063,6 @@ type AttrGetCallCost struct {
RunId string // Run Id
}
-type AttrRateCDRs struct {
- RPCCDRsFilter
- StoreCDRs *bool
- SendToStatS *bool // Set to true if the CDRs should be sent to stats server
- ReplicateCDRs *bool // Replicate results
-}
-
type AttrSetBalance struct {
Tenant string
Account string
diff --git a/utils/consts.go b/utils/consts.go
index 2df873197..99e8736da 100755
--- a/utils/consts.go
+++ b/utils/consts.go
@@ -906,7 +906,7 @@ const (
TBLTPStats = "tp_stats"
TBLTPThresholds = "tp_thresholds"
TBLTPFilters = "tp_filters"
- SessionsCostsTBL = "sessions_costs"
+ SessionCostsTBL = "session_costs"
CDRsTBL = "cdrs"
TBLTPSuppliers = "tp_suppliers"
TBLTPAttributes = "tp_attributes"