Initial SessionSv1.ProcessEvent

This commit is contained in:
DanB
2026-01-13 19:50:19 +01:00
parent 9dda6a264b
commit ec41867448
7 changed files with 155 additions and 26 deletions

View File

@@ -126,7 +126,7 @@ func (cdrS *CDRServer) accountSDebitEvent(ctx *context.Context, cgrEv *utils.CGR
utils.AccountSv1DebitAbstracts, cgrEv, acntCost); err != nil { utils.AccountSv1DebitAbstracts, cgrEv, acntCost); err != nil {
return return
} }
cgrEv.APIOpts[utils.MetaAccountSCost] = acntCost cgrEv.APIOpts[utils.MetaAccountsCost] = acntCost
return return
} }
@@ -257,10 +257,10 @@ func (cdrS *CDRServer) processEvents(ctx *context.Context, evs []*utils.CGREvent
if !acntS { if !acntS {
continue continue
} }
if ecCostIface, wasCharged := cgrEv.APIOpts[utils.MetaAccountSCost]; wasCharged { if ecCostIface, wasCharged := cgrEv.APIOpts[utils.MetaAccountsCost]; wasCharged {
ecCostMap, ok := ecCostIface.(*utils.EventCharges) ecCostMap, ok := ecCostIface.(*utils.EventCharges)
if !ok { if !ok {
return nil, fmt.Errorf("expected %s to be a *utils.EventCharges, got %T", utils.MetaAccountSCost, ecCostMap) return nil, fmt.Errorf("expected %s to be a *utils.EventCharges, got %T", utils.MetaAccountsCost, ecCostMap)
} }
// before converting into EventChargers, we must get the JSON encoding and Unmarshal it into an EventChargers // before converting into EventChargers, we must get the JSON encoding and Unmarshal it into an EventChargers
@@ -412,7 +412,7 @@ func populateCost(cgrOpts map[string]any) *utils.Decimal {
return nil return nil
} }
// check firstly in accounts // check firstly in accounts
if accCost, has := cgrOpts[utils.MetaAccountSCost]; has { if accCost, has := cgrOpts[utils.MetaAccountsCost]; has {
return accCost.(*utils.EventCharges).Concretes return accCost.(*utils.EventCharges).Concretes
} }
// after check in rates // after check in rates

View File

@@ -786,7 +786,7 @@ func TestCDRsAccountProcessEventMock(t *testing.T) {
utils.Cost: 123.0, utils.Cost: 123.0,
}, },
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccountSCost: &utils.EventCharges{}, utils.MetaAccountsCost: &utils.EventCharges{},
utils.MetaSubsys: utils.MetaAccounts, utils.MetaSubsys: utils.MetaAccounts,
}, },
} }
@@ -808,7 +808,7 @@ func TestCDRsAccountProcessEventMock(t *testing.T) {
utils.Cost: 123.0, utils.Cost: 123.0,
}, },
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccountSCost: cgrEv.APIOpts[utils.MetaAccountSCost], utils.MetaAccountsCost: cgrEv.APIOpts[utils.MetaAccountsCost],
utils.MetaSubsys: utils.MetaAccounts, utils.MetaSubsys: utils.MetaAccounts,
}, },
} }
@@ -2167,7 +2167,7 @@ func TestCDRServerAccountSRefundCharges(t *testing.T) {
utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal) utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal)
apiOpts := map[string]any{ apiOpts := map[string]any{
utils.MetaAccountSCost: &utils.EventCharges{}, utils.MetaAccountsCost: &utils.EventCharges{},
utils.MetaSubsys: utils.AccountSConnsCfg, utils.MetaSubsys: utils.AccountSConnsCfg,
} }
eChrgs := &utils.EventCharges{ eChrgs := &utils.EventCharges{
@@ -2243,7 +2243,7 @@ func TestCDRServerAccountSRefundChargesErr(t *testing.T) {
utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal) utils.AccountSConnsCfg), utils.AccountSv1, rpcInternal)
apiOpts := map[string]any{ apiOpts := map[string]any{
utils.MetaAccountSCost: &utils.EventCharges{}, utils.MetaAccountsCost: &utils.EventCharges{},
utils.MetaSubsys: utils.AccountSConnsCfg, utils.MetaSubsys: utils.AccountSConnsCfg,
} }
eChrgs := &utils.EventCharges{ eChrgs := &utils.EventCharges{
@@ -2266,7 +2266,7 @@ func TestPopulateCost(t *testing.T) {
utils.Usage: "10s", utils.Usage: "10s",
}, },
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccountSCost: &utils.EventCharges{ utils.MetaAccountsCost: &utils.EventCharges{
Concretes: utils.NewDecimal(400, 0), Concretes: utils.NewDecimal(400, 0),
}, },
}, },
@@ -2358,7 +2358,7 @@ func TestCDRsProcessEventMockThdsEcCostIface(t *testing.T) {
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccounts: true, utils.MetaAccounts: true,
"*context": utils.MetaCDRs, "*context": utils.MetaCDRs,
utils.MetaAccountSCost: &utils.EventCharges{ utils.MetaAccountsCost: &utils.EventCharges{
Concretes: utils.NewDecimal(400, 0), Concretes: utils.NewDecimal(400, 0),
}, },
}, },
@@ -2396,7 +2396,7 @@ func TestCDRsProcessEventMockThdsEcCostIfaceMarshalErr(t *testing.T) {
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccounts: true, utils.MetaAccounts: true,
"*context": utils.MetaCDRs, "*context": utils.MetaCDRs,
utils.MetaAccountSCost: &utils.EventCharges{ utils.MetaAccountsCost: &utils.EventCharges{
Concretes: utils.NewDecimal(1, 2), Concretes: utils.NewDecimal(1, 2),
}, },
}, },
@@ -2434,7 +2434,7 @@ func TestCDRsProcessEventMockThdsEcCostIfaceUnmarshalErr(t *testing.T) {
APIOpts: map[string]any{ APIOpts: map[string]any{
utils.MetaAccounts: true, utils.MetaAccounts: true,
"*context": utils.MetaCDRs, "*context": utils.MetaCDRs,
utils.MetaAccountSCost: &utils.EventCharges{}, utils.MetaAccountsCost: &utils.EventCharges{},
}, },
} }
expErr := "PARTIALLY_EXECUTED" expErr := "PARTIALLY_EXECUTED"

