Implement ProcessStoredEvents API

This commit is contained in:
ionutboangiu
2023-11-15 11:31:20 -05:00
committed by Dan Christian Bogos
parent e218ca06a8
commit f5d69cb41b
6 changed files with 63 additions and 36 deletions

View File

@@ -85,7 +85,7 @@ func GetUniqueCDRID(cgrEv *utils.CGREvent) string {
return utils.UUIDSha1Prefix()
}
func NewCGREventFromCDR(cdr *CDR) *utils.CGREvent {
func (cdr *CDR) CGREvent() *utils.CGREvent {
return &utils.CGREvent{
Tenant: cdr.Tenant,
ID: utils.Sha1(),
@@ -107,3 +107,9 @@ func checkNestedFields(elem string, values []string) bool {
return false
}
type CDRFilters struct {
Tenant string
FilterIDs []string
APIOpts map[string]interface{}
}

View File

@@ -20,6 +20,7 @@ package engine
import (
"encoding/json"
"errors"
"fmt"
"net/http"
@@ -446,3 +447,27 @@ func populateCost(cgrOpts map[string]any) *utils.Decimal {
}
return nil
}
// V1ProcessStoredEvents processes stored events based on provided filters.
func (cdrS *CDRServer) V1ProcessStoredEvents(ctx *context.Context, args *CDRFilters, reply *string) error {
if args.Tenant == utils.EmptyString {
args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant
}
fltrs, err := PrepareFilters(ctx, args.FilterIDs, args.Tenant, cdrS.dm)
if err != nil {
return fmt.Errorf("preparing filters failed: %w", err)
}
cdrs, err := cdrS.db.GetCDRs(ctx, fltrs, args.APIOpts)
if err != nil {
return fmt.Errorf("retrieving CDRs failed: %w", err)
}
for _, cdr := range cdrs {
event := cdr.CGREvent()
_, err := cdrS.processEvent(ctx, event)
if err != nil && !errors.Is(err, utils.ErrPartiallyExecuted) {
return fmt.Errorf("processing event %s failed: %w", event.ID, err)
}
}
*reply = utils.OK
return nil
}

View File

@@ -908,3 +908,21 @@ func (fltr *FilterRule) FieldAsInterface(fldPath []string) (_ any, err error) {
return fltr.Values, nil
}
}
// PrepareFilters retrieves and compiles the filters identified by filterIDs for the specified tenant.
func PrepareFilters(ctx *context.Context, filterIDs []string, tenant string,
dm *DataManager) ([]*Filter, error) {
fltrs := make([]*Filter, 0, len(filterIDs))
for _, fltrID := range filterIDs {
fltr, err := dm.GetFilter(ctx, tenant, fltrID, true, true, utils.NonTransactional)
if err != nil {
return nil, fmt.Errorf("retrieving filter %s failed: %w", fltrID, err)
}
if err = fltr.Compile(); err != nil {
return nil, fmt.Errorf("compiling filter %s failed: %w", fltrID, err)
}
fltrs = append(fltrs, fltr)
}
return fltrs, nil
}

View File

@@ -239,7 +239,7 @@ func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma
Event: val.Event,
}
var pass bool
dP := NewCGREventFromCDR(newCdr).AsDataProvider()
dP := newCdr.CGREvent().AsDataProvider()
for _, fltr := range excludedCdrQueryFilterTypes {
if pass, err = fltr.Pass(ctx, dP); err != nil {
return nil, err
@@ -328,7 +328,7 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err
Event: cdr.Event,
}
var pass bool
dP := NewCGREventFromCDR(newCdr).AsDataProvider()
dP := newCdr.CGREvent().AsDataProvider()
// check if the filter pass
for _, fltr := range excludedCdrQueryFilterTypes {
if pass, err = fltr.Pass(ctx, dP); err != nil {