ips: match a single IPAllocations object

This commit is contained in:
ionutboangiu
2025-06-27 21:00:23 +03:00
committed by Dan Christian Bogos
parent 17b70d907b
commit 932e0dba05
6 changed files with 119 additions and 148 deletions

View File

@@ -187,27 +187,27 @@ type IPSv1 struct {
ips *ips.IPService
}
// V1GetIPAllocationsForEvent returns active IPs matching the event.
func (ipS *IPSv1) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils.CGREvent, reply *ips.IPAllocationsList) (err error) {
return ipS.V1GetIPAllocationsForEvent(ctx, args, reply)
// V1GetIPAllocationForEvent returns the IPAllocations object matching the event.
func (ipS *IPSv1) V1GetIPAllocationForEvent(ctx *context.Context, args *utils.CGREvent, reply *utils.IPAllocations) error {
return ipS.ips.V1GetIPAllocationForEvent(ctx, args, reply)
}
// V1AuthorizeIP queries service to find if an Usage is allowed
func (ipS *IPSv1) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) (err error) {
// V1AuthorizeIP checks if it's able to allocate an IP address for the given event.
func (ipS *IPSv1) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) error {
return ipS.ips.V1AuthorizeIP(ctx, args, reply)
}
// V1AllocateIP is called when an IP requires allocation.
func (ipS *IPSv1) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) (err error) {
// V1AllocateIP allocates an IP address for the given event.
func (ipS *IPSv1) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, reply *utils.AllocatedIP) error {
return ipS.ips.V1AllocateIP(ctx, args, reply)
}
// V1ReleaseIP is called when we need to clear an allocation
func (ipS *IPSv1) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, reply *string) (err error) {
// V1ReleaseIP releases an allocated IP address for the given event.
func (ipS *IPSv1) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, reply *string) error {
return ipS.ips.V1ReleaseIP(ctx, args, reply)
}
// V1GetIPAllocations returns all IP allocations for a tenant.
func (ipS *IPSv1) V1GetIPAllocations(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.IPAllocations) (err error) {
// V1GetIPAllocations returns all IP allocations for a tenantID.
func (ipS *IPSv1) V1GetIPAllocations(ctx *context.Context, arg *utils.TenantIDWithAPIOpts, reply *utils.IPAllocations) error {
return ipS.ips.V1GetIPAllocations(ctx, arg, reply)
}

View File

@@ -29,8 +29,8 @@ import (
"github.com/cgrates/guardian"
)
// V1GetIPAllocationsForEvent returns active IPs matching the event.
func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils.CGREvent, reply *IPAllocationsList) (err error) {
// V1GetIPAllocationForEvent returns the IPAllocations object matching the event.
func (s *IPService) V1GetIPAllocationForEvent(ctx *context.Context, args *utils.CGREvent, reply *utils.IPAllocations) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.Event)
}
@@ -54,14 +54,14 @@ func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils
// RPC caching
if config.CgrConfig().CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.IPsV1GetIPAllocationsForEvent, utils.ConcatenatedKey(tnt, args.ID))
cacheKey := utils.ConcatenatedKey(utils.IPsV1GetIPAllocationForEvent, utils.ConcatenatedKey(tnt, args.ID))
refID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*reply = *cachedResp.Result.(*IPAllocationsList)
*reply = *cachedResp.Result.(*utils.IPAllocations)
}
return cachedResp.Error
}
@@ -71,12 +71,12 @@ func (s *IPService) V1GetIPAllocationsForEvent(ctx *context.Context, args *utils
}
// end of RPC caching
var allocsList IPAllocationsList
if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
var allocs *utils.IPAllocations
if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
return err
}
defer allocsList.unlock()
*reply = allocsList
defer allocs.Unlock()
*reply = *allocs
return
}
@@ -122,14 +122,14 @@ func (s *IPService) V1AuthorizeIP(ctx *context.Context, args *utils.CGREvent, re
}
// end of RPC caching
var allocsList IPAllocationsList
if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
var allocs *utils.IPAllocations
if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
return err
}
defer allocsList.unlock()
defer allocs.Unlock()
var allocIP *utils.AllocatedIP
if allocIP, err = s.allocateFirstAvailable(allocsList, allocID, true); err != nil {
if allocIP, err = s.allocateFromPools(allocs, allocID, true); err != nil {
if errors.Is(err, utils.ErrIPAlreadyAllocated) {
return utils.ErrIPUnauthorized
}
@@ -182,22 +182,22 @@ func (s *IPService) V1AllocateIP(ctx *context.Context, args *utils.CGREvent, rep
}
// end of RPC caching
var allocsList IPAllocationsList
if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
var allocs *utils.IPAllocations
if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
return err
}
defer allocsList.unlock()
defer allocs.Unlock()
var result *utils.AllocatedIP
if result, err = s.allocateFirstAvailable(allocsList, allocID, false); err != nil {
var allocIP *utils.AllocatedIP
if allocIP, err = s.allocateFromPools(allocs, allocID, false); err != nil {
return err
}
// index it for storing
if err = s.storeMatchedIPAllocations(ctx, allocsList); err != nil {
if err = s.storeMatchedIPAllocations(ctx, allocs); err != nil {
return
}
*reply = *result
*reply = *allocIP
return nil
}
@@ -243,21 +243,19 @@ func (s *IPService) V1ReleaseIP(ctx *context.Context, args *utils.CGREvent, repl
}
// end of RPC caching
var allocsList IPAllocationsList
if allocsList, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
var allocs *utils.IPAllocations
if allocs, err = s.matchingIPAllocationsForEvent(ctx, tnt, args, allocID); err != nil {
return
}
defer allocsList.unlock()
defer allocs.Unlock()
for _, alloc := range allocsList {
if err = alloc.ReleaseAllocation(allocID); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> failed to remove allocation from IPAllocations with ID %q: %v", utils.IPs, alloc.TenantID(), err))
}
if err = allocs.ReleaseAllocation(allocID); err != nil {
utils.Logger.Warning(fmt.Sprintf(
"<%s> failed to remove allocation from IPAllocations with ID %q: %v", utils.IPs, allocs.TenantID(), err))
}
// Handle storing
if err = s.storeMatchedIPAllocations(ctx, allocsList); err != nil {
if err = s.storeMatchedIPAllocations(ctx, allocs); err != nil {
return
}

View File

@@ -34,28 +34,6 @@ import (
"github.com/cgrates/guardian"
)
// IPAllocationsList is a collection of ipAllocations objects.
type IPAllocationsList []*utils.IPAllocations
// unlock will unlock IP allocations in this slice
func (al IPAllocationsList) unlock() {
for _, allocs := range al {
allocs.Unlock()
if prfl := allocs.Config(); prfl != nil {
prfl.Unlock()
}
}
}
// ids returns a map of IP allocation IDs which is used for caching
func (al IPAllocationsList) ids() utils.StringSet {
ids := make(utils.StringSet)
for _, allocs := range al {
ids.Add(allocs.ID)
}
return ids
}
// IPService is the service handling IP allocations
type IPService struct {
dm *engine.DataManager // So we can load the data in cache and index it
@@ -175,30 +153,26 @@ func (s *IPService) storeIPAllocations(ctx *context.Context, allocs *utils.IPAll
}
// storeMatchedIPAllocations will store the list of IP allocations based on the StoreInterval
func (s *IPService) storeMatchedIPAllocations(ctx *context.Context, matched IPAllocationsList) error {
func (s *IPService) storeMatchedIPAllocations(ctx *context.Context, matched *utils.IPAllocations) error {
if s.cfg.IPsCfg().StoreInterval == 0 {
return nil
}
if s.cfg.IPsCfg().StoreInterval > 0 {
s.storedIPsMux.Lock()
defer s.storedIPsMux.Unlock()
s.storedIPs.Add(matched.TenantID())
s.storedIPsMux.Unlock()
return nil
}
for _, allocs := range matched {
if s.cfg.IPsCfg().StoreInterval > 0 {
s.storedIPs.Add(allocs.TenantID())
continue
}
if err := s.storeIPAllocations(ctx, allocs); err != nil {
return err
}
if err := s.storeIPAllocations(ctx, matched); err != nil {
return err
}
return nil
}
// matchingIPAllocationsForEvent returns ordered list of matching IP
// allocations which are active by the time of the API call.
// matchingIPAllocationsForEvent returns the IP allocation with the highest weight
// matching the event.
func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt string,
ev *utils.CGREvent, evUUID string) (al IPAllocationsList, err error) {
ev *utils.CGREvent, evUUID string) (allocs *utils.IPAllocations, err error) {
var itemIDs utils.StringSet
evNm := utils.MapStorage{
utils.MetaReq: ev.Event,
@@ -242,8 +216,8 @@ func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt stri
return nil, err
}
}
al = make(IPAllocationsList, 0, len(itemIDs))
weights := make(map[string]float64) // stores sorting weights by IP allocation ID
var matchedPrfl *utils.IPProfile
var maxWeight float64
for id := range itemIDs {
lkPrflID := guardian.Guardian.GuardIDs("",
config.CgrConfig().GeneralCfg().LockingTimeout,
@@ -254,88 +228,83 @@ func (s *IPService) matchingIPAllocationsForEvent(ctx *context.Context, tnt stri
if err == utils.ErrNotFound {
continue
}
al.unlock()
return nil, err
}
prfl.Lock(lkPrflID)
var pass bool
if pass, err = s.fltrs.Pass(ctx, tnt, prfl.FilterIDs, evNm); err != nil {
prfl.Unlock()
al.unlock()
return nil, err
} else if !pass {
prfl.Unlock()
continue
}
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
utils.IPAllocationsLockKey(prfl.Tenant, prfl.ID))
var allocs *utils.IPAllocations
if allocs, err = s.dm.GetIPAllocations(ctx, prfl.Tenant, prfl.ID, true, true, ""); err != nil {
guardian.Guardian.UnguardIDs(lkID)
prfl.Unlock()
al.unlock()
return nil, err
}
allocs.Lock(lkID)
// Clone profile to avoid modifying cached version during pool sorting.
profileCopy := prfl.Clone()
if err = sortPools(ctx, profileCopy, s.fltrs, evNm); err != nil {
allocs.Unlock()
prfl.Unlock()
al.unlock()
return nil, err
}
if err = allocs.ComputeUnexported(profileCopy); err != nil {
allocs.Unlock()
prfl.Unlock()
al.unlock()
return nil, err
}
var weight float64
if weight, err = engine.WeightFromDynamics(ctx, prfl.Weights, s.fltrs, tnt, evNm); err != nil {
allocs.Unlock()
prfl.Unlock()
al.unlock()
return nil, err
}
weights[allocs.ID] = weight
al = append(al, allocs)
if matchedPrfl == nil || maxWeight < weight {
if matchedPrfl != nil {
matchedPrfl.Unlock()
}
matchedPrfl = prfl
maxWeight = weight
} else {
prfl.Unlock()
}
}
if len(al) == 0 {
if matchedPrfl == nil {
return nil, utils.ErrNotFound
}
// Sort by weight (higher values first).
slices.SortFunc(al, func(a, b *utils.IPAllocations) int {
return cmp.Compare(weights[b.ID], weights[a.ID])
})
if err = engine.Cache.Set(ctx, utils.CacheEventIPs, evUUID, al.ids(), nil,
true, ""); err != nil {
al.unlock()
lkID := guardian.Guardian.GuardIDs(utils.EmptyString,
config.CgrConfig().GeneralCfg().LockingTimeout,
utils.IPAllocationsLockKey(matchedPrfl.Tenant, matchedPrfl.ID))
allocs, err = s.dm.GetIPAllocations(ctx, matchedPrfl.Tenant, matchedPrfl.ID, true, true, "")
if err != nil {
guardian.Guardian.UnguardIDs(lkID)
matchedPrfl.Unlock()
return nil, err
}
return al, nil
allocs.Lock(lkID)
// Clone profile to avoid modifying cached version during pool sorting.
prfl := matchedPrfl.Clone()
if err = sortPools(ctx, prfl, s.fltrs, evNm); err != nil {
allocs.Unlock()
return nil, err
}
if err = allocs.ComputeUnexported(prfl); err != nil {
allocs.Unlock()
return nil, err
}
if err = engine.Cache.Set(ctx, utils.CacheEventIPs, evUUID,
// TODO: check if we still should rely on caching previously matched
// allocations for an allocationID. Currently setting a StringSet to
// maintain previous functionality, but this doesn't seem right.
utils.StringSet{allocs.ID: struct{}{}},
nil, true, ""); err != nil {
allocs.Unlock()
}
return allocs, nil
}
// allocateFirstAvailable attempts IP allocation across pools in priority order.
// allocateFromPools attempts IP allocation across all pools in priority order.
// Continues to next pool only if current pool returns ErrIPAlreadyAllocated.
// Returns first successful allocation or the last "already allocated" error.
func (s *IPService) allocateFirstAvailable(allocs IPAllocationsList, allocID string,
// Returns first successful allocation or the last allocation error.
func (s *IPService) allocateFromPools(allocs *utils.IPAllocations, allocID string,
dryRun bool) (*utils.AllocatedIP, error) {
var err error
for _, alloc := range allocs {
for _, pool := range alloc.Config().Pools {
var result *utils.AllocatedIP
if result, err = alloc.AllocateIPOnPool(allocID, pool, dryRun); err == nil {
return result, nil
}
if !errors.Is(err, utils.ErrIPAlreadyAllocated) {
return nil, err
}
for _, pool := range allocs.Config().Pools {
var result *utils.AllocatedIP
if result, err = allocs.AllocateIPOnPool(allocID, pool, dryRun); err == nil {
return result, nil
}
if !errors.Is(err, utils.ErrIPAlreadyAllocated) {
return nil, err
}
}
return nil, err

View File

@@ -227,8 +227,8 @@ cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Des
}
allocID := "api_allocation"
var evIPs IPAllocationsList
if err := client.Call(context.Background(), utils.IPsV1GetIPAllocationsForEvent,
var evIP utils.IPAllocations
if err := client.Call(context.Background(), utils.IPsV1GetIPAllocationForEvent,
&utils.CGREvent{
Tenant: "cgrates.org",
ID: "GetIPsForEvent1",
@@ -238,7 +238,7 @@ cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Des
APIOpts: map[string]any{
utils.OptsIPsAllocationID: allocID,
},
}, &evIPs); err != nil {
}, &evIP); err != nil {
t.Error(err)
}

View File

@@ -1639,18 +1639,18 @@ const (
// IPs APIs
const (
IPsV1Ping = "IPsV1.Ping"
IPsV1GetIPAllocations = "IPsV1.GetIPAllocations"
IPsV1GetIPAllocationsForEvent = "IPsV1.GetIPAllocationsForEvent"
IPsV1AuthorizeIP = "IPsV1.AuthorizeIP"
IPsV1AllocateIP = "IPsV1.AllocateIP"
IPsV1ReleaseIP = "IPsV1.ReleaseIP"
AdminSv1SetIPProfile = "AdminSv1.SetIPProfile"
AdminSv1GetIPProfiles = "AdminSv1.GetIPProfiles"
AdminSv1RemoveIPProfile = "AdminSv1.RemoveIPProfile"
AdminSv1GetIPProfile = "AdminSv1.GetIPProfile"
AdminSv1GetIPProfileIDs = "AdminSv1.GetIPProfileIDs"
AdminSv1GetIPProfilesCount = "AdminSv1.GetIPProfilesCount"
IPsV1Ping = "IPsV1.Ping"
IPsV1GetIPAllocations = "IPsV1.GetIPAllocations"
IPsV1GetIPAllocationForEvent = "IPsV1.GetIPAllocationForEvent"
IPsV1AuthorizeIP = "IPsV1.AuthorizeIP"
IPsV1AllocateIP = "IPsV1.AllocateIP"
IPsV1ReleaseIP = "IPsV1.ReleaseIP"
AdminSv1SetIPProfile = "AdminSv1.SetIPProfile"
AdminSv1GetIPProfiles = "AdminSv1.GetIPProfiles"
AdminSv1RemoveIPProfile = "AdminSv1.RemoveIPProfile"
AdminSv1GetIPProfile = "AdminSv1.GetIPProfile"
AdminSv1GetIPProfileIDs = "AdminSv1.GetIPProfileIDs"
AdminSv1GetIPProfilesCount = "AdminSv1.GetIPProfilesCount"
)
// SessionS APIs

View File

@@ -550,6 +550,10 @@ func (a *IPAllocations) Unlock() {
id := a.lockID
a.lockID = ""
guardian.Guardian.UnguardIDs(id)
if a.prfl != nil {
a.prfl.Unlock()
}
}
// Config returns the IPAllocations' profile configuration.