View File

@@ -72,6 +72,7 @@ type SessionsOpts struct {
Routes []*DynamicBoolOpt Routes []*DynamicBoolOpt
Stats []*DynamicBoolOpt Stats []*DynamicBoolOpt
Thresholds []*DynamicBoolOpt Thresholds []*DynamicBoolOpt
Authorize []*DynamicBoolOpt
Initiate []*DynamicBoolOpt Initiate []*DynamicBoolOpt
Update []*DynamicBoolOpt Update []*DynamicBoolOpt
Terminate []*DynamicBoolOpt Terminate []*DynamicBoolOpt
@@ -100,6 +101,10 @@ type SessionsOpts struct {
TTLUsage []*DynamicDurationPointerOpt TTLUsage []*DynamicDurationPointerOpt
OriginID []*DynamicStringOpt OriginID []*DynamicStringOpt
AccountsForceUsage []*DynamicBoolOpt AccountsForceUsage []*DynamicBoolOpt
AccountsAuthorize []*DynamicBoolOpt
AccountsInitialize []*DynamicBoolOpt
AccountsUpdate []*DynamicBoolOpt
AccountsTerminate []*DynamicBoolOpt
} }
// SessionSCfg is the config section for SessionS // SessionSCfg is the config section for SessionS

View File

@@ -23,6 +23,7 @@ import (
"time" "time"
"github.com/cgrates/birpc/context" "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/attributes"
"github.com/cgrates/cgrates/chargers" "github.com/cgrates/cgrates/chargers"
"github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/utils"
@@ -784,25 +785,25 @@ func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context,
// BiRPCv1ProcessEvent processes an CGREvent with various subsystems // BiRPCv1ProcessEvent processes an CGREvent with various subsystems
func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
args *utils.CGREvent, authReply *V1ProcessEventReply) (err error) { apiArgs *utils.CGREvent, apiRply *V1ProcessEventReply) (err error) {
if args == nil { if apiArgs == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString) return utils.NewErrMandatoryIeMissing(utils.CGREventString)
} }
if args.Event == nil { if apiArgs.Event == nil {
return utils.NewErrMandatoryIeMissing(utils.Event) return utils.NewErrMandatoryIeMissing(utils.Event)
} }
if args.APIOpts == nil { if apiArgs.APIOpts == nil {
args.APIOpts = make(map[string]any) apiArgs.APIOpts = make(map[string]any)
} }
if args.ID == "" { if apiArgs.ID == "" {
args.ID = utils.GenUUID() apiArgs.ID = utils.GenUUID()
} }
if args.Tenant == "" { if apiArgs.Tenant == "" {
args.Tenant = sS.cfg.GeneralCfg().DefaultTenant apiArgs.Tenant = sS.cfg.GeneralCfg().DefaultTenant
} }
// RPC caching // RPC caching
if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID) cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, apiArgs.ID)
refID := guardian.Guardian.GuardIDs("", refID := guardian.Guardian.GuardIDs("",
sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID) defer guardian.Guardian.UnguardIDs(refID)
@@ -810,14 +811,118 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse) cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil { if cachedResp.Error == nil {
*authReply = *cachedResp.Result.(*V1ProcessEventReply) *apiRply = *cachedResp.Result.(*V1ProcessEventReply)
} }
return cachedResp.Error return cachedResp.Error
} }
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: authReply, Error: err}, &utils.CachedRPCResponse{Result: apiRply, Error: err},
nil, true, utils.NonTransactional) nil, true, utils.NonTransactional)
} }
// end of RPC caching // end of RPC caching
cgrEvs := map[string]*utils.CGREvent{
utils.MetaDefault: apiArgs,
}
cch := make(map[string]any)
// processing AttributeS first gives us the opportunity of enhancing all the other flags
// check for *attribute
if attrS, errAttrS := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cch,
sS.fltrS, sS.cfg.SessionSCfg().Opts.Attributes,
utils.MetaAttributes); errAttrS != nil {
return errAttrS
} else {
cch[utils.MetaAttributes] = attrS
}
if cch[utils.MetaAttributes].(bool) {
apiRply.Attributes = make(map[string]*attributes.AttrSProcessEventReply)
if rplyAttr, errProc := sS.processAttributes(ctx, apiArgs); errProc != nil {
if errProc.Error() != utils.ErrNotFound.Error() {
return utils.NewErrAttributeS(errProc)
}
} else {
*apiArgs = *rplyAttr.CGREvent
apiRply.Attributes[utils.MetaDefault] = &rplyAttr
}
}
// ChargerS will multiply/alter the event before any auth/accounting/cdr taking place
if chrgS, errChrg := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cch,
sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargers,
utils.MetaChargers); errChrg != nil {
return errChrg
} else {
cch[utils.MetaChargers] = chrgS
}
if cch[utils.MetaChargers].(bool) {
delete(cgrEvs, utils.MetaDefault) // ChargerS becomes responsive of charging
var chrgrs []*chargers.ChrgSProcessEventReply
if chrgrs, err = sS.processChargerS(ctx, apiArgs); err != nil {
return
}
for _, chrgr := range chrgrs {
cgrEvs[utils.IfaceAsString(chrgr.CGREvent.APIOpts[utils.MetaRunID])] = chrgr.CGREvent
}
}
// same processing for each event
for runID, cgrEv := range cgrEvs {
cchEv := make(map[string]any)
//var partiallyExecuted bool // will be added to the final answer if true
if blkrErr, errBlkr := engine.GetBoolOpts(ctx, cgrEv.Tenant, cgrEv.AsDataProvider(),
cchEv, sS.fltrS, sS.cfg.SessionSCfg().Opts.Authorize,
utils.OptsSesBlockerError, utils.MetaBlockerErrorCfg); errBlkr != nil {
return errBlkr
} else {
cchEv[utils.OptsSesBlockerError] = blkrErr
}
// IPs Enabled
if ipS, errIPs := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv,
sS.fltrS, sS.cfg.SessionSCfg().Opts.IPs,
utils.MetaIPs); errIPs != nil {
return errIPs
} else {
cchEv[utils.MetaIPs] = ipS
}
// AccountS Enabled
if acntS, errAcnts := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv,
sS.fltrS, sS.cfg.SessionSCfg().Opts.Accounts,
utils.MetaIPs); errAcnts != nil {
return errAcnts
} else {
cchEv[utils.MetaAccounts] = acntS
}
// Auth the events
if auth, errAuth := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(),
cchEv, sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAuthorize,
utils.MetaAuthorize); errAuth != nil {
return errAuth
} else {
cchEv[utils.MetaAuthorize] = auth
}
//IPsAuthorizeBool
if ipsAuthBool, errBool := engine.GetBoolOpts(ctx, apiArgs.Tenant, apiArgs.AsDataProvider(), cchEv,
sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAuthorize,
utils.MetaIPsAuthorizeCfg); errBool != nil {
return errBool
} else {
cchEv[utils.MetaIPsAuthorizeCfg] = ipsAuthBool
}
// IPAuthorization
if cchEv[utils.MetaIPsAuthorizeCfg].(bool) ||
(cchEv[utils.MetaAuthorize].(bool) && cchEv[utils.MetaIPs].(bool)) {
var authIP *utils.AllocatedIP
if authIP, err = sS.authorizeIPs(ctx, cgrEv); err != nil {
return
}
apiRply.IPsAllocation[runID] = authIP
}
}
return return
} }

