From f5d69cb41b4cc6ebcaef94cba5e212e8eba2edc0 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 15 Nov 2023 11:31:20 -0500 Subject: [PATCH] Implement ProcessStoredEvents API --- apis/cdrs.go | 43 ++++++++++--------------------------------- engine/cdr.go | 8 +++++++- engine/cdrs.go | 25 +++++++++++++++++++++++++ engine/filters.go | 18 ++++++++++++++++++ engine/storage_sql.go | 4 ++-- utils/consts.go | 1 + 6 files changed, 63 insertions(+), 36 deletions(-) diff --git a/apis/cdrs.go b/apis/cdrs.go index a9f82281d..1a0ccf7d6 100644 --- a/apis/cdrs.go +++ b/apis/cdrs.go @@ -27,18 +27,12 @@ import ( "github.com/cgrates/cgrates/utils" ) -type CDRFilters struct { - Tenant string - FilterIDs []string - APIOpts map[string]interface{} -} - // GetCDRs retrieves a list of CDRs matching the specified filters. -func (admS AdminSv1) GetCDRs(ctx *context.Context, args *CDRFilters, reply *[]*engine.CDR) error { +func (admS AdminSv1) GetCDRs(ctx *context.Context, args *engine.CDRFilters, reply *[]*engine.CDR) error { if args.Tenant == utils.EmptyString { args.Tenant = admS.cfg.GeneralCfg().DefaultTenant } - fltrs, err := admS.prepareFilters(ctx, args.FilterIDs, args.Tenant) + fltrs, err := engine.PrepareFilters(ctx, args.FilterIDs, args.Tenant, admS.dm) if err != nil { return fmt.Errorf("preparing filters failed: %w", err) } @@ -51,11 +45,11 @@ func (admS AdminSv1) GetCDRs(ctx *context.Context, args *CDRFilters, reply *[]*e } // RemoveCDRs removes CDRs matching the specified filters. -func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *CDRFilters, reply *string) (err error) { +func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *engine.CDRFilters, reply *string) (err error) { if args.Tenant == utils.EmptyString { args.Tenant = admS.cfg.GeneralCfg().DefaultTenant } - fltrs, err := admS.prepareFilters(ctx, args.FilterIDs, args.Tenant) + fltrs, err := engine.PrepareFilters(ctx, args.FilterIDs, args.Tenant, admS.dm) if err != nil { return fmt.Errorf("preparing filters failed: %w", err) } @@ -66,29 +60,6 @@ func (admS AdminSv1) RemoveCDRs(ctx *context.Context, args *CDRFilters, reply *s return } -// prepareFilters retrieves and compiles the filters identified by filterIDs for the specified tenant. -func (admS AdminSv1) prepareFilters(ctx *context.Context, filterIDs []string, tenant string, -) ([]*engine.Filter, error) { - - fltrs := make([]*engine.Filter, 0, len(filterIDs)) - for _, fltrID := range filterIDs { - var singleFltr engine.Filter - err := admS.GetFilter(ctx, &utils.TenantIDWithAPIOpts{ - TenantID: &utils.TenantID{ - Tenant: tenant, - ID: fltrID, - }}, &singleFltr) - if err != nil { - return nil, fmt.Errorf("retrieving filter %s failed: %w", fltrID, err) - } - if err = singleFltr.Compile(); err != nil { - return nil, fmt.Errorf("compiling filter %s failed: %w", fltrID, err) - } - fltrs = append(fltrs, &singleFltr) - } - return fltrs, nil -} - // NewCDRsV1 constructs the RPC Object for CDRsV1 func NewCDRsV1(cdrS *engine.CDRServer) *CDRsV1 { return &CDRsV1{cdrS: cdrS} @@ -111,3 +82,9 @@ func (cdrSv1 *CDRsV1) ProcessEventWithGet(ctx *context.Context, args *utils.CGRE reply *[]*utils.EventsWithOpts) error { return cdrSv1.cdrS.V1ProcessEventWithGet(ctx, args, reply) } + +// ProcessStoredEvents processes stored events based on provided filters. +func (cdrSv1 *CDRsV1) ProcessStoredEvents(ctx *context.Context, args *engine.CDRFilters, + reply *string) error { + return cdrSv1.cdrS.V1ProcessStoredEvents(ctx, args, reply) +} diff --git a/engine/cdr.go b/engine/cdr.go index 671c82ab9..3fc4fed58 100644 --- a/engine/cdr.go +++ b/engine/cdr.go @@ -85,7 +85,7 @@ func GetUniqueCDRID(cgrEv *utils.CGREvent) string { return utils.UUIDSha1Prefix() } -func NewCGREventFromCDR(cdr *CDR) *utils.CGREvent { +func (cdr *CDR) CGREvent() *utils.CGREvent { return &utils.CGREvent{ Tenant: cdr.Tenant, ID: utils.Sha1(), @@ -107,3 +107,9 @@ func checkNestedFields(elem string, values []string) bool { return false } + +type CDRFilters struct { + Tenant string + FilterIDs []string + APIOpts map[string]interface{} +} diff --git a/engine/cdrs.go b/engine/cdrs.go index bae94f9e5..75cf1f870 100644 --- a/engine/cdrs.go +++ b/engine/cdrs.go @@ -20,6 +20,7 @@ package engine import ( "encoding/json" + "errors" "fmt" "net/http" @@ -446,3 +447,27 @@ func populateCost(cgrOpts map[string]any) *utils.Decimal { } return nil } + +// V1ProcessStoredEvents processes stored events based on provided filters. +func (cdrS *CDRServer) V1ProcessStoredEvents(ctx *context.Context, args *CDRFilters, reply *string) error { + if args.Tenant == utils.EmptyString { + args.Tenant = cdrS.cfg.GeneralCfg().DefaultTenant + } + fltrs, err := PrepareFilters(ctx, args.FilterIDs, args.Tenant, cdrS.dm) + if err != nil { + return fmt.Errorf("preparing filters failed: %w", err) + } + cdrs, err := cdrS.db.GetCDRs(ctx, fltrs, args.APIOpts) + if err != nil { + return fmt.Errorf("retrieving CDRs failed: %w", err) + } + for _, cdr := range cdrs { + event := cdr.CGREvent() + _, err := cdrS.processEvent(ctx, event) + if err != nil && !errors.Is(err, utils.ErrPartiallyExecuted) { + return fmt.Errorf("processing event %s failed: %w", event.ID, err) + } + } + *reply = utils.OK + return nil +} diff --git a/engine/filters.go b/engine/filters.go index 8da14b4e4..137c94a40 100644 --- a/engine/filters.go +++ b/engine/filters.go @@ -908,3 +908,21 @@ func (fltr *FilterRule) FieldAsInterface(fldPath []string) (_ any, err error) { return fltr.Values, nil } } + +// PrepareFilters retrieves and compiles the filters identified by filterIDs for the specified tenant. +func PrepareFilters(ctx *context.Context, filterIDs []string, tenant string, + dm *DataManager) ([]*Filter, error) { + + fltrs := make([]*Filter, 0, len(filterIDs)) + for _, fltrID := range filterIDs { + fltr, err := dm.GetFilter(ctx, tenant, fltrID, true, true, utils.NonTransactional) + if err != nil { + return nil, fmt.Errorf("retrieving filter %s failed: %w", fltrID, err) + } + if err = fltr.Compile(); err != nil { + return nil, fmt.Errorf("compiling filter %s failed: %w", fltrID, err) + } + fltrs = append(fltrs, fltr) + } + return fltrs, nil +} diff --git a/engine/storage_sql.go b/engine/storage_sql.go index 657bdb283..3d23aadd6 100644 --- a/engine/storage_sql.go +++ b/engine/storage_sql.go @@ -239,7 +239,7 @@ func (sqls *SQLStorage) GetCDRs(ctx *context.Context, qryFltr []*Filter, opts ma Event: val.Event, } var pass bool - dP := NewCGREventFromCDR(newCdr).AsDataProvider() + dP := newCdr.CGREvent().AsDataProvider() for _, fltr := range excludedCdrQueryFilterTypes { if pass, err = fltr.Pass(ctx, dP); err != nil { return nil, err @@ -328,7 +328,7 @@ func (sqls *SQLStorage) RemoveCDRs(ctx *context.Context, qryFltr []*Filter) (err Event: cdr.Event, } var pass bool - dP := NewCGREventFromCDR(newCdr).AsDataProvider() + dP := newCdr.CGREvent().AsDataProvider() // check if the filter pass for _, fltr := range excludedCdrQueryFilterTypes { if pass, err = fltr.Pass(ctx, dP); err != nil { diff --git a/utils/consts.go b/utils/consts.go index d49112329..9211f0b10 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1566,6 +1566,7 @@ const ( CDRsV1StoreSessionCost = "CDRsV1.StoreSessionCost" CDRsV1ProcessEvent = "CDRsV1.ProcessEvent" CDRsV1ProcessEventWithGet = "CDRsV1.ProcessEventWithGet" + CDRsV1ProcessStoredEvents = "CDRsV1.ProcessStoredEvents" CDRsV1Ping = "CDRsV1.Ping" CDRsV2 = "CDRsV2" CDRsV2StoreSessionCost = "CDRsV2.StoreSessionCost"