3 Commits

Author SHA1 Message Date
arberkatellari
626346e19b Add options to support non-Amazon S3 and SQS 2026-01-30 13:22:15 +01:00
armirveliaj
84f0338605 avoid redundant AttributesWithName call in radiusDP 2026-01-30 12:59:01 +01:00
gezimbll
0400f4a177 added all new subsytems to load with tpreader 2026-01-30 12:56:32 +01:00
21 changed files with 968 additions and 33 deletions

View File

@@ -83,8 +83,8 @@ func (pk *radiusDP) FieldAsInterface(fldPath []string) (data any, err error) {
} else { } else {
return // data found in cache return // data found in cache
} }
if len(pk.req.AttributesWithName(fldPath[0], "")) != 0 { if attrs := pk.req.AttributesWithName(fldPath[0], ""); len(attrs) != 0 {
data = pk.req.AttributesWithName(fldPath[0], "")[0].GetStringValue() data = attrs[0].GetStringValue()
} }
pk.cache.Set(fldPath, data) pk.cache.Set(fldPath, data)
return return

View File

@@ -141,3 +141,47 @@ func TestRadiusDPFieldAsString(t *testing.T) {
t.Errorf("Expecting: flopsy, received: <%s>", data) t.Errorf("Expecting: flopsy, received: <%s>", data)
} }
} }
func TestRadiusDPFieldAsInterfaceCached(t *testing.T) {
pkt := radigo.NewPacket(radigo.AccountingRequest, 1, dictRad, coder, "CGRateS.org")
if err := pkt.AddAVPWithName("User-Name", "cgr1", ""); err != nil {
t.Error(err)
}
if err := pkt.AddAVPWithName("Acct-Session-Time", "3600", ""); err != nil {
t.Error(err)
}
if err := pkt.AddAVPWithName("Password", "pass123", ""); err != nil {
t.Error(err)
}
dp := newRADataProvider(pkt)
if data, err := dp.FieldAsInterface([]string{"User-Name"}); err != nil {
t.Error(err)
} else if data != "cgr1" {
t.Errorf("Expecting: cgr1, received: <%v>", data)
}
if data, err := dp.FieldAsInterface([]string{"Acct-Session-Time"}); err != nil {
t.Error(err)
} else if data != "3600" {
t.Errorf("Expecting: 3600, received: <%v>", data)
}
if data, err := dp.FieldAsInterface([]string{"Password"}); err != nil {
t.Error(err)
} else if data != "pass123" {
t.Errorf("Expecting: pass123, received: <%v>", data)
}
if data, err := dp.FieldAsInterface([]string{"Non-Existent-Field"}); err != nil {
t.Error(err)
} else if data != nil {
t.Errorf("Expecting: nil, received: <%v>", data)
}
if _, err := dp.FieldAsInterface([]string{"Field1", "Field2"}); err != utils.ErrNotFound {
t.Errorf("Expecting: ErrNotFound, received: <%v>", err)
}
}

View File

@@ -19,6 +19,9 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package apis package apis
import ( import (
"fmt"
"time"
"github.com/cgrates/birpc/context" "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/trends" "github.com/cgrates/cgrates/trends"
"github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/utils"
@@ -129,6 +132,21 @@ func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProf
if err = adms.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil { if err = adms.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil {
return utils.APIErrorHandler(err) return utils.APIErrorHandler(err)
} }
//generate a loadID for CacheTrendProfiles and store it in database
loadID := time.Now().UnixNano()
if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
return utils.APIErrorHandler(err)
}
// delay if needed before cache call
if adms.cfg.GeneralCfg().CachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<AdminSv1.SetTrendProfile> Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
}
//handle caching for TrendProfile
if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles,
arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK *reply = utils.OK
return nil return nil
} }
@@ -145,6 +163,21 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan
if err := adms.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil { if err := adms.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil {
return utils.APIErrorHandler(err) return utils.APIErrorHandler(err)
} }
// delay if needed before cache call
if adms.cfg.GeneralCfg().CachingDelay != 0 {
utils.Logger.Info(fmt.Sprintf("<AdminSv1.RemoveTrendProfile> Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
}
//handle caching for TrendProfile
if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles,
utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil {
return utils.APIErrorHandler(err)
}
//generate a loadID for CacheTrendProfiles and store it in database
loadID := time.Now().UnixNano()
if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
return utils.APIErrorHandler(err)
}
*reply = utils.OK *reply = utils.OK
return nil return nil
} }

View File

