Files
cgrates/engine/cdrs.go
ionutboangiu 47af22c724 Update rpcclient library to latest version
`ClientConnector` is no longer defined within `rpcclient` in its latest
version. It has been changed to be obtained from the `cgrates/birpc`
library instead.

Replaced `net/rpc` with `cgrates/birpc` and `net/rpc/jsonrpc` with
`cgrates/birpc/jsonrpc` libraries.

The implementations of `CallBiRPC()` and `Handlers()` were removed,
along with the methods associated with them.

The `rpcclient.BIRPCConector` and the methods prefixed with `BiRPC` were
removed from the `BiRPClient` interface.

The `BiRPClient` interface was renamed to `BIRPCClient`, although not
sure if needed (seems useful just to test if the structure is correct).

`rpcclient.BiRPCConector` has been replaced with `context.ClientConnector`,
which is now passed alongside `context.Context` within the same struct
(`cgrates/birpc/context.Context`). Consequently, functions that were
previously relying on it are now receiving the context instead. The
changes were made in the following functions:

    - `engine/connmanager.go` - `*ConnManager.Call`
    - `engine/connmanager.go` - `*ConnManager.getConn`
    - `engine/connmanager.go` - `*ConnManager.getConnWithConfig`
    - `engine/libengine.go` - `NewRPCPool`
    - `engine/libengine.go` - `NewRPCConnection`
    - `agents/libagents.go` - `processRequest`

Compilation errors related to the `rpcclient.NewRPCClient` function were
resolved by adding the missing `context`, `max_reconnect_interval`, and
`delayFunc` parameters. Additionally, context was added to all calls made
by the client. An effort was made to avoid passing hardcoded values as
much as possible, and extra flags were added where necessary for cgr
binaries.

The `max_reconnect_interval` parameter is now passed from parent
functions, which required adjustments to the function signature.

A new context field was added to all agent objects to ensure access to
it before sending it to the `connmanager's Call`, effectively replacing
`birpcclient`. Although an alternative would have been to create the
new service and add it to the context right before passing it to the
handlers, the chosen approach is definitely more comfortable.

With the addition of a context field for the SIP servers agents, an
additional error needed to be handled, coming from the creation of the
service. Agent constructors within the services package log errors as
they occur and return. Alternate solutions considered were either
shutting down the engine instead of returning, or just logging the
occurrence as a warning, particularly when the `ctx.Client` isn't
required, especially in cases where bidirectional connections are not
needed. For the latter option, it's crucial to return the object with
the error rather than nil or to make the error nil immediately after
logging.

Context has been integrated into all internal Call implementations to
ensure the objects conform to the `birpc.ClientConnector` interface.
These implementations will be removed in the near future as all service
objects are being wrapped in a `birpc.Service` type that satisfies the
`birpc.ClientConnector` interface. Currently, they are being retained
as a reference in case of any unexpected issues that arise.

Ensured that the `birpc.Service` wrapped service objects are passed to
the internal channel getters rather than the objects themselves.

Add context.TODO() to all \*ConnManager.Call function calls. To be
replaced with the context passed to the Method, when available.

For all `*ConnManager.Call` function calls, `context.TODO()` has been
added. This will be replaced with the context passed to the method when
it becomes available.

The value returned by StringGetOpts is now passed directly to the
FirstNonEmpty function, instead of being assigned to a variable
first.

The implementation of the `*AnalyzerService.GetInternalBiRPCCodec`
function has been removed from the services package. Additionally,
the AnalyzerBiRPCConnector type definition and its associated methods
have been removed.

The codec implementation has been revised to include the following
changes:

    - `rpc.ServerCodec` -> `birpc.ServerCodec`;
    - `rpc2.ServerCodec` -> `birpc.BirpcCodec`;
    - `rpc2.Request` -> `birpc.Request`;
    - `rpc2.Response` -> `birpc.Response`;
    - The constructors for json and gob birpc codecs in `cenkalti/rpc`
    have been replaced with ones from the `birpc/jsonrpc` library;
    - The gob codec implementation has been removed in favor of the
    version already implemented in the birpc external library.

The server implementation has been updated with the following changes:

    - A field that represents a simple RPC server has been added to the
    Server struct;
    - Both the simple and bidirectional RPC servers are now initialized
    inside the Server constructor, eliminating the need for nil checks;
    - Usage of `net/rpc` and `cenkalti/rpc2` has been replaced with
    `cgrates/birpc`;
    - Additional `(Bi)RPCUnregisterName` methods have been added;
    - The implementations for (bi)json/gob servers have been somewhat
    simplified.

