From b7e34f675a218cfbb555b6898f810385f3630b7f Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 17 Feb 2019 20:37:07 +0100 Subject: [PATCH] CDRs refactoring - removing old derived charging, APIs redesign --- apier/v1/cdrs.go | 54 +- apier/v1/cdrsv1.go | 86 -- apier/v2/cdrs.go | 23 +- cmd/cgr-engine/cgr-engine.go | 9 +- cmd/cgr-engine/rater.go | 8 +- data/storage/mysql/create_cdrs_tables.sql | 4 +- data/storage/postgres/create_cdrs_tables.sql | 12 +- engine/cdrs.go | 940 ++++++++----------- engine/models.go | 6 +- engine/storage_mongo_datadb.go | 6 +- engine/storage_mongo_stordb.go | 6 +- engine/storage_sql.go | 14 +- migrator/sessions_costs.go | 6 +- migrator/storage_mongo_stordb.go | 10 +- migrator/storage_sql.go | 8 +- utils/apitpdata.go | 68 -- utils/consts.go | 2 +- 17 files changed, 492 insertions(+), 770 deletions(-) delete mode 100644 apier/v1/cdrsv1.go diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index eadebf057..05ef3afed 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -19,9 +19,6 @@ along with this program. If not, see package v1 import ( - "errors" - "fmt" - "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) @@ -52,7 +49,7 @@ func (apier *ApierV1) GetEventCost(attrs utils.AttrGetCallCost, reply *engine.Ev } // Retrieves CDRs based on the filters -func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error { +func (apier *ApierV1) GetCDRs(attrs utils.AttrGetCdrs, reply *[]*engine.ExternalCDR) error { cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone) if err != nil { return utils.NewErrServerError(err) @@ -69,18 +66,6 @@ func (apier *ApierV1) GetCdrs(attrs utils.AttrGetCdrs, reply *[]*engine.External return nil } -// Remove Cdrs out of CDR storage -func (apier *ApierV1) RemCdrs(attrs utils.AttrRemCdrs, reply *string) error { - if len(attrs.CgrIds) == 0 { - return fmt.Errorf("%s:CgrIds", utils.ErrMandatoryIeMissing.Error()) - } - if _, _, err := apier.CdrDb.GetCDRs(&utils.CDRsFilter{CGRIDs: attrs.CgrIds}, true); err != nil { - return utils.NewErrServerError(err) - } - *reply = utils.OK - return nil -} - // New way of removing CDRs func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error { cdrsFilter, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone) @@ -94,10 +79,35 @@ func (apier *ApierV1) RemoveCDRs(attrs utils.RPCCDRsFilter, reply *string) error return nil } -// New way of (re-)rating CDRs -func (apier *ApierV1) RateCDRs(attrs utils.AttrRateCDRs, reply *string) error { - if apier.CDRs == nil { - return errors.New("CDRS_NOT_ENABLED") - } - return apier.CDRs.Call("CDRsV1.RateCDRs", attrs, reply) +// Receive CDRs via RPC methods +type CDRsV1 struct { + CDRs *engine.CDRServer +} + +// ProcessCDR will process a CDR in CGRateS internal format +func (cdrSv1 *CDRsV1) ProcessCDR(cdr *engine.CDR, reply *string) error { + return cdrSv1.CDRs.V1ProcessCDR(cdr, reply) +} + +// ProcessExternalCDR will process a CDR in external format +func (cdrSv1 *CDRsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error { + return cdrSv1.CDRs.V1ProcessExternalCDR(cdr, reply) +} + +// RateCDRs can re-/rate remotely CDRs +func (cdrSv1 *CDRsV1) RateCDRs(arg *engine.ArgRateCDRs, reply *string) error { + return cdrSv1.CDRs.V1RateCDRs(arg, reply) +} + +// StoreSMCost will store +func (cdrSv1 *CDRsV1) StoreSessionCost(attr *engine.AttrCDRSStoreSMCost, reply *string) error { + return cdrSv1.CDRs.V1StoreSessionCost(attr, reply) +} + +func (cdrSv1 *CDRsV1) CountCDRs(args *utils.RPCCDRsFilter, reply *int64) error { + return cdrSv1.CDRs.V1CountCDRs(args, reply) +} + +func (cdrSv1 *CDRsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error { + return cdrSv1.CDRs.V1GetCDRs(args, reply) } diff --git a/apier/v1/cdrsv1.go b/apier/v1/cdrsv1.go deleted file mode 100644 index 56078a1bf..000000000 --- a/apier/v1/cdrsv1.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package v1 - -import ( - "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" -) - -// Receive CDRs via RPC methods -type CdrsV1 struct { - CdrSrv *engine.CdrServer -} - -// Designed for CGR internal usage -// Deprecated -func (self *CdrsV1) ProcessCdr(cdr *engine.CDR, reply *string) error { - return self.ProcessCDR(cdr, reply) -} - -// Designed for CGR internal usage -func (self *CdrsV1) ProcessCDR(cdr *engine.CDR, reply *string) error { - return self.CdrSrv.V1ProcessCDR(cdr, reply) -} - -// Designed for external programs feeding CDRs to CGRateS -// Deprecated -func (self *CdrsV1) ProcessExternalCdr(cdr *engine.ExternalCDR, reply *string) error { - return self.ProcessExternalCDR(cdr, reply) -} - -// Designed for external programs feeding CDRs to CGRateS -func (self *CdrsV1) ProcessExternalCDR(cdr *engine.ExternalCDR, reply *string) error { - if err := self.CdrSrv.ProcessExternalCdr(cdr); err != nil { - return utils.NewErrServerError(err) - } - *reply = utils.OK - return nil -} - -// Remotely (re)rating -// Deprecated -func (self *CdrsV1) RateCdrs(attrs utils.AttrRateCdrs, reply *string) error { - return self.RateCDRs(attrs, reply) -} - -// Remotely (re)rating -func (self *CdrsV1) RateCDRs(attrs utils.AttrRateCdrs, reply *string) error { - cdrsFltr, err := attrs.AsCDRsFilter(self.CdrSrv.Timezone()) - if err != nil { - return utils.NewErrServerError(err) - } - if err := self.CdrSrv.RateCDRs(cdrsFltr, attrs.SendToStats); err != nil { - return utils.NewErrServerError(err) - } - *reply = utils.OK - return nil -} - -func (self *CdrsV1) StoreSMCost(attr engine.AttrCDRSStoreSMCost, reply *string) error { - return self.CdrSrv.V1StoreSMCost(attr, reply) -} - -func (self *CdrsV1) CountCDRs(args utils.RPCCDRsFilter, reply *int64) error { - return self.CdrSrv.V1CountCDRs(args, reply) -} - -func (self *CdrsV1) GetCDRs(args utils.RPCCDRsFilter, reply *[]*engine.CDR) error { - return self.CdrSrv.V1GetCDRs(args, reply) -} diff --git a/apier/v2/cdrs.go b/apier/v2/cdrs.go index 130aaa39e..ab4e0d4b3 100644 --- a/apier/v2/cdrs.go +++ b/apier/v2/cdrs.go @@ -19,14 +19,14 @@ along with this program. If not, see package v2 import ( - "github.com/cgrates/cgrates/apier/v1" + v1 "github.com/cgrates/cgrates/apier/v1" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" ) // Retrieves CDRs based on the filters -func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error { +func (apier *ApierV2) GetCDRs(attrs utils.RPCCDRsFilter, reply *[]*engine.ExternalCDR) error { cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone) if err != nil { return utils.NewErrServerError(err) @@ -46,7 +46,7 @@ func (apier *ApierV2) GetCdrs(attrs utils.RPCCDRsFilter, reply *[]*engine.Extern return nil } -func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error { +func (apier *ApierV2) CountCDRs(attrs utils.RPCCDRsFilter, reply *int64) error { cdrsFltr, err := attrs.AsCDRsFilter(apier.Config.GeneralCfg().DefaultTimezone) if err != nil { if err.Error() != utils.NotFoundCaps { @@ -64,19 +64,14 @@ func (apier *ApierV2) CountCdrs(attrs utils.RPCCDRsFilter, reply *int64) error { } // Receive CDRs via RPC methods, not included with APIer because it has way less dependencies and can be standalone -type CdrsV2 struct { - v1.CdrsV1 +type CDRsV2 struct { + v1.CDRsV1 } -func (self *CdrsV2) StoreSMCost(args engine.ArgsV2CDRSStoreSMCost, reply *string) error { - return self.CdrSrv.V2StoreSMCost(args, reply) +func (cdrSv2 *CDRsV2) StoreSessionCost(args *engine.ArgsV2CDRSStoreSMCost, reply *string) error { + return cdrSv2.CDRs.V2StoreSessionCost(args, reply) } -func (self *CdrsV2) ProcessCDR(cgrEv *utils.CGREvent, reply *string) error { - return self.CdrSrv.V2ProcessCDR(cgrEv, reply) -} - -// RateCDRs will rate/re-rate CDRs using ChargerS -func (self *CdrsV2) RateCDRs(args *utils.RPCCDRsFilter, reply *string) error { - return self.CdrSrv.V2RateCDRs(args, reply) +func (cdrSv2 *CDRsV2) ProcessCDR(arg *engine.ArgV2ProcessCDR, reply *string) error { + return cdrSv2.CDRs.V2ProcessCDR(arg, reply) } diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d87a034f1..b193896e5 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -610,16 +610,15 @@ func startCDRS(internalCdrSChan chan rpcclient.RpcClientConnection, return } } - cdrServer, _ := engine.NewCdrServer(cfg, cdrDb, dm, ralConn, pubSubConn, - attrSConn, usersConn, + cdrServer := engine.NewCDRServer(cfg, cdrDb, dm, + ralConn, pubSubConn, attrSConn, usersConn, thresholdSConn, statsConn, chargerSConn, filterS) - cdrServer.SetTimeToLive(cfg.GeneralCfg().ResponseCacheTTL, nil) utils.Logger.Info("Registering CDRS HTTP Handlers.") cdrServer.RegisterHandlersToServer(server) utils.Logger.Info("Registering CDRS RPC service.") - cdrSrv := v1.CdrsV1{CdrSrv: cdrServer} + cdrSrv := v1.CDRsV1{CDRs: cdrServer} server.RpcRegister(&cdrSrv) - server.RpcRegister(&v2.CdrsV2{CdrsV1: cdrSrv}) + server.RpcRegister(&v2.CDRsV2{CDRsV1: cdrSrv}) // Make the cdr server available for internal communication server.RpcRegister(cdrServer) // register CdrServer for internal usage (TODO: refactor this) internalCdrSChan <- cdrServer // Signal that cdrS is operational diff --git a/cmd/cgr-engine/rater.go b/cmd/cgr-engine/rater.go index daf4b3169..4d25629c3 100755 --- a/cmd/cgr-engine/rater.go +++ b/cmd/cgr-engine/rater.go @@ -21,8 +21,8 @@ package main import ( "fmt" - "github.com/cgrates/cgrates/apier/v1" - "github.com/cgrates/cgrates/apier/v2" + v1 "github.com/cgrates/cgrates/apier/v1" + v2 "github.com/cgrates/cgrates/apier/v2" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/servmanager" "github.com/cgrates/cgrates/utils" @@ -178,8 +178,8 @@ func startRater(internalRaterChan chan rpcclient.RpcClientConnection, cacheS *en utils.RegisterRpcParams("PubSubV1", &engine.PubSub{}) utils.RegisterRpcParams("UsersV1", &engine.UserMap{}) - utils.RegisterRpcParams("", &v1.CdrsV1{}) - utils.RegisterRpcParams("", &v2.CdrsV2{}) + utils.RegisterRpcParams("", &v1.CDRsV1{}) + utils.RegisterRpcParams("", &v2.CDRsV2{}) utils.RegisterRpcParams("", &v1.SMGenericV1{}) utils.RegisterRpcParams("", responder) utils.RegisterRpcParams("", apierRpcV1) diff --git a/data/storage/mysql/create_cdrs_tables.sql b/data/storage/mysql/create_cdrs_tables.sql index 8cf89b7dc..3ceb65729 100644 --- a/data/storage/mysql/create_cdrs_tables.sql +++ b/data/storage/mysql/create_cdrs_tables.sql @@ -32,8 +32,8 @@ CREATE TABLE cdrs ( UNIQUE KEY cdrrun (cgrid, run_id, origin_id) ); -DROP TABLE IF EXISTS sessions_costs; -CREATE TABLE sessions_costs ( +DROP TABLE IF EXISTS session_costs; +CREATE TABLE session_costs ( id int(11) NOT NULL AUTO_INCREMENT, cgrid varchar(40) NOT NULL, run_id varchar(64) NOT NULL, diff --git a/data/storage/postgres/create_cdrs_tables.sql b/data/storage/postgres/create_cdrs_tables.sql index 8bb674556..70e8b1b26 100644 --- a/data/storage/postgres/create_cdrs_tables.sql +++ b/data/storage/postgres/create_cdrs_tables.sql @@ -35,8 +35,8 @@ DROP INDEX IF EXISTS deleted_at_cp_idx; CREATE INDEX deleted_at_cp_idx ON cdrs (deleted_at); -DROP TABLE IF EXISTS sessions_costs; -CREATE TABLE sessions_costs ( +DROP TABLE IF EXISTS session_costs; +CREATE TABLE session_costs ( id SERIAL PRIMARY KEY, cgrid VARCHAR(40) NOT NULL, run_id VARCHAR(64) NOT NULL, @@ -50,10 +50,10 @@ CREATE TABLE sessions_costs ( UNIQUE (cgrid, run_id) ); DROP INDEX IF EXISTS cgrid_sessionscost_idx; -CREATE INDEX cgrid_sessionscost_idx ON sessions_costs (cgrid, run_id); +CREATE INDEX cgrid_sessionscost_idx ON session_costs (cgrid, run_id); DROP INDEX IF EXISTS origin_sessionscost_idx; -CREATE INDEX origin_sessionscost_idx ON sessions_costs (origin_host, origin_id); +CREATE INDEX origin_sessionscost_idx ON session_costs (origin_host, origin_id); DROP INDEX IF EXISTS run_origin_sessionscost_idx; -CREATE INDEX run_origin_sessionscost_idx ON sessions_costs (run_id, origin_id); +CREATE INDEX run_origin_sessionscost_idx ON session_costs (run_id, origin_id); DROP INDEX IF EXISTS deleted_at_sessionscost_idx; -CREATE INDEX deleted_at_sessionscost_idx ON sessions_costs (deleted_at); +CREATE INDEX deleted_at_sessionscost_idx ON session_costs (deleted_at); diff --git a/engine/cdrs.go b/engine/cdrs.go index 6c2f2bc48..b8957c169 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -32,30 +32,28 @@ import ( "github.com/cgrates/rpcclient" ) -var cdrServer *CdrServer // Share the server so we can use it in http handlers +var cdrServer *CDRServer // Share the server so we can use it in http handlers -type CallCostLog struct { - CgrId string - Source string - RunId string - Usage float64 // real usage (not increment rounded) - CallCost *CallCost - CheckDuplicate bool -} - -// Handler for generic cgr cdr http +// cgrCdrHandler handles CDRs received over HTTP REST func cgrCdrHandler(w http.ResponseWriter, r *http.Request) { - cgrCdr, err := NewCgrCdrFromHttpReq(r, cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) + cgrCdr, err := NewCgrCdrFromHttpReq(r, + cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { - utils.Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error())) + utils.Logger.Warning( + fmt.Sprintf("<%s> could not create CDR entry from http: %+v, err <%s>", + utils.CDRs, r.Form, err.Error())) return } - if err := cdrServer.processCdr(cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone)); err != nil { - utils.Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) + cdr := cgrCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) + var ignored string + if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing CDR: %s, err: <%s>", + utils.CDRs, cdr, err.Error())) } } -// Handler for fs http +// fsCdrHandler will handle CDRs received from FreeSWITCH over HTTP-JSON func fsCdrHandler(w http.ResponseWriter, r *http.Request) { body, _ := ioutil.ReadAll(r.Body) fsCdr, err := NewFSCdr(body, cdrServer.cgrCfg) @@ -63,21 +61,26 @@ func fsCdrHandler(w http.ResponseWriter, r *http.Request) { utils.Logger.Err(fmt.Sprintf(" Could not create CDR entry: %s", err.Error())) return } - if err := cdrServer.processCdr(fsCdr.AsCDR(cdrServer.Timezone())); err != nil { - utils.Logger.Err(fmt.Sprintf(" Errors when storing CDR entry: %s", err.Error())) + cdr := fsCdr.AsCDR(cdrServer.cgrCfg.GeneralCfg().DefaultTimezone) + var ignored string + if err := cdrServer.V1ProcessCDR(cdr, &ignored); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> processing CDR: %s, err: <%s>", + utils.CDRs, cdr, err.Error())) } } -func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, - attrs, users, thdS, stats, chargerS rpcclient.RpcClientConnection, filterS *FilterS) (*CdrServer, error) { - if rater != nil && reflect.ValueOf(rater).IsNil() { // Work around so we store actual nil instead of nil interface value, faster to check here than in CdrServer code +// NewCDRServer is a constructor for CDRServer +func NewCDRServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, rater, pubsub, + attrS, users, thdS, statS, chargerS rpcclient.RpcClientConnection, filterS *FilterS) *CDRServer { + if rater != nil && reflect.ValueOf(rater).IsNil() { rater = nil } if pubsub != nil && reflect.ValueOf(pubsub).IsNil() { pubsub = nil } - if attrs != nil && reflect.ValueOf(attrs).IsNil() { - attrs = nil + if attrS != nil && reflect.ValueOf(attrS).IsNil() { + attrS = nil } if users != nil && reflect.ValueOf(users).IsNil() { users = nil @@ -85,278 +88,70 @@ func NewCdrServer(cgrCfg *config.CGRConfig, cdrDb CdrStorage, dm *DataManager, r if thdS != nil && reflect.ValueOf(thdS).IsNil() { thdS = nil } - if stats != nil && reflect.ValueOf(stats).IsNil() { - stats = nil + if statS != nil && reflect.ValueOf(statS).IsNil() { + statS = nil } if chargerS != nil && reflect.ValueOf(chargerS).IsNil() { chargerS = nil } - return &CdrServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, - rals: rater, pubsub: pubsub, attrS: attrs, + return &CDRServer{cgrCfg: cgrCfg, cdrDb: cdrDb, dm: dm, + rals: rater, pubsub: pubsub, attrS: attrS, users: users, - stats: stats, thdS: thdS, + statS: statS, thdS: thdS, chargerS: chargerS, guard: guardian.Guardian, + respCache: utils.NewResponseCache(cgrCfg.GeneralCfg().ResponseCacheTTL), httpPoster: NewHTTPPoster(cgrCfg.GeneralCfg().HttpSkipTlsVerify, - cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS}, nil + cgrCfg.GeneralCfg().ReplyTimeout), filterS: filterS} } -type CdrServer struct { - cgrCfg *config.CGRConfig - cdrDb CdrStorage - dm *DataManager - rals rpcclient.RpcClientConnection - pubsub rpcclient.RpcClientConnection - attrS rpcclient.RpcClientConnection - users rpcclient.RpcClientConnection - thdS rpcclient.RpcClientConnection - stats rpcclient.RpcClientConnection - chargerS rpcclient.RpcClientConnection - guard *guardian.GuardianLocker - responseCache *utils.ResponseCache - httpPoster *HTTPPoster // used for replication - filterS *FilterS +// CDRServer stores and rates CDRs +type CDRServer struct { + cgrCfg *config.CGRConfig + cdrDb CdrStorage + dm *DataManager + rals rpcclient.RpcClientConnection + pubsub rpcclient.RpcClientConnection + attrS rpcclient.RpcClientConnection + users rpcclient.RpcClientConnection + thdS rpcclient.RpcClientConnection + statS rpcclient.RpcClientConnection + chargerS rpcclient.RpcClientConnection + guard *guardian.GuardianLocker + respCache *utils.ResponseCache + httpPoster *HTTPPoster // used for replication + filterS *FilterS } -func (self *CdrServer) Timezone() string { - return self.cgrCfg.GeneralCfg().DefaultTimezone -} -func (self *CdrServer) SetTimeToLive(timeToLive time.Duration, out *int) error { - self.responseCache = utils.NewResponseCache(timeToLive) - return nil +// RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers +func (cdrS *CDRServer) RegisterHandlersToServer(server *utils.Server) { + cdrServer = cdrS // Share the server object for handlers + server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPCDRsURL, cgrCdrHandler) + server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, fsCdrHandler) } -func (self *CdrServer) getCache() *utils.ResponseCache { - if self.responseCache == nil { - self.responseCache = utils.NewResponseCache(0) - } - return self.responseCache -} - -func (self *CdrServer) RegisterHandlersToServer(server *utils.Server) { - cdrServer = self // Share the server object for handlers - server.RegisterHttpFunc(self.cgrCfg.HTTPCfg().HTTPCDRsURL, cgrCdrHandler) - server.RegisterHttpFunc(self.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, fsCdrHandler) -} - -// Used to process external CDRs -func (self *CdrServer) ProcessExternalCdr(eCDR *ExternalCDR) error { - cdr, err := NewCDRFromExternalCDR(eCDR, self.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - return err - } - return self.processCdr(cdr) -} - -func (self *CdrServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error { +// storeSMCost will store a SMCost +func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error { smCost.CostDetails.Compute() // make sure the total cost reflect the increment lockKey := utils.MetaCDRs + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID if checkDuplicate { - _, err := self.guard.Guard(func() (interface{}, error) { - smCosts, err := self.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "") + _, err := cdrS.guard.Guard(func() (interface{}, error) { + smCosts, err := cdrS.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "") if err != nil && err.Error() != utils.NotFoundCaps { return nil, err } if len(smCosts) != 0 { return nil, utils.ErrExists } - return nil, self.cdrDb.SetSMCost(smCost) + return nil, cdrS.cdrDb.SetSMCost(smCost) }, time.Duration(2*time.Second), lockKey) // FixMe: Possible deadlock with Guard from SMG session close() return err } - return self.cdrDb.SetSMCost(smCost) -} - -// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline -func (self *CdrServer) processCdr(cdr *CDR) (err error) { - if cdr.RequestType == "" { - cdr.RequestType = self.cgrCfg.GeneralCfg().DefaultReqType - } - if cdr.Tenant == "" { - cdr.Tenant = self.cgrCfg.GeneralCfg().DefaultTenant - } - if cdr.Category == "" { - cdr.Category = self.cgrCfg.GeneralCfg().DefaultCategory - } - if cdr.Subject == "" { // Use account information as rating subject if missing - cdr.Subject = cdr.Account - } - if !cdr.PreRated { // Enforce the RunID if CDR is not rated - cdr.RunID = utils.MetaRaw - } - if cdr.RunID == utils.MetaRaw { - cdr.Cost = -1.0 - } - if self.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store RawCDRs, this we do sync so we can reply with the status - if err := self.cdrDb.SetCDR(cdr, false); err != nil { - utils.Logger.Err(fmt.Sprintf(" Storing primary CDR %+v, got error: %s", cdr, err.Error())) - return err // Error is propagated back and we don't continue processing the CDR if we cannot store it - } - } - if self.thdS != nil { - // process CDR with thresholdS - go self.thdSProcessEvent(cdr.AsCGREvent()) - } - if self.stats != nil { - var reply []string - - go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cdr.AsCGREvent()}, &reply) - } - if len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { // Replicate raw CDR - self.replicateCDRs([]*CDR{cdr}) - } - if self.rals != nil && !cdr.PreRated { // CDRs not rated will be processed by Rating - go self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CdrsCfg().CDRSStoreCdrs, - true, len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0) - } - return nil -} - -// Returns error if not able to properly store the CDR, mediation is async since we can always recover offline -func (self *CdrServer) deriveRateStoreStatsReplicate(cdr *CDR, store, cdrstats, replicate bool) (err error) { - cdrRuns, err := self.deriveCdrs(cdr) - if err != nil { - utils.Logger.Err(fmt.Sprintf(" Deriving CDR %+v, got error: %s", cdr, err.Error())) - return err - } - var ratedCDRs []*CDR // Gather all CDRs received from rating subsystem - for _, cdrRun := range cdrRuns { - if self.attrS != nil { - var rplyEv AttrSProcessEventReply - cgrEv := cdrRun.AsCGREvent() - cgrEv.Context = utils.StringPointer(utils.MetaCDRs) - if err = self.attrS.Call(utils.AttributeSv1ProcessEvent, - &AttrArgsProcessEvent{ - CGREvent: *cgrEv}, - &rplyEv); err == nil { - if err = cdrRun.UpdateFromCGREvent(rplyEv.CGREvent, - rplyEv.AlteredFields); err != nil { - return - } - } else if err.Error() != utils.ErrNotFound.Error() { - return - } - } - if err := LoadUserProfile(cdrRun, utils.EXTRA_FIELDS); err != nil { - utils.Logger.Err(fmt.Sprintf(" UserS handling for CDR %+v, got error: %s", cdrRun, err.Error())) - continue - } - rcvRatedCDRs, err := self.rateCDR(cdrRun) - if err != nil { - cdrRun.Cost = -1.0 // If there was an error, mark the CDR - cdrRun.ExtraInfo = err.Error() - rcvRatedCDRs = []*CDR{cdrRun} - } - ratedCDRs = append(ratedCDRs, rcvRatedCDRs...) - } - // Request should be processed by SureTax - for _, ratedCDR := range ratedCDRs { - if ratedCDR.RunID == utils.META_SURETAX { - if err := SureTaxProcessCdr(ratedCDR); err != nil { - ratedCDR.Cost = -1.0 - ratedCDR.ExtraInfo = err.Error() // Something failed, write the error in the ExtraInfo - } - } - } - // Store rated CDRs - if store { - for _, ratedCDR := range ratedCDRs { - if err := self.cdrDb.SetCDR(ratedCDR, true); err != nil { - utils.Logger.Err(fmt.Sprintf(" Storing rated CDR %+v, got error: %s", ratedCDR, err.Error())) - } - } - } - // Attach CDR to stats - if cdrstats { // Send CDR to stats - for _, ratedCDR := range ratedCDRs { - if self.stats != nil { - var reply []string - go self.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *ratedCDR.AsCGREvent()}, &reply) - } - } - } - if replicate { - self.replicateCDRs(ratedCDRs) - } - return nil -} - -func (self *CdrServer) deriveCdrs(cdr *CDR) (drvdCDRs []*CDR, err error) { - dfltCDRRun := cdr.Clone() - cdrRuns := []*CDR{dfltCDRRun} - if cdr.RunID != utils.MetaRaw { // Only derive *raw CDRs - return cdrRuns, nil - } - dfltCDRRun.RunID = utils.META_DEFAULT // Rewrite *raw with *default since we have it as first run - if self.attrS != nil { - var rplyEv AttrSProcessEventReply - if err = self.attrS.Call(utils.AttributeSv1ProcessEvent, - &AttrArgsProcessEvent{ - CGREvent: *(cdr.AsCGREvent())}, &rplyEv); err != nil { - return - } - if err = cdr.UpdateFromCGREvent(rplyEv.CGREvent, - rplyEv.AlteredFields); err != nil { - return - } - } - if err := LoadUserProfile(cdr, utils.EXTRA_FIELDS); err != nil { - return nil, err - } - attrsDC := &utils.AttrDerivedChargers{Tenant: cdr.Tenant, Category: cdr.Category, Direction: utils.OUT, - Account: cdr.Account, Subject: cdr.Subject, Destination: cdr.Destination} - var dcs utils.DerivedChargers - if err := self.rals.Call("Responder.GetDerivedChargers", attrsDC, &dcs); err != nil { - utils.Logger.Err(fmt.Sprintf("Could not get derived charging for cgrid %s, error: %s", cdr.CGRID, err.Error())) - return nil, err - } - for _, dc := range dcs.Chargers { - runFilters, _ := utils.ParseRSRFields(dc.RunFilters, utils.INFIELD_SEP) - matchingAllFilters := true - for _, dcRunFilter := range runFilters { - if _, err := cdr.FieldAsStringWithRSRField(dcRunFilter); err != nil { - matchingAllFilters = false - break - } - } - if !matchingAllFilters { // Do not process the derived charger further if not all filters were matched - continue - } - dcRequestTypeFld, _ := utils.NewRSRField(dc.RequestTypeField) - dcTenantFld, _ := utils.NewRSRField(dc.TenantField) - dcCategoryFld, _ := utils.NewRSRField(dc.CategoryField) - dcAcntFld, _ := utils.NewRSRField(dc.AccountField) - dcSubjFld, _ := utils.NewRSRField(dc.SubjectField) - dcDstFld, _ := utils.NewRSRField(dc.DestinationField) - dcSTimeFld, _ := utils.NewRSRField(dc.SetupTimeField) - dcATimeFld, _ := utils.NewRSRField(dc.AnswerTimeField) - dcDurFld, _ := utils.NewRSRField(dc.UsageField) - dcRatedFld, _ := utils.NewRSRField(dc.PreRatedField) - dcCostFld, _ := utils.NewRSRField(dc.CostField) - - dcExtraFields := []*utils.RSRField{} - for key := range cdr.ExtraFields { - dcExtraFields = append(dcExtraFields, &utils.RSRField{Id: key}) - } - - forkedCdr, err := cdr.ForkCdr(dc.RunID, dcRequestTypeFld, dcTenantFld, - dcCategoryFld, dcAcntFld, dcSubjFld, dcDstFld, dcSTimeFld, dcATimeFld, - dcDurFld, dcRatedFld, dcCostFld, dcExtraFields, true, - self.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - utils.Logger.Err(fmt.Sprintf("Could not fork CGR with cgrid %s, run: %s, error: %s", cdr.CGRID, dc.RunID, err.Error())) - continue // do not add it to the forked CDR list - } - if !forkedCdr.PreRated { - forkedCdr.Cost = -1.0 // Make sure that un-rated CDRs start with Cost -1 - } - cdrRuns = append(cdrRuns, forkedCdr) - } - return cdrRuns, nil + return cdrS.cdrDb.SetSMCost(smCost) } // rateCDR will populate cost field // Returns more than one rated CDR in case of SMCost retrieved based on prefix -func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { +func (cdrS *CDRServer) rateCDR(cdr *CDR) ([]*CDR, error) { var qryCC *CallCost var err error if cdr.RequestType == utils.META_NONE { @@ -377,8 +172,8 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT { cgrID = "" // for queries involving originIDPrefix we ignore CGRID } - for i := 0; i < self.cgrCfg.CdrsCfg().CDRSSMCostRetries; i++ { - smCosts, err = self.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost, + for i := 0; i < cdrS.cgrCfg.CdrsCfg().CDRSSMCostRetries; i++ { + smCosts, err = cdrS.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost, cdr.ExtraFields[utils.OriginIDPrefix]) if err == nil && len(smCosts) != 0 { break @@ -404,10 +199,10 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { utils.Logger.Warning( fmt.Sprintf(" WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, originID: %s originHost: %s, will recalculate", cdr.CGRID, utils.MetaSessionS, cdr.RunID, cdr.OriginID, cdr.OriginHost)) - qryCC, err = self.getCostFromRater(cdr) + qryCC, err = cdrS.getCostFromRater(cdr) } } else { - qryCC, err = self.getCostFromRater(cdr) + qryCC, err = cdrS.getCostFromRater(cdr) } if err != nil { return nil, err @@ -419,8 +214,8 @@ func (self *CdrServer) rateCDR(cdr *CDR) ([]*CDR, error) { return []*CDR{cdr}, nil } -// Retrive the cost from engine -func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { +// getCostFromRater will retrieve the cost from RALs +func (cdrS *CDRServer) getCostFromRater(cdr *CDR) (*CallCost, error) { cc := new(CallCost) var err error timeStart := cdr.AnswerTime @@ -441,9 +236,9 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { } if utils.IsSliceMember([]string{utils.META_PSEUDOPREPAID, utils.META_POSTPAID, utils.META_PREPAID, utils.PSEUDOPREPAID, utils.POSTPAID, utils.PREPAID}, cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM - err = self.rals.Call("Responder.Debit", cd, cc) + err = cdrS.rals.Call("Responder.Debit", cd, cc) } else { - err = self.rals.Call("Responder.GetCost", cd, cc) + err = cdrS.rals.Call("Responder.GetCost", cd, cc) } if err != nil { return cc, err @@ -452,17 +247,142 @@ func (self *CdrServer) getCostFromRater(cdr *CDR) (*CallCost, error) { return cc, nil } -func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) { - for _, exportID := range self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports { - expTpl := self.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer +// chrgRaStoReThStaCDR will process the CGREvent with ChargerS subsystem +// it is designed to run in it's own goroutine +func (cdrS *CDRServer) chrgRaStoReThStaCDR(cgrEv *utils.CGREvent, + store, export, thdS, statS *bool) (err error) { + var chrgrs []*ChrgSProcessEventReply + if err = cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, + cgrEv, &chrgrs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) + return + } + var partExec bool + for _, chrgr := range chrgrs { + cdr, errCdr := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, + cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) + if errCdr != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.", + utils.CDRs, errCdr.Error(), cgrEv, utils.ChargerS)) + partExec = true + continue + } + cdrS.raStoReThStaCDR(cdr, store, export, thdS, statS) + } + if partExec { + err = utils.ErrPartiallyExecuted + } + return +} + +// raStoReThStaCDR will RAte/STOtore/REplicate/THresholds/STAts the CDR received +// used by both chargerS as well as re-/rating +func (cdrS *CDRServer) raStoReThStaCDR(cdr *CDR, + store, export, thdS, statS *bool) { + ratedCDRs, err := cdrS.rateCDR(cdr) + if err != nil { + cdr.Cost = -1.0 // If there was an error, mark the CDR + cdr.ExtraInfo = err.Error() + ratedCDRs = []*CDR{cdr} + } + for _, rtCDR := range ratedCDRs { + shouldStore := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs + if store != nil { + shouldStore = *store + } + if shouldStore { // Store CDR + go func(rtCDR *CDR) { + if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s storing CDR %+v.", + utils.CDRs, err.Error(), rtCDR)) + } + }(rtCDR) + } + shouldExport := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 + if export != nil { + shouldExport = *export + } + if shouldExport { + go cdrS.exportCDRs([]*CDR{rtCDR}) + } + cgrEv := rtCDR.AsCGREvent() + shouldThdS := cdrS.thdS != nil + if thdS != nil { + shouldThdS = *thdS + } + if shouldThdS { + go cdrS.thdSProcessEvent(cgrEv) + } + shouldStatS := cdrS.statS != nil + if statS != nil { + shouldStatS = *statS + } + if shouldStatS { + go cdrS.statSProcessEvent(cgrEv) + } + } +} + +// statSProcessEvent will send the event to StatS if the connection is configured +func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) { + if cgrEv.Context == nil { // populate if not already in + cgrEv.Context = utils.StringPointer(utils.MetaCDRs) + } + var rplyEv AttrSProcessEventReply + if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent, + &AttrArgsProcessEvent{ + CGREvent: *cgrEv}, + &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { + *cgrEv = *rplyEv.CGREvent + } else if err.Error() == utils.ErrNotFound.Error() { + err = nil // cancel ErrNotFound + } + return +} + +// thdSProcessEvent will send the event to ThresholdS if the connection is configured +func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) { + var tIDs []string + if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, + &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.", + utils.CDRs, err.Error(), cgrEv)) + return + } +} + +// statSProcessEvent will send the event to StatS if the connection is configured +func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) { + var reply []string + if err := cdrS.statS.Call(utils.StatSv1ProcessEvent, + &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil && + err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.", + utils.CDRs, err.Error(), cgrEv, utils.StatS)) + return + } +} + +// exportCDRs will export the CDRs received +func (cdrS *CDRServer) exportCDRs(cdrs []*CDR) (err error) { + for _, exportID := range cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports { + expTpl := cdrS.cgrCfg.CdreProfiles[exportID] // not checking for existence of profile since this should be done in a higher layer var cdre *CDRExporter if cdre, err = NewCDRExporter(cdrs, expTpl, expTpl.ExportFormat, - expTpl.ExportPath, self.cgrCfg.GeneralCfg().FailedPostsDir, + expTpl.ExportPath, cdrS.cgrCfg.GeneralCfg().FailedPostsDir, "CDRSReplication", expTpl.Synchronous, expTpl.Attempts, expTpl.FieldSeparator, expTpl.UsageMultiplyFactor, - expTpl.CostMultiplyFactor, self.cgrCfg.GeneralCfg().RoundingDecimals, - self.cgrCfg.GeneralCfg().HttpSkipTlsVerify, self.httpPoster, - self.filterS); err != nil { + expTpl.CostMultiplyFactor, cdrS.cgrCfg.GeneralCfg().RoundingDecimals, + cdrS.cgrCfg.GeneralCfg().HttpSkipTlsVerify, cdrS.httpPoster, + cdrS.filterS); err != nil { utils.Logger.Err(fmt.Sprintf(" Building CDRExporter for online exports got error: <%s>", err.Error())) continue } @@ -474,147 +394,14 @@ func (self *CdrServer) replicateCDRs(cdrs []*CDR) (err error) { return } -// Called by rate/re-rate API, FixMe: deprecate it once new APIer structure is operational -func (self *CdrServer) RateCDRs(cdrFltr *utils.CDRsFilter, sendToStats bool) error { - cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false) - if err != nil { - return err - } - for _, cdr := range cdrs { - if err := self.deriveRateStoreStatsReplicate(cdr, self.cgrCfg.CdrsCfg().CDRSStoreCdrs, - sendToStats, len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0); err != nil { - utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) - } - } - return nil -} - -// Internally used and called from CDRSv1 -// Cached requests for HA setups -func (self *CdrServer) V1ProcessCDR(cdr *CDR, reply *string) error { - if len(cdr.CGRID) == 0 { // Populate CGRID if not present - cdr.ComputeCGRID() - } - cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID - if item, err := self.getCache().Get(cacheKey); err == nil && item != nil { - if item.Value != nil { - *reply = item.Value.(string) - } - return item.Err - } - if err := self.processCdr(cdr); err != nil { - self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err}) - return utils.NewErrServerError(err) - } - self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: utils.OK}) - *reply = utils.OK - return nil -} - -// RPC method, differs from storeSMCost through it's signature -func (self *CdrServer) V1StoreSMCost(attr AttrCDRSStoreSMCost, reply *string) error { - if attr.Cost.CGRID == "" { - return utils.NewCGRError(utils.CDRSCtx, - utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), - "SMCost: %+v with empty CGRID") - } - cacheKey := "V1StoreSMCost" + attr.Cost.CGRID + attr.Cost.RunID + attr.Cost.OriginID - if item, err := self.getCache().Get(cacheKey); err == nil && item != nil { - if item.Value != nil { - *reply = item.Value.(string) - } - return item.Err - } - if err := self.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { - self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err}) - return utils.NewErrServerError(err) - } - self.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: utils.OK}) - *reply = utils.OK - return nil -} - -func (cdrs *CdrServer) V2StoreSMCost(args ArgsV2CDRSStoreSMCost, reply *string) error { - if args.Cost.CGRID == "" { - return utils.NewCGRError(utils.CDRSCtx, - utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), - "SMCost: %+v with empty CGRID") - } - cacheKey := "V2StoreSMCost" + args.Cost.CGRID + args.Cost.RunID + args.Cost.OriginID - if item, err := cdrs.getCache().Get(cacheKey); err == nil && item != nil { - if item.Value != nil { - *reply = item.Value.(string) - } - return item.Err - } - cc := args.Cost.CostDetails.AsCallCost() - cc.Round() - roundIncrements := cc.GetRoundIncrements() - if len(roundIncrements) != 0 { - cd := cc.CreateCallDescriptor() - cd.CgrID = args.Cost.CGRID - cd.RunID = args.Cost.RunID - cd.Increments = roundIncrements - var response float64 - if err := cdrs.rals.Call("Responder.RefundRounding", cd, &response); err != nil { - utils.Logger.Err(fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s", cc, err.Error())) - } - } - if err := cdrs.storeSMCost(&SMCost{ - CGRID: args.Cost.CGRID, - RunID: args.Cost.RunID, - OriginHost: args.Cost.OriginHost, - OriginID: args.Cost.OriginID, - CostSource: args.Cost.CostSource, - Usage: args.Cost.Usage, - CostDetails: args.Cost.CostDetails, - }, args.CheckDuplicate); err != nil { - cdrs.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Err: err}) - return utils.NewErrServerError(err) - } - *reply = utils.OK - cdrs.getCache().Cache(cacheKey, &utils.ResponseCacheItem{Value: *reply}) - return nil - -} - -// Called by rate/re-rate API, RPC method -func (self *CdrServer) V1RateCDRs(attrs utils.AttrRateCDRs, reply *string) error { - cdrFltr, err := attrs.RPCCDRsFilter.AsCDRsFilter(self.cgrCfg.GeneralCfg().DefaultTimezone) - if err != nil { - return utils.NewErrServerError(err) - } - cdrs, _, err := self.cdrDb.GetCDRs(cdrFltr, false) - if err != nil { - return err - } - storeCDRs := self.cgrCfg.CdrsCfg().CDRSStoreCdrs - if attrs.StoreCDRs != nil { - storeCDRs = *attrs.StoreCDRs - } - sendToStats := self.stats != nil - if attrs.SendToStatS != nil { - sendToStats = *attrs.SendToStatS - } - replicate := len(self.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 - if attrs.ReplicateCDRs != nil { - replicate = *attrs.ReplicateCDRs - } - for _, cdr := range cdrs { - if err := self.deriveRateStoreStatsReplicate(cdr, storeCDRs, sendToStats, replicate); err != nil { - utils.Logger.Err(fmt.Sprintf(" Processing CDR %+v, got error: %s", cdr, err.Error())) - } - } - return nil -} - -func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply interface{}) error { +// Call implements the rpcclient.RpcClientConnection interface +func (cdrS *CDRServer) Call(serviceMethod string, args interface{}, reply interface{}) error { parts := strings.Split(serviceMethod, ".") if len(parts) != 2 { return rpcclient.ErrUnsupporteServiceMethod } // get method - method := reflect.ValueOf(cdrsrv).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method + method := reflect.ValueOf(cdrS).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method if !method.IsValid() { return rpcclient.ErrUnsupporteServiceMethod } @@ -634,145 +421,210 @@ func (cdrsrv *CdrServer) Call(serviceMethod string, args interface{}, reply inte return err } -// thdSProcessEvent will send the event to ThresholdS if the connection is configured -func (cdrS *CdrServer) thdSProcessEvent(cgrEv *utils.CGREvent) { - var tIDs []string - if err := cdrS.thdS.Call(utils.ThresholdSv1ProcessEvent, - &ArgsProcessEvent{CGREvent: *cgrEv}, &tIDs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CDR event %+v with thdS.", - utils.CDRs, err.Error(), cgrEv)) - return +// V1ProcessCDR processes a CDR +func (cdrS *CDRServer) V1ProcessCDR(cdr *CDR, reply *string) (err error) { + if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present + cdr.ComputeCGRID() } -} - -// statSProcessEvent will send the event to StatS if the connection is configured -func (cdrS *CdrServer) statSProcessEvent(cgrEv *utils.CGREvent) { - var reply []string - if err := cdrS.stats.Call(utils.StatSv1ProcessEvent, &StatsArgsProcessEvent{CGREvent: *cgrEv}, &reply); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CDR event %+v with %s.", - utils.CDRs, err.Error(), cgrEv, utils.StatS)) - return - } -} - -// rarethsta will RAte/STOtore/REplicate/THresholds/STAts the CDR received -// used by both chargerS as well as re-/rating -func (cdrS *CdrServer) raStoReThStaCDR(cdr *CDR) { - ratedCDRs, err := cdrS.rateCDR(cdr) - if err != nil { - cdr.Cost = -1.0 // If there was an error, mark the CDR - cdr.ExtraInfo = err.Error() - ratedCDRs = []*CDR{cdr} - } - for _, rtCDR := range ratedCDRs { - if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store CDR - go func(rtCDR *CDR) { - if err := cdrS.cdrDb.SetCDR(rtCDR, true); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s storing CDR %+v.", - utils.CDRs, err.Error(), rtCDR)) - } - }(rtCDR) - } - if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { - go cdrS.replicateCDRs([]*CDR{rtCDR}) - } - cgrEv := rtCDR.AsCGREvent() - if cdrS.thdS != nil { - go cdrS.thdSProcessEvent(cgrEv) - } - if cdrS.stats != nil { - go cdrS.statSProcessEvent(cgrEv) + cacheKey := "V1ProcessCDR" + cdr.CGRID + cdr.RunID + if item, err := cdrS.respCache.Get(cacheKey); err == nil && item != nil { + if item.Err == nil { + *reply = *item.Value.(*string) } + return item.Err } -} + defer cdrS.respCache.Cache(cacheKey, + &utils.ResponseCacheItem{Value: reply, Err: err}) -// chrgrSProcessEvent will process the CGREvent with ChargerS subsystem -func (cdrS *CdrServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) { - var chrgrs []*ChrgSProcessEventReply - if err := cdrS.chargerS.Call(utils.ChargerSv1ProcessEvent, cgrEv, &chrgrs); err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing CGR event %+v with %s.", - utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) - return + if cdr.RequestType == utils.EmptyString { + cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType } - for _, chrgr := range chrgrs { - cdr, err := NewMapEvent(chrgr.CGREvent.Event).AsCDR(cdrS.cgrCfg, - cgrEv.Tenant, cdrS.Timezone()) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s converting CDR event %+v with %s.", - utils.CDRs, err.Error(), cgrEv, utils.ChargerS)) - continue - } - cdrS.raStoReThStaCDR(cdr) - + if cdr.Tenant == utils.EmptyString { + cdr.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant } -} - -// statSProcessEvent will send the event to StatS if the connection is configured -func (cdrS *CdrServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) { - if cgrEv.Context == nil { // populate if not already in - cgrEv.Context = utils.StringPointer(utils.MetaCDRs) + if cdr.Category == utils.EmptyString { + cdr.Category = cdrS.cgrCfg.GeneralCfg().DefaultCategory } - var rplyEv AttrSProcessEventReply - if err = cdrS.attrS.Call(utils.AttributeSv1ProcessEvent, - &AttrArgsProcessEvent{ - CGREvent: *cgrEv}, - &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 { - *cgrEv = *rplyEv.CGREvent - } else if err.Error() == utils.ErrNotFound.Error() { - err = nil // cancel ErrNotFound + if cdr.Subject == utils.EmptyString { // Use account information as rating subject if missing + cdr.Subject = cdr.Account + } + if !cdr.PreRated { // Enforce the RunID if CDR is not rated + cdr.RunID = utils.MetaRaw + } + if utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) { + cdr.Cost = -1.0 + } + cgrEv := &utils.CGREvent{ + Tenant: cdr.Tenant, + ID: utils.UUIDSha1Prefix(), + Event: cdr.AsMapStringIface(), } - return -} - -// V2ProcessCDR will process the CDR out of CGREvent -func (cdrS *CdrServer) V2ProcessCDR(cgrEv *utils.CGREvent, reply *string) (err error) { if cdrS.attrS != nil { - if err := cdrS.attrSProcessEvent(cgrEv); err != nil { + if err = cdrS.attrSProcessEvent(cgrEv); err != nil { return utils.NewErrServerError(err) } } - rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, cgrEv.Tenant, cdrS.Timezone()) - if err != nil { - return utils.NewErrServerError(err) - } - if cdrS.chargerS == nil { // backwards compatibility for DerivedChargers - return cdrS.V1ProcessCDR(rawCDR, reply) - } if cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs { // Store *raw CDR - if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil { + if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> storing primary CDR %+v, got error: %s", + utils.CDRs, cdr, err.Error())) return utils.NewErrServerError(err) // Cannot store CDR } } if len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 { - cdrS.replicateCDRs([]*CDR{rawCDR}) // Replicate raw CDR + cdrS.exportCDRs([]*CDR{cdr}) // Replicate raw CDR } if cdrS.thdS != nil { go cdrS.thdSProcessEvent(cgrEv) } - if cdrS.stats != nil { + if cdrS.statS != nil { go cdrS.statSProcessEvent(cgrEv) } - if cdrS.chargerS != nil { - go cdrS.chrgrSProcessEvent(cgrEv) + if cdrS.chargerS != nil && + utils.IsSliceMember([]string{"", utils.MetaRaw}, cdr.RunID) { + go cdrS.chrgRaStoReThStaCDR(cgrEv, nil, nil, nil, nil) + } + *reply = utils.OK + + return +} + +type ArgV2ProcessCDR struct { + utils.CGREvent + AttributeS *bool // control AttributeS processing + ChargerS *bool // control ChargerS processing + Store *bool // control storing of the CDR + Export *bool // control online exports for the CDR + ThresholdS *bool // control ThresholdS + StatS *bool // control sending the CDR to StatS for aggregation +} + +// V2ProcessCDR will process the CDR out of CGREvent +func (cdrS *CDRServer) V2ProcessCDR(arg *ArgV2ProcessCDR, reply *string) (err error) { + attrS := cdrS.attrS != nil + if arg.AttributeS != nil { + attrS = *arg.AttributeS + } + cgrEv := &arg.CGREvent + if attrS { + if err := cdrS.attrSProcessEvent(cgrEv); err != nil { + return utils.NewErrServerError(err) + } + } + rawCDR, err := NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg, + cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) + if err != nil { + return utils.NewErrServerError(err) + } + store := cdrS.cgrCfg.CdrsCfg().CDRSStoreCdrs + if arg.Store != nil { + store = *arg.Store + } + if store { // Store *raw CDR + if err = cdrS.cdrDb.SetCDR(rawCDR, false); err != nil { + return utils.NewErrServerError(err) // Cannot store CDR + } + } + export := len(cdrS.cgrCfg.CdrsCfg().CDRSOnlineCDRExports) != 0 + if arg.Export != nil { + export = *arg.Export + } + if export { + cdrS.exportCDRs([]*CDR{rawCDR}) // Replicate raw CDR + } + thrdS := cdrS.thdS != nil + if arg.ThresholdS != nil { + thrdS = *arg.ThresholdS + } + if thrdS { + go cdrS.thdSProcessEvent(cgrEv) + } + statS := cdrS.statS != nil + if arg.StatS != nil { + statS = *arg.StatS + } + if statS { + go cdrS.statSProcessEvent(cgrEv) + } + chrgS := cdrS.chargerS != nil + if arg.ChargerS != nil { + chrgS = *arg.ChargerS + } + if chrgS { + go cdrS.chrgRaStoReThStaCDR(cgrEv, + arg.Store, arg.Export, arg.ThresholdS, arg.StatS) } *reply = utils.OK return nil } -// Called by rate/re-rate API, RPC method -func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) error { - if cdrS.chargerS == nil { - return utils.NewErrNotConnected(utils.ChargerS) +// V1StoreSMCost handles storing of the cost into session_costs table +func (cdrS *CDRServer) V1StoreSessionCost(attr *AttrCDRSStoreSMCost, reply *string) error { + if attr.Cost.CGRID == "" { + return utils.NewCGRError(utils.CDRSCtx, + utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), + "SMCost: %+v with empty CGRID") } - cdrFltr, err := attrs.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) + if err := cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil +} + +// V2StoreSessionCost will store the SessionCost into session_costs table +func (cdrS *CDRServer) V2StoreSessionCost(args *ArgsV2CDRSStoreSMCost, reply *string) error { + if args.Cost.CGRID == "" { + return utils.NewCGRError(utils.CDRSCtx, + utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing), + "SMCost: %+v with empty CGRID") + } + cc := args.Cost.CostDetails.AsCallCost() + cc.Round() + roundIncrements := cc.GetRoundIncrements() + if len(roundIncrements) != 0 { + cd := cc.CreateCallDescriptor() + cd.CgrID = args.Cost.CGRID + cd.RunID = args.Cost.RunID + cd.Increments = roundIncrements + var response float64 + if err := cdrS.rals.Call("Responder.RefundRounding", + cd, &response); err != nil { + utils.Logger.Warning( + fmt.Sprintf(" RefundRounding for cc: %+v, got error: %s", + cc, err.Error())) + } + } + if err := cdrS.storeSMCost( + &SMCost{ + CGRID: args.Cost.CGRID, + RunID: args.Cost.RunID, + OriginHost: args.Cost.OriginHost, + OriginID: args.Cost.OriginID, + CostSource: args.Cost.CostSource, + Usage: args.Cost.Usage, + CostDetails: args.Cost.CostDetails}, + args.CheckDuplicate); err != nil { + return utils.NewErrServerError(err) + } + *reply = utils.OK + return nil + +} + +type ArgRateCDRs struct { + utils.RPCCDRsFilter + ChargerS *bool + Store *bool + Export *bool // Replicate results + ThresholdS *bool + StatS *bool // Set to true if the CDRs should be sent to stats server +} + +// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB +func (cdrS *CDRServer) V1RateCDRs(arg *ArgRateCDRs, reply *string) (err error) { + cdrFltr, err := arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { return utils.NewErrServerError(err) } @@ -781,22 +633,42 @@ func (cdrS *CdrServer) V2RateCDRs(attrs *utils.RPCCDRsFilter, reply *string) err return err } for _, cdr := range cdrs { - go cdrS.raStoReThStaCDR(cdr) + if arg.ChargerS != nil && *arg.ChargerS { + if cdrS.chargerS == nil { + return utils.NewErrNotConnected(utils.ChargerS) + } + if err = cdrS.chrgRaStoReThStaCDR(cdr.AsCGREvent(), + arg.Store, arg.Export, arg.ThresholdS, arg.StatS); err != nil { + return utils.NewErrServerError(err) + } + } else { + cdrS.raStoReThStaCDR(cdr, arg.Store, + arg.Export, arg.ThresholdS, arg.StatS) + } } *reply = utils.OK return nil } +// Used to process external CDRs +func (cdrS *CDRServer) V1ProcessExternalCDR(eCDR *ExternalCDR, reply *string) error { + cdr, err := NewCDRFromExternalCDR(eCDR, cdrS.cgrCfg.GeneralCfg().DefaultTimezone) + if err != nil { + return err + } + return cdrS.V1ProcessCDR(cdr, reply) +} + // V1GetCDRs returns CDRs from DB -func (self *CdrServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error { - cdrsFltr, err := args.AsCDRsFilter(self.Timezone()) +func (cdrS *CDRServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error { + cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { if err.Error() != utils.NotFoundCaps { err = utils.NewErrServerError(err) } return err } - if qryCDRs, _, err := self.cdrDb.GetCDRs(cdrsFltr, false); err != nil { + if qryCDRs, _, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false); err != nil { return utils.NewErrServerError(err) } else { *cdrs = qryCDRs @@ -805,8 +677,8 @@ func (self *CdrServer) V1GetCDRs(args utils.RPCCDRsFilter, cdrs *[]*CDR) error { } // V1CountCDRs counts CDRs from DB -func (self *CdrServer) V1CountCDRs(args utils.RPCCDRsFilter, cnt *int64) error { - cdrsFltr, err := args.AsCDRsFilter(self.Timezone()) +func (cdrS *CDRServer) V1CountCDRs(args *utils.RPCCDRsFilter, cnt *int64) error { + cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone) if err != nil { if err.Error() != utils.NotFoundCaps { err = utils.NewErrServerError(err) @@ -814,7 +686,7 @@ func (self *CdrServer) V1CountCDRs(args utils.RPCCDRsFilter, cnt *int64) error { return err } cdrsFltr.Count = true - if _, qryCnt, err := self.cdrDb.GetCDRs(cdrsFltr, false); err != nil { + if _, qryCnt, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false); err != nil { return utils.NewErrServerError(err) } else { *cnt = qryCnt diff --git a/engine/models.go b/engine/models.go index 351e8a222..8c9aefa55 100644 --- a/engine/models.go +++ b/engine/models.go @@ -382,7 +382,7 @@ func (t CDRsql) TableName() string { return utils.CDRsTBL } -type SessionsCostsSQL struct { +type SessionCostsSQL struct { ID int64 Cgrid string RunID string @@ -395,8 +395,8 @@ type SessionsCostsSQL struct { DeletedAt *time.Time } -func (t SessionsCostsSQL) TableName() string { - return utils.SessionsCostsTBL +func (t SessionCostsSQL) TableName() string { + return utils.SessionCostsTBL } type TBLVersion struct { diff --git a/engine/storage_mongo_datadb.go b/engine/storage_mongo_datadb.go index 24185aa53..39a405761 100644 --- a/engine/storage_mongo_datadb.go +++ b/engine/storage_mongo_datadb.go @@ -298,17 +298,17 @@ func (ms *MongoStorage) EnsureIndexes() (err error) { } } - if err = ms.EnusureIndex(utils.SessionsCostsTBL, true, CGRIDLow, + if err = ms.EnusureIndex(utils.SessionCostsTBL, true, CGRIDLow, RunIDLow); err != nil { return } - if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, OriginHostLow, + if err = ms.EnusureIndex(utils.SessionCostsTBL, false, OriginHostLow, OriginIDLow); err != nil { return } - if err = ms.EnusureIndex(utils.SessionsCostsTBL, false, RunIDLow, + if err = ms.EnusureIndex(utils.SessionCostsTBL, false, RunIDLow, OriginIDLow); err != nil { return } diff --git a/engine/storage_mongo_stordb.go b/engine/storage_mongo_stordb.go index a89c0e477..099c48e10 100644 --- a/engine/storage_mongo_stordb.go +++ b/engine/storage_mongo_stordb.go @@ -993,7 +993,7 @@ func (ms *MongoStorage) SetSMCost(smc *SMCost) error { return nil } return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(utils.SessionsCostsTBL).InsertOne(sctx, smc) + _, err = ms.getCol(utils.SessionCostsTBL).InsertOne(sctx, smc) return err }) } @@ -1004,7 +1004,7 @@ func (ms *MongoStorage) RemoveSMCost(smc *SMCost) error { remParams = bson.M{"cgrid": smc.CGRID, "runid": smc.RunID} } return ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { - _, err = ms.getCol(utils.SessionsCostsTBL).DeleteMany(sctx, remParams) + _, err = ms.getCol(utils.SessionCostsTBL).DeleteMany(sctx, remParams) return err }) } @@ -1024,7 +1024,7 @@ func (ms *MongoStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri filter[OriginIDLow] = bsonx.Regex(fmt.Sprintf("^%s", originIDPrefix), "") } err = ms.client.UseSession(ms.ctx, func(sctx mongo.SessionContext) (err error) { - cur, err := ms.getCol(utils.SessionsCostsTBL).Find(sctx, filter) + cur, err := ms.getCol(utils.SessionCostsTBL).Find(sctx, filter) if err != nil { return err } diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 07078f2fb..79784bd92 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -107,7 +107,7 @@ func (self *SQLStorage) IsDBEmpty() (resp bool, err error) { utils.TBLTPSharedGroups, utils.TBLTPActions, utils.TBLTPActionTriggers, utils.TBLTPAccountActions, utils.TBLTPDerivedChargers, utils.TBLTPUsers, utils.TBLTPResources, utils.TBLTPStats, utils.TBLTPThresholds, - utils.TBLTPFilters, utils.SessionsCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans, + utils.TBLTPFilters, utils.SessionCostsTBL, utils.CDRsTBL, utils.TBLTPActionPlans, utils.TBLVersions, utils.TBLTPSuppliers, utils.TBLTPAttributes, utils.TBLTPChargers, } for _, tbl := range tbls { @@ -745,7 +745,7 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error { return nil } tx := self.db.Begin() - cd := &SessionsCostsSQL{ + cd := &SessionCostsSQL{ Cgrid: smc.CGRID, RunID: smc.RunID, OriginHost: smc.OriginHost, @@ -765,12 +765,12 @@ func (self *SQLStorage) SetSMCost(smc *SMCost) error { func (self *SQLStorage) RemoveSMCost(smc *SMCost) error { tx := self.db.Begin() - var rmParam *SessionsCostsSQL + var rmParam *SessionCostsSQL if smc != nil { - rmParam = &SessionsCostsSQL{Cgrid: smc.CGRID, + rmParam = &SessionCostsSQL{Cgrid: smc.CGRID, RunID: smc.RunID} } - if err := tx.Where(rmParam).Delete(SessionsCostsSQL{}).Error; err != nil { + if err := tx.Where(rmParam).Delete(SessionCostsSQL{}).Error; err != nil { tx.Rollback() return err } @@ -781,7 +781,7 @@ func (self *SQLStorage) RemoveSMCost(smc *SMCost) error { // GetSMCosts is used to retrieve one or multiple SMCosts based on filter func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix string) ([]*SMCost, error) { var smCosts []*SMCost - filter := &SessionsCostsSQL{} + filter := &SessionCostsSQL{} if cgrid != "" { filter.Cgrid = cgrid } @@ -795,7 +795,7 @@ func (self *SQLStorage) GetSMCosts(cgrid, runid, originHost, originIDPrefix stri if originIDPrefix != "" { q = self.db.Where(filter).Where(fmt.Sprintf("origin_id LIKE '%s%%'", originIDPrefix)) } - results := make([]*SessionsCostsSQL, 0) + results := make([]*SessionCostsSQL, 0) if err := q.Find(&results).Error; err != nil { return nil, err } diff --git a/migrator/sessions_costs.go b/migrator/sessions_costs.go index 931f9d735..be5962117 100644 --- a/migrator/sessions_costs.go +++ b/migrator/sessions_costs.go @@ -153,7 +153,7 @@ func (v2Cost *v2SessionsCost) V2toV3Cost() (cost *engine.SMCost) { return } -func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 *v2SessionsCost, err error) { +func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionCostsSQL) (smV2 *v2SessionsCost, err error) { smV2 = new(v2SessionsCost) smV2.CGRID = smSql.Cgrid smV2.RunID = smSql.RunID @@ -168,8 +168,8 @@ func NewV2SessionsCostFromSessionsCostSql(smSql *engine.SessionsCostsSQL) (smV2 return } -func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionsCostsSQL) { - smSql = new(engine.SessionsCostsSQL) +func (v2Cost *v2SessionsCost) AsSessionsCostSql() (smSql *engine.SessionCostsSQL) { + smSql = new(engine.SessionCostsSQL) smSql.Cgrid = v2Cost.CGRID smSql.RunID = v2Cost.RunID smSql.OriginHost = v2Cost.OriginHost diff --git a/migrator/storage_mongo_stordb.go b/migrator/storage_mongo_stordb.go index 0fd0715b4..3feae80b0 100644 --- a/migrator/storage_mongo_stordb.go +++ b/migrator/storage_mongo_stordb.go @@ -79,12 +79,12 @@ func (v1ms *mongoStorDBMigrator) renameV1SMCosts() (err error) { return err } return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), - bson.D{{"create", utils.SessionsCostsTBL}}).Err() + bson.D{{"create", utils.SessionCostsTBL}}).Err() } func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) { v1ms.mgoDB.DB().Collection(utils.OldSMCosts).Drop(v1ms.mgoDB.GetContext()) - v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Drop(v1ms.mgoDB.GetContext()) + v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Drop(v1ms.mgoDB.GetContext()) return v1ms.mgoDB.DB().RunCommand(v1ms.mgoDB.GetContext(), bson.D{{"create", utils.OldSMCosts}, {"size", 1024}}).Err() } @@ -93,7 +93,7 @@ func (v1ms *mongoStorDBMigrator) createV1SMCosts() (err error) { func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err error) { if v1ms.cursor == nil { var cursor mongo.Cursor - cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{}) + cursor, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).Find(v1ms.mgoDB.GetContext(), bson.D{}) if err != nil { return nil, err } @@ -113,12 +113,12 @@ func (v1ms *mongoStorDBMigrator) getV2SMCost() (v2Cost *v2SessionsCost, err erro //set func (v1ms *mongoStorDBMigrator) setV2SMCost(v2Cost *v2SessionsCost) (err error) { - _, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost) + _, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).InsertOne(v1ms.mgoDB.GetContext(), v2Cost) return } //remove func (v1ms *mongoStorDBMigrator) remV2SMCost(v2Cost *v2SessionsCost) (err error) { - _, err = v1ms.mgoDB.DB().Collection(utils.SessionsCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{}) + _, err = v1ms.mgoDB.DB().Collection(utils.SessionCostsTBL).DeleteMany(v1ms.mgoDB.GetContext(), bson.D{}) return } diff --git a/migrator/storage_sql.go b/migrator/storage_sql.go index b4d42039c..e90a8b9e2 100755 --- a/migrator/storage_sql.go +++ b/migrator/storage_sql.go @@ -125,7 +125,7 @@ func (mgSQL *migratorSQL) getV2SMCost() (v2Cost *v2SessionsCost, err error) { return nil, err } } - scSql := new(engine.SessionsCostsSQL) + scSql := new(engine.SessionCostsSQL) mgSQL.rowIter.Scan(&scSql) v2Cost, err = NewV2SessionsCostFromSessionsCostSql(scSql) @@ -151,12 +151,12 @@ func (mgSQL *migratorSQL) setV2SMCost(v2Cost *v2SessionsCost) (err error) { func (mgSQL *migratorSQL) remV2SMCost(v2Cost *v2SessionsCost) (err error) { tx := mgSQL.sqlStorage.ExportGormDB().Begin() - var rmParam *engine.SessionsCostsSQL + var rmParam *engine.SessionCostsSQL if v2Cost != nil { - rmParam = &engine.SessionsCostsSQL{Cgrid: v2Cost.CGRID, + rmParam = &engine.SessionCostsSQL{Cgrid: v2Cost.CGRID, RunID: v2Cost.RunID} } - if err := tx.Where(rmParam).Delete(engine.SessionsCostsSQL{}).Error; err != nil { + if err := tx.Where(rmParam).Delete(engine.SessionCostsSQL{}).Error; err != nil { tx.Rollback() return err } diff --git a/utils/apitpdata.go b/utils/apitpdata.go index 13cd9bdd9..2693249d6 100755 --- a/utils/apitpdata.go +++ b/utils/apitpdata.go @@ -765,67 +765,6 @@ type AttrRemCdrs struct { CgrIds []string // List of CgrIds to remove from storeDb } -type AttrRateCdrs struct { - CgrIds []string // If provided, it will filter based on the cgrids present in list - MediationRunIds []string // If provided, it will filter on mediation runid - TORs []string // If provided, filter on TypeOfRecord - CdrHosts []string // If provided, it will filter cdrhost - CdrSources []string // If provided, it will filter cdrsource - ReqTypes []string // If provided, it will fiter reqtype - Tenants []string // If provided, it will filter tenant - Categories []string // If provided, it will filter çategory - Accounts []string // If provided, it will filter account - Subjects []string // If provided, it will filter the rating subject - DestinationPrefixes []string // If provided, it will filter on destination prefix - OrderIdStart *int64 // Export from this order identifier - OrderIdEnd *int64 // Export smaller than this order identifier - TimeStart string // If provided, it will represent the starting of the CDRs interval (>=) - TimeEnd string // If provided, it will represent the end of the CDRs interval (<) - RerateErrors bool // Rerate previous CDRs with errors (makes sense for reqtype rated and pseudoprepaid - RerateRated bool // Rerate CDRs which were previously rated (makes sense for reqtype rated and pseudoprepaid) - SendToStats bool // Set to true if the CDRs should be sent to stats server -} - -func (attrRateCDRs *AttrRateCdrs) AsCDRsFilter(timezone string) (*CDRsFilter, error) { - cdrFltr := &CDRsFilter{ - CGRIDs: attrRateCDRs.CgrIds, - RunIDs: attrRateCDRs.MediationRunIds, - OriginHosts: attrRateCDRs.CdrHosts, - Sources: attrRateCDRs.CdrSources, - ToRs: attrRateCDRs.TORs, - RequestTypes: attrRateCDRs.ReqTypes, - Tenants: attrRateCDRs.Tenants, - Categories: attrRateCDRs.Categories, - Accounts: attrRateCDRs.Accounts, - Subjects: attrRateCDRs.Subjects, - DestinationPrefixes: attrRateCDRs.DestinationPrefixes, - OrderIDStart: attrRateCDRs.OrderIdStart, - OrderIDEnd: attrRateCDRs.OrderIdEnd, - } - if aTime, err := ParseTimeDetectLayout(attrRateCDRs.TimeStart, timezone); err != nil { - return nil, err - } else if !aTime.IsZero() { - cdrFltr.AnswerTimeStart = &aTime - } - if aTimeEnd, err := ParseTimeDetectLayout(attrRateCDRs.TimeEnd, timezone); err != nil { - return nil, err - } else if !aTimeEnd.IsZero() { - cdrFltr.AnswerTimeEnd = &aTimeEnd - } - if attrRateCDRs.RerateErrors { - cdrFltr.MinCost = Float64Pointer(-1.0) - if !attrRateCDRs.RerateRated { - cdrFltr.MaxCost = Float64Pointer(0.0) - } - } else if attrRateCDRs.RerateRated { - cdrFltr.MinCost = Float64Pointer(0.0) - } - if attrRateCDRs.RerateErrors || attrRateCDRs.RerateRated { - cdrFltr.NotRunIDs = append(cdrFltr.NotRunIDs, MetaRaw) - } - return cdrFltr, nil -} - type AttrLoadTpFromFolder struct { FolderPath string // Take files from folder absolute path DryRun bool // Do not write to database but parse only @@ -1124,13 +1063,6 @@ type AttrGetCallCost struct { RunId string // Run Id } -type AttrRateCDRs struct { - RPCCDRsFilter - StoreCDRs *bool - SendToStatS *bool // Set to true if the CDRs should be sent to stats server - ReplicateCDRs *bool // Replicate results -} - type AttrSetBalance struct { Tenant string Account string diff --git a/utils/consts.go b/utils/consts.go index 2df873197..99e8736da 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -906,7 +906,7 @@ const ( TBLTPStats = "tp_stats" TBLTPThresholds = "tp_thresholds" TBLTPFilters = "tp_filters" - SessionsCostsTBL = "sessions_costs" + SessionCostsTBL = "session_costs" CDRsTBL = "cdrs" TBLTPSuppliers = "tp_suppliers" TBLTPAttributes = "tp_attributes"