diff --git a/apier/v1/cdrs.go b/apier/v1/cdrs.go index cc365f809..2756e7a99 100644 --- a/apier/v1/cdrs.go +++ b/apier/v1/cdrs.go @@ -100,6 +100,11 @@ func (cdrSv1 *CDRsV1) ProcessEvent(ctx *context.Context, arg *engine.ArgV1Proces return cdrSv1.CDRs.V1ProcessEvent(ctx, arg, reply) } +// ProcessEvent will process multiple events Event based on the flags attached. +func (cdrSv1 *CDRsV1) ProcessEvents(ctx *context.Context, arg *engine.ArgV1ProcessEvents, reply *string) error { + return cdrSv1.CDRs.V1ProcessEvents(ctx, arg, reply) +} + // ProcessExternalCDR will process a CDR in external format func (cdrSv1 *CDRsV1) ProcessExternalCDR(ctx *context.Context, cdr *engine.ExternalCDRWithAPIOpts, reply *string) error { return cdrSv1.CDRs.V1ProcessExternalCDR(ctx, cdr, reply) diff --git a/engine/cdrs.go b/engine/cdrs.go index 9bebee85d..02e2d00de 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -958,6 +958,142 @@ func (cdrS *CDRServer) V2ProcessEvent(ctx *context.Context, arg *ArgV1ProcessEve return nil } +// ArgV1ProcessEvents defines the structure for holding CGREvents about to be processed by CDRsV1.ProcessEvents. +type ArgV1ProcessEvents struct { + Flags []string + CGREvents []*utils.CGREvent + APIOpts map[string]any + clnb bool //rpcclonable +} + +// SetCloneable sets if the args should be clonned on internal connections +func (attr *ArgV1ProcessEvents) SetCloneable(rpcCloneable bool) { + attr.clnb = rpcCloneable +} + +// RPCClone implements rpcclient.RPCCloner interface +func (attr *ArgV1ProcessEvents) RPCClone() (any, error) { + if !attr.clnb { + return attr, nil + } + return attr.Clone(), nil +} + +// Clone creates a clone of the object +func (attr *ArgV1ProcessEvents) Clone() *ArgV1ProcessEvents { + cln := &ArgV1ProcessEvents{ + Flags: slices.Clone(attr.Flags), + CGREvents: make([]*utils.CGREvent, 0, len(attr.CGREvents)), + } + for _, cgrEv := range attr.CGREvents { + cln.CGREvents = append(cln.CGREvents, cgrEv.Clone()) + } + if attr.APIOpts == nil { + return cln + } + cln.APIOpts = make(map[string]any) + for key, value := range attr.APIOpts { + cln.APIOpts[key] = value + } + return cln +} + +// V1ProcessEvents is similar to V1ProcessEvent but it can accept multiple CGREvents inside the parameter. +func (cdrS *CDRServer) V1ProcessEvents(ctx *context.Context, arg *ArgV1ProcessEvents, reply *string) (err error) { + + // Populate ID and Tenant of CGREvents if missing. + for _, cgrEv := range arg.CGREvents { + if cgrEv.ID == "" { + cgrEv.ID = utils.GenUUID() + } + if cgrEv.Tenant == "" { + cgrEv.Tenant = cdrS.cgrCfg.GeneralCfg().DefaultTenant + } + } + + flgs := utils.FlagsWithParamsFromSlice(arg.Flags) + attrS := len(cdrS.cgrCfg.CdrsCfg().AttributeSConns) != 0 + if v, has := arg.APIOpts[utils.OptsAttributeS]; has { + if attrS, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaAttributes) { + attrS = flgs.GetBool(utils.MetaAttributes) + } + store := cdrS.cgrCfg.CdrsCfg().StoreCdrs + if flgs.Has(utils.MetaStore) { + store = flgs.GetBool(utils.MetaStore) + } + export := len(cdrS.cgrCfg.CdrsCfg().OnlineCDRExports) != 0 || len(cdrS.cgrCfg.CdrsCfg().EEsConns) != 0 + if flgs.Has(utils.MetaExport) { + export = flgs.GetBool(utils.MetaExport) + } + thdS := len(cdrS.cgrCfg.CdrsCfg().ThresholdSConns) != 0 + if v, has := arg.APIOpts[utils.OptsThresholdS]; has { + if thdS, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaThresholds) { + thdS = flgs.GetBool(utils.MetaThresholds) + } + stS := len(cdrS.cgrCfg.CdrsCfg().StatSConns) != 0 + if v, has := arg.APIOpts[utils.OptsStatS]; has { + if stS, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaStats) { + stS = flgs.GetBool(utils.MetaStats) + } + chrgS := len(cdrS.cgrCfg.CdrsCfg().ChargerSConns) != 0 + if v, has := arg.APIOpts[utils.OptsChargerS]; has { + if chrgS, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaChargers) { + chrgS = flgs.GetBool(utils.MetaChargers) + } + var ralS bool + if v, has := arg.APIOpts[utils.OptsRALs]; has { + if ralS, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaRALs) { + ralS = flgs.GetBool(utils.MetaRALs) + } + var reRate bool + if v, has := arg.APIOpts[utils.OptsRerate]; has { + if reRate, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaRerate) { + if reRate = flgs.GetBool(utils.MetaRerate); reRate { + ralS = true + } + } + var refund bool + if v, has := arg.APIOpts[utils.OptsRefund]; has { + if refund, err = utils.IfaceAsBool(v); err != nil { + return + } + } + if flgs.Has(utils.MetaRefund) { + refund = flgs.GetBool(utils.MetaRefund) + } + + if _, err = cdrS.processEvents(arg.CGREvents, chrgS, attrS, refund, + ralS, store, reRate, export, thdS, stS); err != nil { + return + } + *reply = utils.OK + return nil +} + // V1StoreSessionCost handles storing of the cost into session_costs table func (cdrS *CDRServer) V1StoreSessionCost(ctx *context.Context, attr *AttrCDRSStoreSMCost, reply *string) (err error) { if attr.Cost.CGRID == "" { diff --git a/utils/consts.go b/utils/consts.go index 0c2d46748..978f8d051 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1758,6 +1758,7 @@ const ( CDRsV1ProcessExternalCDR = "CDRsV1.ProcessExternalCDR" CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV1ProcessEvent = "CDRsV1.ProcessEvent" + CDRsV1ProcessEvents = "CDRsV1.ProcessEvents" CDRsV1Ping = "CDRsV1.Ping" CDRsV2 = "CDRsV2" CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"