From ec41867448753e190b98bc4510a2ea3f79e14862 Mon Sep 17 00:00:00 2001 From: DanB Date: Tue, 13 Jan 2026 19:50:19 +0100 Subject: [PATCH] Initial SessionSv1.ProcessEvent --- cdrs/cdrs.go | 8 +-- cdrs/cdrs_test.go | 16 ++--- config/sessionscfg.go | 5 ++ sessions/apis.go | 129 ++++++++++++++++++++++++++++++++++++---- sessions/libsessions.go | 3 +- sessions/sessions.go | 18 ++++++ utils/consts.go | 2 +- 7 files changed, 155 insertions(+), 26 deletions(-) diff --git a/cdrs/cdrs.go b/cdrs/cdrs.go index f1d48e212..c59031679 100644 --- a/cdrs/cdrs.go +++ b/cdrs/cdrs.go @@ -126,7 +126,7 @@ func (cdrS *CDRServer) accountSDebitEvent(ctx *context.Context, cgrEv *utils.CGR utils.AccountSv1DebitAbstracts, cgrEv, acntCost); err != nil { return } - cgrEv.APIOpts[utils.MetaAccountSCost] = acntCost + cgrEv.APIOpts[utils.MetaAccountsCost] = acntCost return } @@ -257,10 +257,10 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent if !acntS { continue } - if ecCostIface, wasCharged := cgrEv.APIOpts[utils.MetaAccountSCost]; wasCharged { + if ecCostIface, wasCharged := cgrEv.APIOpts[utils.MetaAccountsCost]; wasCharged { ecCostMap, ok := ecCostIface.(*utils.EventCharges) if !ok { - return nil, fmt.Errorf("expected %s to be a *utils.EventCharges, got %T", utils.MetaAccountSCost, ecCostMap) + return nil, fmt.Errorf("expected %s to be a *utils.EventCharges, got %T", utils.MetaAccountsCost, ecCostMap) } // before converting into EventChargers, we must get the JSON encoding and Unmarshal it into an EventChargers @@ -412,7 +412,7 @@ func populateCost(cgrOpts map[string]any) *utils.Decimal { return nil } // check firstly in accounts - if accCost, has := cgrOpts[utils.MetaAccountSCost]; has { + if accCost, has := cgrOpts[utils.MetaAccountsCost]; has { return accCost.(*utils.EventCharges).Concretes } // after check in rates diff --git a/cdrs/cdrs_test.go b/cdrs/cdrs_test.go index b5e124743..fa94b75ac 100644 --- a/cdrs/cdrs_test.go +++ b/cdrs/cdrs_test.go @@ -786,7 +786,7 @@ func TestCDRsAccountProcessEventMock(t *testing.T) { utils.Cost: 123.0, }, APIOpts: map[string]any{ - utils.MetaAccountSCost: &utils.EventCharges{}, + utils.MetaAccountsCost: &utils.EventCharges{}, utils.MetaSubsys: utils.MetaAccounts, }, } @@ -808,7 +808,7 @@ func TestCDRsAccountProcessEventMock(t *testing.T) { utils.Cost: 123.0, }, APIOpts: map[string]any{ - utils.MetaAccountSCost: cgrEv.APIOpts[utils.MetaAccountSCost], + utils.MetaAccountsCost: cgrEv.APIOpts[utils.MetaAccountsCost], utils.MetaSubsys: utils.MetaAccounts, }, } @@ -2167,7 +2167,7 @@ func TestCDRServerAccountSRefundCharges(t *testing.T) { utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal) apiOpts := map[string]any{ - utils.MetaAccountSCost: &utils.EventCharges{}, + utils.MetaAccountsCost: &utils.EventCharges{}, utils.MetaSubsys: utils.AccountSConnsCfg, } eChrgs := &utils.EventCharges{ @@ -2243,7 +2243,7 @@ func TestCDRServerAccountSRefundChargesErr(t *testing.T) { utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal) apiOpts := map[string]any{ - utils.MetaAccountSCost: &utils.EventCharges{}, + utils.MetaAccountsCost: &utils.EventCharges{}, utils.MetaSubsys: utils.AccountSConnsCfg, } eChrgs := &utils.EventCharges{ @@ -2266,7 +2266,7 @@ func TestPopulateCost(t *testing.T) { utils.Usage: "10s", }, APIOpts: map[string]any{ - utils.MetaAccountSCost: &utils.EventCharges{ + utils.MetaAccountsCost: &utils.EventCharges{ Concretes: utils.NewDecimal(400, 0), }, }, @@ -2358,7 +2358,7 @@ func TestCDRsProcessEventMockThdsEcCostIface(t *testing.T) { APIOpts: map[string]any{ utils.MetaAccounts: true, "*context": utils.MetaCDRs, - utils.MetaAccountSCost: &utils.EventCharges{ + utils.MetaAccountsCost: &utils.EventCharges{ Concretes: utils.NewDecimal(400, 0), }, }, @@ -2396,7 +2396,7 @@ func TestCDRsProcessEventMockThdsEcCostIfaceMarshalErr(t *testing.T) { APIOpts: map[string]any{ utils.MetaAccounts: true, "*context": utils.MetaCDRs, - utils.MetaAccountSCost: &utils.EventCharges{ + utils.MetaAccountsCost: &utils.EventCharges{ Concretes: utils.NewDecimal(1, 2), }, }, @@ -2434,7 +2434,7 @@ func TestCDRsProcessEventMockThdsEcCostIfaceUnmarshalErr(t *testing.T) { APIOpts: map[string]any{ utils.MetaAccounts: true, "*context": utils.MetaCDRs, - utils.MetaAccountSCost: &utils.EventCharges{}, + utils.MetaAccountsCost: &utils.EventCharges{}, }, } expErr := "PARTIALLY_EXECUTED" diff --git a/config/sessionscfg.go b/config/sessionscfg.go index 9f11b4505..26a974381 100644 --- a/config/sessionscfg.go +++ b/config/sessionscfg.go @@ -72,6 +72,7 @@ type SessionsOpts struct { Routes []*DynamicBoolOpt Stats []*DynamicBoolOpt Thresholds []*DynamicBoolOpt + Authorize []*DynamicBoolOpt Initiate []*DynamicBoolOpt Update []*DynamicBoolOpt Terminate []*DynamicBoolOpt @@ -100,6 +101,10 @@ type SessionsOpts struct { TTLUsage []*DynamicDurationPointerOpt OriginID []*DynamicStringOpt AccountsForceUsage []*DynamicBoolOpt + AccountsAuthorize []*DynamicBoolOpt + AccountsInitialize []*DynamicBoolOpt + AccountsUpdate []*DynamicBoolOpt + AccountsTerminate []*DynamicBoolOpt } // SessionSCfg is the config section for SessionS diff --git a/sessions/apis.go b/sessions/apis.go index e40ae8766..4562306d6 100644 --- a/sessions/apis.go +++ b/sessions/apis.go @@ -23,6 +23,7 @@ import ( "time" "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/attributes" "github.com/cgrates/cgrates/chargers" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" @@ -784,25 +785,25 @@ func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context, // BiRPCv1ProcessEvent processes an CGREvent with various subsystems func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, - args *utils.CGREvent, authReply *V1ProcessEventReply) (err error) { - if args == nil { + apiArgs *utils.CGREvent, apiRply *V1ProcessEventReply) (err error) { + if apiArgs == nil { return utils.NewErrMandatoryIeMissing(utils.CGREventString) } - if args.Event == nil { + if apiArgs.Event == nil { return utils.NewErrMandatoryIeMissing(utils.Event) } - if args.APIOpts == nil { - args.APIOpts = make(map[string]any) + if apiArgs.APIOpts == nil { + apiArgs.APIOpts = make(map[string]any) } - if args.ID == "" { - args.ID = utils.GenUUID() + if apiArgs.ID == "" { + apiArgs.ID = utils.GenUUID() } - if args.Tenant == "" { - args.Tenant = sS.cfg.GeneralCfg().DefaultTenant + if apiArgs.Tenant == "" { + apiArgs.Tenant = sS.cfg.GeneralCfg().DefaultTenant } // RPC caching if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID) + cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, apiArgs.ID) refID := guardian.Guardian.GuardIDs("", sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic defer guardian.Guardian.UnguardIDs(refID) @@ -810,14 +811,118 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) if cachedResp.Error == nil { - *authReply = *cachedResp.Result.(*V1ProcessEventReply) + *apiRply = *cachedResp.Result.(*V1ProcessEventReply) } return cachedResp.Error } defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, - &utils.CachedRPCResponse{Result: authReply, Error: err}, + &utils.CachedRPCResponse{Result: apiRply, Error: err}, nil, true, utils.NonTransactional) } // end of RPC caching + + cgrEvs := map[string]*utils.CGREvent{ + utils.MetaDefault: apiArgs, + } + + cch := make(map[string]any) + + // processing AttributeS first gives us the opportunity of enhancing all the other flags + // check for *attribute + if attrS, errAttrS := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cch, + sS.fltrS, sS.cfg.SessionSCfg().Opts.Attributes, + utils.MetaAttributes); errAttrS != nil { + return errAttrS + } else { + cch[utils.MetaAttributes] = attrS + } + if cch[utils.MetaAttributes].(bool) { + apiRply.Attributes = make(map[string]*attributes.AttrSProcessEventReply) + if rplyAttr, errProc := sS.processAttributes(ctx, apiArgs); errProc != nil { + if errProc.Error() != utils.ErrNotFound.Error() { + return utils.NewErrAttributeS(errProc) + } + } else { + *apiArgs = *rplyAttr.CGREvent + apiRply.Attributes[utils.MetaDefault] = &rplyAttr + } + } + + // ChargerS will multiply/alter the event before any auth/accounting/cdr taking place + if chrgS, errChrg := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cch, + sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargers, + utils.MetaChargers); errChrg != nil { + return errChrg + } else { + cch[utils.MetaChargers] = chrgS + } + if cch[utils.MetaChargers].(bool) { + delete(cgrEvs, utils.MetaDefault) // ChargerS becomes responsive of charging + var chrgrs []*chargers.ChrgSProcessEventReply + if chrgrs, err = sS.processChargerS(ctx, apiArgs); err != nil { + return + } + for _, chrgr := range chrgrs { + cgrEvs[utils.IfaceAsString(chrgr.CGREvent.APIOpts[utils.MetaRunID])] = chrgr.CGREvent + } + } + + // same processing for each event + for runID, cgrEv := range cgrEvs { + cchEv := make(map[string]any) + //var partiallyExecuted bool // will be added to the final answer if true + if blkrErr, errBlkr := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(), + cchEv, sS.fltrS, sS.cfg.SessionSCfg().Opts.Authorize, + utils.OptsSesBlockerError, utils.MetaBlockerErrorCfg); errBlkr != nil { + return errBlkr + } else { + cchEv[utils.OptsSesBlockerError] = blkrErr + } + + // IPs Enabled + if ipS, errIPs := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv, + sS.fltrS, sS.cfg.SessionSCfg().Opts.IPs, + utils.MetaIPs); errIPs != nil { + return errIPs + } else { + cchEv[utils.MetaIPs] = ipS + } + + // AccountS Enabled + if acntS, errAcnts := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv, + sS.fltrS, sS.cfg.SessionSCfg().Opts.Accounts, + utils.MetaIPs); errAcnts != nil { + return errAcnts + } else { + cchEv[utils.MetaAccounts] = acntS + } + + // Auth the events + if auth, errAuth := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), + cchEv, sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAuthorize, + utils.MetaAuthorize); errAuth != nil { + return errAuth + } else { + cchEv[utils.MetaAuthorize] = auth + } + + //IPsAuthorizeBool + if ipsAuthBool, errBool := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv, + sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAuthorize, + utils.MetaIPsAuthorizeCfg); errBool != nil { + return errBool + } else { + cchEv[utils.MetaIPsAuthorizeCfg] = ipsAuthBool + } + // IPAuthorization + if cchEv[utils.MetaIPsAuthorizeCfg].(bool) || + (cchEv[utils.MetaAuthorize].(bool) && cchEv[utils.MetaIPs].(bool)) { + var authIP *utils.AllocatedIP + if authIP, err = sS.authorizeIPs(ctx, cgrEv); err != nil { + return + } + apiRply.IPsAllocation[runID] = authIP + } + } return } diff --git a/sessions/libsessions.go b/sessions/libsessions.go index f92289319..c2fb0161f 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -326,8 +326,9 @@ func getDerivedEvents(events map[string]*utils.CGREvent, derivedReply bool) map[ // V1ProcessEventReply is the reply for the ProcessEvent API type V1ProcessEventReply struct { AccountSUsage map[string]time.Duration `json:",omitempty"` - RateSCost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part + RateSCost map[string]float64 `json:",omitempty"` ResourceAllocation map[string]string `json:",omitempty"` + IPsAllocation map[string]*utils.AllocatedIP `json:",omitempty"` Attributes map[string]*attributes.AttrSProcessEventReply `json:",omitempty"` RouteProfiles map[string]routes.SortedRoutesList `json:",omitempty"` ThresholdIDs map[string][]string `json:",omitempty"` diff --git a/sessions/sessions.go b/sessions/sessions.go index dbe5f8790..c912cfc4f 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -953,6 +953,24 @@ func (sS *SessionS) processChargerS(ctx *context.Context, cgrEv *utils.CGREvent) return } +// authorizeIPs will authorize the event with IPs subsystem +func (sS *SessionS) authorizeIPs(ctx *context.Context, cgrEv *utils.CGREvent) (rply *utils.AllocatedIP, err error) { + if len(sS.cfg.SessionSCfg().IPsConns) == 0 { + err = errors.New("IPs is disabled") + return + } + var alcIP utils.AllocatedIP + if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().IPsConns, + utils.IPsV1AuthorizeIP, + cgrEv, &alcIP); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s could not authorize IP for event: %+v", + utils.SessionS, err.Error(), cgrEv)) + } + return &alcIP, nil + +} + // getSessions is used to return in a thread-safe manner active or passive sessions func (sS *SessionS) getSessions(originID string, pSessions bool) (ss []*Session) { ssMux := &sS.aSsMux // get the pointer so we don't copy, otherwise locks will not work diff --git a/utils/consts.go b/utils/consts.go index b74d98510..b45dc7c47 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -838,7 +838,7 @@ const ( MetaRadDMRTemplate = "*radDMRTemplate" MetaCost = "*cost" MetaRateSCost = "*rateSCost" - MetaAccountSCost = "*accountSCost" + MetaAccountsCost = "*accountsCost" MetaGroup = "*group" InternalRPCSet = "InternalRPCSet" MetaFileName = "*fileName"