Files
cgrates/engine/cdrs.go
2025-10-29 19:42:40 +01:00

1305 lines
41 KiB
Go

/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>
*/
package engine
import (
"fmt"
"net/http"
"slices"
"time"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
func newMapEventFromReqForm(r *http.Request) (mp MapEvent, err error) {
if r.Form == nil {
if err = r.ParseForm(); err != nil {
return
}
}
mp = MapEvent{utils.Source: r.RemoteAddr}
for k, vals := range r.Form {
mp[k] = vals[0] // We only support the first value for now, if more are provided it is considered remote's fault
}
return
}
// cgrCdrHandler handles CDRs received over HTTP REST
func (cdrS *CDRServer) cgrCdrHandler(w http.ResponseWriter, r *http.Request) {
cgrCDR, err := newMapEventFromReqForm(r)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not create CDR entry from http: %+v, err <%s>",
utils.CDRs, r.Form, err.Error()))
return
}
cdr, err := cgrCDR.AsCDR(cdrS.cgrCfg, cdrS.cgrCfg.GeneralCfg().DefaultTenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> could not create CDR entry from rawCDR: %+v, err <%s>",
utils.CDRs, cgrCDR, err.Error()))
return
}
var ignored string
if err := cdrS.V1ProcessCDR(context.TODO(), &CDRWithAPIOpts{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
}
}
// fsCdrHandler will handle CDRs received from FreeSWITCH over HTTP-JSON
func (cdrS *CDRServer) fsCdrHandler(w http.ResponseWriter, r *http.Request) {
fsCdr, err := NewFSCdr(r.Body, cdrS.cgrCfg)
r.Body.Close()
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not create CDR entry: %s", err.Error()))
return
}
cdr, err := fsCdr.AsCDR(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
utils.Logger.Err(fmt.Sprintf("<CDRS> Could not create AsCDR entry: %s", err.Error()))
return
}
var ignored string
if err := cdrS.V1ProcessCDR(context.TODO(), &CDRWithAPIOpts{CDR: cdr}, &ignored); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> processing CDR: %s, err: <%s>",
utils.CDRs, cdr, err.Error()))
}
}
// NewCDRServer is a constructor for CDRServer
func NewCDRServer(cgrCfg *config.CGRConfig, storDBChan chan StorDB, dm *DataManager, filterS *FilterS,
connMgr *ConnManager) *CDRServer {
cdrDb := <-storDBChan
return &CDRServer{
cgrCfg: cgrCfg,
cdrDb: cdrDb,
dm: dm,
guard: guardian.Guardian,
filterS: filterS,
connMgr: connMgr,
storDBChan: storDBChan,
}
}
// CDRServer stores and rates CDRs
type CDRServer struct {
cgrCfg *config.CGRConfig
cdrDb CdrStorage
dm *DataManager
guard *guardian.GuardianLocker
filterS *FilterS
connMgr *ConnManager
storDBChan chan StorDB
}
// ListenAndServe listen for storbd reload
func (cdrS *CDRServer) ListenAndServe(stopChan chan struct{}) {
for {
select {
case <-stopChan:
return
case stordb, ok := <-cdrS.storDBChan:
if !ok { // the chanel was closed by the shutdown of stordbService
return
}
cdrS.cdrDb = stordb
}
}
}
// RegisterHandlersToServer is called by cgr-engine to register HTTP URL handlers
func (cdrS *CDRServer) RegisterHandlersToServer(server utils.Server) {
server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPCDRsURL, cdrS.cgrCdrHandler)
server.RegisterHttpFunc(cdrS.cgrCfg.HTTPCfg().HTTPFreeswitchCDRsURL, cdrS.fsCdrHandler)
}
// storeSMCost will store a SMCost
func (cdrS *CDRServer) storeSMCost(smCost *SMCost, checkDuplicate bool) error {
smCost.CostDetails.Compute() // make sure the total cost reflect the increment
lockKey := utils.MetaCDRs + smCost.CGRID + smCost.RunID + smCost.OriginID // Will lock on this ID
if checkDuplicate {
return cdrS.guard.Guard(func() error {
smCosts, err := cdrS.cdrDb.GetSMCosts(smCost.CGRID, smCost.RunID, "", "")
if err != nil && err.Error() != utils.NotFoundCaps {
return err
}
if len(smCosts) != 0 {
return utils.ErrExists
}
return cdrS.cdrDb.SetSMCost(smCost)
}, config.CgrConfig().GeneralCfg().LockingTimeout, lockKey) // FixMe: Possible deadlock with Guard from SMG session close()
}
return cdrS.cdrDb.SetSMCost(smCost)
}
// rateCDR will populate cost field
// Returns more than one rated CDR in case of SMCost retrieved based on prefix
func (cdrS *CDRServer) rateCDR(cdr *CDRWithAPIOpts) ([]*CDR, error) {
var qryCC *CallCost
var err error
if cdr.RequestType == utils.MetaNone {
return nil, nil
}
if cdr.Usage < 0 {
cdr.Usage = time.Duration(0)
}
cdr.ExtraInfo = "" // Clean previous ExtraInfo, useful when re-rating
var cdrsRated []*CDR
_, hasLastUsed := cdr.ExtraFields[utils.LastUsed]
if slices.Contains([]string{utils.MetaPrepaid, utils.Prepaid}, cdr.RequestType) &&
(cdr.Usage != 0 || hasLastUsed) && cdr.CostDetails == nil {
// ToDo: Get rid of Prepaid as soon as we don't want to support it backwards
// Should be previously calculated and stored in DB
fib := utils.FibDuration(time.Millisecond, 0)
var smCosts []*SMCost
cgrID := cdr.CGRID
if _, hasIT := cdr.ExtraFields[utils.OriginIDPrefix]; hasIT {
cgrID = "" // for queries involving originIDPrefix we ignore CGRID
}
for i := 0; i < cdrS.cgrCfg.CdrsCfg().SMCostRetries; i++ {
smCosts, err = cdrS.cdrDb.GetSMCosts(cgrID, cdr.RunID, cdr.OriginHost,
cdr.ExtraFields[utils.OriginIDPrefix])
if err == nil && len(smCosts) != 0 {
break
}
if i <= cdrS.cgrCfg.CdrsCfg().SMCostRetries-1 {
time.Sleep(fib())
}
}
if len(smCosts) != 0 { // Cost retrieved from SMCost table
for _, smCost := range smCosts {
cdrClone := cdr.CDR.Clone()
cdrClone.OriginID = smCost.OriginID
if cdr.Usage == 0 {
cdrClone.Usage = smCost.Usage
} else if smCost.Usage != cdr.Usage {
if _, err = cdrS.refundEventCost(smCost.CostDetails, // ToDo: need to maybe mark it in the future in the processed flags
cdrClone.RequestType, cdrClone.ToR); err != nil {
return nil, err
}
cdrClone.CostDetails = nil
if qryCC, err = cdrS.getCostFromRater(&CDRWithAPIOpts{CDR: cdrClone}); err != nil {
return nil, err
}
smCost = &SMCost{
CGRID: cdrClone.CGRID,
RunID: cdrClone.RunID,
OriginHost: cdrClone.OriginID,
CostSource: utils.CDRs,
Usage: cdrClone.Usage,
CostDetails: NewEventCostFromCallCost(qryCC, cdrClone.CGRID, cdrClone.RunID),
}
}
cdrClone.Cost = smCost.CostDetails.GetCost()
cdrClone.CostDetails = smCost.CostDetails
cdrClone.CostSource = smCost.CostSource
cdrsRated = append(cdrsRated, cdrClone)
}
return cdrsRated, nil
}
utils.Logger.Warning(
fmt.Sprintf("<Cdrs> WARNING: Could not find CallCostLog for cgrid: %s, source: %s, runid: %s, originID: %s originHost: %s, will recalculate",
cdr.CGRID, utils.MetaSessionS, cdr.RunID, cdr.OriginID, cdr.OriginHost))
}
if cdr.CostDetails != nil {
if cdr.Usage <= cdr.CostDetails.GetUsage() { // Costs were previously calculated, make sure they cover the full usage
cdr.Cost = cdr.CostDetails.GetCost()
cdr.CostDetails.Compute()
return []*CDR{cdr.CDR}, nil
}
// ToDo: need to maybe mark it in the future in the processed flags
if _, err = cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); err != nil {
return nil, err
}
cdr.CostDetails = nil
}
qryCC, err = cdrS.getCostFromRater(cdr)
if err != nil {
return nil, err
}
if qryCC != nil {
cdr.Cost = qryCC.Cost
cdr.CostDetails = NewEventCostFromCallCost(qryCC, cdr.CGRID, cdr.RunID)
}
cdr.CostDetails.Compute()
return []*CDR{cdr.CDR}, nil
}
var reqTypes = utils.NewStringSet([]string{utils.MetaPseudoPrepaid, utils.MetaPostpaid, utils.MetaPrepaid,
utils.PseudoPrepaid, utils.Postpaid, utils.Prepaid, utils.MetaDynaprepaid})
// getCostFromRater will retrieve the cost from RALs
func (cdrS *CDRServer) getCostFromRater(cdr *CDRWithAPIOpts) (*CallCost, error) {
if len(cdrS.cgrCfg.CdrsCfg().RaterConns) == 0 {
return nil, utils.NewErrNotConnected(utils.RALService)
}
cc := new(CallCost)
var err error
timeStart := cdr.AnswerTime
if timeStart.IsZero() { // Fix for FreeSWITCH unanswered calls
timeStart = cdr.SetupTime
}
cd := &CallDescriptor{
ToR: cdr.ToR,
Tenant: cdr.Tenant,
Category: cdr.Category,
Subject: cdr.Subject,
Account: cdr.Account,
Destination: cdr.Destination,
ExtraFields: cdr.ExtraFields,
TimeStart: timeStart,
TimeEnd: timeStart.Add(cdr.Usage),
DurationIndex: cdr.Usage,
PerformRounding: true,
}
if reqTypes.Has(cdr.RequestType) { // Prepaid - Cost can be recalculated in case of missing records from SM
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderDebit,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
if err != nil && err.Error() == utils.ErrAccountNotFound.Error() &&
cdr.RequestType == utils.MetaDynaprepaid {
var reply string
// execute the actionPlan configured in Scheduler
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().SchedulerConns,
utils.SchedulerSv1ExecuteActionPlans, &utils.AttrsExecuteActionPlans{
ActionPlanIDs: cdrS.cgrCfg.SchedulerCfg().DynaprepaidActionPlans,
AccountID: cdr.Account, Tenant: cdr.Tenant},
&reply); err != nil {
return cc, err
}
// execute again the Debit operation
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderDebit,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
}
} else {
err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderGetCost,
&CallDescriptorWithAPIOpts{
CallDescriptor: cd,
APIOpts: cdr.APIOpts,
}, cc)
}
if err != nil {
return cc, err
}
cdr.CostSource = utils.MetaCDRs
return cc, nil
}
// rateCDRWithErr rates a CDR including errors
func (cdrS *CDRServer) rateCDRWithErr(cdr *CDRWithAPIOpts) (ratedCDRs []*CDR) {
var err error
ratedCDRs, err = cdrS.rateCDR(cdr)
if err != nil {
cdr.Cost = -1.0 // If there was an error, mark the CDR
cdr.ExtraInfo = err.Error()
ratedCDRs = []*CDR{cdr.CDR}
}
return
}
// refundEventCost will refund the EventCost using RefundIncrements
func (cdrS *CDRServer) refundEventCost(ec *EventCost, reqType, tor string) (rfnd bool, err error) {
if len(cdrS.cgrCfg.CdrsCfg().RaterConns) == 0 {
return false, utils.NewErrNotConnected(utils.RALService)
}
if ec == nil || !utils.AccountableRequestTypes.Has(reqType) {
return // non refundable
}
cd := ec.AsRefundIncrements(tor)
if len(cd.Increments) == 0 {
return
}
var acnt Account
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderRefundIncrements,
&CallDescriptorWithAPIOpts{CallDescriptor: cd}, &acnt); err != nil {
return
}
return true, nil
}
// chrgrSProcessEvent forks CGREventWithOpts into multiples based on matching ChargerS profiles
func (cdrS *CDRServer) chrgrSProcessEvent(cgrEv *utils.CGREvent) (cgrEvs []*utils.CGREvent, err error) {
var chrgrs []*ChrgSProcessEventReply
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().ChargerSConns,
utils.ChargerSv1ProcessEvent,
cgrEv, &chrgrs); err != nil {
return
}
if len(chrgrs) == 0 {
return
}
cgrEvs = make([]*utils.CGREvent, len(chrgrs))
for i, cgrPrfl := range chrgrs {
cgrEvs[i] = cgrPrfl.CGREvent
}
return
}
// attrSProcessEvent will send the event to AttributeS if the connection is configured.
func (cdrS *CDRServer) attrSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var rplyEv AttrSProcessEventReply
if cgrEv.APIOpts == nil {
cgrEv.APIOpts = make(map[string]any)
}
cgrEv.APIOpts[utils.MetaSubsys] = utils.MetaCDRs
ctx, has := cgrEv.APIOpts[utils.OptsContext]
cgrEv.APIOpts[utils.OptsContext] = utils.FirstNonEmpty(
utils.IfaceAsString(ctx),
utils.MetaCDRs)
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().AttributeSConns,
utils.AttributeSv1ProcessEvent,
cgrEv, &rplyEv); err != nil {
if err.Error() == utils.ErrNotFound.Error() {
err = nil // ignore ErrNotFound
}
return
}
if len(rplyEv.AlteredFields) != 0 {
*cgrEv = *rplyEv.CGREvent
if !has && utils.IfaceAsString(cgrEv.APIOpts[utils.OptsContext]) == utils.MetaCDRs {
delete(cgrEv.APIOpts, utils.OptsContext)
}
}
return
}
// thdSProcessEvent will send the event to ThresholdS
func (cdrS *CDRServer) thdSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var tIDs []string
// we clone the CGREvent so we can add EventType without being propagated
thArgs := cgrEv.Clone()
if thArgs.APIOpts == nil {
thArgs.APIOpts = make(map[string]any)
}
thArgs.APIOpts[utils.MetaEventType] = utils.CDR
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().ThresholdSConns,
utils.ThresholdSv1ProcessEvent,
thArgs, &tIDs); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// statSProcessEvent will send the event to StatS
func (cdrS *CDRServer) statSProcessEvent(cgrEv *utils.CGREvent) (err error) {
var reply []string
statArgs := cgrEv.Clone()
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().StatSConns,
utils.StatSv1ProcessEvent,
statArgs, &reply); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// eeSProcessEvent will process the event with the EEs component
func (cdrS *CDRServer) eeSProcessEvent(cgrEv *CGREventWithEeIDs) (err error) {
var reply map[string]map[string]any
if err = cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().EEsConns,
utils.EeSv1ProcessEvent,
cgrEv, &reply); err != nil &&
err.Error() == utils.ErrNotFound.Error() {
err = nil // NotFound is not considered error
}
return
}
// cdrProcessingArgs holds the arguments for processing CDR events.
type cdrProcessingArgs struct {
attrS bool
chrgS bool
refund bool
ralS bool
store bool
reRate bool
export bool
thdS bool
stS bool
reprocess bool
}
// newCDRProcessingArgs initializes processing arguments from config and overrides them with provided flags.
func newCDRProcessingArgs(cfg *config.CdrsCfg, flags utils.FlagsWithParams, opts map[string]any) (*cdrProcessingArgs, error) {
args := &cdrProcessingArgs{
attrS: len(cfg.AttributeSConns) != 0,
chrgS: len(cfg.ChargerSConns) != 0,
store: cfg.StoreCdrs,
export: len(cfg.OnlineCDRExports) != 0 || len(cfg.EEsConns) != 0,
thdS: len(cfg.ThresholdSConns) != 0,
stS: len(cfg.StatSConns) != 0,
ralS: len(cfg.RaterConns) != 0,
}
var err error
if v, has := opts[utils.OptsAttributeS]; has {
if args.attrS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaAttributes) {
args.attrS = flags.GetBool(utils.MetaAttributes)
}
if v, has := opts[utils.OptsChargerS]; has {
if args.chrgS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaChargers) {
args.chrgS = flags.GetBool(utils.MetaChargers)
}
if flags.Has(utils.MetaStore) {
args.store = flags.GetBool(utils.MetaStore)
}
if flags.Has(utils.MetaExport) {
args.export = flags.GetBool(utils.MetaExport)
}
if v, has := opts[utils.OptsThresholdS]; has {
if args.thdS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaThresholds) {
args.thdS = flags.GetBool(utils.MetaThresholds)
}
if v, has := opts[utils.OptsStatS]; has {
if args.stS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaStats) {
args.stS = flags.GetBool(utils.MetaStats)
}
if v, has := opts[utils.OptsRerate]; has {
if args.reRate, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRerate) {
args.reRate = flags.GetBool(utils.MetaRerate)
}
if args.reRate {
args.ralS = true
args.refund = true
}
if v, has := opts[utils.OptsRefund]; has {
if args.refund, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRefund) {
args.refund = flags.GetBool(utils.MetaRefund)
}
if args.refund && !args.reRate {
args.ralS = false
}
if args.refund {
args.reprocess = true
}
if flags.Has(utils.MetaReprocess) {
args.reprocess = flags.GetBool(utils.MetaReprocess)
}
if v, has := opts[utils.OptsRALs]; has {
if args.ralS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRALs) {
args.ralS = flags.GetBool(utils.MetaRALs)
}
return args, nil
}
// newCDRProcessingArgsNoCfg initializes processing arguments by taking flags from provided FlagsWithParams and APIOpts, without taking them from configs.
func newCDRProcessingArgsNoCfg(flags utils.FlagsWithParams, opts map[string]any) (*cdrProcessingArgs, error) {
args := new(cdrProcessingArgs)
var err error
if v, has := opts[utils.OptsAttributeS]; has {
if args.attrS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaAttributes) {
args.attrS = flags.GetBool(utils.MetaAttributes)
}
if v, has := opts[utils.OptsChargerS]; has {
if args.chrgS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaChargers) {
args.chrgS = flags.GetBool(utils.MetaChargers)
}
if flags.Has(utils.MetaStore) {
args.store = flags.GetBool(utils.MetaStore)
}
if flags.Has(utils.MetaExport) {
args.export = flags.GetBool(utils.MetaExport)
}
if v, has := opts[utils.OptsThresholdS]; has {
if args.thdS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaThresholds) {
args.thdS = flags.GetBool(utils.MetaThresholds)
}
if v, has := opts[utils.OptsStatS]; has {
if args.stS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaStats) {
args.stS = flags.GetBool(utils.MetaStats)
}
if v, has := opts[utils.OptsRerate]; has {
if args.reRate, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRerate) {
args.reRate = flags.GetBool(utils.MetaRerate)
}
if args.reRate {
args.ralS = true
args.refund = true
}
if v, has := opts[utils.OptsRefund]; has {
if args.refund, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRefund) {
args.refund = flags.GetBool(utils.MetaRefund)
}
if args.refund && !args.reRate {
args.ralS = false
}
if args.refund {
args.reprocess = true
}
if flags.Has(utils.MetaReprocess) {
args.reprocess = flags.GetBool(utils.MetaReprocess)
}
if v, has := opts[utils.OptsRALs]; has {
if args.ralS, err = utils.IfaceAsBool(v); err != nil {
return nil, err
}
}
if flags.Has(utils.MetaRALs) {
args.ralS = flags.GetBool(utils.MetaRALs)
}
return args, nil
}
// processEvent processes a CGREvent based on arguments
// in case of partially executed, both error and evs will be returned
func (cdrS *CDRServer) processEvents(evs []*utils.CGREvent, args *cdrProcessingArgs) (outEvs []*utils.EventWithFlags, err error) {
if args.attrS {
for _, ev := range evs {
if err = cdrS.attrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.AttributeS))
err = utils.ErrPartiallyExecuted
return
}
}
}
var cgrEvs []*utils.CGREvent
if args.chrgS {
for _, ev := range evs {
var chrgEvs []*utils.CGREvent
if chrgEvs, err = cdrS.chrgrSProcessEvent(ev); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(ev), utils.ChargerS))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEvs = append(cgrEvs, chrgEvs...)
}
}
} else { // ChargerS not requested, charge the original event
cgrEvs = evs
}
// Check if the unique ID was not already processed
if !args.reprocess {
for _, cgrEv := range cgrEvs {
me := MapEvent(cgrEv.Event)
if !me.HasField(utils.CGRID) { // try to compute the CGRID if missing
me[utils.CGRID] = utils.Sha1(
me.GetStringIgnoreErrors(utils.OriginID),
me.GetStringIgnoreErrors(utils.OriginHost),
)
}
uID := utils.ConcatenatedKey(
me.GetStringIgnoreErrors(utils.CGRID),
me.GetStringIgnoreErrors(utils.RunID),
)
if Cache.HasItem(utils.CacheCDRIDs, uID) && !args.reRate {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, utils.ErrExists, utils.ToJSON(cgrEv), utils.CacheS))
return nil, utils.ErrExists
}
if errCh := Cache.Set(utils.CacheCDRIDs, uID, true, nil,
cacheCommit(utils.NonTransactional), utils.NonTransactional); errCh != nil {
return nil, errCh
}
}
}
// Populate CDR list out of events
cdrs := make([]*CDR, len(cgrEvs))
if args.refund || args.ralS || args.store || args.reRate || args.export {
for i, cgrEv := range cgrEvs {
if args.refund {
if _, has := cgrEv.Event[utils.CostDetails]; !has {
// if CostDetails is not populated or is nil, look for it inside the previously stored cdr
var cgrID string // prepare CGRID to filter for previous CDR
if val, has := cgrEv.Event[utils.CGRID]; !has {
cgrID = utils.Sha1(utils.IfaceAsString(cgrEv.Event[utils.OriginID]),
utils.IfaceAsString(cgrEv.Event[utils.OriginHost]))
} else {
cgrID = utils.IfaceAsString(val)
}
var prevCDRs []*CDR // only one should be returned
if prevCDRs, _, err = cdrS.cdrDb.GetCDRs(
&utils.CDRsFilter{CGRIDs: []string{cgrID},
RunIDs: []string{utils.IfaceAsString(cgrEv.Event[utils.RunID])}}, false); err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> could not retrieve previously stored CDR, error: <%s>",
utils.CDRs, err.Error()))
err = utils.ErrPartiallyExecuted
return
} else {
cgrEv.Event[utils.CostDetails] = prevCDRs[0].CostDetails
}
}
} else if args.reRate {
// Force rerate by removing CostDetails to avoid marking as already rated.
delete(cgrEv.Event, utils.CostDetails)
}
if cdrs[i], err = NewMapEvent(cgrEv.Event).AsCDR(cdrS.cgrCfg,
cgrEv.Tenant, cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> converting event %+v to CDR",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv)))
err = utils.ErrPartiallyExecuted
return
}
}
}
procFlgs := make([]utils.StringSet, len(cgrEvs)) // will save the flags for the reply here
for i := range cgrEvs {
procFlgs[i] = utils.NewStringSet(nil)
}
if args.refund {
for i, cdr := range cdrs {
if rfnd, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), utils.ToJSON(cdr)))
} else if rfnd {
cdr.CostDetails = nil // this makes sure that the rater will recalculate (and debit) the cost
procFlgs[i].Add(utils.MetaRefund)
}
}
}
if args.ralS {
for i, cdr := range cdrs {
if !utils.AccountableRequestTypes.Has(cdr.RequestType) {
cdr.CostDetails = nil
}
for j, rtCDR := range cdrS.rateCDRWithErr(
&CDRWithAPIOpts{
CDR: cdr,
APIOpts: cgrEvs[i].APIOpts,
}) {
cgrEv := rtCDR.AsCGREvent()
cgrEv.APIOpts = cgrEvs[i].APIOpts
if j == 0 { // the first CDR will replace the events we got already as a small optimization
cdrs[i] = rtCDR
cgrEvs[i] = cgrEv
} else {
cdrs = append(cdrs, cdr)
cgrEvs = append(cgrEvs, cgrEv)
}
}
}
}
if args.store {
refundCDRCosts := func() { // will be used to refund all CDRs on errors
for _, cdr := range cdrs { // refund what we have charged since duplicates are not allowed
if _, errRfd := cdrS.refundEventCost(cdr.CostDetails,
cdr.RequestType, cdr.ToR); errRfd != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> refunding CDR %+v",
utils.CDRs, errRfd.Error(), utils.ToJSON(cdr)))
}
}
}
for _, cdr := range cdrs {
if err = cdrS.cdrDb.SetCDR(cdr, false); err != nil {
if err != utils.ErrExists || !args.reRate {
refundCDRCosts()
return
}
if err = cdrS.cdrDb.SetCDR(cdr, true); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> updating CDR %+v",
utils.CDRs, err.Error(), utils.ToJSON(cdr)))
err = utils.ErrPartiallyExecuted
return
}
}
}
}
var partiallyExecuted bool // from here actions are optional and a general error is returned
if args.export {
if len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0 {
for _, cgrEv := range cgrEvs {
evWithOpts := &CGREventWithEeIDs{
CGREvent: cgrEv,
EeIDs: cdrS.cgrCfg.CdrsCfg().OnlineCDRExports,
}
if err = cdrS.eeSProcessEvent(evWithOpts); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> exporting cdr %+v",
utils.CDRs, err.Error(), utils.ToJSON(evWithOpts)))
partiallyExecuted = true
}
}
}
}
if args.thdS {
for _, cgrEv := range cgrEvs {
if err = cdrS.thdSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv), utils.ThresholdS))
partiallyExecuted = true
}
}
}
if args.stS {
for _, cgrEv := range cgrEvs {
if err = cdrS.statSProcessEvent(cgrEv); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> processing event %+v with %s",
utils.CDRs, err.Error(), utils.ToJSON(cgrEv), utils.StatS))
partiallyExecuted = true
}
}
}
if partiallyExecuted {
err = utils.ErrPartiallyExecuted
}
outEvs = make([]*utils.EventWithFlags, len(cgrEvs))
for i, cgrEv := range cgrEvs {
outEvs[i] = &utils.EventWithFlags{
Flags: procFlgs[i].AsSlice(),
Event: cgrEv.Event,
}
}
return
}
// V1ProcessCDR processes a CDR
func (cdrS *CDRServer) V1ProcessCDR(ctx *context.Context, cdr *CDRWithAPIOpts, reply *string) (err error) {
if cdr.CGRID == utils.EmptyString { // Populate CGRID if not present
cdr.ComputeCGRID()
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessCDR, cdr.CGRID, cdr.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if cdr.RequestType == utils.EmptyString {
cdr.RequestType = cdrS.cgrCfg.GeneralCfg().DefaultReqType
}
if cdr.Tenant == utils.EmptyString {
cdr.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
if cdr.Category == utils.EmptyString {
cdr.Category = cdrS.cgrCfg.GeneralCfg().DefaultCategory
}
if cdr.Subject == utils.EmptyString { // Use account information as rating subject if missing
cdr.Subject = cdr.Account
}
if cdr.RunID == utils.EmptyString {
cdr.RunID = utils.MetaDefault
}
cgrEv := cdr.AsCGREvent()
cgrEv.APIOpts = cdr.APIOpts
procArgs, err := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), nil, nil)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
procArgs.chrgS = procArgs.chrgS && !cdr.PreRated
procArgs.ralS = !cdr.PreRated // rate the CDR if it's not PreRated
if _, err = cdrS.processEvents([]*utils.CGREvent{cgrEv}, procArgs); err != nil {
return
}
*reply = utils.OK
return
}
// ArgV1ProcessEvent is the CGREvent with proccesing Flags
type ArgV1ProcessEvent struct {
Flags []string
utils.CGREvent
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgV1ProcessEvent) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgV1ProcessEvent) RPCClone() (any, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgV1ProcessEvent) Clone() *ArgV1ProcessEvent {
var flags []string
if attr.Flags != nil {
flags = make([]string, len(attr.Flags))
copy(flags, attr.Flags)
}
return &ArgV1ProcessEvent{
Flags: flags,
CGREvent: *attr.CGREvent.Clone(),
}
}
// V1ProcessEvent will process the CGREvent
func (cdrS *CDRServer) V1ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEvent, reply *string) (err error) {
if arg.CGREvent.ID == utils.EmptyString {
arg.CGREvent.ID = utils.GenUUID()
}
if arg.CGREvent.Tenant == utils.EmptyString {
arg.CGREvent.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1ProcessEvent, arg.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
// Compute processing arguments based on flags and configuration.
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
procArgs, err := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs, arg.APIOpts)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
if _, err = cdrS.processEvents([]*utils.CGREvent{&arg.CGREvent}, procArgs); err != nil {
return
}
*reply = utils.OK
return nil
}
// V2ProcessEvent has the same logic with V1ProcessEvent except it adds the proccessed events to the reply
func (cdrS *CDRServer) V2ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEvent, evs *[]*utils.EventWithFlags) (err error) {
if arg.ID == "" {
arg.ID = utils.GenUUID()
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV2ProcessEvent, arg.CGREvent.ID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*evs = *cachedResp.Result.(*[]*utils.EventWithFlags)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: evs, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
// Compute processing arguments based on flags and configuration.
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
procArgs, err := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs, arg.APIOpts)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
var procEvs []*utils.EventWithFlags
if procEvs, err = cdrS.processEvents([]*utils.CGREvent{&arg.CGREvent}, procArgs); err != nil {
return
}
*evs = procEvs
return nil
}
// ArgV1ProcessEvents defines the structure for holding CGREvents about to be processed by CDRsV1.ProcessEvents.
type ArgV1ProcessEvents struct {
Flags []string
CGREvents []*utils.CGREvent
APIOpts map[string]any
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgV1ProcessEvents) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgV1ProcessEvents) RPCClone() (any, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgV1ProcessEvents) Clone() *ArgV1ProcessEvents {
cln := &ArgV1ProcessEvents{
Flags: slices.Clone(attr.Flags),
CGREvents: make([]*utils.CGREvent, 0, len(attr.CGREvents)),
}
for _, cgrEv := range attr.CGREvents {
cln.CGREvents = append(cln.CGREvents, cgrEv.Clone())
}
if attr.APIOpts == nil {
return cln
}
cln.APIOpts = make(map[string]any)
for key, value := range attr.APIOpts {
cln.APIOpts[key] = value
}
return cln
}
// V1ProcessEvents is similar to V1ProcessEvent but it can accept multiple CGREvents inside the parameter.
func (cdrS *CDRServer) V1ProcessEvents(ctx *context.Context, arg *ArgV1ProcessEvents, reply *string) (err error) {
// Populate ID and Tenant of CGREvents if missing.
for _, cgrEv := range arg.CGREvents {
if cgrEv.ID == "" {
cgrEv.ID = utils.GenUUID()
}
if cgrEv.Tenant == "" {
cgrEv.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
}
// Compute processing arguments based on flags and configuration.
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
procArgs, err := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs, arg.APIOpts)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
if _, err = cdrS.processEvents(arg.CGREvents, procArgs); err != nil {
return
}
*reply = utils.OK
return nil
}
// V1StoreSessionCost handles storing of the cost into session_costs table
func (cdrS *CDRServer) V1StoreSessionCost(ctx *context.Context, attr *AttrCDRSStoreSMCost, reply *string) (err error) {
if attr.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRsCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, attr.Cost.CGRID, attr.Cost.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
if err = cdrS.storeSMCost(attr.Cost, attr.CheckDuplicate); err != nil {
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return nil
}
// V2StoreSessionCost will store the SessionCost into session_costs table
func (cdrS *CDRServer) V2StoreSessionCost(ctx *context.Context, args *ArgsV2CDRSStoreSMCost, reply *string) (err error) {
if args.Cost.CGRID == "" {
return utils.NewCGRError(utils.CDRsCtx,
utils.MandatoryIEMissingCaps, fmt.Sprintf("%s: CGRID", utils.MandatoryInfoMissing),
"SMCost: %+v with empty CGRID")
}
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.CDRsV1StoreSessionCost, args.Cost.CGRID, args.Cost.RunID)
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*string)
}
return cachedResp.Error
}
defer Cache.Set(utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: reply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
cc := args.Cost.CostDetails.AsCallCost(utils.EmptyString)
if args.Cost.CostDetails.AccountSummary != nil {
cc.Tenant = args.Cost.CostDetails.AccountSummary.Tenant
cc.Account = args.Cost.CostDetails.AccountSummary.ID
}
cc.Round()
roundIncrements := cc.GetRoundIncrements()
if len(roundIncrements) != 0 {
cd := cc.CreateCallDescriptor()
cd.CgrID = args.Cost.CGRID
cd.RunID = args.Cost.RunID
cd.Increments = roundIncrements
response := new(Account)
if err := cdrS.connMgr.Call(context.TODO(), cdrS.cgrCfg.CdrsCfg().RaterConns,
utils.ResponderRefundRounding,
&CallDescriptorWithAPIOpts{CallDescriptor: cd},
response); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<CDRS> RefundRounding for cc: %+v, got error: %s",
cc, err.Error()))
}
accSum := response.AsAccountSummary()
accSum.UpdateInitialValue(cc.AccountSummary)
cc.AccountSummary = accSum
}
if err = cdrS.storeSMCost(
&SMCost{
CGRID: args.Cost.CGRID,
RunID: args.Cost.RunID,
OriginHost: args.Cost.OriginHost,
OriginID: args.Cost.OriginID,
CostSource: args.Cost.CostSource,
Usage: args.Cost.Usage,
CostDetails: NewEventCostFromCallCost(cc, args.Cost.CGRID, args.Cost.RunID)},
args.CheckDuplicate); err != nil {
err = utils.NewErrServerError(err)
return
}
*reply = utils.OK
return
}
// ArgRateCDRs a cdr with extra flags
type ArgRateCDRs struct {
Flags []string
utils.RPCCDRsFilter
Tenant string
APIOpts map[string]any
}
// V1RateCDRs is used for re-/rate CDRs which are already stored within StorDB
// FixMe: add RPC caching
func (cdrS *CDRServer) V1RateCDRs(ctx *context.Context, arg *ArgRateCDRs, reply *string) (err error) {
var cdrFltr *utils.CDRsFilter
if cdrFltr, err = arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
return utils.NewErrServerError(err)
}
var cdrs []*CDR
if cdrs, _, err = cdrS.cdrDb.GetCDRs(cdrFltr, false); err != nil {
return
}
// Compute processing arguments based on flags and configuration.
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
procArgs, err := newCDRProcessingArgs(cdrS.cgrCfg.CdrsCfg(), flgs, arg.APIOpts)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
procArgs.ralS = true
cgrEvs := make([]*utils.CGREvent, len(cdrs))
for i, cdr := range cdrs {
cdr.Cost = -1 // the cost will be recalculated
cgrEvs[i] = cdr.AsCGREvent()
cgrEvs[i].APIOpts = arg.APIOpts
}
if _, err = cdrS.processEvents(cgrEvs, procArgs); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return
}
// V1ReprocessCDRs is used to reprocess CDRs which are already stored within StorDB
func (cdrS *CDRServer) V1ReprocessCDRs(ctx *context.Context, arg *ArgRateCDRs, reply *string) (err error) {
var cdrFltr *utils.CDRsFilter
if cdrFltr, err = arg.RPCCDRsFilter.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone); err != nil {
return utils.NewErrServerError(err)
}
var cdrs []*CDR
if cdrs, _, err = cdrS.cdrDb.GetCDRs(cdrFltr, false); err != nil {
return
}
// Compute processing arguments based on flags and configuration.
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
procArgs, err := newCDRProcessingArgsNoCfg(flgs, arg.APIOpts)
if err != nil {
return fmt.Errorf("failed to configure processing args: %v", err)
}
procArgs.reprocess = true
cgrEvs := make([]*utils.CGREvent, len(cdrs))
for i, cdr := range cdrs {
cgrEvs[i] = cdr.AsCGREvent()
cgrEvs[i].APIOpts = arg.APIOpts
}
if _, err = cdrS.processEvents(cgrEvs, procArgs); err != nil {
return utils.NewErrServerError(err)
}
*reply = utils.OK
return
}
// V1ProcessExternalCDR is used to process external CDRs
func (cdrS *CDRServer) V1ProcessExternalCDR(ctx *context.Context, eCDR *ExternalCDRWithAPIOpts, reply *string) error {
cdr, err := NewCDRFromExternalCDR(eCDR.ExternalCDR,
cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
return err
}
return cdrS.V1ProcessCDR(ctx,
&CDRWithAPIOpts{
CDR: cdr,
APIOpts: eCDR.APIOpts,
}, reply)
}
// V1GetCDRs returns CDRs from DB
func (cdrS *CDRServer) V1GetCDRs(ctx *context.Context, args utils.RPCCDRsFilterWithAPIOpts, cdrs *[]*CDR) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
}
return err
}
qryCDRs, _, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false)
if err != nil {
return utils.NewErrServerError(err)
}
*cdrs = qryCDRs
return nil
}
// V1GetCDRsCount counts CDRs from DB
func (cdrS *CDRServer) V1GetCDRsCount(ctx *context.Context, args *utils.RPCCDRsFilterWithAPIOpts, cnt *int64) error {
cdrsFltr, err := args.AsCDRsFilter(cdrS.cgrCfg.GeneralCfg().DefaultTimezone)
if err != nil {
if err.Error() != utils.NotFoundCaps {
err = utils.NewErrServerError(err)
}
return err
}
cdrsFltr.Count = true
_, qryCnt, err := cdrS.cdrDb.GetCDRs(cdrsFltr, false)
if err != nil {
return utils.NewErrServerError(err)
}
*cnt = qryCnt
return nil
}