sessions: partially reimplement TerminateSession

to be able to release IP allocs
This commit is contained in:
ionutboangiu
2025-07-09 00:53:54 +03:00
committed by Dan Christian Bogos
parent 063fcbc138
commit 4a1f3144ba

View File

@@ -32,6 +32,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/routes"
"github.com/cgrates/guardian"
"github.com/cgrates/cgrates/utils"
)
@@ -1669,162 +1670,181 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context,
// BiRPCv1TerminateSession will stop debit loops as well as release any used resources
func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context,
args *utils.CGREvent, rply *string) (err error) {
/*
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
if args.Event == nil {
return utils.NewErrMandatoryIeMissing(utils.Event)
}
var withErrors bool
if args.ID == "" {
args.ID = utils.GenUUID()
}
if args.Tenant == "" {
args.Tenant = sS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
if args.Event == nil {
return utils.NewErrMandatoryIeMissing(utils.Event)
}
if args.ID == "" {
args.ID = utils.GenUUID()
}
if args.Tenant == "" {
args.Tenant = sS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*rply = *cachedResp.Result.(*string)
}
return cachedResp.Error
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*rply = *cachedResp.Result.(*string)
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: rply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
dP := args.AsDataProvider()
var resourcesRelease bool
if resourcesRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesRelease,
config.SessionsResourcesReleaseDftOpt, utils.OptsSesResourceSRelease); err != nil {
return
}
var termS bool
if termS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Terminate,
config.SessionsTerminateDftOpt, utils.OptsSesTerminate); err != nil {
return
}
if !(resourcesRelease || termS) {
return // nothing to do
return cachedResp.Error
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: rply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
ev := engine.MapEvent(args.Event)
opts := engine.MapEvent(args.APIOpts)
optOriginID := GetSetOptsOriginID(ev, opts)
originID := ev.GetStringIgnoreErrors(utils.OriginID)
if termS {
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
var dbtItvl time.Duration
if dbtItvl, err = engine.GetDurationOpts(ctx, args.Tenant, args, sS.fltrS, sS.cfg.SessionSCfg().Opts.DebitInterval,
config.SessionsDebitIntervalDftOpt, utils.OptsSesDebitInterval); err != nil {
return err
}
var s *Session
fib := utils.FibDuration(time.Millisecond, 0)
var isInstantEvent bool // one time charging, do not perform indexing and sTerminator
var forceDuration bool
if forceDuration, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ForceDuration,
config.SessionsForceDurationDftOpt, utils.OptsSesForceDuration); err != nil {
return
}
for i := 0; i < sS.cfg.SessionSCfg().TerminateAttempts; i++ {
if s = sS.getRelocateSession(ctx, optOriginID,
ev.GetStringIgnoreErrors(utils.InitialOriginID),
ev.GetStringIgnoreErrors(utils.OriginID),
ev.GetStringIgnoreErrors(utils.OriginHost)); s != nil {
break
}
if i+1 < sS.cfg.SessionSCfg().TerminateAttempts { // not last iteration
time.Sleep(fib())
continue
}
isInstantEvent = true
if s, err = sS.initSession(ctx, args, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID),
dbtItvl, isInstantEvent, forceDuration); err != nil {
return err //utils.NewErrRALs(err)
}
if _, err = sS.updateSession(ctx, s, ev, opts, isInstantEvent); err != nil {
return err
}
break
}
if !isInstantEvent {
s.UpdateSRuns(ev, sS.cfg.SessionSCfg().AlterableFields)
}
s.Lock()
if s.Chargeable, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargeable,
config.SessionsChargeableDftOpt, utils.OptsSesChargeable); err != nil {
return
}
s.Unlock()
if err = sS.terminateSession(ctx, s,
ev.GetDurationPtrIgnoreErrors(utils.Usage),
ev.GetDurationPtrIgnoreErrors(utils.LastUsed),
ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString),
isInstantEvent); err != nil {
return err //utils.NewErrRALs(err)
}
dP := args.AsDataProvider()
var ipsRelease bool
if ipsRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, nil, sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAllocate,
utils.MetaIPs); err != nil {
return
}
// var resourcesRelease bool
// if resourcesRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesRelease,
// config.SessionsResourcesReleaseDftOpt, utils.OptsSesResourceSRelease); err != nil {
// return
// }
// var termS bool
// if termS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Terminate,
// config.SessionsTerminateDftOpt, utils.OptsSesTerminate); err != nil {
// return
// }
if !ipsRelease { // && !resourcesRelease && !termS
return // nothing to do
}
// opts := engine.MapEvent(args.APIOpts)
// optOriginID := GetSetOptsOriginID(ev, opts)
originID, _ := args.OptAsString(utils.MetaOriginID)
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
// if termS {
// if originID == "" {
// return utils.NewErrMandatoryIeMissing(utils.OriginID)
// }
// var dbtItvl time.Duration
// if dbtItvl, err = engine.GetDurationOpts(ctx, args.Tenant, args, sS.fltrS, sS.cfg.SessionSCfg().Opts.DebitInterval,
// config.SessionsDebitIntervalDftOpt, utils.OptsSesDebitInterval); err != nil {
// return err
// }
// var s *Session
// fib := utils.FibDuration(time.Millisecond, 0)
// var isInstantEvent bool // one time charging, do not perform indexing and sTerminator
// var forceDuration bool
// if forceDuration, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ForceDuration,
// config.SessionsForceDurationDftOpt, utils.OptsSesForceDuration); err != nil {
// return
// }
// for i := 0; i < sS.cfg.SessionSCfg().TerminateAttempts; i++ {
// if s = sS.getRelocateSession(ctx, optOriginID,
// ev.GetStringIgnoreErrors(utils.InitialOriginID),
// ev.GetStringIgnoreErrors(utils.OriginID),
// ev.GetStringIgnoreErrors(utils.OriginHost)); s != nil {
// break
// }
// if i+1 < sS.cfg.SessionSCfg().TerminateAttempts { // not last iteration
// time.Sleep(fib())
// continue
// }
// isInstantEvent = true
// if s, err = sS.initSession(ctx, args, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID),
// dbtItvl, isInstantEvent, forceDuration); err != nil {
// return err //utils.NewErrRALs(err)
// }
// if _, err = sS.updateSession(ctx, s, ev, opts, isInstantEvent); err != nil {
// return err
// }
// break
// }
// if !isInstantEvent {
// s.UpdateSRuns(ev, sS.cfg.SessionSCfg().AlterableFields)
// }
// s.Lock()
// if s.Chargeable, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargeable,
// config.SessionsChargeableDftOpt, utils.OptsSesChargeable); err != nil {
// return
// }
// s.Unlock()
// if err = sS.terminateSession(ctx, s,
// ev.GetDurationPtrIgnoreErrors(utils.Usage),
// ev.GetDurationPtrIgnoreErrors(utils.LastUsed),
// ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString),
// isInstantEvent); err != nil {
// return err //utils.NewErrRALs(err)
// }
// }
// if resourcesRelease {
// if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 {
// return utils.NewErrNotConnected(utils.ResourceS)
// }
// if originID == "" {
// return utils.NewErrMandatoryIeMissing(utils.OriginID)
// }
// args.APIOpts[utils.OptsResourcesUsageID] = originID
// args.APIOpts[utils.OptsResourcesUnits] = 1
// var reply string
// if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1ReleaseResources,
// args, &reply); err != nil {
// return utils.NewErrResourceS(err)
// }
// }
if ipsRelease {
if len(sS.cfg.SessionSCfg().IPsConns) == 0 {
return utils.NewErrNotConnected(utils.IPs)
}
if resourcesRelease {
if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 {
return utils.NewErrNotConnected(utils.ResourceS)
}
if originID == "" {
return utils.NewErrMandatoryIeMissing(utils.OriginID)
}
args.APIOpts[utils.OptsResourcesUsageID] = originID
args.APIOpts[utils.OptsResourcesUnits] = 1
var reply string
if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1ReleaseResources,
args, &reply); err != nil {
return utils.NewErrResourceS(err)
}
args.APIOpts[utils.OptsIPsAllocationID] = originID
var reply string
if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().IPsConns, utils.IPsV1ReleaseIP,
args, &reply); err != nil {
return utils.NewErrIPs(err)
}
var thdS bool
if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds,
config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil {
return
}
if thdS {
_, err := sS.processThreshold(ctx, args, true)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
utils.SessionS, err.Error(), args))
withErrors = true
}
}
var stS bool
if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats,
config.SessionsStatsDftOpt, utils.MetaStats); err != nil {
return
}
if stS {
_, err := sS.processStats(ctx, args, false)
if err != nil &&
err.Error() != utils.ErrNotFound.Error() {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
utils.SessionS, err.Error(), args))
withErrors = true
}
}
if withErrors {
err = utils.ErrPartiallyExecuted
}
*/
}
// var withErrors bool
// var thdS bool
// if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds,
// config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil {
// return
// }
// if thdS {
// _, err := sS.processThreshold(ctx, args, true)
// if err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// utils.Logger.Warning(
// fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.",
// utils.SessionS, err.Error(), args))
// withErrors = true
// }
// }
// var stS bool
// if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats,
// config.SessionsStatsDftOpt, utils.MetaStats); err != nil {
// return
// }
// if stS {
// _, err := sS.processStats(ctx, args, false)
// if err != nil &&
// err.Error() != utils.ErrNotFound.Error() {
// utils.Logger.Warning(
// fmt.Sprintf("<%s> error: %s processing event %+v with StatS.",
// utils.SessionS, err.Error(), args))
// withErrors = true
// }
// }
// if withErrors {
// err = utils.ErrPartiallyExecuted
// }
*rply = utils.OK
return
}