@@ -477,9 +477,13 @@ const CGRATES_CFG_JSON = `
// SQS // SQS
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read // "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read
// "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
// "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// S3 // S3
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read // "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read
// "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
// "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
// nats // nats
// "natsJetStream": false, // controls if the nats reader uses the JetStream // "natsJetStream": false, // controls if the nats reader uses the JetStream

View File

@@ -210,8 +210,12 @@ type EventExporterOpts struct {
AWSSecret *string AWSSecret *string
AWSToken *string AWSToken *string
SQSQueueID *string SQSQueueID *string
SQSForcePathStyle *bool
SQSSkipTlsVerify *bool
S3BucketID *string S3BucketID *string
S3FolderPath *string S3FolderPath *string
S3ForcePathStyle *bool
S3SkipTlsVerify *bool
NATSJetStream *bool NATSJetStream *bool
NATSSubject *string NATSSubject *string
NATSJWTFile *string NATSJWTFile *string
@@ -435,12 +439,24 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
if jsnCfg.SQSQueueID != nil { if jsnCfg.SQSQueueID != nil {
eeOpts.SQSQueueID = jsnCfg.SQSQueueID eeOpts.SQSQueueID = jsnCfg.SQSQueueID
} }
if jsnCfg.SQSForcePathStyle != nil {
eeOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle
}
if jsnCfg.SQSSkipTlsVerify != nil {
eeOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify
}
if jsnCfg.S3BucketID != nil { if jsnCfg.S3BucketID != nil {
eeOpts.S3BucketID = jsnCfg.S3BucketID eeOpts.S3BucketID = jsnCfg.S3BucketID
} }
if jsnCfg.S3FolderPath != nil { if jsnCfg.S3FolderPath != nil {
eeOpts.S3FolderPath = jsnCfg.S3FolderPath eeOpts.S3FolderPath = jsnCfg.S3FolderPath
} }
if jsnCfg.S3ForcePathStyle != nil {
eeOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle
}
if jsnCfg.S3SkipTlsVerify != nil {
eeOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify
}
if jsnCfg.NATSJetStream != nil { if jsnCfg.NATSJetStream != nil {
eeOpts.NATSJetStream = jsnCfg.NATSJetStream eeOpts.NATSJetStream = jsnCfg.NATSJetStream
} }
@@ -805,6 +821,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
cln.SQSQueueID = new(string) cln.SQSQueueID = new(string)
*cln.SQSQueueID = *eeOpts.SQSQueueID *cln.SQSQueueID = *eeOpts.SQSQueueID
} }
if eeOpts.SQSForcePathStyle != nil {
cln.SQSForcePathStyle = new(bool)
*cln.SQSForcePathStyle = *eeOpts.SQSForcePathStyle
}
if eeOpts.SQSSkipTlsVerify != nil {
cln.SQSSkipTlsVerify = new(bool)
*cln.SQSSkipTlsVerify = *eeOpts.SQSSkipTlsVerify
}
if eeOpts.S3BucketID != nil { if eeOpts.S3BucketID != nil {
cln.S3BucketID = new(string) cln.S3BucketID = new(string)
*cln.S3BucketID = *eeOpts.S3BucketID *cln.S3BucketID = *eeOpts.S3BucketID
@@ -813,6 +837,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
cln.S3FolderPath = new(string) cln.S3FolderPath = new(string)
*cln.S3FolderPath = *eeOpts.S3FolderPath *cln.S3FolderPath = *eeOpts.S3FolderPath
} }
if eeOpts.S3ForcePathStyle != nil {
cln.S3ForcePathStyle = new(bool)
*cln.S3ForcePathStyle = *eeOpts.S3ForcePathStyle
}
if eeOpts.S3SkipTlsVerify != nil {
cln.S3SkipTlsVerify = new(bool)
*cln.S3SkipTlsVerify = *eeOpts.S3SkipTlsVerify
}
if eeOpts.NATSJetStream != nil { if eeOpts.NATSJetStream != nil {
cln.NATSJetStream = new(bool) cln.NATSJetStream = new(bool)
*cln.NATSJetStream = *eeOpts.NATSJetStream *cln.NATSJetStream = *eeOpts.NATSJetStream
@@ -1119,12 +1151,24 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any {
if optsEes.SQSQueueID != nil { if optsEes.SQSQueueID != nil {
opts[utils.SQSQueueID] = *optsEes.SQSQueueID opts[utils.SQSQueueID] = *optsEes.SQSQueueID
} }
if optsEes.SQSForcePathStyle != nil {
opts[utils.SQSForcePathStyle] = *optsEes.SQSForcePathStyle
}
if optsEes.SQSSkipTlsVerify != nil {
opts[utils.SQSSkipTlsVerify] = *optsEes.SQSSkipTlsVerify
}
if optsEes.S3BucketID != nil { if optsEes.S3BucketID != nil {
opts[utils.S3Bucket] = *optsEes.S3BucketID opts[utils.S3Bucket] = *optsEes.S3BucketID
} }
if optsEes.S3FolderPath != nil { if optsEes.S3FolderPath != nil {
opts[utils.S3FolderPath] = *optsEes.S3FolderPath opts[utils.S3FolderPath] = *optsEes.S3FolderPath
} }
if optsEes.S3ForcePathStyle != nil {
opts[utils.S3ForcePathStyle] = *optsEes.S3ForcePathStyle
}
if optsEes.S3SkipTlsVerify != nil {
opts[utils.S3SkipTlsVerify] = *optsEes.S3SkipTlsVerify
}
if optsEes.NATSJetStream != nil { if optsEes.NATSJetStream != nil {
opts[utils.NatsJetStream] = *optsEes.NATSJetStream opts[utils.NatsJetStream] = *optsEes.NATSJetStream
} }
@@ -1231,8 +1275,12 @@ type EventExporterOptsJson struct {
AWSSecret *string `json:"awsSecret"` AWSSecret *string `json:"awsSecret"`
AWSToken *string `json:"awsToken"` AWSToken *string `json:"awsToken"`
SQSQueueID *string `json:"sqsQueueID"` SQSQueueID *string `json:"sqsQueueID"`
SQSForcePathStyle *bool `json:"sqsForcePathStyle"`
SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"`
S3BucketID *string `json:"s3BucketID"` S3BucketID *string `json:"s3BucketID"`
S3FolderPath *string `json:"s3FolderPath"` S3FolderPath *string `json:"s3FolderPath"`
S3ForcePathStyle *bool `json:"s3ForcePathStyle"`
S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"`
NATSJetStream *bool `json:"natsJetStream"` NATSJetStream *bool `json:"natsJetStream"`
NATSSubject *string `json:"natsSubject"` NATSSubject *string `json:"natsSubject"`
NATSJWTFile *string `json:"natsJWTFile"` NATSJWTFile *string `json:"natsJWTFile"`
@@ -1539,6 +1587,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
} else { } else {
d.SQSQueueID = nil d.SQSQueueID = nil
} }
if v2.SQSForcePathStyle != nil {
if v1.SQSForcePathStyle == nil ||
*v1.SQSForcePathStyle != *v2.SQSForcePathStyle {
d.SQSForcePathStyle = v2.SQSForcePathStyle
}
} else {
d.SQSForcePathStyle = nil
}
if v2.SQSSkipTlsVerify != nil {
if v1.SQSSkipTlsVerify == nil ||
*v1.SQSSkipTlsVerify != *v2.SQSSkipTlsVerify {
d.SQSSkipTlsVerify = v2.SQSSkipTlsVerify
}
} else {
d.SQSSkipTlsVerify = nil
}
if v2.S3BucketID != nil { if v2.S3BucketID != nil {
if v1.S3BucketID == nil || if v1.S3BucketID == nil ||
*v1.S3BucketID != *v2.S3BucketID { *v1.S3BucketID != *v2.S3BucketID {
@@ -1555,6 +1619,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
} else { } else {
d.S3FolderPath = nil d.S3FolderPath = nil
} }
if v2.S3ForcePathStyle != nil {
if v1.S3ForcePathStyle == nil ||
*v1.S3ForcePathStyle != *v2.S3ForcePathStyle {
d.S3ForcePathStyle = v2.S3ForcePathStyle
}
} else {
d.S3ForcePathStyle = nil
}
if v2.S3SkipTlsVerify != nil {
if v1.S3SkipTlsVerify == nil ||
*v1.S3SkipTlsVerify != *v2.S3SkipTlsVerify {
d.S3SkipTlsVerify = v2.S3SkipTlsVerify
}
} else {
d.S3SkipTlsVerify = nil
}
if v2.NATSJetStream != nil { if v2.NATSJetStream != nil {
if v1.NATSJetStream == nil || if v1.NATSJetStream == nil ||
*v1.NATSJetStream != *v2.NATSJetStream { *v1.NATSJetStream != *v2.NATSJetStream {

View File

@@ -1218,8 +1218,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1268,8 +1272,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1345,8 +1353,12 @@ func TestEventExporterOptsClone(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1397,8 +1409,12 @@ func TestEventExporterOptsClone(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1457,8 +1473,12 @@ func TestLoadFromJSONCfg(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1508,8 +1528,12 @@ func TestLoadFromJSONCfg(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1662,8 +1686,12 @@ func TestEEsAsMapInterface(t *testing.T) {
AWSSecret: utils.StringPointer("aws_secret"), AWSSecret: utils.StringPointer("aws_secret"),
AWSToken: utils.StringPointer("aws_token"), AWSToken: utils.StringPointer("aws_token"),
SQSQueueID: utils.StringPointer("sqs_queue_id"), SQSQueueID: utils.StringPointer("sqs_queue_id"),
SQSForcePathStyle: utils.BoolPointer(true),
SQSSkipTlsVerify: utils.BoolPointer(true),
S3BucketID: utils.StringPointer("s3_bucket_id"), S3BucketID: utils.StringPointer("s3_bucket_id"),
S3FolderPath: utils.StringPointer("s3_folder_path"), S3FolderPath: utils.StringPointer("s3_folder_path"),
S3ForcePathStyle: utils.BoolPointer(true),
S3SkipTlsVerify: utils.BoolPointer(true),
NATSJetStream: utils.BoolPointer(false), NATSJetStream: utils.BoolPointer(false),
NATSSubject: utils.StringPointer("ees_nats"), NATSSubject: utils.StringPointer("ees_nats"),
NATSJWTFile: utils.StringPointer("/path/to/jwt"), NATSJWTFile: utils.StringPointer("/path/to/jwt"),
@@ -1725,6 +1753,8 @@ func TestEEsAsMapInterface(t *testing.T) {
"rpcReplyTimeout": "2s", "rpcReplyTimeout": "2s",
"s3BucketID": "s3_bucket_id", "s3BucketID": "s3_bucket_id",
"s3FolderPath": "s3_folder_path", "s3FolderPath": "s3_folder_path",
"s3ForcePathStyle": true,
"s3SkipTlsVerify": true,
"serviceMethod": "service_method", "serviceMethod": "service_method",
"sqlConnMaxLifetime": "2s", "sqlConnMaxLifetime": "2s",
"sqlDBName": "cgrates", "sqlDBName": "cgrates",
@@ -1732,6 +1762,8 @@ func TestEEsAsMapInterface(t *testing.T) {
"sqlMaxOpenConns": 10, "sqlMaxOpenConns": 10,
"sqlTableName": "cdrs", "sqlTableName": "cdrs",
"sqsQueueID": "sqs_queue_id", "sqsQueueID": "sqs_queue_id",
"sqsForcePathStyle": true,
"sqsSkipTlsVerify": true,
"pgSSLMode": "sslm", "pgSSLMode": "sslm",
"kafkaTLS": false, "kafkaTLS": false,
"connIDs": []string{"testID"}, "connIDs": []string{"testID"},
@@ -1798,8 +1830,12 @@ func TestEescfgNewEventExporterCfg(t *testing.T) {
AWSSecret: &str, AWSSecret: &str,
AWSToken: &str, AWSToken: &str,
SQSQueueID: &str, SQSQueueID: &str,
SQSForcePathStyle: &bl,
SQSSkipTlsVerify: &bl,
S3BucketID: &str, S3BucketID: &str,
S3FolderPath: &str, S3FolderPath: &str,
S3ForcePathStyle: &bl,
S3SkipTlsVerify: &bl,
NATSJetStream: &bl, NATSJetStream: &bl,
NATSSubject: &str, NATSSubject: &str,
NATSJWTFile: &str, NATSJWTFile: &str,
@@ -1902,8 +1938,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
AWSSecret: &str, AWSSecret: &str,
AWSToken: &str, AWSToken: &str,
SQSQueueID: &str, SQSQueueID: &str,
SQSForcePathStyle: &bl,
SQSSkipTlsVerify: &bl,
S3BucketID: &str, S3BucketID: &str,
S3FolderPath: &str, S3FolderPath: &str,
S3ForcePathStyle: &bl,
S3SkipTlsVerify: &bl,
NATSJetStream: &bl, NATSJetStream: &bl,
NATSSubject: &str, NATSSubject: &str,
NATSJWTFile: &str, NATSJWTFile: &str,
@@ -1976,8 +2016,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
AWSSecret: &str, AWSSecret: &str,
AWSToken: &str, AWSToken: &str,
SQSQueueID: &str, SQSQueueID: &str,
SQSForcePathStyle: &bl,
SQSSkipTlsVerify: &bl,
S3BucketID: &str, S3BucketID: &str,
S3FolderPath: &str, S3FolderPath: &str,
S3ForcePathStyle: &bl,
S3SkipTlsVerify: &bl,
NATSJetStream: &bl, NATSJetStream: &bl,
NATSSubject: &str, NATSSubject: &str,
NATSJWTFile: &str, NATSJWTFile: &str,

View File

@@ -58,6 +58,25 @@
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"} {"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
] ]
}, },
{
"id": "s3_nuantix_test_file",
"type": "*s3JSONMap",
"export_path": "s3.eu-central-1.amazonaws.com",
"opts": {
"awsRegion": "eu-central-1",
"awsKey": "access key ID",
"awsSecret": "secret access key",
"s3BucketID": "cgrates-cdrs",
"s3ForcePathStyle": true,
"s3SkipTlsVerify": true
},
"attempts": 1,
"failed_posts_dir": "/var/spool/cgrates/failed_posts2",
"synchronous": true,
"fields":[
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
]
},
{ {
"id": "amqpv1_test_file", "id": "amqpv1_test_file",
"type": "*amqpv1JSONMap", "type": "*amqpv1JSONMap",

View File

@@ -20,7 +20,9 @@ package ees
import ( import (
"bytes" "bytes"
"crypto/tls"
"fmt" "fmt"
"net/http"
"sync" "sync"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@@ -46,14 +48,16 @@ func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE {
// S3EE is a s3 poster // S3EE is a s3 poster
type S3EE struct { type S3EE struct {
awsRegion string awsRegion string
awsID string awsID string
awsKey string awsKey string
awsToken string awsToken string
bucket string bucket string
folderPath string folderPath string
session *session.Session forcePathStyle bool
up *s3manager.Uploader skipTlsVerify bool
session *session.Session
up *s3manager.Uploader
cfg *config.EventExporterCfg cfg *config.EventExporterCfg
em *utils.ExporterMetrics em *utils.ExporterMetrics
@@ -82,6 +86,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) {
if opts.AWSToken != nil { if opts.AWSToken != nil {
pstr.awsToken = *opts.AWSToken pstr.awsToken = *opts.AWSToken
} }
if opts.S3ForcePathStyle != nil {
pstr.forcePathStyle = *opts.S3ForcePathStyle
}
if opts.S3SkipTlsVerify != nil {
pstr.skipTlsVerify = *opts.S3SkipTlsVerify
}
} }
func (pstr *S3EE) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *S3EE) Cfg() *config.EventExporterCfg { return pstr.cfg }
@@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) {
len(pstr.awsKey) != 0 { len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
} }
if pstr.forcePathStyle {
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
}
if pstr.skipTlsVerify {
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
},
},
}
}
pstr.session, err = session.NewSessionWithOptions( pstr.session, err = session.NewSessionWithOptions(
session.Options{ session.Options{
Config: cfg, Config: cfg,

View File

@@ -175,3 +175,112 @@ package ees
// t.Errorf("Expected: %q, received: %q", expected, rply) // t.Errorf("Expected: %q, received: %q", expected, rply)
// } // }
// } // }
// var (
// runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix")
// sTestsS3Nuantix = []func(t *testing.T){
// testS3LoadConfig,
// testS3ResetDataDB,
// testS3ResetStorDb,
// testS3StartEngine,
// testS3RPCConn,
// testS3NuantixExportEvent,
// testS3NuantixVerifyExport,
// testStopCgrEngine,
// }
// )
// func TestS3ExportNuantix(t *testing.T) {
// if !*runS3NuantixTest {
// t.SkipNow()
// }
// s3ConfDir = "ees_cloud"
// for _, stest := range sTestsS3Nuantix {
// t.Run(s3ConfDir, stest)
// }
// }
// func testS3NuantixExportEvent(t *testing.T) {
// cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String())
// t.Log(cgrID)
// ev := &engine.CGREventWithEeIDs{
// EeIDs: []string{"s3_nuantix_test_file"},
// CGREvent: &utils.CGREvent{
// Tenant: "cgrates.org",
// ID: "dataEvent",
// Time: utils.TimePointer(time.Now()),
// Event: map[string]any{
// utils.CGRID: cgrID,
// utils.ToR: utils.MetaData,
// utils.OriginID: "abcdefg",
// utils.OriginHost: "192.168.1.1",
// utils.RequestType: utils.MetaRated,
// utils.Tenant: "AnotherTenant",
// utils.Category: "call", //for data CDR use different Tenant
// utils.AccountField: "1001",
// utils.Subject: "1001",
// utils.Destination: "1002",
// utils.SetupTime: time.Unix(1383813745, 0).UTC(),
// utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
// utils.Usage: 10 * time.Nanosecond,
// utils.RunID: utils.MetaDefault,
// utils.Cost: 0.012,
// },
// },
// }
// var reply map[string]utils.MapStorage
// if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil {
// t.Error(err)
// }
// time.Sleep(2 * time.Second)
// }
// func testS3NuantixVerifyExport(t *testing.T) {
// endpoint := "s3.eu-central-1.amazonaws.com"
// region := "eu-central-1"
// qname := "cgrates-cdrs"
// key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault)
// var sess *session.Session
// cfg := aws.Config{Endpoint: aws.String(endpoint)}
// cfg.Region = aws.String(region)
// cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
// var err error
// cfg.S3ForcePathStyle = aws.Bool(true)
// cfg.HTTPClient = &http.Client{
// Transport: &http.Transport{
// TLSClientConfig: &tls.Config{
// InsecureSkipVerify: true,
// },
// },
// }
// sess, err = session.NewSessionWithOptions(
// session.Options{
// Config: cfg,
// },
// )
// if err != nil {
// t.Error(err)
// }
// s3Clnt := s3.New(sess)
// s3Clnt.DeleteObject(&s3.DeleteObjectInput{})
// file := aws.NewWriteAtBuffer([]byte{})
// svc := s3manager.NewDownloader(sess)
// if _, err = svc.Download(file,
// &s3.GetObjectInput{
// Bucket: aws.String(qname),
// Key: aws.String(key),
// }); err != nil {
// t.Fatalf("Unable to download item %v", err)
// }
// expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}`
// if rply := string(file.Bytes()); rply != expected {
// t.Errorf("Expected: %q, received: %q", expected, rply)
// }
// }

View File

@@ -19,6 +19,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
package ees package ees
import ( import (
"crypto/tls"
"net/http"
"sync" "sync"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@@ -44,14 +46,16 @@ func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
// SQSee is a poster for sqs // SQSee is a poster for sqs
type SQSee struct { type SQSee struct {
awsRegion string awsRegion string
awsID string awsID string
awsKey string awsKey string
awsToken string awsToken string
queueURL *string queueURL *string
queueID string queueID string
session *session.Session forcePathStyle bool
svc *sqs.SQS skipTlsVerify bool
session *session.Session
svc *sqs.SQS
cfg *config.EventExporterCfg cfg *config.EventExporterCfg
em *utils.ExporterMetrics em *utils.ExporterMetrics
@@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
if opts.AWSToken != nil { if opts.AWSToken != nil {
pstr.awsToken = *opts.AWSToken pstr.awsToken = *opts.AWSToken
} }
if opts.SQSForcePathStyle != nil {
pstr.forcePathStyle = *opts.SQSForcePathStyle
}
if opts.SQSSkipTlsVerify != nil {
pstr.skipTlsVerify = *opts.SQSSkipTlsVerify
}
} }
func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg } func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg }
@@ -93,6 +103,18 @@ func (pstr *SQSee) Connect() (err error) {
len(pstr.awsKey) != 0 { len(pstr.awsKey) != 0 {
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken) cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
} }
if pstr.forcePathStyle {
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
}
if pstr.skipTlsVerify {
cfg.HTTPClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
},
},
}
}
pstr.session, err = session.NewSessionWithOptions( pstr.session, err = session.NewSessionWithOptions(
session.Options{ session.Options{
Config: cfg, Config: cfg,

View File

@@ -1589,7 +1589,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
if dm == nil { if dm == nil {
return utils.ErrNoDatabaseConn return utils.ErrNoDatabaseConn
} }
oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, true, false, utils.NonTransactional) oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, false, false, utils.NonTransactional)
if err != nil && err != utils.ErrNotFound { if err != nil && err != utils.ErrNotFound {
return err return err
} }
@@ -1613,7 +1613,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID, APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
dbCfg.RplCache, utils.EmptyString)}, itm) dbCfg.RplCache, utils.EmptyString)}, itm)
} }
return return dm.RemoveRanking(ctx, tenant, id)
} }
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) { func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) {
tntID := utils.ConcatenatedKey(tenant, id) tntID := utils.ConcatenatedKey(tenant, id)

View File

@@ -26,6 +26,8 @@ import (
"github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils" "github.com/cgrates/cgrates/utils"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
) )
func TestLoaderCSV(t *testing.T) { func TestLoaderCSV(t *testing.T) {
@@ -85,6 +87,14 @@ func TestLoaderCSV(t *testing.T) {
#Tenant[0],Id[1],FilterIDs[2],Weights[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Thresholds[9] #Tenant[0],Id[1],FilterIDs[2],Weights[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Thresholds[9]
cgrates.org,ResGroup21,*string:~*req.Account:1001,;10,1s,2,call,true,true, cgrates.org,ResGroup21,*string:~*req.Account:1001,;10,1s,2,call,true,true,
cgrates.org,ResGroup22,*string:~*req.Account:dan,;10,3600s,2,premium_call,true,true, cgrates.org,ResGroup22,*string:~*req.Account:dan,;10,3600s,2,premium_call,true,true,
`
IPCSVContent := `
#Tenant[0],ID[1],FilterIDs[2],Weights[3],TTL[4],Stored[5],PoolID[6],PoolFilterIDs[7],PoolType[8],PoolRange[9],PoolStrategy[10],PoolMessage[11],PoolWeights[12],PoolBlockers[13]
cgrates.org,IPs1,*string:~*req.Account:1001,;10,1s,true,,,,,,,,
cgrates.org,IPs1,,,,,POOL1,*string:~*req.Destination:2001,*ipv4,172.16.1.1/32,*ascending,alloc_success,;15,
cgrates.org,IPs1,,,,,POOL1,,,,,,*exists:~*req.NeedMoreWeight:;50,*exists:~*req.ShouldBlock:;true
cgrates.org,IPs1,,,,,POOL2,*string:~*req.Destination:2002,*ipv4,192.168.122.1/32,*random,alloc_new,;25,;true
cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Destination:3001,*ipv4,127.0.0.1/32,*descending,alloc_msg,;35,;true
` `
StatsCSVContent := ` StatsCSVContent := `
#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],Metrics[10],MetricFilterIDs[11],MetricBlockers[12] #Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],Metrics[10],MetricFilterIDs[11],MetricBlockers[12]
@@ -171,7 +181,7 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
} }
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: idb}, config.CgrConfig().DbCfg()) dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: idb}, config.CgrConfig().DbCfg())
csvr, err := NewTpReader(dbCM, NewStringCSVStorage(utils.CSVSep, csvr, err := NewTpReader(dbCM, NewStringCSVStorage(utils.CSVSep,
ResourcesCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent, ResourcesCSVContent, IPCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent,
RoutesCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent, RoutesCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent,
DispatcherHostCSVContent, RateProfileCSVContent, ActionProfileCSVContent, AccountCSVContent), testTPID, "", nil, nil) DispatcherHostCSVContent, RateProfileCSVContent, ActionProfileCSVContent, AccountCSVContent), testTPID, "", nil, nil)
if err != nil { if err != nil {
@@ -183,6 +193,9 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
if err := csvr.LoadResourceProfiles(); err != nil { if err := csvr.LoadResourceProfiles(); err != nil {
log.Print("error in LoadResourceProfiles:", err) log.Print("error in LoadResourceProfiles:", err)
} }
if err := csvr.LoadIPs(); err != nil {
log.Print("error in LoadIPProfiles:")
}
if err := csvr.LoadStats(); err != nil { if err := csvr.LoadStats(); err != nil {
log.Print("error in LoadStats:", err) log.Print("error in LoadStats:", err)
} }
@@ -246,6 +259,85 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
} }
}) })
t.Run("load IPProfiles", func(t *testing.T) {
eIPsProfiles := map[utils.TenantID]*utils.TPIPProfile{
{Tenant: "cgrates.org", ID: "IPs1"}: {
TPid: "LoaderCSVTests",
Tenant: "cgrates.org",
ID: "IPs1",
FilterIDs: []string{"*string:~*req.Account:1001"},
Weights: ";10",
TTL: "1s",
Stored: true,
Pools: []*utils.TPIPPool{
{
ID: "POOL1",
FilterIDs: []string{"*string:~*req.Destination:2001"},
Type: "*ipv4",
Range: "172.16.1.1/32",
Strategy: "*ascending",
Message: "alloc_success",
Weights: ";15",
Blockers: "",
},
{
ID: "POOL1",
FilterIDs: nil,
Type: "",
Range: "",
Strategy: "",
Message: "",
Weights: "*exists:~*req.NeedMoreWeight:;50",
Blockers: "*exists:~*req.ShouldBlock:;true",
},
{
ID: "POOL2",
FilterIDs: []string{"*string:~*req.Destination:2002"},
Type: "*ipv4",
Range: "192.168.122.1/32",
Strategy: "*random",
Message: "alloc_new",
Weights: ";25",
Blockers: ";true",
},
},
},
{Tenant: "cgrates.org", ID: "IPs2"}: {
TPid: "LoaderCSVTests",
Tenant: "cgrates.org",
ID: "IPs2",
FilterIDs: []string{"*string:~*req.Account:1002"},
Weights: ";20",
TTL: "2s",
Stored: false,
Pools: []*utils.TPIPPool{
{
ID: "POOL1",
FilterIDs: []string{"*string:~*req.Destination:3001"},
Type: "*ipv4",
Range: "127.0.0.1/32",
Strategy: "*descending",
Message: "alloc_msg",
Weights: ";35",
Blockers: ";true",
},
},
}}
if len(csvr.ipProfiles) != len(eIPsProfiles) {
t.Errorf("Failed to load IPProfiles: %s", utils.ToIJSON(csvr.ipProfiles))
}
for key, val := range eIPsProfiles {
if diff := cmp.Diff(val, csvr.ipProfiles[key], cmpopts.SortSlices(func(a, b *utils.TPIPPool) bool {
if a.ID != b.ID {
return a.ID < b.ID
}
return a.Weights < b.Weights
})); diff != "" {
t.Errorf("IPProfile mismatch (-want +got):\n%s", diff)
}
}
})
t.Run("load StatProfiles", func(t *testing.T) { t.Run("load StatProfiles", func(t *testing.T) {
eStats := map[utils.TenantID]*utils.TPStatProfile{ eStats := map[utils.TenantID]*utils.TPStatProfile{
{Tenant: "cgrates.org", ID: "TestStats"}: { {Tenant: "cgrates.org", ID: "TestStats"}: {

View File

@@ -310,6 +310,246 @@ func ResourceProfileToAPI(rp *utils.ResourceProfile) (tpRL *utils.TPResourceProf
return return
} }
type IPMdls []*IPMdl
// CSVHeader return the header for csv fields as a slice of string
func (tps IPMdls) CSVHeader() []string {
return []string{
"#" + utils.Tenant,
utils.ID,
utils.FilterIDs,
utils.Weights,
utils.TTL,
utils.Stored,
utils.PoolID,
utils.PoolFilterIDs,
utils.PoolType,
utils.PoolRange,
utils.PoolStrategy,
utils.PoolMessage,
utils.PoolWeights,
utils.PoolBlockers,
}
}
func (tps IPMdls) AsTPIPs() []*utils.TPIPProfile {
filterMap := make(map[string]utils.StringSet)
mst := make(map[string]*utils.TPIPProfile)
poolMap := make(map[string]map[string]*utils.TPIPPool)
for _, mdl := range tps {
tenID := (&utils.TenantID{Tenant: mdl.Tenant, ID: mdl.ID}).TenantID()
tpip, found := mst[tenID]
if !found {
tpip = &utils.TPIPProfile{
TPid: mdl.Tpid,
Tenant: mdl.Tenant,
ID: mdl.ID,
Stored: mdl.Stored,
}
}
// Handle Pool
if mdl.PoolID != utils.EmptyString {
if _, has := poolMap[tenID]; !has {
poolMap[tenID] = make(map[string]*utils.TPIPPool)
}
poolID := mdl.PoolID
if mdl.PoolFilterIDs != utils.EmptyString {
poolID = utils.ConcatenatedKey(poolID,
utils.NewStringSet(strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)).Sha1())
}
pool, found := poolMap[tenID][poolID]
if !found {
pool = &utils.TPIPPool{
ID: mdl.PoolID,
Type: mdl.PoolType,
Range: mdl.PoolRange,
Strategy: mdl.PoolStrategy,
Message: mdl.PoolMessage,
Weights: mdl.PoolWeights,
Blockers: mdl.PoolBlockers,
}
}
if mdl.PoolFilterIDs != utils.EmptyString {
poolFilterSplit := strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)
pool.FilterIDs = append(pool.FilterIDs, poolFilterSplit...)
}
poolMap[tenID][poolID] = pool
}
// Profile-level fields
if mdl.TTL != utils.EmptyString {
tpip.TTL = mdl.TTL
}
if mdl.Weights != "" {
tpip.Weights = mdl.Weights
}
if mdl.Stored {
tpip.Stored = mdl.Stored
}
if mdl.FilterIDs != utils.EmptyString {
if _, has := filterMap[tenID]; !has {
filterMap[tenID] = make(utils.StringSet)
}
filterMap[tenID].AddSlice(strings.Split(mdl.FilterIDs, utils.InfieldSep))
}
mst[tenID] = tpip
}
// Build result with Pools
result := make([]*utils.TPIPProfile, len(mst))
i := 0
for tntID, tpip := range mst {
result[i] = tpip
for _, poolData := range poolMap[tntID] {
result[i].Pools = append(result[i].Pools, poolData)
}
result[i].FilterIDs = filterMap[tntID].AsSlice()
i++
}
return result
}
func APItoModelIP(tp *utils.TPIPProfile) IPMdls {
if tp == nil {
return nil
}
var mdls IPMdls
// Handle case with no pools
if len(tp.Pools) == 0 {
mdl := &IPMdl{
Tpid: tp.TPid,
Tenant: tp.Tenant,
ID: tp.ID,
TTL: tp.TTL,
Stored: tp.Stored,
Weights: tp.Weights,
}
for i, val := range tp.FilterIDs {
if i != 0 {
mdl.FilterIDs += utils.InfieldSep
}
mdl.FilterIDs += val
}
mdls = append(mdls, mdl)
return mdls
}
for i, pool := range tp.Pools {
mdl := &IPMdl{
Tpid: tp.TPid,
Tenant: tp.Tenant,
ID: tp.ID,
Stored: tp.Stored,
}
if i == 0 {
// Profile-level fields only on first row
mdl.TTL = tp.TTL
mdl.Weights = tp.Weights
for j, val := range tp.FilterIDs {
if j != 0 {
mdl.FilterIDs += utils.InfieldSep
}
mdl.FilterIDs += val
}
}
// Pool fields on every row
mdl.PoolID = pool.ID
mdl.PoolType = pool.Type
mdl.PoolRange = pool.Range
mdl.PoolStrategy = pool.Strategy
mdl.PoolMessage = pool.Message
mdl.PoolWeights = pool.Weights
mdl.PoolBlockers = pool.Blockers
for j, val := range pool.FilterIDs {
if j != 0 {
mdl.PoolFilterIDs += utils.InfieldSep
}
mdl.PoolFilterIDs += val
}
mdls = append(mdls, mdl)
}
return mdls
}
func APItoIP(tp *utils.TPIPProfile) (*utils.IPProfile, error) {
ipp := &utils.IPProfile{
Tenant: tp.Tenant,
ID: tp.ID,
Stored: tp.Stored,
FilterIDs: make([]string, len(tp.FilterIDs)),
Pools: make([]*utils.IPPool, len(tp.Pools)),
}
if tp.Weights != utils.EmptyString {
var err error
ipp.Weights, err = utils.NewDynamicWeightsFromString(tp.Weights, utils.InfieldSep, utils.ANDSep)
if err != nil {
return nil, err
}
}
if tp.TTL != utils.EmptyString {
var err error
if ipp.TTL, err = utils.ParseDurationWithNanosecs(tp.TTL); err != nil {
return nil, err
}
}
copy(ipp.FilterIDs, tp.FilterIDs)
for i, pool := range tp.Pools {
ipp.Pools[i] = &utils.IPPool{
ID: pool.ID,
FilterIDs: pool.FilterIDs,
Type: pool.Type,
Range: pool.Range,
Strategy: pool.Strategy,
Message: pool.Message,
}
if pool.Weights != utils.EmptyString {
var err error
ipp.Pools[i].Weights, err = utils.NewDynamicWeightsFromString(pool.Weights, utils.InfieldSep, utils.ANDSep)
if err != nil {
return nil, err
}
}
if pool.Blockers != utils.EmptyString {
var err error
ipp.Pools[i].Blockers, err = utils.NewDynamicBlockersFromString(pool.Blockers, utils.InfieldSep, utils.ANDSep)
if err != nil {
return nil, err
}
}
}
return ipp, nil
}
func IPProfileToAPI(ipp *utils.IPProfile) *utils.TPIPProfile {
tp := &utils.TPIPProfile{
Tenant: ipp.Tenant,
ID: ipp.ID,
FilterIDs: make([]string, len(ipp.FilterIDs)),
Weights: ipp.Weights.String(utils.InfieldSep, utils.ANDSep),
Stored: ipp.Stored,
Pools: make([]*utils.TPIPPool, len(ipp.Pools)),
}
if ipp.TTL != time.Duration(0) {
tp.TTL = ipp.TTL.String()
}
copy(tp.FilterIDs, ipp.FilterIDs)
for i, pool := range ipp.Pools {
tp.Pools[i] = &utils.TPIPPool{
ID: pool.ID,
FilterIDs: pool.FilterIDs,
Type: pool.Type,
Range: pool.Range,
Blockers: pool.Blockers.String(utils.InfieldSep, utils.ANDSep),
Weights: pool.Weights.String(utils.InfieldSep, utils.ANDSep),
Strategy: pool.Strategy,
Message: pool.Message,
}
}
return tp
}
type StatMdls []*StatMdl type StatMdls []*StatMdl
// CSVHeader return the header for csv fields as a slice of string // CSVHeader return the header for csv fields as a slice of string

View File

@@ -47,6 +47,30 @@ func (ResourceMdl) TableName() string {
return utils.TBLTPResources return utils.TBLTPResources
} }
type IPMdl struct {
PK uint `gorm:"primary_key"`
Tpid string
Tenant string `index:"0" re:".*"`
ID string `index:"1" re:".*"`
FilterIDs string `index:"2" re:".*"`
Weights string `index:"3" re:".*"`
TTL string `index:"4" re:".*"`
Stored bool `index:"5" re:".*"`
PoolID string `index:"6" re:".*"`
PoolFilterIDs string `index:"7" re:".*"`
PoolType string `index:"8" re:".*"`
PoolRange string `index:"9" re:".*"`
PoolStrategy string `index:"10" re:".*"`
PoolMessage string `index:"11" re:".*"`
PoolWeights string `index:"12" re:".*"`
PoolBlockers string `index:"13" re:".*"`
CreatedAt time.Time
}
func (IPMdl) TableName() string {
return utils.TBLTPIPs
}
type StatMdl struct { type StatMdl struct {
PK uint `gorm:"primary_key"` PK uint `gorm:"primary_key"`
Tpid string Tpid string

View File

@@ -45,6 +45,7 @@ type CSVStorage struct {
generator func() csvReaderCloser generator func() csvReaderCloser
// file names // file names
resProfilesFn []string resProfilesFn []string
ipsFn []string
statsFn []string statsFn []string
trendsFn []string trendsFn []string
rankingsFn []string rankingsFn []string
@@ -60,13 +61,14 @@ type CSVStorage struct {
// NewCSVStorage creates a CSV storege that takes the data from the paths specified // NewCSVStorage creates a CSV storege that takes the data from the paths specified
func NewCSVStorage(sep rune, func NewCSVStorage(sep rune,
resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
attributeProfilesFn, chargerProfilesFn, attributeProfilesFn, chargerProfilesFn,
rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage { rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage {
return &CSVStorage{ return &CSVStorage{
sep: sep, sep: sep,
generator: NewCsvFile, generator: NewCsvFile,
resProfilesFn: resProfilesFn, resProfilesFn: resProfilesFn,
ipsFn: ipsFn,
statsFn: statsFn, statsFn: statsFn,
rankingsFn: rankingsFn, rankingsFn: rankingsFn,
trendsFn: trendsFn, trendsFn: trendsFn,
@@ -88,6 +90,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
return nil, fmt.Errorf("could not retrieve any folders from %q: %v", dataPath, err) return nil, fmt.Errorf("could not retrieve any folders from %q: %v", dataPath, err)
} }
resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv) resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv)
ipsPaths := appendName(allFoldersPath, utils.IPsCsv)
statsPaths := appendName(allFoldersPath, utils.StatsCsv) statsPaths := appendName(allFoldersPath, utils.StatsCsv)
rankingsPaths := appendName(allFoldersPath, utils.RankingsCsv) rankingsPaths := appendName(allFoldersPath, utils.RankingsCsv)
trendsPaths := appendName(allFoldersPath, utils.TrendsCsv) trendsPaths := appendName(allFoldersPath, utils.TrendsCsv)
@@ -101,6 +104,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
accountsFn := appendName(allFoldersPath, utils.AccountsCsv) accountsFn := appendName(allFoldersPath, utils.AccountsCsv)
return NewCSVStorage(sep, return NewCSVStorage(sep,
resourcesPaths, resourcesPaths,
ipsPaths,
statsPaths, statsPaths,
rankingsPaths, rankingsPaths,
trendsPaths, trendsPaths,
@@ -117,11 +121,11 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
// NewStringCSVStorage creates a csv storage from strings // NewStringCSVStorage creates a csv storage from strings
func NewStringCSVStorage(sep rune, func NewStringCSVStorage(sep rune,
resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn, resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn, attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn,
rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage { rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage {
c := NewCSVStorage(sep, c := NewCSVStorage(sep,
[]string{resProfilesFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn}, []string{resProfilesFn}, []string{ipsFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn},
[]string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn}, []string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn},
[]string{rateProfilesFn}, []string{rateProfilesFn},
[]string{actionProfilesFn}, []string{accountsFn}) []string{actionProfilesFn}, []string{accountsFn})
@@ -147,6 +151,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
} }
c := NewCSVStorage(sep, c := NewCSVStorage(sep,
getIfExist(utils.ResourcesStr), getIfExist(utils.ResourcesStr),
getIfExist(utils.IPs),
getIfExist(utils.Stats), getIfExist(utils.Stats),
getIfExist(utils.Rankings), getIfExist(utils.Rankings),
getIfExist(utils.Trends), getIfExist(utils.Trends),
@@ -170,6 +175,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
// NewURLCSVStorage returns a CSVStorage that can parse URLs // NewURLCSVStorage returns a CSVStorage that can parse URLs
func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage { func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
var resourcesPaths []string var resourcesPaths []string
var ipsPaths []string
var statsPaths []string var statsPaths []string
var rankingsPaths []string var rankingsPaths []string
var trendsPaths []string var trendsPaths []string
@@ -185,6 +191,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) { for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) {
if !strings.HasSuffix(baseURL, utils.CSVSuffix) { if !strings.HasSuffix(baseURL, utils.CSVSuffix) {
resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv)) resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv))
ipsPaths = append(ipsPaths, joinURL(baseURL, utils.IPsCsv))
statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv)) statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv))
rankingsPaths = append(rankingsPaths, joinURL(baseURL, utils.RankingsCsv)) rankingsPaths = append(rankingsPaths, joinURL(baseURL, utils.RankingsCsv))
trendsPaths = append(trendsPaths, joinURL(baseURL, utils.TrendsCsv)) trendsPaths = append(trendsPaths, joinURL(baseURL, utils.TrendsCsv))
@@ -201,6 +208,8 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
switch { switch {
case strings.HasSuffix(baseURL, utils.ResourcesCsv): case strings.HasSuffix(baseURL, utils.ResourcesCsv):
resourcesPaths = append(resourcesPaths, baseURL) resourcesPaths = append(resourcesPaths, baseURL)
case strings.HasSuffix(baseURL, utils.IPsCsv):
ipsPaths = append(ipsPaths, baseURL)
case strings.HasSuffix(baseURL, utils.StatsCsv): case strings.HasSuffix(baseURL, utils.StatsCsv):
statsPaths = append(statsPaths, baseURL) statsPaths = append(statsPaths, baseURL)
case strings.HasSuffix(baseURL, utils.RankingsCsv): case strings.HasSuffix(baseURL, utils.RankingsCsv):
@@ -229,6 +238,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
c := NewCSVStorage(sep, c := NewCSVStorage(sep,
resourcesPaths, resourcesPaths,
ipsPaths,
statsPaths, statsPaths,
rankingsPaths, rankingsPaths,
trendsPaths, trendsPaths,
@@ -320,6 +330,18 @@ func (csvs *CSVStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPReso
return tpResLimits.AsTPResources(), nil return tpResLimits.AsTPResources(), nil
} }
func (csvs *CSVStorage) GetTPIPs(tpid, tenant, id string) ([]*utils.TPIPProfile, error) {
var tpIPS IPMdls
if err := csvs.proccesData(IPMdl{}, csvs.ipsFn, func(tp any) {
tpIP := tp.(IPMdl)
tpIP.Tpid = tpid
tpIPS = append(tpIPS, &tpIP)
}); err != nil {
return nil, err
}
return tpIPS.AsTPIPs(), nil
}
func (csvs *CSVStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) { func (csvs *CSVStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) {
var tpStats StatMdls var tpStats StatMdls
if err := csvs.proccesData(StatMdl{}, csvs.statsFn, func(tp any) { if err := csvs.proccesData(StatMdl{}, csvs.statsFn, func(tp any) {

View File

@@ -136,6 +136,7 @@ type LoadReader interface {
GetTpTableIds(string, string, []string, GetTpTableIds(string, string, []string,
map[string]string, *utils.PaginatorWithSearch) ([]string, error) map[string]string, *utils.PaginatorWithSearch) ([]string, error)
GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error) GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error)
GetTPIPs(string, string, string) ([]*utils.TPIPProfile, error)
GetTPStats(string, string, string) ([]*utils.TPStatProfile, error) GetTPStats(string, string, string) ([]*utils.TPStatProfile, error)
GetTPRankings(tpid, tenant, id string) ([]*utils.TPRankingProfile, error) GetTPRankings(tpid, tenant, id string) ([]*utils.TPRankingProfile, error)
GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error) GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error)

View File

@@ -36,6 +36,7 @@ type TpReader struct {
lr LoadReader lr LoadReader
resProfiles map[utils.TenantID]*utils.TPResourceProfile resProfiles map[utils.TenantID]*utils.TPResourceProfile
sqProfiles map[utils.TenantID]*utils.TPStatProfile sqProfiles map[utils.TenantID]*utils.TPStatProfile
ipProfiles map[utils.TenantID]*utils.TPIPProfile
trProfiles map[utils.TenantID]*utils.TPTrendsProfile trProfiles map[utils.TenantID]*utils.TPTrendsProfile
rgProfiles map[utils.TenantID]*utils.TPRankingProfile rgProfiles map[utils.TenantID]*utils.TPRankingProfile
thProfiles map[utils.TenantID]*utils.TPThresholdProfile thProfiles map[utils.TenantID]*utils.TPThresholdProfile
@@ -67,6 +68,7 @@ func NewTpReader(db *DBConnManager, lr LoadReader, tpid, timezone string,
func (tpr *TpReader) Init() { func (tpr *TpReader) Init() {
tpr.resProfiles = make(map[utils.TenantID]*utils.TPResourceProfile) tpr.resProfiles = make(map[utils.TenantID]*utils.TPResourceProfile)
tpr.ipProfiles = make(map[utils.TenantID]*utils.TPIPProfile)
tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStatProfile) tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStatProfile)
tpr.rgProfiles = make(map[utils.TenantID]*utils.TPRankingProfile) tpr.rgProfiles = make(map[utils.TenantID]*utils.TPRankingProfile)
tpr.thProfiles = make(map[utils.TenantID]*utils.TPThresholdProfile) tpr.thProfiles = make(map[utils.TenantID]*utils.TPThresholdProfile)
@@ -120,6 +122,26 @@ func (tpr *TpReader) LoadStats() error {
return tpr.LoadStatsFiltered("") return tpr.LoadStatsFiltered("")
} }
func (tpr *TpReader) LoadIPsFiltered(tag string) (err error) {
tps, err := tpr.lr.GetTPIPs(tpr.tpid, "", tag)
if err != nil {
return err
}
mapIPs := make(map[utils.TenantID]*utils.TPIPProfile)
for _, ip := range tps {
if err = verifyInlineFilterS(ip.FilterIDs); err != nil {
return
}
mapIPs[utils.TenantID{Tenant: ip.Tenant, ID: ip.ID}] = ip
}
tpr.ipProfiles = mapIPs
return nil
}
func (tpr *TpReader) LoadIPs() error {
return tpr.LoadIPsFiltered("")
}
func (tpr *TpReader) LoadRankingsFiltered(tag string) error { func (tpr *TpReader) LoadRankingsFiltered(tag string) error {
tps, err := tpr.lr.GetTPRankings(tpr.tpid, "", tag) tps, err := tpr.lr.GetTPRankings(tpr.tpid, "", tag)
if err != nil { if err != nil {
@@ -330,6 +352,9 @@ func (tpr *TpReader) LoadAll() (err error) {
if err = tpr.LoadRankings(); err != nil && err.Error() != utils.NotFoundCaps { if err = tpr.LoadRankings(); err != nil && err.Error() != utils.NotFoundCaps {
return return
} }
if err = tpr.LoadIPs(); err != nil && err.Error() != utils.NotFoundCaps {
return
}
if err = tpr.LoadTrends(); err != nil && err.Error() != utils.NotFoundCaps { if err = tpr.LoadTrends(); err != nil && err.Error() != utils.NotFoundCaps {
return return
} }
@@ -404,6 +429,25 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
loadIDs[utils.CacheResourceProfiles] = loadID loadIDs[utils.CacheResourceProfiles] = loadID
loadIDs[utils.CacheResources] = loadID loadIDs[utils.CacheResources] = loadID
} }
if verbose {
log.Print("IPProfiles")
}
for _, tpIP := range tpr.ipProfiles {
var ip *utils.IPProfile
if ip, err = APItoIP(tpIP); err != nil {
return
}
if err = tpr.dm.SetIPProfile(context.TODO(), ip, true); err != nil {
return
}
if verbose {
log.Print("\t", ip.TenantID())
}
}
if len(tpr.ipProfiles) != 0 {
loadIDs[utils.CacheIPProfiles] = loadID
loadIDs[utils.CacheIPAllocations] = loadID
}
if verbose { if verbose {
log.Print("StatQueueProfiles:") log.Print("StatQueueProfiles:")
} }
@@ -594,6 +638,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
func (tpr *TpReader) ShowStatistics() { func (tpr *TpReader) ShowStatistics() {
// resource profiles // resource profiles
log.Print("ResourceProfiles: ", len(tpr.resProfiles)) log.Print("ResourceProfiles: ", len(tpr.resProfiles))
// ip profiles
log.Print("IPProfiels", len(tpr.ipProfiles))
// stats // stats
log.Print("Stats: ", len(tpr.sqProfiles)) log.Print("Stats: ", len(tpr.sqProfiles))
// thresholds // thresholds
@@ -627,7 +673,30 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
i++ i++
} }
return keys, nil return keys, nil
case utils.IPProfilesPrefix:
keys := make([]string, len(tpr.ipProfiles))
i := 0
for k := range tpr.ipProfiles {
keys[i] = k.TenantID()
i++
}
return keys, nil
case utils.TrendProfilePrefix:
keys := make([]string, len(tpr.trProfiles))
i := 0
for k := range tpr.trProfiles {
keys[i] = k.TenantID()
i++
}
return keys, nil
case utils.RankingProfilePrefix:
keys := make([]string, len(tpr.rgProfiles))
i := 0
for k := range tpr.rgProfiles {
keys[i] = k.TenantID()
i++
}
return keys, nil
case utils.StatQueueProfilePrefix: case utils.StatQueueProfilePrefix:
keys := make([]string, len(tpr.sqProfiles)) keys := make([]string, len(tpr.sqProfiles))
i := 0 i := 0
@@ -710,6 +779,17 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID)) log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID))
} }
} }
if verbose {
log.Print("IPProfiles")
}
for _, tpIP := range tpr.ipProfiles {
if err = tpr.dm.RemoveIPProfile(context.TODO(), tpIP.Tenant, tpIP.ID, true); err != nil {
return
}
if verbose {
log.Print("\t", utils.ConcatenatedKey(tpIP.Tenant, tpIP.ID))
}
}
if verbose { if verbose {
log.Print("StatQueueProfiles:") log.Print("StatQueueProfiles:")
} }
@@ -732,6 +812,26 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID)) log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID))
} }
} }
if verbose {
log.Print("TrendProfiles:")
}
for _, tpTr := range tpr.trProfiles {
if err = tpr.dm.RemoveTrendProfile(context.TODO(), tpTr.Tenant, tpTr.ID); err != nil {
return
}
log.Print("\t", utils.ConcatenatedKey(tpTr.Tenant, tpTr.ID))
}
if verbose {
log.Print("RankingProfiles:")
}
for _, tpRnk := range tpr.rgProfiles {
if err = tpr.dm.RemoveRankingProfile(context.TODO(), tpRnk.Tenant, tpRnk.ID); err != nil {
return
}
log.Print("\t", utils.ConcatenatedKey(tpRnk.Tenant, tpRnk.ID))
}
if verbose { if verbose {
log.Print("RouteProfiles:") log.Print("RouteProfiles:")
} }
@@ -829,6 +929,10 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
loadIDs[utils.CacheResourceProfiles] = loadID loadIDs[utils.CacheResourceProfiles] = loadID
loadIDs[utils.CacheResources] = loadID loadIDs[utils.CacheResources] = loadID
} }
if len(tpr.ipProfiles) != 0 {
loadIDs[utils.CacheIPProfiles] = loadID
loadIDs[utils.CacheIPAllocations] = loadID
}
if len(tpr.sqProfiles) != 0 { if len(tpr.sqProfiles) != 0 {
loadIDs[utils.CacheStatQueueProfiles] = loadID loadIDs[utils.CacheStatQueueProfiles] = loadID
loadIDs[utils.CacheStatQueues] = loadID loadIDs[utils.CacheStatQueues] = loadID
@@ -837,6 +941,14 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
loadIDs[utils.CacheThresholdProfiles] = loadID loadIDs[utils.CacheThresholdProfiles] = loadID
loadIDs[utils.CacheThresholds] = loadID loadIDs[utils.CacheThresholds] = loadID
} }
if len(tpr.trProfiles) != 0 {
loadIDs[utils.CacheTrendProfiles] = loadID
loadIDs[utils.CacheTrends] = loadID
}
if len(tpr.rgProfiles) != 0 {
loadIDs[utils.CacheRankingProfiles] = loadID
loadIDs[utils.CacheRankings] = loadID
}
if len(tpr.routeProfiles) != 0 { if len(tpr.routeProfiles) != 0 {
loadIDs[utils.CacheRouteProfiles] = loadID loadIDs[utils.CacheRouteProfiles] = loadID
} }
@@ -867,6 +979,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix) rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix) stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix)
trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix) trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix)
trnpfIDs, _ := tpr.GetLoadedIds(utils.TrendProfilePrefix)
rnkpfIDs, _ := tpr.GetLoadedIds(utils.RankingProfilePrefix)
ippfIDs, _ := tpr.GetLoadedIds(utils.IPProfilesPrefix)
flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix) flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix)
routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix) routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix)
apfIDs, _ := tpr.GetLoadedIds(utils.AttributeProfilePrefix) apfIDs, _ := tpr.GetLoadedIds(utils.AttributeProfilePrefix)
@@ -889,6 +1004,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
utils.CacheChargerProfiles: chargerIDs, utils.CacheChargerProfiles: chargerIDs,
utils.CacheRateProfiles: ratePrfIDs, utils.CacheRateProfiles: ratePrfIDs,
utils.CacheActionProfiles: actionPrfIDs, utils.CacheActionProfiles: actionPrfIDs,
utils.CacheTrendProfiles: trnpfIDs,
utils.CacheRankingProfiles: rnkpfIDs,
utils.CacheIPProfiles: ippfIDs,
} }
// verify if we need to clear indexes // verify if we need to clear indexes
@@ -896,6 +1014,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
if len(apfIDs) != 0 { if len(apfIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheAttributeFilterIndexes) cacheIDs = append(cacheIDs, utils.CacheAttributeFilterIndexes)
} }
if len(ippfIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheIPFilterIndexes)
}
if len(routeIDs) != 0 { if len(routeIDs) != 0 {
cacheIDs = append(cacheIDs, utils.CacheRouteFilterIndexes) cacheIDs = append(cacheIDs, utils.CacheRouteFilterIndexes)
} }