Before deleting the Call functions and using the `birpc.NewService`
method to register the methods for all cgrates components, update the
Call functions to satisfy the `birpc.ClientConnector` interface. This
way it will be a bit safer. Had to be done for SessionS though.

The `BiRPCCall` method has been removed from coreutils.go. The
`RPCCall` and `APIerRPCCall` methods are also to be removed in the
future.

Ensured that all methods for `SessionSv1` and `SessionS` have the
correct function signature with context. The same adjustments were made
for the session dispatcher methods and for the `SessionSv1Interface`.
Also removed sessionsbirpc.go and smgbirpc.go files.

Implemented the following methods to help with the registration of
methods across all subsystems:

    - `NewServiceWithName`;
    - `NewDispatcherService` for all dispatcher methods;
    - `NewService` for the remaining methods that are already named
    correctly.

Compared to the constructor from the external library, these also make
sure that the naming of the methods is consistent with our constants.

Added context to the Call methods for the mock client connectors (used
in tests).

Removed unused rpc fields from inside the following services:

    - EeS
    - LoaderS
    - ResourceS
    - RouteS
    - StatS
    - ThresholdS
    - SessionS
    - CoreS

Updated the methods implementing the logic for API methods to align
with the latest changes, ensuring consistency and correctness. The
modifications include:

    - Adjusting the function signature to the new format
    (ctx, args, reply).
    - Prefixing names with 'V*' to indicate that they are utilized by
    or registered as APIs.
    - Containing the complete logic within the methods, enabling APIs
    to call them and return their reply directly.

The subsystems affected by these changes are detailed as follows:

    - CoreS: Additional methods were implementing utilizing the
    existing ones. Though modifying them directly was possible, certain
    methods (e.g., StopCPUProfiling()) were used elsewhere and not as
    RPC requests.
    - CDRs: Renamed V1CountCDRs to V1GetCDRsCount.
    - StatS: V1GetQueueFloatMetrics, V1GetQueueStringMetrics,
    V1GetStatQueue accept different arguments compared to API functions
    (opted to register StatSv1 instead).
    - ResourceS: Renamed V1ResourcesForEvent to V1GetResourcesForEvent
    to align with API naming.
    - DispatcherS: Renamed V1GetProfilesForEvent to
    DispatcherSv1GetProfilesForEvent.
    - For the rest, adding context to the function signature was enough.

In the unit tests, wrapping the object within a biprc.Service is now
ensured before passing it to the internal connections map under the
corresponding key.

Some tests that are covering error cases, are also checking the other
return value besides the error. That check has been removed since it
is redundant.

Revised the RPC/BiRPC clients' constructors (for use in tests)

A different approach has been chosen for the handling of ping methods
within subsystems. Instead of defining the same structure in every file,
the ping methods were added inside the Service constructor function.
Though the existing Ping methods were left as they were, they will be
removed in the future.

An additional method has been implemented to register the Ping method
from outside of the engine package.

Implemented Sleep and CapsError methods for SessionS (before they were
exclusively for bidirectional use, I believe).

A specific issue has been fixed within the CapsError SessionSv1 API
implementation, which is designed to overwrite methods that cannot be
allocated due to the threshold limit being reached. Previously, it was
deallocating when writing the response, even when a spot hadn't been
allocated in the first place (due to the cap being hit). The reason
behind this, especially why the test was passing before, still needs
to be looked into, as the problem should have occurred from before.

Implement `*SessionSv1.RegisterInternalBiJSONConn` method in apier.

All agent methods have been registered under the SessionSv1 name. For
the correct method names, the leading "V1" prefix has been trimmed
using the `birpc.NewServiceWithMethodsRename` function.

Revise the RegisterRpcParams function to populate the parameters
while relying on the `*birpc.Service` type instead. This will
automatically also deal with the validation. At the moment,
any error encountered is logged without being returned. Might
be changed in the future.

Inside the cgrRPCAction function, `mapstructure.Decode`'s output parameter
is now guaranteed to always be a pointer.

Updated go.mod and go.sum.

Fixed some typos.
2023-09-01 11:23:54 +02:00