View File

@@ -326,8 +326,9 @@ func getDerivedEvents(events map[string]*utils.CGREvent, derivedReply bool) map[
// V1ProcessEventReply is the reply for the ProcessEvent API // V1ProcessEventReply is the reply for the ProcessEvent API
type V1ProcessEventReply struct { type V1ProcessEventReply struct {
AccountSUsage map[string]time.Duration `json:",omitempty"` AccountSUsage map[string]time.Duration `json:",omitempty"`
RateSCost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part RateSCost map[string]float64 `json:",omitempty"`
ResourceAllocation map[string]string `json:",omitempty"` ResourceAllocation map[string]string `json:",omitempty"`
IPsAllocation map[string]*utils.AllocatedIP `json:",omitempty"`
Attributes map[string]*attributes.AttrSProcessEventReply `json:",omitempty"` Attributes map[string]*attributes.AttrSProcessEventReply `json:",omitempty"`
RouteProfiles map[string]routes.SortedRoutesList `json:",omitempty"` RouteProfiles map[string]routes.SortedRoutesList `json:",omitempty"`
ThresholdIDs map[string][]string `json:",omitempty"` ThresholdIDs map[string][]string `json:",omitempty"`

View File

@@ -953,6 +953,24 @@ func (sS *SessionS) processChargerS(ctx *context.Context, cgrEv *utils.CGREvent)
return return
} }
// authorizeIPs will authorize the event with IPs subsystem
func (sS *SessionS) authorizeIPs(ctx *context.Context, cgrEv *utils.CGREvent) (rply *utils.AllocatedIP, err error) {
if len(sS.cfg.SessionSCfg().IPsConns) == 0 {
err = errors.New("IPs is disabled")
return
}
var alcIP utils.AllocatedIP
if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().IPsConns,
utils.IPsV1AuthorizeIP,
cgrEv, &alcIP); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s could not authorize IP for event: %+v",
utils.SessionS, err.Error(), cgrEv))
}
return &alcIP, nil
}
// getSessions is used to return in a thread-safe manner active or passive sessions // getSessions is used to return in a thread-safe manner active or passive sessions
func (sS *SessionS) getSessions(originID string, pSessions bool) (ss []*Session) { func (sS *SessionS) getSessions(originID string, pSessions bool) (ss []*Session) {
ssMux := &sS.aSsMux // get the pointer so we don't copy, otherwise locks will not work ssMux := &sS.aSsMux // get the pointer so we don't copy, otherwise locks will not work

View File

@@ -838,7 +838,7 @@ const (
MetaRadDMRTemplate = "*radDMRTemplate" MetaRadDMRTemplate = "*radDMRTemplate"
MetaCost = "*cost" MetaCost = "*cost"
MetaRateSCost = "*rateSCost" MetaRateSCost = "*rateSCost"
MetaAccountSCost = "*accountSCost" MetaAccountsCost = "*accountsCost"
MetaGroup = "*group" MetaGroup = "*group"
InternalRPCSet = "InternalRPCSet" InternalRPCSet = "InternalRPCSet"
MetaFileName = "*fileName" MetaFileName = "*fileName"