View File

@@ -582,6 +582,7 @@ func TestTPReaderReloadCache(t *testing.T) {
APIOpts: map[string]any{}, APIOpts: map[string]any{},
Tenant: "cgrates.org", Tenant: "cgrates.org",
ResourceProfileIDs: []string{"cgrates.org:resourceProfilesID"}, ResourceProfileIDs: []string{"cgrates.org:resourceProfilesID"},
IPProfileIDs: []string{"cgrates.org:ipProfilesID"},
StatsQueueProfileIDs: []string{"cgrates.org:statProfilesID"}, StatsQueueProfileIDs: []string{"cgrates.org:statProfilesID"},
ThresholdProfileIDs: []string{"cgrates.org:thresholdProfilesID"}, ThresholdProfileIDs: []string{"cgrates.org:thresholdProfilesID"},
FilterIDs: []string{"cgrates.org:filtersID"}, FilterIDs: []string{"cgrates.org:filtersID"},
@@ -589,6 +590,8 @@ func TestTPReaderReloadCache(t *testing.T) {
AttributeProfileIDs: []string{"cgrates.org:attributeProfilesID"}, AttributeProfileIDs: []string{"cgrates.org:attributeProfilesID"},
ChargerProfileIDs: []string{"cgrates.org:chargerProfilesID"}, ChargerProfileIDs: []string{"cgrates.org:chargerProfilesID"},
ResourceIDs: []string{"cgrates.org:resourceProfilesID"}, ResourceIDs: []string{"cgrates.org:resourceProfilesID"},
TrendProfileIDs: []string{"cgrates.org:trendProfilesID"},
RankingProfileIDs: []string{"cgrates.org:rankingProfileID"},
StatsQueueIDs: []string{"cgrates.org:statProfilesID"}, StatsQueueIDs: []string{"cgrates.org:statProfilesID"},
ThresholdIDs: []string{"cgrates.org:thresholdProfilesID"}, ThresholdIDs: []string{"cgrates.org:thresholdProfilesID"},
@@ -617,12 +620,21 @@ func TestTPReaderReloadCache(t *testing.T) {
resProfiles: map[utils.TenantID]*utils.TPResourceProfile{ resProfiles: map[utils.TenantID]*utils.TPResourceProfile{
{Tenant: "cgrates.org", ID: "resourceProfilesID"}: {}, {Tenant: "cgrates.org", ID: "resourceProfilesID"}: {},
}, },
ipProfiles: map[utils.TenantID]*utils.TPIPProfile{
{Tenant: "cgrates.org", ID: "ipProfilesID"}: {},
},
rgProfiles: map[utils.TenantID]*utils.TPRankingProfile{
{Tenant: "cgrates.org", ID: "rankingProfileID"}: {},
},
sqProfiles: map[utils.TenantID]*utils.TPStatProfile{ sqProfiles: map[utils.TenantID]*utils.TPStatProfile{
{Tenant: "cgrates.org", ID: "statProfilesID"}: {}, {Tenant: "cgrates.org", ID: "statProfilesID"}: {},
}, },
thProfiles: map[utils.TenantID]*utils.TPThresholdProfile{ thProfiles: map[utils.TenantID]*utils.TPThresholdProfile{
{Tenant: "cgrates.org", ID: "thresholdProfilesID"}: {}, {Tenant: "cgrates.org", ID: "thresholdProfilesID"}: {},
}, },
trProfiles: map[utils.TenantID]*utils.TPTrendsProfile{
{Tenant: "cgrates.org", ID: "trendProfilesID"}: {},
},
filters: map[utils.TenantID]*utils.TPFilterProfile{ filters: map[utils.TenantID]*utils.TPFilterProfile{
{Tenant: "cgrates.org", ID: "filtersID"}: {}, {Tenant: "cgrates.org", ID: "filtersID"}: {},
}, },

View File

@@ -1,5 +1,4 @@
//go:build integration //go:build flaky
// +build integration
/* /*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
@@ -131,7 +130,22 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
} }
client, _ := ng.Run(t) client, _ := ng.Run(t)
time.Sleep(100 * time.Millisecond) // Wait for loader to finish loading all profiles (thresholds, stats, trends)
for range 20 {
var thPrfIDs, stPrfIDs []string
var trPrfs *[]*utils.TrendProfile
errTh := client.Call(context.Background(), utils.AdminSv1GetThresholdProfileIDs,
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &thPrfIDs)
errSt := client.Call(context.Background(), utils.AdminSv1GetStatQueueProfileIDs,
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &stPrfIDs)
errTr := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &trPrfs)
if errTh == nil && errSt == nil && errTr == nil &&
len(thPrfIDs) == 2 && len(stPrfIDs) == 2 && trPrfs != nil && len(*trPrfs) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
var tr *utils.Trend var tr *utils.Trend
t.Run("TrendSchedule", func(t *testing.T) { t.Run("TrendSchedule", func(t *testing.T) {
var replyTrendProfiles *[]*utils.TrendProfile var replyTrendProfiles *[]*utils.TrendProfile

View File

@@ -151,7 +151,27 @@ type TPResourceProfile struct {
Weights string // Weight to sort the ResourceLimits Weights string // Weight to sort the ResourceLimits
ThresholdIDs []string // Thresholds to check after changing Limit ThresholdIDs []string // Thresholds to check after changing Limit
} }
type TPIPPool struct {
ID string
FilterIDs []string
Type string
Range string
Strategy string
Message string
Weights string
Blockers string
}
type TPIPProfile struct {
TPid string
Tenant string
ID string
FilterIDs []string
TTL string
Stored bool
Weights string
Pools []*TPIPPool
}
type ArgsComputeFilterIndexIDs struct { type ArgsComputeFilterIndexIDs struct {
Tenant string Tenant string
APIOpts map[string]any APIOpts map[string]any

View File

@@ -74,8 +74,8 @@ var (
CacheThresholds: ThresholdPrefix, CacheThresholds: ThresholdPrefix,
CacheFilters: FilterPrefix, CacheFilters: FilterPrefix,
CacheRouteProfiles: RouteProfilePrefix, CacheRouteProfiles: RouteProfilePrefix,
CacheRankingProfiles: RankingPrefix, CacheRankingProfiles: RankingProfilePrefix,
CacheRankings: RankingProfilePrefix, CacheRankings: RankingPrefix,
CacheAttributeProfiles: AttributeProfilePrefix, CacheAttributeProfiles: AttributeProfilePrefix,
CacheChargerProfiles: ChargerProfilePrefix, CacheChargerProfiles: ChargerProfilePrefix,
CacheRateProfiles: RateProfilePrefix, CacheRateProfiles: RateProfilePrefix,
@@ -855,6 +855,13 @@ const (
SortingData = "SortingData" SortingData = "SortingData"
ProfileID = "ProfileID" ProfileID = "ProfileID"
PoolID = "PoolID" PoolID = "PoolID"
PoolFilterIDs = "PoolFilterIDs"
PoolType = "PoolType"
PoolRange = "PoolRange"
PoolStrategy = "PoolStrategy"
PoolMessage = "PoolMessage"
PoolWeights = "PoolWeights"
PoolBlockers = "PoolBlockers"
SortedRoutes = "SortedRoutes" SortedRoutes = "SortedRoutes"
MetaMonthly = "*monthly" MetaMonthly = "*monthly"
MetaYearly = "*yearly" MetaYearly = "*yearly"
@@ -1946,6 +1953,7 @@ const (
// Table Name // Table Name
const ( const (
TBLTPResources = "tp_resources" TBLTPResources = "tp_resources"
TBLTPIPs = "tp_ips"
TBLTPStats = "tp_stats" TBLTPStats = "tp_stats"
TBLTPRankings = "tp_rankings" TBLTPRankings = "tp_rankings"
TBLTPTrends = "tp_trends" TBLTPTrends = "tp_trends"
@@ -2855,11 +2863,15 @@ const (
AWSToken = "awsToken" AWSToken = "awsToken"
// sqs // sqs
SQSQueueID = "sqsQueueID" SQSQueueID = "sqsQueueID"
SQSForcePathStyle = "sqsForcePathStyle"
SQSSkipTlsVerify = "sqsSkipTlsVerify"
// s3 // s3
S3Bucket = "s3BucketID" S3Bucket = "s3BucketID"
S3FolderPath = "s3FolderPath" S3FolderPath = "s3FolderPath"
S3ForcePathStyle = "s3ForcePathStyle"
S3SkipTlsVerify = "s3SkipTlsVerify"
// sql // sql
SQLDefaultDBName = "cgrates" SQLDefaultDBName = "cgrates"