Implement CDRsv1.ProcessEvents API

This commit is contained in:
ionutboangiu
2024-02-16 11:03:46 -05:00
committed by Dan Christian Bogos
parent 8ab43944a2
commit 267e6109e1
3 changed files with 142 additions and 0 deletions

View File

@@ -100,6 +100,11 @@ func (cdrSv1 *CDRsV1) ProcessEvent(ctx *context.Context, arg *engine.ArgV1Proces
return cdrSv1.CDRs.V1ProcessEvent(ctx, arg, reply)
}
// ProcessEvent will process multiple events Event based on the flags attached.
func (cdrSv1 *CDRsV1) ProcessEvents(ctx *context.Context, arg *engine.ArgV1ProcessEvents, reply *string) error {
return cdrSv1.CDRs.V1ProcessEvents(ctx, arg, reply)
}
// ProcessExternalCDR will process a CDR in external format
func (cdrSv1 *CDRsV1) ProcessExternalCDR(ctx *context.Context, cdr *engine.ExternalCDRWithAPIOpts, reply *string) error {
return cdrSv1.CDRs.V1ProcessExternalCDR(ctx, cdr, reply)

View File

@@ -958,6 +958,142 @@ func (cdrS *CDRServer) V2ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEve
return nil
}
// ArgV1ProcessEvents defines the structure for holding CGREvents about to be processed by CDRsV1.ProcessEvents.
type ArgV1ProcessEvents struct {
Flags []string
CGREvents []*utils.CGREvent
APIOpts map[string]any
clnb bool //rpcclonable
}
// SetCloneable sets if the args should be clonned on internal connections
func (attr *ArgV1ProcessEvents) SetCloneable(rpcCloneable bool) {
attr.clnb = rpcCloneable
}
// RPCClone implements rpcclient.RPCCloner interface
func (attr *ArgV1ProcessEvents) RPCClone() (any, error) {
if !attr.clnb {
return attr, nil
}
return attr.Clone(), nil
}
// Clone creates a clone of the object
func (attr *ArgV1ProcessEvents) Clone() *ArgV1ProcessEvents {
cln := &ArgV1ProcessEvents{
Flags: slices.Clone(attr.Flags),
CGREvents: make([]*utils.CGREvent, 0, len(attr.CGREvents)),
}
for _, cgrEv := range attr.CGREvents {
cln.CGREvents = append(cln.CGREvents, cgrEv.Clone())
}
if attr.APIOpts == nil {
return cln
}
cln.APIOpts = make(map[string]any)
for key, value := range attr.APIOpts {
cln.APIOpts[key] = value
}
return cln
}
// V1ProcessEvents is similar to V1ProcessEvent but it can accept multiple CGREvents inside the parameter.
func (cdrS *CDRServer) V1ProcessEvents(ctx *context.Context, arg *ArgV1ProcessEvents, reply *string) (err error) {
// Populate ID and Tenant of CGREvents if missing.
for _, cgrEv := range arg.CGREvents {
if cgrEv.ID == "" {
cgrEv.ID = utils.GenUUID()
}
if cgrEv.Tenant == "" {
cgrEv.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant
}
}
flgs := utils.FlagsWithParamsFromSlice(arg.Flags)
attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0
if v, has := arg.APIOpts[utils.OptsAttributeS]; has {
if attrS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaAttributes) {
attrS = flgs.GetBool(utils.MetaAttributes)
}
store := cdrS.cgrCfg.CdrsCfg().StoreCdrs
if flgs.Has(utils.MetaStore) {
store = flgs.GetBool(utils.MetaStore)
}
export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0
if flgs.Has(utils.MetaExport) {
export = flgs.GetBool(utils.MetaExport)
}
thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0
if v, has := arg.APIOpts[utils.OptsThresholdS]; has {
if thdS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaThresholds) {
thdS = flgs.GetBool(utils.MetaThresholds)
}
stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0
if v, has := arg.APIOpts[utils.OptsStatS]; has {
if stS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaStats) {
stS = flgs.GetBool(utils.MetaStats)
}
chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0
if v, has := arg.APIOpts[utils.OptsChargerS]; has {
if chrgS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaChargers) {
chrgS = flgs.GetBool(utils.MetaChargers)
}
var ralS bool
if v, has := arg.APIOpts[utils.OptsRALs]; has {
if ralS, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRALs) {
ralS = flgs.GetBool(utils.MetaRALs)
}
var reRate bool
if v, has := arg.APIOpts[utils.OptsRerate]; has {
if reRate, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRerate) {
if reRate = flgs.GetBool(utils.MetaRerate); reRate {
ralS = true
}
}
var refund bool
if v, has := arg.APIOpts[utils.OptsRefund]; has {
if refund, err = utils.IfaceAsBool(v); err != nil {
return
}
}
if flgs.Has(utils.MetaRefund) {
refund = flgs.GetBool(utils.MetaRefund)
}
if _, err = cdrS.processEvents(arg.CGREvents, chrgS, attrS, refund,
ralS, store, reRate, export, thdS, stS); err != nil {
return
}
*reply = utils.OK
return nil
}
// V1StoreSessionCost handles storing of the cost into session_costs table
func (cdrS *CDRServer) V1StoreSessionCost(ctx *context.Context, attr *AttrCDRSStoreSMCost, reply *string) (err error) {
if attr.Cost.CGRID == "" {

View File

@@ -1758,6 +1758,7 @@ const (
CDRsV1ProcessExternalCDR = "CDRsV1.ProcessExternalCDR"
CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost"
CDRsV1ProcessEvent = "CDRsV1.ProcessEvent"
CDRsV1ProcessEvents = "CDRsV1.ProcessEvents"
CDRsV1Ping = "CDRsV1.Ping"
CDRsV2 = "CDRsV2"
CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"