From 4a1f3144ba7f256b25f06d24a73e8a70088277eb Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Wed, 9 Jul 2025 00:53:54 +0300 Subject: [PATCH] sessions: partially reimplement TerminateSession to be able to release IP allocs --- sessions/sessions.go | 320 +++++++++++++++++++++++-------------------- 1 file changed, 170 insertions(+), 150 deletions(-) diff --git a/sessions/sessions.go b/sessions/sessions.go index 12dac6b06..01eb374bb 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -32,6 +32,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/routes" + "github.com/cgrates/guardian" "github.com/cgrates/cgrates/utils" ) @@ -1669,162 +1670,181 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context, // BiRPCv1TerminateSession will stop debit loops as well as release any used resources func (sS *SessionS) BiRPCv1TerminateSession(ctx *context.Context, args *utils.CGREvent, rply *string) (err error) { - /* - if args == nil { - return utils.NewErrMandatoryIeMissing(utils.CGREventString) - } - if args.Event == nil { - return utils.NewErrMandatoryIeMissing(utils.Event) - } - var withErrors bool - if args.ID == "" { - args.ID = utils.GenUUID() - } - if args.Tenant == "" { - args.Tenant = sS.cfg.GeneralCfg().DefaultTenant - } - // RPC caching - if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.ID) - refID := guardian.Guardian.GuardIDs("", - sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic - defer guardian.Guardian.UnguardIDs(refID) + if args == nil { + return utils.NewErrMandatoryIeMissing(utils.CGREventString) + } + if args.Event == nil { + return utils.NewErrMandatoryIeMissing(utils.Event) + } + if args.ID == "" { + args.ID = utils.GenUUID() + } + if args.Tenant == "" { + args.Tenant = sS.cfg.GeneralCfg().DefaultTenant + } + // RPC caching + if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv1TerminateSession, args.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) - if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { - cachedResp := itm.(*utils.CachedRPCResponse) - if cachedResp.Error == nil { - *rply = *cachedResp.Result.(*string) - } - return cachedResp.Error + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *rply = *cachedResp.Result.(*string) } - defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, - &utils.CachedRPCResponse{Result: rply, Error: err}, - nil, true, utils.NonTransactional) - } - // end of RPC caching - dP := args.AsDataProvider() - var resourcesRelease bool - if resourcesRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesRelease, - config.SessionsResourcesReleaseDftOpt, utils.OptsSesResourceSRelease); err != nil { - return - } - var termS bool - if termS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Terminate, - config.SessionsTerminateDftOpt, utils.OptsSesTerminate); err != nil { - return - } - if !(resourcesRelease || termS) { - return // nothing to do + return cachedResp.Error } + defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: rply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching - ev := engine.MapEvent(args.Event) - opts := engine.MapEvent(args.APIOpts) - optOriginID := GetSetOptsOriginID(ev, opts) - originID := ev.GetStringIgnoreErrors(utils.OriginID) - if termS { - if originID == "" { - return utils.NewErrMandatoryIeMissing(utils.OriginID) - } - var dbtItvl time.Duration - if dbtItvl, err = engine.GetDurationOpts(ctx, args.Tenant, args, sS.fltrS, sS.cfg.SessionSCfg().Opts.DebitInterval, - config.SessionsDebitIntervalDftOpt, utils.OptsSesDebitInterval); err != nil { - return err - } - var s *Session - fib := utils.FibDuration(time.Millisecond, 0) - var isInstantEvent bool // one time charging, do not perform indexing and sTerminator - var forceDuration bool - if forceDuration, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ForceDuration, - config.SessionsForceDurationDftOpt, utils.OptsSesForceDuration); err != nil { - return - } - for i := 0; i < sS.cfg.SessionSCfg().TerminateAttempts; i++ { - if s = sS.getRelocateSession(ctx, optOriginID, - ev.GetStringIgnoreErrors(utils.InitialOriginID), - ev.GetStringIgnoreErrors(utils.OriginID), - ev.GetStringIgnoreErrors(utils.OriginHost)); s != nil { - break - } - if i+1 < sS.cfg.SessionSCfg().TerminateAttempts { // not last iteration - time.Sleep(fib()) - continue - } - isInstantEvent = true - if s, err = sS.initSession(ctx, args, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID), - dbtItvl, isInstantEvent, forceDuration); err != nil { - return err //utils.NewErrRALs(err) - } - if _, err = sS.updateSession(ctx, s, ev, opts, isInstantEvent); err != nil { - return err - } - break - } - if !isInstantEvent { - s.UpdateSRuns(ev, sS.cfg.SessionSCfg().AlterableFields) - } - s.Lock() - if s.Chargeable, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargeable, - config.SessionsChargeableDftOpt, utils.OptsSesChargeable); err != nil { - return - } - s.Unlock() - if err = sS.terminateSession(ctx, s, - ev.GetDurationPtrIgnoreErrors(utils.Usage), - ev.GetDurationPtrIgnoreErrors(utils.LastUsed), - ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString), - isInstantEvent); err != nil { - return err //utils.NewErrRALs(err) - } + dP := args.AsDataProvider() + var ipsRelease bool + if ipsRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, nil, sS.fltrS, sS.cfg.SessionSCfg().Opts.IPsAllocate, + utils.MetaIPs); err != nil { + return + } + // var resourcesRelease bool + // if resourcesRelease, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ResourcesRelease, + // config.SessionsResourcesReleaseDftOpt, utils.OptsSesResourceSRelease); err != nil { + // return + // } + // var termS bool + // if termS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Terminate, + // config.SessionsTerminateDftOpt, utils.OptsSesTerminate); err != nil { + // return + // } + if !ipsRelease { // && !resourcesRelease && !termS + return // nothing to do + } + + // opts := engine.MapEvent(args.APIOpts) + // optOriginID := GetSetOptsOriginID(ev, opts) + originID, _ := args.OptAsString(utils.MetaOriginID) + if originID == "" { + return utils.NewErrMandatoryIeMissing(utils.OriginID) + } + // if termS { + // if originID == "" { + // return utils.NewErrMandatoryIeMissing(utils.OriginID) + // } + // var dbtItvl time.Duration + // if dbtItvl, err = engine.GetDurationOpts(ctx, args.Tenant, args, sS.fltrS, sS.cfg.SessionSCfg().Opts.DebitInterval, + // config.SessionsDebitIntervalDftOpt, utils.OptsSesDebitInterval); err != nil { + // return err + // } + // var s *Session + // fib := utils.FibDuration(time.Millisecond, 0) + // var isInstantEvent bool // one time charging, do not perform indexing and sTerminator + // var forceDuration bool + // if forceDuration, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.ForceDuration, + // config.SessionsForceDurationDftOpt, utils.OptsSesForceDuration); err != nil { + // return + // } + // for i := 0; i < sS.cfg.SessionSCfg().TerminateAttempts; i++ { + // if s = sS.getRelocateSession(ctx, optOriginID, + // ev.GetStringIgnoreErrors(utils.InitialOriginID), + // ev.GetStringIgnoreErrors(utils.OriginID), + // ev.GetStringIgnoreErrors(utils.OriginHost)); s != nil { + // break + // } + // if i+1 < sS.cfg.SessionSCfg().TerminateAttempts { // not last iteration + // time.Sleep(fib()) + // continue + // } + // isInstantEvent = true + // if s, err = sS.initSession(ctx, args, sS.biJClntID(ctx.Client), ev.GetStringIgnoreErrors(utils.OriginID), + // dbtItvl, isInstantEvent, forceDuration); err != nil { + // return err //utils.NewErrRALs(err) + // } + // if _, err = sS.updateSession(ctx, s, ev, opts, isInstantEvent); err != nil { + // return err + // } + // break + // } + // if !isInstantEvent { + // s.UpdateSRuns(ev, sS.cfg.SessionSCfg().AlterableFields) + // } + // s.Lock() + // if s.Chargeable, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Chargeable, + // config.SessionsChargeableDftOpt, utils.OptsSesChargeable); err != nil { + // return + // } + // s.Unlock() + // if err = sS.terminateSession(ctx, s, + // ev.GetDurationPtrIgnoreErrors(utils.Usage), + // ev.GetDurationPtrIgnoreErrors(utils.LastUsed), + // ev.GetTimePtrIgnoreErrors(utils.AnswerTime, utils.EmptyString), + // isInstantEvent); err != nil { + // return err //utils.NewErrRALs(err) + // } + // } + // if resourcesRelease { + // if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 { + // return utils.NewErrNotConnected(utils.ResourceS) + // } + // if originID == "" { + // return utils.NewErrMandatoryIeMissing(utils.OriginID) + // } + // args.APIOpts[utils.OptsResourcesUsageID] = originID + // args.APIOpts[utils.OptsResourcesUnits] = 1 + // var reply string + // if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1ReleaseResources, + // args, &reply); err != nil { + // return utils.NewErrResourceS(err) + // } + // } + + if ipsRelease { + if len(sS.cfg.SessionSCfg().IPsConns) == 0 { + return utils.NewErrNotConnected(utils.IPs) } - if resourcesRelease { - if len(sS.cfg.SessionSCfg().ResourceSConns) == 0 { - return utils.NewErrNotConnected(utils.ResourceS) - } - if originID == "" { - return utils.NewErrMandatoryIeMissing(utils.OriginID) - } - args.APIOpts[utils.OptsResourcesUsageID] = originID - args.APIOpts[utils.OptsResourcesUnits] = 1 - var reply string - if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().ResourceSConns, utils.ResourceSv1ReleaseResources, - args, &reply); err != nil { - return utils.NewErrResourceS(err) - } + args.APIOpts[utils.OptsIPsAllocationID] = originID + var reply string + if err = sS.connMgr.Call(ctx, sS.cfg.SessionSCfg().IPsConns, utils.IPsV1ReleaseIP, + args, &reply); err != nil { + return utils.NewErrIPs(err) } - var thdS bool - if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds, - config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil { - return - } - if thdS { - _, err := sS.processThreshold(ctx, args, true) - if err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", - utils.SessionS, err.Error(), args)) - withErrors = true - } - } - var stS bool - if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats, - config.SessionsStatsDftOpt, utils.MetaStats); err != nil { - return - } - if stS { - _, err := sS.processStats(ctx, args, false) - if err != nil && - err.Error() != utils.ErrNotFound.Error() { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", - utils.SessionS, err.Error(), args)) - withErrors = true - } - } - if withErrors { - err = utils.ErrPartiallyExecuted - } - */ + } + + // var withErrors bool + // var thdS bool + // if thdS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Thresholds, + // config.SessionsThresholdsDftOpt, utils.MetaThresholds); err != nil { + // return + // } + // if thdS { + // _, err := sS.processThreshold(ctx, args, true) + // if err != nil && + // err.Error() != utils.ErrNotFound.Error() { + // utils.Logger.Warning( + // fmt.Sprintf("<%s> error: %s processing event %+v with ThresholdS.", + // utils.SessionS, err.Error(), args)) + // withErrors = true + // } + // } + // var stS bool + // if stS, err = engine.GetBoolOpts(ctx, args.Tenant, dP, sS.fltrS, sS.cfg.SessionSCfg().Opts.Stats, + // config.SessionsStatsDftOpt, utils.MetaStats); err != nil { + // return + // } + // if stS { + // _, err := sS.processStats(ctx, args, false) + // if err != nil && + // err.Error() != utils.ErrNotFound.Error() { + // utils.Logger.Warning( + // fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", + // utils.SessionS, err.Error(), args)) + // withErrors = true + // } + // } + // if withErrors { + // err = utils.ErrPartiallyExecuted + // } *rply = utils.OK return }