1180 lines
38 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"net/http"
"reflect"
"strings"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) {
if r.Form == nil {
if err = r.ParseForm(); err != nil {
return
}
}
mp = MapEvent{utils.Source: r.RemoteAddr}
for k, vals := range r.Form {
mp[k] = vals[0] // We only support the first value for now, if more are provided it is considered remote's fault
}
return
}
// cgrCdrHandler handles CDRs received over HTTP REST
func (cdrS *CDRServer) cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
cgrCDR, err := newMapEventFromReqForm(r)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not create CDR entry from http: %+v, err <%s>",
utils.CDRs, r.Form, err.Error()))
return
}
cdr, err := cgrCDR.AsCDR(cdrS.cgrCfg, cdrS.cgrCfg.GeneralCfg().DefaultTenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not create CDR entry from rawCDR: %+v, err <%s>",
utils.CDRs, cgrCDR, err.Error()))
return
}
var ignored string
if err := cdrS.V1ProcessCDR(context.TODO(), &CDRWithAPIOpts{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
}
}
// fsCdrHandler will handle CDRs received from FreeSWITCH over HTTP-JSON
func (cdrS *CDRServer) fsCdrHandler(w http.ResponseWriter, r *http.Request) {
fsCdr, err := NewFSCdr(r.Body, cdrS.cgrCfg)
r.Body.Close()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not create CDR entry: %s", err.Error()))
return
}
cdr, err := fsCdr.AsCDR(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not create AsCDR entry: %s", err.Error()))
return
}
var ignored string
if err := cdrS.V1ProcessCDR(context.TODO(), &CDRWithAPIOpts{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
}
}
// NewCDRServer is a constructor for CDRServer
func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS,
connMgr *ConnManager) *CDRServer {
cdrDb := <-storDBChan
return &CDRServer{
cgrCfg: cgrCfg,
cdrDb: cdrDb,
dm: dm,
guard: guardian.Guardian,
filterS: filterS,
connMgr: connMgr,
storDBChan: storDBChan,
}
}
// CDRServer stores and rates CDRs
type CDRServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
dm *DataManager
guard *guardian.GuardianLocker
filterS *FilterS
connMgr *ConnManager
storDBChan chan StorDB
}
// ListenAndServe listen for storbd reload
func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) {
for {
select {
case <-stopChan:
return
case stordb, ok := <-cdrS.storDBChan:
if !ok { // the chanel was closed by the shutdown of stordbService
return
}
cdrS.cdrDb = stordb
}
}
}
// RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers
func (cdrS *CDRServer) RegisterHandlersToServer(server utils.Server) {
server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPCDRsURL, cdrS.cgrCdrHandler)
server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, cdrS.fsCdrHandler)
}
// storeSMCost will store a SMCost
func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
smCost.CostDetails.Compute() // make sure the total cost reflect the increment
lockKey := utils.MetaCDRs + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID
if checkDuplicate {
return cdrS.guard.Guard(func() error {
smCosts, err := cdrS.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
if err != nil && err.Error() != utils.NotFoundCaps {
return err
}
if len(smCosts) != 0 {
return utils.ErrExists
}
return cdrS.cdrDb.SetSMCost(smCost)
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
}
return cdrS.cdrDb.SetSMCost(smCost)
}
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) {
var qryCC *CallCost
var err error
if cdr.RequestType == utils.MetaNone {
return nil, nil
}
if cdr.Usage < 0 {
cdr.Usage = time.Duration(0)
}
cdr.ExtraInfo = "" // Clean previous ExtraInfo, useful when re-rating
var cdrsRated []*CDR
_, hasLastUsed := cdr.ExtraFields[utils.LastUsed]
if utils.SliceHasMember([]string{utils.MetaPrepaid, utils.Prepaid}, cdr.RequestType) &&
(cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil {
// ToDo: Get rid of Prepaid as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
fib := utils.FibDuration(time.Second, 0)
var smCosts []*SMCost
cgrID := cdr.CGRID
if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT {
cgrID = "" // for queries involving originIDPrefix we ignore CGRID
}
for i := 0; i < cdrS.cgrCfg.CdrsCfg().SMCostRetries; i++ {
smCosts, err = cdrS.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost,
cdr.ExtraFields[utils.OriginIDPrefix])
if err == nil && len(smCosts) != 0 {
break
}
if i <= cdrS.cgrCfg.CdrsCfg().SMCostRetries-1 {
time.Sleep(fib())
}
}
if len(smCosts) != 0 { // Cost retrieved from SMCost table
for _, smCost := range smCosts {
cdrClone := cdr.CDR.Clone()
cdrClone.OriginID = smCost.OriginID
if cdr.Usage == 0 {
cdrClone.Usage = smCost.Usage
} else if smCost.Usage != cdr.Usage {
if _, err = cdrS.refundEventCost(smCost.CostDetails, // ToDo: need to maybe mark it in the future in the processed flags
cdrClone.RequestType, cdrClone.ToR); err != nil {
return nil, err
}
cdrClone.CostDetails = nil
if qryCC, err = cdrS.getCostFromRater(&CDRWithAPIOpts{CDR: cdrClone}); err != nil {
return nil, err
}
smCost = &SMCost{
CGRID: cdrClone.CGRID,
RunID: cdrClone.RunID,
OriginHost: cdrClone.OriginID,
CostSource: utils.CDRs,
Usage: cdrClone.Usage,
CostDetails: NewEventCostFromCallCost(qryCC, cdrClone.CGRID, cdrClone.RunID),
}
}
cdrClone.Cost = smCost.CostDetails.GetCost()
cdrClone.CostDetails = smCost.CostDetails
cdrClone.CostSource = smCost.CostSource
cdrsRated = append(cdrsRated, cdrClone)
}
return cdrsRated, nil
}
utils.Logger.Warning(
fmt.Sprintf("<Cdrs> WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, originID: %s originHost: %s, will recalculate",
cdr.CGRID, utils.MetaSessionS, cdr.RunID, cdr.OriginID, cdr.OriginHost))
}
if cdr.CostDetails != nil {
if cdr.Usage == cdr.CostDetails.GetUsage() { // Costs were previously calculated, make sure they cover the full usage
cdr.Cost = cdr.CostDetails.GetCost()
cdr.CostDetails.Compute()
return []*CDR{cdr.CDR}, nil
}
// ToDo: need to maybe mark it in the future in the processed flags
if _, err = cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
return nil, err
}
cdr.CostDetails = nil
}
qryCC, err = cdrS.getCostFromRater(cdr)
if err != nil {
return nil, err
}
if qryCC != nil {
cdr.Cost = qryCC.Cost
cdr.CostDetails = NewEventCostFromCallCost(qryCC, cdr.CGRID, cdr.RunID)
}
cdr.CostDetails.Compute()
return []*CDR{cdr.CDR}, nil
}
var reqTypes = utils.NewStringSet([]string{utils.MetaPseudoPrepaid, utils.MetaPostpaid, utils.MetaPrepaid,
utils.PseudoPrepaid, utils.Postpaid, utils.Prepaid, utils.MetaDynaprepaid})
// getCostFromRater will retrieve the cost from RALs
func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithAPIOpts) (*CallCost, error) {
if len(cdrS.cgrCfg.CdrsCfg().RaterConns) == 0 {
return nil, utils.NewErrNotConnected(utils.RALService)
}
cc := new(CallCost)
var err error
timeStart := cdr.AnswerTime
if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls
timeStart = cdr.SetupTime
}
cd := &CallDescriptor{
ToR: cdr.ToR,
Tenant: cdr.Tenant,
Category: cdr.Category,
Subject: cdr.Subject,
Account: cdr.Account,
Destination: cdr.Destination,
TimeStart: timeStart,
TimeEnd: timeStart.Add(cdr.Usage),
DurationIndex: cdr.Usage,
PerformRounding: true,
}
if reqTypes.Has(cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderDebit,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
if err != nil && err.Error() == utils.ErrAccountNotFound.Error() &&
cdr.RequestType == utils.MetaDynaprepaid {
var reply string
// execute the actionPlan configured in Scheduler
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().SchedulerConns,
utils.SchedulerSv1ExecuteActionPlans, &utils.AttrsExecuteActionPlans{
ActionPlanIDs: cdrS.cgrCfg.SchedulerCfg().DynaprepaidActionPlans,
AccountID: cdr.Account, Tenant: cdr.Tenant},
&reply); err != nil {
return cc, err
}
// execute again the Debit operation
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderDebit,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
}
} else {
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderGetCost,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
}
if err != nil {
return cc, err
}
cdr.CostSource = utils.MetaCDRs
return cc, nil
}
// rateCDRWithErr rates a CDR including errors
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithAPIOpts) (ratedCDRs []*CDR) {
var err error
ratedCDRs, err = cdrS.rateCDR(cdr)
if err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
ratedCDRs = []*CDR{cdr.CDR}
}
return
}
// refundEventCost will refund the EventCost using RefundIncrements
func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (rfnd bool, err error) {
if len(cdrS.cgrCfg.CdrsCfg().RaterConns) == 0 {
return false, utils.NewErrNotConnected(utils.RALService)
}
if ec == nil || !utils.AccountableRequestTypes.Has(reqType) {
return // non refundable
}
cd := ec.AsRefundIncrements(tor)
if cd == nil || len(cd.Increments) == 0 {
return
}
var acnt Account
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderRefundIncrements,
&CallDescriptorWithAPIOpts{CallDescriptor: cd}, &acnt); err != nil {
return
}
return true, nil
}
// chrgrSProcessEvent forks CGREventWithOpts into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) (cgrEvs []*utils.CGREvent, err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().ChargerSConns,
utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil {
return
}
if len(chrgrs) == 0 {
return
}
cgrEvs = make([]*utils.CGREvent, len(chrgrs))
for i, cgrPrfl := range chrgrs {
cgrEvs[i] = cgrPrfl.CGREvent
}
return
}
// attrSProcessEvent will send the event to StatS if the connection is configured
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var rplyEv AttrSProcessEventReply
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = make(map[string]any)
}
cgrEv.APIOpts[utils.MetaSubsys] = utils.MetaCDRs
ctx, has := cgrEv.APIOpts[utils.OptsContext]
cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty(
utils.IfaceAsString(ctx),
utils.MetaCDRs)
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().AttributeSConns,
utils.AttributeSv1ProcessEvent,
cgrEv, &rplyEv); err == nil && len(rplyEv.AlteredFields) != 0 {
*cgrEv = *rplyEv.CGREvent
if !has && utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]) == utils.MetaCDRs {
delete(cgrEv.APIOpts, utils.OptsContext)
}
} else if err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // cancel ErrNotFound
}
return
}
// thdSProcessEvent will send the event to ThresholdS
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var tIDs []string
// we clone the CGREvent so we can add EventType without being propagated
thArgs := cgrEv.Clone()
if thArgs.APIOpts == nil {
thArgs.APIOpts = make(map[string]any)
}
thArgs.APIOpts[utils.MetaEventType] = utils.CDR
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent,
thArgs, &tIDs); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// statSProcessEvent will send the event to StatS
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var reply []string
statArgs := cgrEv.Clone()
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().StatSConns,
utils.StatSv1ProcessEvent,
statArgs, &reply); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// eeSProcessEvent will process the event with the EEs component
func (cdrS *CDRServer) eeSProcessEvent(cgrEv *CGREventWithEeIDs) (err error) {
var reply map[string]map[string]any
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().EEsConns,
utils.EeSv1ProcessEvent,
cgrEv, &reply); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// processEvent processes a CGREvent based on arguments
// in case of partially executed, both error and evs will be returned
func (cdrS *CDRServer) processEvents(evs []*utils.CGREvent,
chrgS, attrS, refund, ralS, store, reRate, export, thdS, stS bool) (outEvs []*utils.EventWithFlags, err error) {
if reRate {
refund = true
}
if attrS {
for _, ev := range evs {
if err = cdrS.attrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.AttributeS))
err = utils.ErrPartiallyExecuted
return
}
}
}
var cgrEvs []*utils.CGREvent
if chrgS {
for _, ev := range evs {
var chrgEvs []*utils.CGREvent
if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.ChargerS))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEvs = append(cgrEvs, chrgEvs...)
}
}
} else { // ChargerS not requested, charge the original event
cgrEvs = evs
}
// Check if the unique ID was not already processed
if !refund {
for _, cgrEv := range cgrEvs {
me := MapEvent(cgrEv.Event)
if !me.HasField(utils.CGRID) { // try to compute the CGRID if missing
me[utils.CGRID] = utils.Sha1(
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost),
)
}
uID := utils.ConcatenatedKey(
me.GetStringIgnoreErrors(utils.CGRID),
me.GetStringIgnoreErrors(utils.RunID),
)
if Cache.HasItem(utils.CacheCDRIDs, uID) && !reRate {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, utils.ErrExists, utils.ToJSON(cgrEv), utils.CacheS))
return nil, utils.ErrExists
}
if errCh := Cache.Set(utils.CacheCDRIDs, uID, true, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
}
// Populate CDR list out of events
cdrs := make([]*CDR, len(cgrEvs))
if refund || ralS || store || reRate || export {
for i, cgrEv := range cgrEvs {
if refund {
if _, has := cgrEv.Event[utils.CostDetails]; !has {
// if CostDetails is not populated or is nil, look for it inside the previously stored cdr
var cgrID string // prepare CGRID to filter for previous CDR
if val, has := cgrEv.Event[utils.CGRID]; !has {
cgrID = utils.Sha1(utils.IfaceAsString(cgrEv.Event[utils.OriginID]),
utils.IfaceAsString(cgrEv.Event[utils.OriginHost]))
} else {
cgrID = utils.IfaceAsString(val)
}
var prevCDRs []*CDR // only one should be returned
if prevCDRs, _, err = cdrS.cdrDb.GetCDRs(
&utils.CDRsFilter{CGRIDs: []string{cgrID},
RunIDs: []string{utils.IfaceAsString(cgrEv.Event[utils.RunID])}}, false); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not retrieve previously stored CDR, error: <%s>",
utils.CDRs, err.Error()))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails
}
}
}
if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> converting event %+v to CDR",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv)))
err = utils.ErrPartiallyExecuted
return
}
}
}
procFlgs := make([]utils.StringSet, len(cgrEvs)) // will save the flags for the reply here
for i := range cgrEvs {
procFlgs[i] = utils.NewStringSet(nil)
}
if refund {
for i, cdr := range cdrs {
if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), utils.ToJSON(cdr)))
} else if rfnd {
cdr.CostDetails = nil // this makes sure that the rater will recalculate (and debit) the cost
procFlgs[i].Add(utils.MetaRefund)
}
}
}
if ralS {
for i, cdr := range cdrs {
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithAPIOpts{
CDR: cdr,
APIOpts: cgrEvs[i].APIOpts,
}) {
cgrEv := rtCDR.AsCGREvent()
cgrEv.APIOpts = cgrEvs[i].APIOpts
if j == 0 { // the first CDR will replace the events we got already as a small optimization
cdrs[i] = rtCDR
cgrEvs[i] = cgrEv
} else {
cdrs = append(cdrs, cdr)
cgrEvs = append(cgrEvs, cgrEv)
}
}
}
}
if store {
refundCDRCosts := func() { // will be used to refund all CDRs on errors
for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed
if _, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), utils.ToJSON(cdr)))
}
}
}
for _, cdr := range cdrs {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err != utils.ErrExists || !reRate {
refundCDRCosts()
return
}
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), utils.ToJSON(cdr)))
err = utils.ErrPartiallyExecuted
return
}
}
}
}
var partiallyExecuted bool // from here actions are optional and a general error is returned
if export {
if len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0 {
for _, cgrEv := range cgrEvs {
evWithOpts := &CGREventWithEeIDs{
CGREvent: cgrEv,
EeIDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports,
}
if err = cdrS.eeSProcessEvent(evWithOpts); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting cdr %+v",
utils.CDRs, err.Error(), utils.ToJSON(evWithOpts)))
partiallyExecuted = true
}
}
}
}
if thdS {
for _, cgrEv := range cgrEvs {
if err = cdrS.thdSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv), utils.ThresholdS))
partiallyExecuted = true
}
}
}
if stS {
for _, cgrEv := range cgrEvs {
if err = cdrS.statSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv), utils.StatS))
partiallyExecuted = true
}
}
}
if partiallyExecuted {
err = utils.ErrPartiallyExecuted
}
outEvs = make([]*utils.EventWithFlags, len(cgrEvs))
for i, cgrEv := range cgrEvs {
outEvs[i] = &utils.EventWithFlags{
Flags: procFlgs[i].AsSlice(),
Event: cgrEv.Event,
}
}
return
}
// Call implements the birpc.ClientConnector interface
func (cdrS *CDRServer) Call(ctx *context.Context, serviceMethod string, args any, reply any) error {
parts := strings.Split(serviceMethod, ".")
if len(parts) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
// get method
method := reflect.ValueOf(cdrS).MethodByName(parts[0][len(parts[0])-2:] + parts[1]) // Inherit the version in the method
if !method.IsValid() {
return rpcclient.ErrUnsupporteServiceMethod
}
// construct the params
params := []reflect.Value{reflect.ValueOf(args), reflect.ValueOf(reply)}
ret := method.Call(params)
if len(ret) != 1 {
return utils.ErrServerError
}
if ret[0].Interface() == nil {
return nil
}
err, ok := ret[0].Interface().(error)
if !ok {
return utils.ErrServerError
}
return err
}
// V1ProcessCDR processes a CDR
func (cdrS *CDRServer) V1ProcessCDR(ctx *context.Context, cdr *CDRWithAPIOpts, reply *string) (err error) {
if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present
cdr.ComputeCGRID()
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if cdr.RequestType == utils.EmptyString {
cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType
}
if cdr.Tenant == utils.EmptyString {
cdr.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
if cdr.Category == utils.EmptyString {
cdr.Category = cdrS.cgrCfg.GeneralCfg().DefaultCategory
}
if cdr.Subject == utils.EmptyString { // Use account information as rating subject if missing
cdr.Subject = cdr.Account
}
if cdr.RunID == utils.EmptyString {
cdr.RunID = utils.MetaDefault
}
cgrEv := cdr.AsCGREvent()
cgrEv.APIOpts = cdr.APIOpts
if _, err = cdrS.processEvents([]*utils.CGREvent{cgrEv},
len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 && !cdr.PreRated,
len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0,
false,
!cdr.PreRated, // rate the CDR if is not PreRated
cdrS.cgrCfg.CdrsCfg().StoreCdrs,
false, // no rerate
len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0,
len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0,
len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0); err != nil {
return
}
*reply = utils.OK
return
}
// ArgV1ProcessEvent is the CGREvent with proccesing Flags
type ArgV1ProcessEvent struct {
Flags []string
utils.CGREvent
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgV1ProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgV1ProcessEvent) RPCClone() (any, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgV1ProcessEvent) Clone() *ArgV1ProcessEvent {
var flags []string
if attr.Flags != nil {
flags = make([]string, len(attr.Flags))
for i, id := range attr.Flags {
flags[i] = id
}
}
return &ArgV1ProcessEvent{
Flags: flags,
CGREvent: *attr.CGREvent.Clone(),
}
}
// V1ProcessEvent will process the CGREvent
func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEvent, reply *string) (err error) {
if arg.CGREvent.ID == utils.EmptyString {
arg.CGREvent.ID = utils.GenUUID()
}
if arg.CGREvent.Tenant == utils.EmptyString {
arg.CGREvent.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEvent, arg.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
// processing options
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if v, has := arg.APIOpts[utils.OptsAttributeS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.Has(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.Has(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if v, has := arg.APIOpts[utils.OptsThresholdS]; has {
if thdS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if v, has := arg.APIOpts[utils.OptsStatS]; has {
if stS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaStats) {
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event
if v, has := arg.APIOpts[utils.OptsChargerS]; has {
if chrgS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // activate single rating for the CDR
if v, has := arg.APIOpts[utils.OptsRALs]; has {
if ralS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if v, has := arg.APIOpts[utils.OptsRerate]; has {
if reRate, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRerate) {
if reRate = flgs.GetBool(utils.MetaRerate); reRate {
ralS = true
}
}
var refund bool
if v, has := arg.APIOpts[utils.OptsRefund]; has {
if refund, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRefund) {
refund = flgs.GetBool(utils.MetaRefund)
}
// end of processing options
if _, err = cdrS.processEvents([]*utils.CGREvent{&arg.CGREvent}, chrgS, attrS, refund,
ralS, store, reRate, export, thdS, stS); err != nil {
return
}
*reply = utils.OK
return nil
}
// V2ProcessEvent has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrS *CDRServer) V2ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEvent, evs *[]*utils.EventWithFlags) (err error) {
if arg.ID == "" {
arg.ID = utils.GenUUID()
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessEvent, arg.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*evs = *cachedResp.Result.(*[]*utils.EventWithFlags)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: evs, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
// processing options
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if flgs.Has(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.Has(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.Has(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if flgs.Has(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if flgs.Has(utils.MetaStats) {
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 // activate charging for the Event
if flgs.Has(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool // activate single rating for the CDR
if flgs.Has(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if flgs.Has(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
if reRate {
ralS = true
}
}
var refund bool
if flgs.Has(utils.MetaRefund) {
refund = flgs.GetBool(utils.MetaRefund)
}
// end of processing options
var procEvs []*utils.EventWithFlags
if procEvs, err = cdrS.processEvents([]*utils.CGREvent{&arg.CGREvent}, chrgS, attrS, refund,
ralS, store, reRate, export, thdS, stS); err != nil {
return
}
*evs = procEvs
return nil
}
// V1StoreSessionCost handles storing of the cost into session_costs table
func (cdrS *CDRServer) V1StoreSessionCost(ctx *context.Context, attr *AttrCDRSStoreSMCost, reply *string) (err error) {
if attr.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRsCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if err = cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return nil
}
// V2StoreSessionCost will store the SessionCost into session_costs table
func (cdrS *CDRServer) V2StoreSessionCost(ctx *context.Context, args *ArgsV2CDRSStoreSMCost, reply *string) (err error) {
if args.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRsCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
cc := args.Cost.CostDetails.AsCallCost(utils.EmptyString)
if args.Cost.CostDetails.AccountSummary != nil {
cc.Tenant = args.Cost.CostDetails.AccountSummary.Tenant
cc.Account = args.Cost.CostDetails.AccountSummary.ID
}
cc.Round()
roundIncrements := cc.GetRoundIncrements()
if len(roundIncrements) != 0 {
cd := cc.CreateCallDescriptor()
cd.CgrID = args.Cost.CGRID
cd.RunID = args.Cost.RunID
cd.Increments = roundIncrements
response := new(Account)
if err := cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderRefundRounding,
&CallDescriptorWithAPIOpts{CallDescriptor: cd},
response); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> RefundRounding for cc: %+v, got error: %s",
cc, err.Error()))
}
if response != nil {
accSum := response.AsAccountSummary()
accSum.UpdateInitialValue(cc.AccountSummary)
cc.AccountSummary = accSum
}
}
if err = cdrS.storeSMCost(
&SMCost{
CGRID: args.Cost.CGRID,
RunID: args.Cost.RunID,
OriginHost: args.Cost.OriginHost,
OriginID: args.Cost.OriginID,
CostSource: args.Cost.CostSource,
Usage: args.Cost.Usage,
CostDetails: NewEventCostFromCallCost(cc, args.Cost.CGRID, args.Cost.RunID)},
args.CheckDuplicate); err != nil {
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return
}
// ArgRateCDRs a cdr with extra flags
type ArgRateCDRs struct {
Flags []string
utils.RPCCDRsFilter
Tenant string
APIOpts map[string]any
}
// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB
// FixMe: add RPC caching
func (cdrS *CDRServer) V1RateCDRs(ctx *context.Context, arg *ArgRateCDRs, reply *string) (err error) {
var cdrFltr *utils.CDRsFilter
if cdrFltr, err = arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
return utils.NewErrServerError(err)
}
var cdrs []*CDR
if cdrs, _, err = cdrS.cdrDb.GetCDRs(cdrFltr, false); err != nil {
return
}
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.Has(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.Has(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if flgs.Has(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
statS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if flgs.Has(utils.MetaStats) {
statS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0
if flgs.Has(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if flgs.Has(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
var reRate bool
if flgs.Has(utils.MetaRerate) {
reRate = flgs.GetBool(utils.MetaRerate)
}
if chrgS && len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) == 0 {
return utils.NewErrNotConnected(utils.ChargerS)
}
cgrEvs := make([]*utils.CGREvent, len(cdrs))
for i, cdr := range cdrs {
cdr.Cost = -1 // the cost will be recalculated
cgrEvs[i] = cdr.AsCGREvent()
cgrEvs[i].APIOpts = arg.APIOpts
}
if _, err = cdrS.processEvents(cgrEvs, chrgS, attrS, false,
true, store, reRate, export, thdS, statS); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return
}
// V1ProcessExternalCDR is used to process external CDRs
func (cdrS *CDRServer) V1ProcessExternalCDR(ctx *context.Context, eCDR *ExternalCDRWithAPIOpts, reply *string) error {
cdr, err := NewCDRFromExternalCDR(eCDR.ExternalCDR,
cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
return err
}
return cdrS.V1ProcessCDR(ctx,
&CDRWithAPIOpts{
CDR: cdr,
APIOpts: eCDR.APIOpts,
}, reply)
}
// V1GetCDRs returns CDRs from DB
func (cdrS *CDRServer) V1GetCDRs(ctx *context.Context, args utils.RPCCDRsFilterWithAPIOpts, cdrs *[]*CDR) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
}
return err
}
qryCDRs, _, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false)
if err != nil {
return utils.NewErrServerError(err)
}
*cdrs = qryCDRs
return nil
}
// V1GetCDRsCount counts CDRs from DB
func (cdrS *CDRServer) V1GetCDRsCount(ctx *context.Context, args *utils.RPCCDRsFilterWithAPIOpts, cnt *int64) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
}
return err
}
cdrsFltr.Count = true
_, qryCnt, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false)
if err != nil {
return utils.NewErrServerError(err)
}
*cnt = qryCnt
return nil
}