From 932e0dba05fc46f12df0ded67dc32b0051db3293 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Fri, 27 Jun 2025 21:00:23 +0300 Subject: [PATCH] ips: match a single IPAllocations object --- apis/ips.go | 22 +++---- ips/apis.go | 54 ++++++++-------- ips/ips.go | 157 ++++++++++++++++++--------------------------- ips/ips_it_test.go | 6 +- utils/consts.go | 24 +++---- utils/ips.go | 4 ++ 6 files changed, 119 insertions(+), 148 deletions(-) diff --git a/apis/ips.go b/apis/ips.go index 4c56b16b6..1c5f6f2cd 100644 --- a/apis/ips.go +++ b/apis/ips.go @@ -187,27 +187,27 @@ type IPSv1 struct { ips *ips.IPService } -// V1GetIPAllocationsForEvent returns active IPs matching the event. -func (ipS *IPSv1) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils.CGREvent, reply *ips.IPAllocationsList) (err error) { - return ipS.V1GetIPAllocationsForEvent(ctx, args, reply) +// V1GetIPAllocationForEvent returns the IPAllocations object matching the event. +func (ipS *IPSv1) V1GetIPAllocationForEvent(ctx *context.Context, args *utils.CGREvent, reply *utils.IPAllocations) error { + return ipS.ips.V1GetIPAllocationForEvent(ctx, args, reply) } -// V1AuthorizeIP queries service to find if an Usage is allowed -func (ipS *IPSv1) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) (err error) { +// V1AuthorizeIP checks if it's able to allocate an IP address for the given event. +func (ipS *IPSv1) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) error { return ipS.ips.V1AuthorizeIP(ctx, args, reply) } -// V1AllocateIP is called when an IP requires allocation. -func (ipS *IPSv1) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) (err error) { +// V1AllocateIP allocates an IP address for the given event. +func (ipS *IPSv1) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) error { return ipS.ips.V1AllocateIP(ctx, args, reply) } -// V1ReleaseIP is called when we need to clear an allocation -func (ipS *IPSv1) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) { +// V1ReleaseIP releases an allocated IP address for the given event. +func (ipS *IPSv1) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, reply *string) error { return ipS.ips.V1ReleaseIP(ctx, args, reply) } -// V1GetIPAllocations returns all IP allocations for a tenant. -func (ipS *IPSv1) V1GetIPAllocations(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.IPAllocations) (err error) { +// V1GetIPAllocations returns all IP allocations for a tenantID. +func (ipS *IPSv1) V1GetIPAllocations(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.IPAllocations) error { return ipS.ips.V1GetIPAllocations(ctx, arg, reply) } diff --git a/ips/apis.go b/ips/apis.go index deb96f71c..383043a01 100644 --- a/ips/apis.go +++ b/ips/apis.go @@ -29,8 +29,8 @@ import ( "github.com/cgrates/guardian" ) -// V1GetIPAllocationsForEvent returns active IPs matching the event. -func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils.CGREvent, reply *IPAllocationsList) (err error) { +// V1GetIPAllocationForEvent returns the IPAllocations object matching the event. +func (s *IPService) V1GetIPAllocationForEvent(ctx *context.Context, args *utils.CGREvent, reply *utils.IPAllocations) (err error) { if args == nil { return utils.NewErrMandatoryIeMissing(utils.Event) } @@ -54,14 +54,14 @@ func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils // RPC caching if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { - cacheKey := utils.ConcatenatedKey(utils.IPsV1GetIPAllocationsForEvent, utils.ConcatenatedKey(tnt, args.ID)) + cacheKey := utils.ConcatenatedKey(utils.IPsV1GetIPAllocationForEvent, utils.ConcatenatedKey(tnt, args.ID)) refID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic defer guardian.Guardian.UnguardIDs(refID) if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { cachedResp := itm.(*utils.CachedRPCResponse) if cachedResp.Error == nil { - *reply = *cachedResp.Result.(*IPAllocationsList) + *reply = *cachedResp.Result.(*utils.IPAllocations) } return cachedResp.Error } @@ -71,12 +71,12 @@ func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils } // end of RPC caching - var allocsList IPAllocationsList - if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { + var allocs *utils.IPAllocations + if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { return err } - defer allocsList.unlock() - *reply = allocsList + defer allocs.Unlock() + *reply = *allocs return } @@ -122,14 +122,14 @@ func (s *IPService) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, re } // end of RPC caching - var allocsList IPAllocationsList - if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { + var allocs *utils.IPAllocations + if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { return err } - defer allocsList.unlock() + defer allocs.Unlock() var allocIP *utils.AllocatedIP - if allocIP, err = s.allocateFirstAvailable(allocsList, allocID, true); err != nil { + if allocIP, err = s.allocateFromPools(allocs, allocID, true); err != nil { if errors.Is(err, utils.ErrIPAlreadyAllocated) { return utils.ErrIPUnauthorized } @@ -182,22 +182,22 @@ func (s *IPService) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, rep } // end of RPC caching - var allocsList IPAllocationsList - if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { + var allocs *utils.IPAllocations + if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { return err } - defer allocsList.unlock() + defer allocs.Unlock() - var result *utils.AllocatedIP - if result, err = s.allocateFirstAvailable(allocsList, allocID, false); err != nil { + var allocIP *utils.AllocatedIP + if allocIP, err = s.allocateFromPools(allocs, allocID, false); err != nil { return err } // index it for storing - if err = s.storeMatchedIPAllocations(ctx, allocsList); err != nil { + if err = s.storeMatchedIPAllocations(ctx, allocs); err != nil { return } - *reply = *result + *reply = *allocIP return nil } @@ -243,21 +243,19 @@ func (s *IPService) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, repl } // end of RPC caching - var allocsList IPAllocationsList - if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { + var allocs *utils.IPAllocations + if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil { return } - defer allocsList.unlock() + defer allocs.Unlock() - for _, alloc := range allocsList { - if err = alloc.ReleaseAllocation(allocID); err != nil { - utils.Logger.Warning(fmt.Sprintf( - "<%s> failed to remove allocation from IPAllocations with ID %q: %v", utils.IPs, alloc.TenantID(), err)) - } + if err = allocs.ReleaseAllocation(allocID); err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> failed to remove allocation from IPAllocations with ID %q: %v", utils.IPs, allocs.TenantID(), err)) } // Handle storing - if err = s.storeMatchedIPAllocations(ctx, allocsList); err != nil { + if err = s.storeMatchedIPAllocations(ctx, allocs); err != nil { return } diff --git a/ips/ips.go b/ips/ips.go index 10e04710c..436141a68 100644 --- a/ips/ips.go +++ b/ips/ips.go @@ -34,28 +34,6 @@ import ( "github.com/cgrates/guardian" ) -// IPAllocationsList is a collection of ipAllocations objects. -type IPAllocationsList []*utils.IPAllocations - -// unlock will unlock IP allocations in this slice -func (al IPAllocationsList) unlock() { - for _, allocs := range al { - allocs.Unlock() - if prfl := allocs.Config(); prfl != nil { - prfl.Unlock() - } - } -} - -// ids returns a map of IP allocation IDs which is used for caching -func (al IPAllocationsList) ids() utils.StringSet { - ids := make(utils.StringSet) - for _, allocs := range al { - ids.Add(allocs.ID) - } - return ids -} - // IPService is the service handling IP allocations type IPService struct { dm *engine.DataManager // So we can load the data in cache and index it @@ -175,30 +153,26 @@ func (s *IPService) storeIPAllocations(ctx *context.Context, allocs *utils.IPAll } // storeMatchedIPAllocations will store the list of IP allocations based on the StoreInterval -func (s *IPService) storeMatchedIPAllocations(ctx *context.Context, matched IPAllocationsList) error { +func (s *IPService) storeMatchedIPAllocations(ctx *context.Context, matched *utils.IPAllocations) error { if s.cfg.IPsCfg().StoreInterval == 0 { return nil } if s.cfg.IPsCfg().StoreInterval > 0 { s.storedIPsMux.Lock() - defer s.storedIPsMux.Unlock() + s.storedIPs.Add(matched.TenantID()) + s.storedIPsMux.Unlock() + return nil } - for _, allocs := range matched { - if s.cfg.IPsCfg().StoreInterval > 0 { - s.storedIPs.Add(allocs.TenantID()) - continue - } - if err := s.storeIPAllocations(ctx, allocs); err != nil { - return err - } + if err := s.storeIPAllocations(ctx, matched); err != nil { + return err } return nil } -// matchingIPAllocationsForEvent returns ordered list of matching IP -// allocations which are active by the time of the API call. +// matchingIPAllocationsForEvent returns the IP allocation with the highest weight +// matching the event. func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt string, - ev *utils.CGREvent, evUUID string) (al IPAllocationsList, err error) { + ev *utils.CGREvent, evUUID string) (allocs *utils.IPAllocations, err error) { var itemIDs utils.StringSet evNm := utils.MapStorage{ utils.MetaReq: ev.Event, @@ -242,8 +216,8 @@ func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt stri return nil, err } } - al = make(IPAllocationsList, 0, len(itemIDs)) - weights := make(map[string]float64) // stores sorting weights by IP allocation ID + var matchedPrfl *utils.IPProfile + var maxWeight float64 for id := range itemIDs { lkPrflID := guardian.Guardian.GuardIDs("", config.CgrConfig().GeneralCfg().LockingTimeout, @@ -254,88 +228,83 @@ func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt stri if err == utils.ErrNotFound { continue } - al.unlock() return nil, err } prfl.Lock(lkPrflID) var pass bool if pass, err = s.fltrs.Pass(ctx, tnt, prfl.FilterIDs, evNm); err != nil { prfl.Unlock() - al.unlock() return nil, err } else if !pass { prfl.Unlock() continue } - lkID := guardian.Guardian.GuardIDs(utils.EmptyString, - config.CgrConfig().GeneralCfg().LockingTimeout, - utils.IPAllocationsLockKey(prfl.Tenant, prfl.ID)) - var allocs *utils.IPAllocations - if allocs, err = s.dm.GetIPAllocations(ctx, prfl.Tenant, prfl.ID, true, true, ""); err != nil { - guardian.Guardian.UnguardIDs(lkID) - prfl.Unlock() - al.unlock() - return nil, err - } - allocs.Lock(lkID) - - // Clone profile to avoid modifying cached version during pool sorting. - profileCopy := prfl.Clone() - if err = sortPools(ctx, profileCopy, s.fltrs, evNm); err != nil { - allocs.Unlock() - prfl.Unlock() - al.unlock() - return nil, err - } - - if err = allocs.ComputeUnexported(profileCopy); err != nil { - allocs.Unlock() - prfl.Unlock() - al.unlock() - return nil, err - } var weight float64 if weight, err = engine.WeightFromDynamics(ctx, prfl.Weights, s.fltrs, tnt, evNm); err != nil { - allocs.Unlock() prfl.Unlock() - al.unlock() return nil, err } - weights[allocs.ID] = weight - al = append(al, allocs) + if matchedPrfl == nil || maxWeight < weight { + if matchedPrfl != nil { + matchedPrfl.Unlock() + } + matchedPrfl = prfl + maxWeight = weight + } else { + prfl.Unlock() + } } - - if len(al) == 0 { + if matchedPrfl == nil { return nil, utils.ErrNotFound } - - // Sort by weight (higher values first). - slices.SortFunc(al, func(a, b *utils.IPAllocations) int { - return cmp.Compare(weights[b.ID], weights[a.ID]) - }) - - if err = engine.Cache.Set(ctx, utils.CacheEventIPs, evUUID, al.ids(), nil, - true, ""); err != nil { - al.unlock() + lkID := guardian.Guardian.GuardIDs(utils.EmptyString, + config.CgrConfig().GeneralCfg().LockingTimeout, + utils.IPAllocationsLockKey(matchedPrfl.Tenant, matchedPrfl.ID)) + allocs, err = s.dm.GetIPAllocations(ctx, matchedPrfl.Tenant, matchedPrfl.ID, true, true, "") + if err != nil { + guardian.Guardian.UnguardIDs(lkID) + matchedPrfl.Unlock() + return nil, err } - return al, nil + allocs.Lock(lkID) + + // Clone profile to avoid modifying cached version during pool sorting. + prfl := matchedPrfl.Clone() + if err = sortPools(ctx, prfl, s.fltrs, evNm); err != nil { + allocs.Unlock() + return nil, err + } + if err = allocs.ComputeUnexported(prfl); err != nil { + allocs.Unlock() + return nil, err + } + + if err = engine.Cache.Set(ctx, utils.CacheEventIPs, evUUID, + + // TODO: check if we still should rely on caching previously matched + // allocations for an allocationID. Currently setting a StringSet to + // maintain previous functionality, but this doesn't seem right. + utils.StringSet{allocs.ID: struct{}{}}, + + nil, true, ""); err != nil { + allocs.Unlock() + } + return allocs, nil } -// allocateFirstAvailable attempts IP allocation across pools in priority order. +// allocateFromPools attempts IP allocation across all pools in priority order. // Continues to next pool only if current pool returns ErrIPAlreadyAllocated. -// Returns first successful allocation or the last "already allocated" error. -func (s *IPService) allocateFirstAvailable(allocs IPAllocationsList, allocID string, +// Returns first successful allocation or the last allocation error. +func (s *IPService) allocateFromPools(allocs *utils.IPAllocations, allocID string, dryRun bool) (*utils.AllocatedIP, error) { var err error - for _, alloc := range allocs { - for _, pool := range alloc.Config().Pools { - var result *utils.AllocatedIP - if result, err = alloc.AllocateIPOnPool(allocID, pool, dryRun); err == nil { - return result, nil - } - if !errors.Is(err, utils.ErrIPAlreadyAllocated) { - return nil, err - } + for _, pool := range allocs.Config().Pools { + var result *utils.AllocatedIP + if result, err = allocs.AllocateIPOnPool(allocID, pool, dryRun); err == nil { + return result, nil + } + if !errors.Is(err, utils.ErrIPAlreadyAllocated) { + return nil, err } } return nil, err diff --git a/ips/ips_it_test.go b/ips/ips_it_test.go index ce3462c18..aed735cb7 100644 --- a/ips/ips_it_test.go +++ b/ips/ips_it_test.go @@ -227,8 +227,8 @@ cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Des } allocID := "api_allocation" - var evIPs IPAllocationsList - if err := client.Call(context.Background(), utils.IPsV1GetIPAllocationsForEvent, + var evIP utils.IPAllocations + if err := client.Call(context.Background(), utils.IPsV1GetIPAllocationForEvent, &utils.CGREvent{ Tenant: "cgrates.org", ID: "GetIPsForEvent1", @@ -238,7 +238,7 @@ cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Des APIOpts: map[string]any{ utils.OptsIPsAllocationID: allocID, }, - }, &evIPs); err != nil { + }, &evIP); err != nil { t.Error(err) } diff --git a/utils/consts.go b/utils/consts.go index 54db7592e..d1d95ee4a 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1639,18 +1639,18 @@ const ( // IPs APIs const ( - IPsV1Ping = "IPsV1.Ping" - IPsV1GetIPAllocations = "IPsV1.GetIPAllocations" - IPsV1GetIPAllocationsForEvent = "IPsV1.GetIPAllocationsForEvent" - IPsV1AuthorizeIP = "IPsV1.AuthorizeIP" - IPsV1AllocateIP = "IPsV1.AllocateIP" - IPsV1ReleaseIP = "IPsV1.ReleaseIP" - AdminSv1SetIPProfile = "AdminSv1.SetIPProfile" - AdminSv1GetIPProfiles = "AdminSv1.GetIPProfiles" - AdminSv1RemoveIPProfile = "AdminSv1.RemoveIPProfile" - AdminSv1GetIPProfile = "AdminSv1.GetIPProfile" - AdminSv1GetIPProfileIDs = "AdminSv1.GetIPProfileIDs" - AdminSv1GetIPProfilesCount = "AdminSv1.GetIPProfilesCount" + IPsV1Ping = "IPsV1.Ping" + IPsV1GetIPAllocations = "IPsV1.GetIPAllocations" + IPsV1GetIPAllocationForEvent = "IPsV1.GetIPAllocationForEvent" + IPsV1AuthorizeIP = "IPsV1.AuthorizeIP" + IPsV1AllocateIP = "IPsV1.AllocateIP" + IPsV1ReleaseIP = "IPsV1.ReleaseIP" + AdminSv1SetIPProfile = "AdminSv1.SetIPProfile" + AdminSv1GetIPProfiles = "AdminSv1.GetIPProfiles" + AdminSv1RemoveIPProfile = "AdminSv1.RemoveIPProfile" + AdminSv1GetIPProfile = "AdminSv1.GetIPProfile" + AdminSv1GetIPProfileIDs = "AdminSv1.GetIPProfileIDs" + AdminSv1GetIPProfilesCount = "AdminSv1.GetIPProfilesCount" ) // SessionS APIs diff --git a/utils/ips.go b/utils/ips.go index 781e0d277..86b9d2521 100644 --- a/utils/ips.go +++ b/utils/ips.go @@ -550,6 +550,10 @@ func (a *IPAllocations) Unlock() { id := a.lockID a.lockID = "" guardian.Guardian.UnguardIDs(id) + + if a.prfl != nil { + a.prfl.Unlock() + } } // Config returns the IPAllocations' profile configuration.