mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-11 18:16:24 +05:00
Compare commits
3 Commits
2fe3fd6690
...
626346e19b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
626346e19b | ||
|
|
84f0338605 | ||
|
|
0400f4a177 |
@@ -83,8 +83,8 @@ func (pk *radiusDP) FieldAsInterface(fldPath []string) (data any, err error) {
|
||||
} else {
|
||||
return // data found in cache
|
||||
}
|
||||
if len(pk.req.AttributesWithName(fldPath[0], "")) != 0 {
|
||||
data = pk.req.AttributesWithName(fldPath[0], "")[0].GetStringValue()
|
||||
if attrs := pk.req.AttributesWithName(fldPath[0], ""); len(attrs) != 0 {
|
||||
data = attrs[0].GetStringValue()
|
||||
}
|
||||
pk.cache.Set(fldPath, data)
|
||||
return
|
||||
|
||||
@@ -141,3 +141,47 @@ func TestRadiusDPFieldAsString(t *testing.T) {
|
||||
t.Errorf("Expecting: flopsy, received: <%s>", data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRadiusDPFieldAsInterfaceCached(t *testing.T) {
|
||||
pkt := radigo.NewPacket(radigo.AccountingRequest, 1, dictRad, coder, "CGRateS.org")
|
||||
if err := pkt.AddAVPWithName("User-Name", "cgr1", ""); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := pkt.AddAVPWithName("Acct-Session-Time", "3600", ""); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if err := pkt.AddAVPWithName("Password", "pass123", ""); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
dp := newRADataProvider(pkt)
|
||||
|
||||
if data, err := dp.FieldAsInterface([]string{"User-Name"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if data != "cgr1" {
|
||||
t.Errorf("Expecting: cgr1, received: <%v>", data)
|
||||
}
|
||||
|
||||
if data, err := dp.FieldAsInterface([]string{"Acct-Session-Time"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if data != "3600" {
|
||||
t.Errorf("Expecting: 3600, received: <%v>", data)
|
||||
}
|
||||
|
||||
if data, err := dp.FieldAsInterface([]string{"Password"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if data != "pass123" {
|
||||
t.Errorf("Expecting: pass123, received: <%v>", data)
|
||||
}
|
||||
|
||||
if data, err := dp.FieldAsInterface([]string{"Non-Existent-Field"}); err != nil {
|
||||
t.Error(err)
|
||||
} else if data != nil {
|
||||
t.Errorf("Expecting: nil, received: <%v>", data)
|
||||
}
|
||||
|
||||
if _, err := dp.FieldAsInterface([]string{"Field1", "Field2"}); err != utils.ErrNotFound {
|
||||
t.Errorf("Expecting: ErrNotFound, received: <%v>", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,6 +19,9 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
|
||||
package apis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/birpc/context"
|
||||
"github.com/cgrates/cgrates/trends"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
@@ -129,6 +132,21 @@ func (adms *AdminSv1) SetTrendProfile(ctx *context.Context, arg *utils.TrendProf
|
||||
if err = adms.dm.SetTrendProfile(ctx, arg.TrendProfile); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//generate a loadID for CacheTrendProfiles and store it in database
|
||||
loadID := time.Now().UnixNano()
|
||||
if err = adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
// delay if needed before cache call
|
||||
if adms.cfg.GeneralCfg().CachingDelay != 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("<AdminSv1.SetTrendProfile> Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
|
||||
time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
|
||||
}
|
||||
//handle caching for TrendProfile
|
||||
if err = adms.CallCache(ctx, utils.IfaceAsString(arg.APIOpts[utils.MetaCache]), arg.Tenant, utils.CacheTrendProfiles,
|
||||
arg.TenantID(), utils.EmptyString, nil, arg.APIOpts); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
@@ -145,6 +163,21 @@ func (adms *AdminSv1) RemoveTrendProfile(ctx *context.Context, args *utils.Tenan
|
||||
if err := adms.dm.RemoveTrendProfile(ctx, tnt, args.ID); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
// delay if needed before cache call
|
||||
if adms.cfg.GeneralCfg().CachingDelay != 0 {
|
||||
utils.Logger.Info(fmt.Sprintf("<AdminSv1.RemoveTrendProfile> Delaying cache call for %v", adms.cfg.GeneralCfg().CachingDelay))
|
||||
time.Sleep(adms.cfg.GeneralCfg().CachingDelay)
|
||||
}
|
||||
//handle caching for TrendProfile
|
||||
if err := adms.CallCache(ctx, utils.IfaceAsString(args.APIOpts[utils.MetaCache]), tnt, utils.CacheTrendProfiles,
|
||||
utils.ConcatenatedKey(tnt, args.ID), utils.EmptyString, nil, args.APIOpts); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
//generate a loadID for CacheTrendProfiles and store it in database
|
||||
loadID := time.Now().UnixNano()
|
||||
if err := adms.dm.SetLoadIDs(ctx, map[string]int64{utils.CacheTrendProfiles: loadID}); err != nil {
|
||||
return utils.APIErrorHandler(err)
|
||||
}
|
||||
*reply = utils.OK
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -477,9 +477,13 @@ const CGRATES_CFG_JSON = `
|
||||
|
||||
// SQS
|
||||
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read
|
||||
// "sqsForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
|
||||
// "sqsSkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
|
||||
|
||||
// S3
|
||||
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read
|
||||
// "s3ForcePathStyle": false, // when true, force the request to use path-style addressing, i.e., http://s3.amazonaws.com/BUCKET/KEY. If false, (http://BUCKET.s3.amazonaws.com/KEY)
|
||||
// "s3SkipTlsVerify": false, // if enabled Http Client will accept any TLS certificate
|
||||
|
||||
// nats
|
||||
// "natsJetStream": false, // controls if the nats reader uses the JetStream
|
||||
|
||||
@@ -210,8 +210,12 @@ type EventExporterOpts struct {
|
||||
AWSSecret *string
|
||||
AWSToken *string
|
||||
SQSQueueID *string
|
||||
SQSForcePathStyle *bool
|
||||
SQSSkipTlsVerify *bool
|
||||
S3BucketID *string
|
||||
S3FolderPath *string
|
||||
S3ForcePathStyle *bool
|
||||
S3SkipTlsVerify *bool
|
||||
NATSJetStream *bool
|
||||
NATSSubject *string
|
||||
NATSJWTFile *string
|
||||
@@ -435,12 +439,24 @@ func (eeOpts *EventExporterOpts) loadFromJSONCfg(jsnCfg *EventExporterOptsJson)
|
||||
if jsnCfg.SQSQueueID != nil {
|
||||
eeOpts.SQSQueueID = jsnCfg.SQSQueueID
|
||||
}
|
||||
if jsnCfg.SQSForcePathStyle != nil {
|
||||
eeOpts.SQSForcePathStyle = jsnCfg.SQSForcePathStyle
|
||||
}
|
||||
if jsnCfg.SQSSkipTlsVerify != nil {
|
||||
eeOpts.SQSSkipTlsVerify = jsnCfg.SQSSkipTlsVerify
|
||||
}
|
||||
if jsnCfg.S3BucketID != nil {
|
||||
eeOpts.S3BucketID = jsnCfg.S3BucketID
|
||||
}
|
||||
if jsnCfg.S3FolderPath != nil {
|
||||
eeOpts.S3FolderPath = jsnCfg.S3FolderPath
|
||||
}
|
||||
if jsnCfg.S3ForcePathStyle != nil {
|
||||
eeOpts.S3ForcePathStyle = jsnCfg.S3ForcePathStyle
|
||||
}
|
||||
if jsnCfg.S3SkipTlsVerify != nil {
|
||||
eeOpts.S3SkipTlsVerify = jsnCfg.S3SkipTlsVerify
|
||||
}
|
||||
if jsnCfg.NATSJetStream != nil {
|
||||
eeOpts.NATSJetStream = jsnCfg.NATSJetStream
|
||||
}
|
||||
@@ -805,6 +821,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
|
||||
cln.SQSQueueID = new(string)
|
||||
*cln.SQSQueueID = *eeOpts.SQSQueueID
|
||||
}
|
||||
if eeOpts.SQSForcePathStyle != nil {
|
||||
cln.SQSForcePathStyle = new(bool)
|
||||
*cln.SQSForcePathStyle = *eeOpts.SQSForcePathStyle
|
||||
}
|
||||
if eeOpts.SQSSkipTlsVerify != nil {
|
||||
cln.SQSSkipTlsVerify = new(bool)
|
||||
*cln.SQSSkipTlsVerify = *eeOpts.SQSSkipTlsVerify
|
||||
}
|
||||
if eeOpts.S3BucketID != nil {
|
||||
cln.S3BucketID = new(string)
|
||||
*cln.S3BucketID = *eeOpts.S3BucketID
|
||||
@@ -813,6 +837,14 @@ func (eeOpts *EventExporterOpts) Clone() *EventExporterOpts {
|
||||
cln.S3FolderPath = new(string)
|
||||
*cln.S3FolderPath = *eeOpts.S3FolderPath
|
||||
}
|
||||
if eeOpts.S3ForcePathStyle != nil {
|
||||
cln.S3ForcePathStyle = new(bool)
|
||||
*cln.S3ForcePathStyle = *eeOpts.S3ForcePathStyle
|
||||
}
|
||||
if eeOpts.S3SkipTlsVerify != nil {
|
||||
cln.S3SkipTlsVerify = new(bool)
|
||||
*cln.S3SkipTlsVerify = *eeOpts.S3SkipTlsVerify
|
||||
}
|
||||
if eeOpts.NATSJetStream != nil {
|
||||
cln.NATSJetStream = new(bool)
|
||||
*cln.NATSJetStream = *eeOpts.NATSJetStream
|
||||
@@ -1119,12 +1151,24 @@ func (optsEes *EventExporterOpts) AsMapInterface() map[string]any {
|
||||
if optsEes.SQSQueueID != nil {
|
||||
opts[utils.SQSQueueID] = *optsEes.SQSQueueID
|
||||
}
|
||||
if optsEes.SQSForcePathStyle != nil {
|
||||
opts[utils.SQSForcePathStyle] = *optsEes.SQSForcePathStyle
|
||||
}
|
||||
if optsEes.SQSSkipTlsVerify != nil {
|
||||
opts[utils.SQSSkipTlsVerify] = *optsEes.SQSSkipTlsVerify
|
||||
}
|
||||
if optsEes.S3BucketID != nil {
|
||||
opts[utils.S3Bucket] = *optsEes.S3BucketID
|
||||
}
|
||||
if optsEes.S3FolderPath != nil {
|
||||
opts[utils.S3FolderPath] = *optsEes.S3FolderPath
|
||||
}
|
||||
if optsEes.S3ForcePathStyle != nil {
|
||||
opts[utils.S3ForcePathStyle] = *optsEes.S3ForcePathStyle
|
||||
}
|
||||
if optsEes.S3SkipTlsVerify != nil {
|
||||
opts[utils.S3SkipTlsVerify] = *optsEes.S3SkipTlsVerify
|
||||
}
|
||||
if optsEes.NATSJetStream != nil {
|
||||
opts[utils.NatsJetStream] = *optsEes.NATSJetStream
|
||||
}
|
||||
@@ -1231,8 +1275,12 @@ type EventExporterOptsJson struct {
|
||||
AWSSecret *string `json:"awsSecret"`
|
||||
AWSToken *string `json:"awsToken"`
|
||||
SQSQueueID *string `json:"sqsQueueID"`
|
||||
SQSForcePathStyle *bool `json:"sqsForcePathStyle"`
|
||||
SQSSkipTlsVerify *bool `json:"sqsSkipTlsVerify"`
|
||||
S3BucketID *string `json:"s3BucketID"`
|
||||
S3FolderPath *string `json:"s3FolderPath"`
|
||||
S3ForcePathStyle *bool `json:"s3ForcePathStyle"`
|
||||
S3SkipTlsVerify *bool `json:"s3SkipTlsVerify"`
|
||||
NATSJetStream *bool `json:"natsJetStream"`
|
||||
NATSSubject *string `json:"natsSubject"`
|
||||
NATSJWTFile *string `json:"natsJWTFile"`
|
||||
@@ -1539,6 +1587,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
|
||||
} else {
|
||||
d.SQSQueueID = nil
|
||||
}
|
||||
if v2.SQSForcePathStyle != nil {
|
||||
if v1.SQSForcePathStyle == nil ||
|
||||
*v1.SQSForcePathStyle != *v2.SQSForcePathStyle {
|
||||
d.SQSForcePathStyle = v2.SQSForcePathStyle
|
||||
}
|
||||
} else {
|
||||
d.SQSForcePathStyle = nil
|
||||
}
|
||||
if v2.SQSSkipTlsVerify != nil {
|
||||
if v1.SQSSkipTlsVerify == nil ||
|
||||
*v1.SQSSkipTlsVerify != *v2.SQSSkipTlsVerify {
|
||||
d.SQSSkipTlsVerify = v2.SQSSkipTlsVerify
|
||||
}
|
||||
} else {
|
||||
d.SQSSkipTlsVerify = nil
|
||||
}
|
||||
if v2.S3BucketID != nil {
|
||||
if v1.S3BucketID == nil ||
|
||||
*v1.S3BucketID != *v2.S3BucketID {
|
||||
@@ -1555,6 +1619,22 @@ func diffEventExporterOptsJsonCfg(d *EventExporterOptsJson, v1, v2 *EventExporte
|
||||
} else {
|
||||
d.S3FolderPath = nil
|
||||
}
|
||||
if v2.S3ForcePathStyle != nil {
|
||||
if v1.S3ForcePathStyle == nil ||
|
||||
*v1.S3ForcePathStyle != *v2.S3ForcePathStyle {
|
||||
d.S3ForcePathStyle = v2.S3ForcePathStyle
|
||||
}
|
||||
} else {
|
||||
d.S3ForcePathStyle = nil
|
||||
}
|
||||
if v2.S3SkipTlsVerify != nil {
|
||||
if v1.S3SkipTlsVerify == nil ||
|
||||
*v1.S3SkipTlsVerify != *v2.S3SkipTlsVerify {
|
||||
d.S3SkipTlsVerify = v2.S3SkipTlsVerify
|
||||
}
|
||||
} else {
|
||||
d.S3SkipTlsVerify = nil
|
||||
}
|
||||
if v2.NATSJetStream != nil {
|
||||
if v1.NATSJetStream == nil ||
|
||||
*v1.NATSJetStream != *v2.NATSJetStream {
|
||||
|
||||
@@ -1218,8 +1218,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1268,8 +1272,12 @@ func TestDiffEventExporterOptsJsonCfg(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1345,8 +1353,12 @@ func TestEventExporterOptsClone(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1397,8 +1409,12 @@ func TestEventExporterOptsClone(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1457,8 +1473,12 @@ func TestLoadFromJSONCfg(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1508,8 +1528,12 @@ func TestLoadFromJSONCfg(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1662,8 +1686,12 @@ func TestEEsAsMapInterface(t *testing.T) {
|
||||
AWSSecret: utils.StringPointer("aws_secret"),
|
||||
AWSToken: utils.StringPointer("aws_token"),
|
||||
SQSQueueID: utils.StringPointer("sqs_queue_id"),
|
||||
SQSForcePathStyle: utils.BoolPointer(true),
|
||||
SQSSkipTlsVerify: utils.BoolPointer(true),
|
||||
S3BucketID: utils.StringPointer("s3_bucket_id"),
|
||||
S3FolderPath: utils.StringPointer("s3_folder_path"),
|
||||
S3ForcePathStyle: utils.BoolPointer(true),
|
||||
S3SkipTlsVerify: utils.BoolPointer(true),
|
||||
NATSJetStream: utils.BoolPointer(false),
|
||||
NATSSubject: utils.StringPointer("ees_nats"),
|
||||
NATSJWTFile: utils.StringPointer("/path/to/jwt"),
|
||||
@@ -1725,6 +1753,8 @@ func TestEEsAsMapInterface(t *testing.T) {
|
||||
"rpcReplyTimeout": "2s",
|
||||
"s3BucketID": "s3_bucket_id",
|
||||
"s3FolderPath": "s3_folder_path",
|
||||
"s3ForcePathStyle": true,
|
||||
"s3SkipTlsVerify": true,
|
||||
"serviceMethod": "service_method",
|
||||
"sqlConnMaxLifetime": "2s",
|
||||
"sqlDBName": "cgrates",
|
||||
@@ -1732,6 +1762,8 @@ func TestEEsAsMapInterface(t *testing.T) {
|
||||
"sqlMaxOpenConns": 10,
|
||||
"sqlTableName": "cdrs",
|
||||
"sqsQueueID": "sqs_queue_id",
|
||||
"sqsForcePathStyle": true,
|
||||
"sqsSkipTlsVerify": true,
|
||||
"pgSSLMode": "sslm",
|
||||
"kafkaTLS": false,
|
||||
"connIDs": []string{"testID"},
|
||||
@@ -1798,8 +1830,12 @@ func TestEescfgNewEventExporterCfg(t *testing.T) {
|
||||
AWSSecret: &str,
|
||||
AWSToken: &str,
|
||||
SQSQueueID: &str,
|
||||
SQSForcePathStyle: &bl,
|
||||
SQSSkipTlsVerify: &bl,
|
||||
S3BucketID: &str,
|
||||
S3FolderPath: &str,
|
||||
S3ForcePathStyle: &bl,
|
||||
S3SkipTlsVerify: &bl,
|
||||
NATSJetStream: &bl,
|
||||
NATSSubject: &str,
|
||||
NATSJWTFile: &str,
|
||||
@@ -1902,8 +1938,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
|
||||
AWSSecret: &str,
|
||||
AWSToken: &str,
|
||||
SQSQueueID: &str,
|
||||
SQSForcePathStyle: &bl,
|
||||
SQSSkipTlsVerify: &bl,
|
||||
S3BucketID: &str,
|
||||
S3FolderPath: &str,
|
||||
S3ForcePathStyle: &bl,
|
||||
S3SkipTlsVerify: &bl,
|
||||
NATSJetStream: &bl,
|
||||
NATSSubject: &str,
|
||||
NATSJWTFile: &str,
|
||||
@@ -1976,8 +2016,12 @@ func TestEescfgloadFromJSONCfg(t *testing.T) {
|
||||
AWSSecret: &str,
|
||||
AWSToken: &str,
|
||||
SQSQueueID: &str,
|
||||
SQSForcePathStyle: &bl,
|
||||
SQSSkipTlsVerify: &bl,
|
||||
S3BucketID: &str,
|
||||
S3FolderPath: &str,
|
||||
S3ForcePathStyle: &bl,
|
||||
S3SkipTlsVerify: &bl,
|
||||
NATSJetStream: &bl,
|
||||
NATSSubject: &str,
|
||||
NATSJWTFile: &str,
|
||||
|
||||
@@ -58,6 +58,25 @@
|
||||
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "s3_nuantix_test_file",
|
||||
"type": "*s3JSONMap",
|
||||
"export_path": "s3.eu-central-1.amazonaws.com",
|
||||
"opts": {
|
||||
"awsRegion": "eu-central-1",
|
||||
"awsKey": "access key ID",
|
||||
"awsSecret": "secret access key",
|
||||
"s3BucketID": "cgrates-cdrs",
|
||||
"s3ForcePathStyle": true,
|
||||
"s3SkipTlsVerify": true
|
||||
},
|
||||
"attempts": 1,
|
||||
"failed_posts_dir": "/var/spool/cgrates/failed_posts2",
|
||||
"synchronous": true,
|
||||
"fields":[
|
||||
{"tag": "RequiredTemplate","type": "*template", "value": "requiredFields"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": "amqpv1_test_file",
|
||||
"type": "*amqpv1JSONMap",
|
||||
|
||||
38
ees/s3.go
38
ees/s3.go
@@ -20,7 +20,9 @@ package ees
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
@@ -46,14 +48,16 @@ func NewS3EE(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *S3EE {
|
||||
|
||||
// S3EE is a s3 poster
|
||||
type S3EE struct {
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
bucket string
|
||||
folderPath string
|
||||
session *session.Session
|
||||
up *s3manager.Uploader
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
bucket string
|
||||
folderPath string
|
||||
forcePathStyle bool
|
||||
skipTlsVerify bool
|
||||
session *session.Session
|
||||
up *s3manager.Uploader
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
em *utils.ExporterMetrics
|
||||
@@ -82,6 +86,12 @@ func (pstr *S3EE) parseOpts(opts *config.EventExporterOpts) {
|
||||
if opts.AWSToken != nil {
|
||||
pstr.awsToken = *opts.AWSToken
|
||||
}
|
||||
if opts.S3ForcePathStyle != nil {
|
||||
pstr.forcePathStyle = *opts.S3ForcePathStyle
|
||||
}
|
||||
if opts.S3SkipTlsVerify != nil {
|
||||
pstr.skipTlsVerify = *opts.S3SkipTlsVerify
|
||||
}
|
||||
}
|
||||
|
||||
func (pstr *S3EE) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
@@ -98,6 +108,18 @@ func (pstr *S3EE) Connect() (err error) {
|
||||
len(pstr.awsKey) != 0 {
|
||||
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
|
||||
}
|
||||
if pstr.forcePathStyle {
|
||||
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
|
||||
}
|
||||
if pstr.skipTlsVerify {
|
||||
cfg.HTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
pstr.session, err = session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
Config: cfg,
|
||||
|
||||
@@ -175,3 +175,112 @@ package ees
|
||||
// t.Errorf("Expected: %q, received: %q", expected, rply)
|
||||
// }
|
||||
// }
|
||||
|
||||
// var (
|
||||
// runS3NuantixTest = flag.Bool("nuantix", false, "Run the integration test for the S3 exporter from Nuantix")
|
||||
|
||||
// sTestsS3Nuantix = []func(t *testing.T){
|
||||
// testS3LoadConfig,
|
||||
// testS3ResetDataDB,
|
||||
// testS3ResetStorDb,
|
||||
// testS3StartEngine,
|
||||
// testS3RPCConn,
|
||||
// testS3NuantixExportEvent,
|
||||
// testS3NuantixVerifyExport,
|
||||
// testStopCgrEngine,
|
||||
// }
|
||||
// )
|
||||
|
||||
// func TestS3ExportNuantix(t *testing.T) {
|
||||
// if !*runS3NuantixTest {
|
||||
// t.SkipNow()
|
||||
// }
|
||||
// s3ConfDir = "ees_cloud"
|
||||
// for _, stest := range sTestsS3Nuantix {
|
||||
// t.Run(s3ConfDir, stest)
|
||||
// }
|
||||
// }
|
||||
|
||||
// func testS3NuantixExportEvent(t *testing.T) {
|
||||
// cgrID = utils.Sha1("abcdefg", time.Unix(1383813745, 0).UTC().String())
|
||||
// t.Log(cgrID)
|
||||
// ev := &engine.CGREventWithEeIDs{
|
||||
// EeIDs: []string{"s3_nuantix_test_file"},
|
||||
// CGREvent: &utils.CGREvent{
|
||||
// Tenant: "cgrates.org",
|
||||
// ID: "dataEvent",
|
||||
// Time: utils.TimePointer(time.Now()),
|
||||
// Event: map[string]any{
|
||||
// utils.CGRID: cgrID,
|
||||
// utils.ToR: utils.MetaData,
|
||||
// utils.OriginID: "abcdefg",
|
||||
// utils.OriginHost: "192.168.1.1",
|
||||
// utils.RequestType: utils.MetaRated,
|
||||
// utils.Tenant: "AnotherTenant",
|
||||
// utils.Category: "call", //for data CDR use different Tenant
|
||||
// utils.AccountField: "1001",
|
||||
// utils.Subject: "1001",
|
||||
// utils.Destination: "1002",
|
||||
// utils.SetupTime: time.Unix(1383813745, 0).UTC(),
|
||||
// utils.AnswerTime: time.Unix(1383813746, 0).UTC(),
|
||||
// utils.Usage: 10 * time.Nanosecond,
|
||||
// utils.RunID: utils.MetaDefault,
|
||||
// utils.Cost: 0.012,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
|
||||
// var reply map[string]utils.MapStorage
|
||||
// if err := s3RPC.Call(context.Background(), utils.EeSv1ProcessEvent, ev, &reply); err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// time.Sleep(2 * time.Second)
|
||||
// }
|
||||
|
||||
// func testS3NuantixVerifyExport(t *testing.T) {
|
||||
// endpoint := "s3.eu-central-1.amazonaws.com"
|
||||
// region := "eu-central-1"
|
||||
// qname := "cgrates-cdrs"
|
||||
|
||||
// key := fmt.Sprintf("%s/%s:%s.json", "", cgrID, utils.MetaDefault)
|
||||
|
||||
// var sess *session.Session
|
||||
// cfg := aws.Config{Endpoint: aws.String(endpoint)}
|
||||
// cfg.Region = aws.String(region)
|
||||
|
||||
// cfg.Credentials = credentials.NewStaticCredentials(awsKey, awsSecret, "")
|
||||
// var err error
|
||||
// cfg.S3ForcePathStyle = aws.Bool(true)
|
||||
// cfg.HTTPClient = &http.Client{
|
||||
// Transport: &http.Transport{
|
||||
// TLSClientConfig: &tls.Config{
|
||||
// InsecureSkipVerify: true,
|
||||
// },
|
||||
// },
|
||||
// }
|
||||
// sess, err = session.NewSessionWithOptions(
|
||||
// session.Options{
|
||||
// Config: cfg,
|
||||
// },
|
||||
// )
|
||||
// if err != nil {
|
||||
// t.Error(err)
|
||||
// }
|
||||
// s3Clnt := s3.New(sess)
|
||||
// s3Clnt.DeleteObject(&s3.DeleteObjectInput{})
|
||||
// file := aws.NewWriteAtBuffer([]byte{})
|
||||
// svc := s3manager.NewDownloader(sess)
|
||||
|
||||
// if _, err = svc.Download(file,
|
||||
// &s3.GetObjectInput{
|
||||
// Bucket: aws.String(qname),
|
||||
// Key: aws.String(key),
|
||||
// }); err != nil {
|
||||
// t.Fatalf("Unable to download item %v", err)
|
||||
// }
|
||||
|
||||
// expected := `{"Account":"1001","CGRID":"5a5ff7c40039976e1c2bd303dee45983267f6ed2","Category":"call","Destination":"1002","OriginID":"abcdefg","RequestType":"*rated","RunID":"*default","Subject":"1001","Tenant":"AnotherTenant","ToR":"*data"}`
|
||||
// if rply := string(file.Bytes()); rply != expected {
|
||||
// t.Errorf("Expected: %q, received: %q", expected, rply)
|
||||
// }
|
||||
// }
|
||||
|
||||
38
ees/sqs.go
38
ees/sqs.go
@@ -19,6 +19,8 @@ along with this program. If not, see <https://www.gnu.org/licenses/>
|
||||
package ees
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
@@ -44,14 +46,16 @@ func NewSQSee(cfg *config.EventExporterCfg, em *utils.ExporterMetrics) *SQSee {
|
||||
|
||||
// SQSee is a poster for sqs
|
||||
type SQSee struct {
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
queueURL *string
|
||||
queueID string
|
||||
session *session.Session
|
||||
svc *sqs.SQS
|
||||
awsRegion string
|
||||
awsID string
|
||||
awsKey string
|
||||
awsToken string
|
||||
queueURL *string
|
||||
queueID string
|
||||
forcePathStyle bool
|
||||
skipTlsVerify bool
|
||||
session *session.Session
|
||||
svc *sqs.SQS
|
||||
|
||||
cfg *config.EventExporterCfg
|
||||
em *utils.ExporterMetrics
|
||||
@@ -77,6 +81,12 @@ func (pstr *SQSee) parseOpts(opts *config.EventExporterOpts) {
|
||||
if opts.AWSToken != nil {
|
||||
pstr.awsToken = *opts.AWSToken
|
||||
}
|
||||
if opts.SQSForcePathStyle != nil {
|
||||
pstr.forcePathStyle = *opts.SQSForcePathStyle
|
||||
}
|
||||
if opts.SQSSkipTlsVerify != nil {
|
||||
pstr.skipTlsVerify = *opts.SQSSkipTlsVerify
|
||||
}
|
||||
}
|
||||
|
||||
func (pstr *SQSee) Cfg() *config.EventExporterCfg { return pstr.cfg }
|
||||
@@ -93,6 +103,18 @@ func (pstr *SQSee) Connect() (err error) {
|
||||
len(pstr.awsKey) != 0 {
|
||||
cfg.Credentials = credentials.NewStaticCredentials(pstr.awsID, pstr.awsKey, pstr.awsToken)
|
||||
}
|
||||
if pstr.forcePathStyle {
|
||||
cfg.S3ForcePathStyle = aws.Bool(true) // Required for custom S3-compatible endpoints
|
||||
}
|
||||
if pstr.skipTlsVerify {
|
||||
cfg.HTTPClient = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true, // Equivalent to verify=False for self-signed certificates
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
pstr.session, err = session.NewSessionWithOptions(
|
||||
session.Options{
|
||||
Config: cfg,
|
||||
|
||||
@@ -1589,7 +1589,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
|
||||
if dm == nil {
|
||||
return utils.ErrNoDatabaseConn
|
||||
}
|
||||
oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, true, false, utils.NonTransactional)
|
||||
oldSgs, err := dm.GetRankingProfile(ctx, tenant, id, false, false, utils.NonTransactional)
|
||||
if err != nil && err != utils.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
@@ -1613,7 +1613,7 @@ func (dm *DataManager) RemoveRankingProfile(ctx *context.Context, tenant, id str
|
||||
APIOpts: utils.GenerateDBItemOpts(itm.APIKey, itm.RouteID,
|
||||
dbCfg.RplCache, utils.EmptyString)}, itm)
|
||||
}
|
||||
return
|
||||
return dm.RemoveRanking(ctx, tenant, id)
|
||||
}
|
||||
func (dm *DataManager) GetRanking(ctx *context.Context, tenant, id string, cacheRead, cacheWrite bool, transactionID string) (rn *utils.Ranking, err error) {
|
||||
tntID := utils.ConcatenatedKey(tenant, id)
|
||||
|
||||
@@ -26,6 +26,8 @@ import (
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
)
|
||||
|
||||
func TestLoaderCSV(t *testing.T) {
|
||||
@@ -85,6 +87,14 @@ func TestLoaderCSV(t *testing.T) {
|
||||
#Tenant[0],Id[1],FilterIDs[2],Weights[3],TTL[4],Limit[5],AllocationMessage[6],Blocker[7],Stored[8],Thresholds[9]
|
||||
cgrates.org,ResGroup21,*string:~*req.Account:1001,;10,1s,2,call,true,true,
|
||||
cgrates.org,ResGroup22,*string:~*req.Account:dan,;10,3600s,2,premium_call,true,true,
|
||||
`
|
||||
IPCSVContent := `
|
||||
#Tenant[0],ID[1],FilterIDs[2],Weights[3],TTL[4],Stored[5],PoolID[6],PoolFilterIDs[7],PoolType[8],PoolRange[9],PoolStrategy[10],PoolMessage[11],PoolWeights[12],PoolBlockers[13]
|
||||
cgrates.org,IPs1,*string:~*req.Account:1001,;10,1s,true,,,,,,,,
|
||||
cgrates.org,IPs1,,,,,POOL1,*string:~*req.Destination:2001,*ipv4,172.16.1.1/32,*ascending,alloc_success,;15,
|
||||
cgrates.org,IPs1,,,,,POOL1,,,,,,*exists:~*req.NeedMoreWeight:;50,*exists:~*req.ShouldBlock:;true
|
||||
cgrates.org,IPs1,,,,,POOL2,*string:~*req.Destination:2002,*ipv4,192.168.122.1/32,*random,alloc_new,;25,;true
|
||||
cgrates.org,IPs2,*string:~*req.Account:1002,;20,2s,false,POOL1,*string:~*req.Destination:3001,*ipv4,127.0.0.1/32,*descending,alloc_msg,;35,;true
|
||||
`
|
||||
StatsCSVContent := `
|
||||
#Tenant[0],Id[1],FilterIDs[2],Weights[3],Blockers[4],QueueLength[5],TTL[6],MinItems[7],Stored[8],ThresholdIDs[9],Metrics[10],MetricFilterIDs[11],MetricBlockers[12]
|
||||
@@ -171,7 +181,7 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
|
||||
}
|
||||
dbCM := NewDBConnManager(map[string]DataDB{utils.MetaDefault: idb}, config.CgrConfig().DbCfg())
|
||||
csvr, err := NewTpReader(dbCM, NewStringCSVStorage(utils.CSVSep,
|
||||
ResourcesCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent,
|
||||
ResourcesCSVContent, IPCSVContent, StatsCSVContent, RankingsCSVContent, TrendsCSVContent, ThresholdsCSVContent, FiltersCSVContent,
|
||||
RoutesCSVContent, AttributesCSVContent, ChargersCSVContent, DispatcherCSVContent,
|
||||
DispatcherHostCSVContent, RateProfileCSVContent, ActionProfileCSVContent, AccountCSVContent), testTPID, "", nil, nil)
|
||||
if err != nil {
|
||||
@@ -183,6 +193,9 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
|
||||
if err := csvr.LoadResourceProfiles(); err != nil {
|
||||
log.Print("error in LoadResourceProfiles:", err)
|
||||
}
|
||||
if err := csvr.LoadIPs(); err != nil {
|
||||
log.Print("error in LoadIPProfiles:")
|
||||
}
|
||||
if err := csvr.LoadStats(); err != nil {
|
||||
log.Print("error in LoadStats:", err)
|
||||
}
|
||||
@@ -246,6 +259,85 @@ cgrates.org,1001,,,,,VoiceBalance,,;10,*string:~*req.Destination:1002;true;;fals
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("load IPProfiles", func(t *testing.T) {
|
||||
eIPsProfiles := map[utils.TenantID]*utils.TPIPProfile{
|
||||
{Tenant: "cgrates.org", ID: "IPs1"}: {
|
||||
TPid: "LoaderCSVTests",
|
||||
Tenant: "cgrates.org",
|
||||
ID: "IPs1",
|
||||
FilterIDs: []string{"*string:~*req.Account:1001"},
|
||||
Weights: ";10",
|
||||
TTL: "1s",
|
||||
Stored: true,
|
||||
Pools: []*utils.TPIPPool{
|
||||
{
|
||||
ID: "POOL1",
|
||||
FilterIDs: []string{"*string:~*req.Destination:2001"},
|
||||
Type: "*ipv4",
|
||||
Range: "172.16.1.1/32",
|
||||
Strategy: "*ascending",
|
||||
Message: "alloc_success",
|
||||
Weights: ";15",
|
||||
Blockers: "",
|
||||
},
|
||||
{
|
||||
ID: "POOL1",
|
||||
FilterIDs: nil,
|
||||
Type: "",
|
||||
Range: "",
|
||||
Strategy: "",
|
||||
Message: "",
|
||||
Weights: "*exists:~*req.NeedMoreWeight:;50",
|
||||
Blockers: "*exists:~*req.ShouldBlock:;true",
|
||||
},
|
||||
{
|
||||
ID: "POOL2",
|
||||
FilterIDs: []string{"*string:~*req.Destination:2002"},
|
||||
Type: "*ipv4",
|
||||
Range: "192.168.122.1/32",
|
||||
Strategy: "*random",
|
||||
Message: "alloc_new",
|
||||
Weights: ";25",
|
||||
Blockers: ";true",
|
||||
},
|
||||
},
|
||||
},
|
||||
{Tenant: "cgrates.org", ID: "IPs2"}: {
|
||||
TPid: "LoaderCSVTests",
|
||||
Tenant: "cgrates.org",
|
||||
ID: "IPs2",
|
||||
FilterIDs: []string{"*string:~*req.Account:1002"},
|
||||
Weights: ";20",
|
||||
TTL: "2s",
|
||||
Stored: false,
|
||||
Pools: []*utils.TPIPPool{
|
||||
{
|
||||
ID: "POOL1",
|
||||
FilterIDs: []string{"*string:~*req.Destination:3001"},
|
||||
Type: "*ipv4",
|
||||
Range: "127.0.0.1/32",
|
||||
Strategy: "*descending",
|
||||
Message: "alloc_msg",
|
||||
Weights: ";35",
|
||||
Blockers: ";true",
|
||||
},
|
||||
},
|
||||
}}
|
||||
|
||||
if len(csvr.ipProfiles) != len(eIPsProfiles) {
|
||||
t.Errorf("Failed to load IPProfiles: %s", utils.ToIJSON(csvr.ipProfiles))
|
||||
}
|
||||
for key, val := range eIPsProfiles {
|
||||
if diff := cmp.Diff(val, csvr.ipProfiles[key], cmpopts.SortSlices(func(a, b *utils.TPIPPool) bool {
|
||||
if a.ID != b.ID {
|
||||
return a.ID < b.ID
|
||||
}
|
||||
return a.Weights < b.Weights
|
||||
})); diff != "" {
|
||||
t.Errorf("IPProfile mismatch (-want +got):\n%s", diff)
|
||||
}
|
||||
}
|
||||
})
|
||||
t.Run("load StatProfiles", func(t *testing.T) {
|
||||
eStats := map[utils.TenantID]*utils.TPStatProfile{
|
||||
{Tenant: "cgrates.org", ID: "TestStats"}: {
|
||||
|
||||
@@ -310,6 +310,246 @@ func ResourceProfileToAPI(rp *utils.ResourceProfile) (tpRL *utils.TPResourceProf
|
||||
return
|
||||
}
|
||||
|
||||
type IPMdls []*IPMdl
|
||||
|
||||
// CSVHeader return the header for csv fields as a slice of string
|
||||
func (tps IPMdls) CSVHeader() []string {
|
||||
return []string{
|
||||
"#" + utils.Tenant,
|
||||
utils.ID,
|
||||
utils.FilterIDs,
|
||||
utils.Weights,
|
||||
utils.TTL,
|
||||
utils.Stored,
|
||||
utils.PoolID,
|
||||
utils.PoolFilterIDs,
|
||||
utils.PoolType,
|
||||
utils.PoolRange,
|
||||
utils.PoolStrategy,
|
||||
utils.PoolMessage,
|
||||
utils.PoolWeights,
|
||||
utils.PoolBlockers,
|
||||
}
|
||||
}
|
||||
|
||||
func (tps IPMdls) AsTPIPs() []*utils.TPIPProfile {
|
||||
filterMap := make(map[string]utils.StringSet)
|
||||
mst := make(map[string]*utils.TPIPProfile)
|
||||
poolMap := make(map[string]map[string]*utils.TPIPPool)
|
||||
for _, mdl := range tps {
|
||||
tenID := (&utils.TenantID{Tenant: mdl.Tenant, ID: mdl.ID}).TenantID()
|
||||
tpip, found := mst[tenID]
|
||||
if !found {
|
||||
tpip = &utils.TPIPProfile{
|
||||
TPid: mdl.Tpid,
|
||||
Tenant: mdl.Tenant,
|
||||
ID: mdl.ID,
|
||||
Stored: mdl.Stored,
|
||||
}
|
||||
}
|
||||
// Handle Pool
|
||||
if mdl.PoolID != utils.EmptyString {
|
||||
if _, has := poolMap[tenID]; !has {
|
||||
poolMap[tenID] = make(map[string]*utils.TPIPPool)
|
||||
}
|
||||
poolID := mdl.PoolID
|
||||
if mdl.PoolFilterIDs != utils.EmptyString {
|
||||
poolID = utils.ConcatenatedKey(poolID,
|
||||
utils.NewStringSet(strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)).Sha1())
|
||||
}
|
||||
pool, found := poolMap[tenID][poolID]
|
||||
if !found {
|
||||
pool = &utils.TPIPPool{
|
||||
ID: mdl.PoolID,
|
||||
Type: mdl.PoolType,
|
||||
Range: mdl.PoolRange,
|
||||
Strategy: mdl.PoolStrategy,
|
||||
Message: mdl.PoolMessage,
|
||||
Weights: mdl.PoolWeights,
|
||||
Blockers: mdl.PoolBlockers,
|
||||
}
|
||||
}
|
||||
if mdl.PoolFilterIDs != utils.EmptyString {
|
||||
poolFilterSplit := strings.Split(mdl.PoolFilterIDs, utils.InfieldSep)
|
||||
pool.FilterIDs = append(pool.FilterIDs, poolFilterSplit...)
|
||||
}
|
||||
poolMap[tenID][poolID] = pool
|
||||
}
|
||||
// Profile-level fields
|
||||
if mdl.TTL != utils.EmptyString {
|
||||
tpip.TTL = mdl.TTL
|
||||
}
|
||||
if mdl.Weights != "" {
|
||||
tpip.Weights = mdl.Weights
|
||||
}
|
||||
if mdl.Stored {
|
||||
tpip.Stored = mdl.Stored
|
||||
}
|
||||
|
||||
if mdl.FilterIDs != utils.EmptyString {
|
||||
if _, has := filterMap[tenID]; !has {
|
||||
filterMap[tenID] = make(utils.StringSet)
|
||||
}
|
||||
filterMap[tenID].AddSlice(strings.Split(mdl.FilterIDs, utils.InfieldSep))
|
||||
}
|
||||
mst[tenID] = tpip
|
||||
}
|
||||
// Build result with Pools
|
||||
result := make([]*utils.TPIPProfile, len(mst))
|
||||
i := 0
|
||||
for tntID, tpip := range mst {
|
||||
result[i] = tpip
|
||||
for _, poolData := range poolMap[tntID] {
|
||||
result[i].Pools = append(result[i].Pools, poolData)
|
||||
}
|
||||
result[i].FilterIDs = filterMap[tntID].AsSlice()
|
||||
i++
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func APItoModelIP(tp *utils.TPIPProfile) IPMdls {
|
||||
if tp == nil {
|
||||
return nil
|
||||
}
|
||||
var mdls IPMdls
|
||||
// Handle case with no pools
|
||||
if len(tp.Pools) == 0 {
|
||||
mdl := &IPMdl{
|
||||
Tpid: tp.TPid,
|
||||
Tenant: tp.Tenant,
|
||||
ID: tp.ID,
|
||||
TTL: tp.TTL,
|
||||
Stored: tp.Stored,
|
||||
Weights: tp.Weights,
|
||||
}
|
||||
for i, val := range tp.FilterIDs {
|
||||
if i != 0 {
|
||||
mdl.FilterIDs += utils.InfieldSep
|
||||
}
|
||||
mdl.FilterIDs += val
|
||||
}
|
||||
mdls = append(mdls, mdl)
|
||||
return mdls
|
||||
}
|
||||
for i, pool := range tp.Pools {
|
||||
mdl := &IPMdl{
|
||||
Tpid: tp.TPid,
|
||||
Tenant: tp.Tenant,
|
||||
ID: tp.ID,
|
||||
Stored: tp.Stored,
|
||||
}
|
||||
if i == 0 {
|
||||
// Profile-level fields only on first row
|
||||
mdl.TTL = tp.TTL
|
||||
mdl.Weights = tp.Weights
|
||||
for j, val := range tp.FilterIDs {
|
||||
if j != 0 {
|
||||
mdl.FilterIDs += utils.InfieldSep
|
||||
}
|
||||
mdl.FilterIDs += val
|
||||
}
|
||||
}
|
||||
// Pool fields on every row
|
||||
mdl.PoolID = pool.ID
|
||||
mdl.PoolType = pool.Type
|
||||
mdl.PoolRange = pool.Range
|
||||
mdl.PoolStrategy = pool.Strategy
|
||||
mdl.PoolMessage = pool.Message
|
||||
mdl.PoolWeights = pool.Weights
|
||||
mdl.PoolBlockers = pool.Blockers
|
||||
for j, val := range pool.FilterIDs {
|
||||
if j != 0 {
|
||||
mdl.PoolFilterIDs += utils.InfieldSep
|
||||
}
|
||||
mdl.PoolFilterIDs += val
|
||||
}
|
||||
mdls = append(mdls, mdl)
|
||||
}
|
||||
return mdls
|
||||
}
|
||||
|
||||
func APItoIP(tp *utils.TPIPProfile) (*utils.IPProfile, error) {
|
||||
ipp := &utils.IPProfile{
|
||||
Tenant: tp.Tenant,
|
||||
ID: tp.ID,
|
||||
Stored: tp.Stored,
|
||||
FilterIDs: make([]string, len(tp.FilterIDs)),
|
||||
Pools: make([]*utils.IPPool, len(tp.Pools)),
|
||||
}
|
||||
if tp.Weights != utils.EmptyString {
|
||||
var err error
|
||||
ipp.Weights, err = utils.NewDynamicWeightsFromString(tp.Weights, utils.InfieldSep, utils.ANDSep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if tp.TTL != utils.EmptyString {
|
||||
var err error
|
||||
if ipp.TTL, err = utils.ParseDurationWithNanosecs(tp.TTL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
copy(ipp.FilterIDs, tp.FilterIDs)
|
||||
|
||||
for i, pool := range tp.Pools {
|
||||
ipp.Pools[i] = &utils.IPPool{
|
||||
ID: pool.ID,
|
||||
FilterIDs: pool.FilterIDs,
|
||||
Type: pool.Type,
|
||||
Range: pool.Range,
|
||||
Strategy: pool.Strategy,
|
||||
Message: pool.Message,
|
||||
}
|
||||
if pool.Weights != utils.EmptyString {
|
||||
var err error
|
||||
ipp.Pools[i].Weights, err = utils.NewDynamicWeightsFromString(pool.Weights, utils.InfieldSep, utils.ANDSep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if pool.Blockers != utils.EmptyString {
|
||||
var err error
|
||||
ipp.Pools[i].Blockers, err = utils.NewDynamicBlockersFromString(pool.Blockers, utils.InfieldSep, utils.ANDSep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return ipp, nil
|
||||
}
|
||||
|
||||
func IPProfileToAPI(ipp *utils.IPProfile) *utils.TPIPProfile {
|
||||
tp := &utils.TPIPProfile{
|
||||
Tenant: ipp.Tenant,
|
||||
ID: ipp.ID,
|
||||
FilterIDs: make([]string, len(ipp.FilterIDs)),
|
||||
Weights: ipp.Weights.String(utils.InfieldSep, utils.ANDSep),
|
||||
Stored: ipp.Stored,
|
||||
Pools: make([]*utils.TPIPPool, len(ipp.Pools)),
|
||||
}
|
||||
if ipp.TTL != time.Duration(0) {
|
||||
tp.TTL = ipp.TTL.String()
|
||||
}
|
||||
|
||||
copy(tp.FilterIDs, ipp.FilterIDs)
|
||||
|
||||
for i, pool := range ipp.Pools {
|
||||
tp.Pools[i] = &utils.TPIPPool{
|
||||
ID: pool.ID,
|
||||
FilterIDs: pool.FilterIDs,
|
||||
Type: pool.Type,
|
||||
Range: pool.Range,
|
||||
Blockers: pool.Blockers.String(utils.InfieldSep, utils.ANDSep),
|
||||
Weights: pool.Weights.String(utils.InfieldSep, utils.ANDSep),
|
||||
Strategy: pool.Strategy,
|
||||
Message: pool.Message,
|
||||
}
|
||||
}
|
||||
return tp
|
||||
}
|
||||
|
||||
type StatMdls []*StatMdl
|
||||
|
||||
// CSVHeader return the header for csv fields as a slice of string
|
||||
|
||||
@@ -47,6 +47,30 @@ func (ResourceMdl) TableName() string {
|
||||
return utils.TBLTPResources
|
||||
}
|
||||
|
||||
type IPMdl struct {
|
||||
PK uint `gorm:"primary_key"`
|
||||
Tpid string
|
||||
Tenant string `index:"0" re:".*"`
|
||||
ID string `index:"1" re:".*"`
|
||||
FilterIDs string `index:"2" re:".*"`
|
||||
Weights string `index:"3" re:".*"`
|
||||
TTL string `index:"4" re:".*"`
|
||||
Stored bool `index:"5" re:".*"`
|
||||
PoolID string `index:"6" re:".*"`
|
||||
PoolFilterIDs string `index:"7" re:".*"`
|
||||
PoolType string `index:"8" re:".*"`
|
||||
PoolRange string `index:"9" re:".*"`
|
||||
PoolStrategy string `index:"10" re:".*"`
|
||||
PoolMessage string `index:"11" re:".*"`
|
||||
PoolWeights string `index:"12" re:".*"`
|
||||
PoolBlockers string `index:"13" re:".*"`
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
func (IPMdl) TableName() string {
|
||||
return utils.TBLTPIPs
|
||||
}
|
||||
|
||||
type StatMdl struct {
|
||||
PK uint `gorm:"primary_key"`
|
||||
Tpid string
|
||||
|
||||
@@ -45,6 +45,7 @@ type CSVStorage struct {
|
||||
generator func() csvReaderCloser
|
||||
// file names
|
||||
resProfilesFn []string
|
||||
ipsFn []string
|
||||
statsFn []string
|
||||
trendsFn []string
|
||||
rankingsFn []string
|
||||
@@ -60,13 +61,14 @@ type CSVStorage struct {
|
||||
|
||||
// NewCSVStorage creates a CSV storege that takes the data from the paths specified
|
||||
func NewCSVStorage(sep rune,
|
||||
resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
attributeProfilesFn, chargerProfilesFn,
|
||||
rateProfilesFn, actionProfilesFn, accountsFn []string) *CSVStorage {
|
||||
return &CSVStorage{
|
||||
sep: sep,
|
||||
generator: NewCsvFile,
|
||||
resProfilesFn: resProfilesFn,
|
||||
ipsFn: ipsFn,
|
||||
statsFn: statsFn,
|
||||
rankingsFn: rankingsFn,
|
||||
trendsFn: trendsFn,
|
||||
@@ -88,6 +90,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
|
||||
return nil, fmt.Errorf("could not retrieve any folders from %q: %v", dataPath, err)
|
||||
}
|
||||
resourcesPaths := appendName(allFoldersPath, utils.ResourcesCsv)
|
||||
ipsPaths := appendName(allFoldersPath, utils.IPsCsv)
|
||||
statsPaths := appendName(allFoldersPath, utils.StatsCsv)
|
||||
rankingsPaths := appendName(allFoldersPath, utils.RankingsCsv)
|
||||
trendsPaths := appendName(allFoldersPath, utils.TrendsCsv)
|
||||
@@ -101,6 +104,7 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
|
||||
accountsFn := appendName(allFoldersPath, utils.AccountsCsv)
|
||||
return NewCSVStorage(sep,
|
||||
resourcesPaths,
|
||||
ipsPaths,
|
||||
statsPaths,
|
||||
rankingsPaths,
|
||||
trendsPaths,
|
||||
@@ -117,11 +121,11 @@ func NewFileCSVStorage(sep rune, dataPath string) (*CSVStorage, error) {
|
||||
|
||||
// NewStringCSVStorage creates a csv storage from strings
|
||||
func NewStringCSVStorage(sep rune,
|
||||
resProfilesFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
resProfilesFn, ipsFn, statsFn, rankingsFn, trendsFn, thresholdsFn, filterFn, routeProfilesFn,
|
||||
attributeProfilesFn, chargerProfilesFn, dispatcherProfilesFn, dispatcherHostsFn,
|
||||
rateProfilesFn, actionProfilesFn, accountsFn string) *CSVStorage {
|
||||
c := NewCSVStorage(sep,
|
||||
[]string{resProfilesFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn},
|
||||
[]string{resProfilesFn}, []string{ipsFn}, []string{statsFn}, []string{rankingsFn}, []string{trendsFn}, []string{thresholdsFn}, []string{filterFn},
|
||||
[]string{routeProfilesFn}, []string{attributeProfilesFn}, []string{chargerProfilesFn},
|
||||
[]string{rateProfilesFn},
|
||||
[]string{actionProfilesFn}, []string{accountsFn})
|
||||
@@ -147,6 +151,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
|
||||
}
|
||||
c := NewCSVStorage(sep,
|
||||
getIfExist(utils.ResourcesStr),
|
||||
getIfExist(utils.IPs),
|
||||
getIfExist(utils.Stats),
|
||||
getIfExist(utils.Rankings),
|
||||
getIfExist(utils.Trends),
|
||||
@@ -170,6 +175,7 @@ func NewGoogleCSVStorage(sep rune, spreadsheetID string) (*CSVStorage, error) {
|
||||
// NewURLCSVStorage returns a CSVStorage that can parse URLs
|
||||
func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
var resourcesPaths []string
|
||||
var ipsPaths []string
|
||||
var statsPaths []string
|
||||
var rankingsPaths []string
|
||||
var trendsPaths []string
|
||||
@@ -185,6 +191,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
for _, baseURL := range strings.Split(dataPath, utils.InfieldSep) {
|
||||
if !strings.HasSuffix(baseURL, utils.CSVSuffix) {
|
||||
resourcesPaths = append(resourcesPaths, joinURL(baseURL, utils.ResourcesCsv))
|
||||
ipsPaths = append(ipsPaths, joinURL(baseURL, utils.IPsCsv))
|
||||
statsPaths = append(statsPaths, joinURL(baseURL, utils.StatsCsv))
|
||||
rankingsPaths = append(rankingsPaths, joinURL(baseURL, utils.RankingsCsv))
|
||||
trendsPaths = append(trendsPaths, joinURL(baseURL, utils.TrendsCsv))
|
||||
@@ -201,6 +208,8 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
switch {
|
||||
case strings.HasSuffix(baseURL, utils.ResourcesCsv):
|
||||
resourcesPaths = append(resourcesPaths, baseURL)
|
||||
case strings.HasSuffix(baseURL, utils.IPsCsv):
|
||||
ipsPaths = append(ipsPaths, baseURL)
|
||||
case strings.HasSuffix(baseURL, utils.StatsCsv):
|
||||
statsPaths = append(statsPaths, baseURL)
|
||||
case strings.HasSuffix(baseURL, utils.RankingsCsv):
|
||||
@@ -229,6 +238,7 @@ func NewURLCSVStorage(sep rune, dataPath string) *CSVStorage {
|
||||
|
||||
c := NewCSVStorage(sep,
|
||||
resourcesPaths,
|
||||
ipsPaths,
|
||||
statsPaths,
|
||||
rankingsPaths,
|
||||
trendsPaths,
|
||||
@@ -320,6 +330,18 @@ func (csvs *CSVStorage) GetTPResources(tpid, tenant, id string) ([]*utils.TPReso
|
||||
return tpResLimits.AsTPResources(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTPIPs(tpid, tenant, id string) ([]*utils.TPIPProfile, error) {
|
||||
var tpIPS IPMdls
|
||||
if err := csvs.proccesData(IPMdl{}, csvs.ipsFn, func(tp any) {
|
||||
tpIP := tp.(IPMdl)
|
||||
tpIP.Tpid = tpid
|
||||
tpIPS = append(tpIPS, &tpIP)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tpIPS.AsTPIPs(), nil
|
||||
}
|
||||
|
||||
func (csvs *CSVStorage) GetTPStats(tpid, tenant, id string) ([]*utils.TPStatProfile, error) {
|
||||
var tpStats StatMdls
|
||||
if err := csvs.proccesData(StatMdl{}, csvs.statsFn, func(tp any) {
|
||||
|
||||
@@ -136,6 +136,7 @@ type LoadReader interface {
|
||||
GetTpTableIds(string, string, []string,
|
||||
map[string]string, *utils.PaginatorWithSearch) ([]string, error)
|
||||
GetTPResources(string, string, string) ([]*utils.TPResourceProfile, error)
|
||||
GetTPIPs(string, string, string) ([]*utils.TPIPProfile, error)
|
||||
GetTPStats(string, string, string) ([]*utils.TPStatProfile, error)
|
||||
GetTPRankings(tpid, tenant, id string) ([]*utils.TPRankingProfile, error)
|
||||
GetTPTrends(tpid, tenant, id string) ([]*utils.TPTrendsProfile, error)
|
||||
|
||||
@@ -36,6 +36,7 @@ type TpReader struct {
|
||||
lr LoadReader
|
||||
resProfiles map[utils.TenantID]*utils.TPResourceProfile
|
||||
sqProfiles map[utils.TenantID]*utils.TPStatProfile
|
||||
ipProfiles map[utils.TenantID]*utils.TPIPProfile
|
||||
trProfiles map[utils.TenantID]*utils.TPTrendsProfile
|
||||
rgProfiles map[utils.TenantID]*utils.TPRankingProfile
|
||||
thProfiles map[utils.TenantID]*utils.TPThresholdProfile
|
||||
@@ -67,6 +68,7 @@ func NewTpReader(db *DBConnManager, lr LoadReader, tpid, timezone string,
|
||||
|
||||
func (tpr *TpReader) Init() {
|
||||
tpr.resProfiles = make(map[utils.TenantID]*utils.TPResourceProfile)
|
||||
tpr.ipProfiles = make(map[utils.TenantID]*utils.TPIPProfile)
|
||||
tpr.sqProfiles = make(map[utils.TenantID]*utils.TPStatProfile)
|
||||
tpr.rgProfiles = make(map[utils.TenantID]*utils.TPRankingProfile)
|
||||
tpr.thProfiles = make(map[utils.TenantID]*utils.TPThresholdProfile)
|
||||
@@ -120,6 +122,26 @@ func (tpr *TpReader) LoadStats() error {
|
||||
return tpr.LoadStatsFiltered("")
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadIPsFiltered(tag string) (err error) {
|
||||
tps, err := tpr.lr.GetTPIPs(tpr.tpid, "", tag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mapIPs := make(map[utils.TenantID]*utils.TPIPProfile)
|
||||
for _, ip := range tps {
|
||||
if err = verifyInlineFilterS(ip.FilterIDs); err != nil {
|
||||
return
|
||||
}
|
||||
mapIPs[utils.TenantID{Tenant: ip.Tenant, ID: ip.ID}] = ip
|
||||
}
|
||||
tpr.ipProfiles = mapIPs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadIPs() error {
|
||||
return tpr.LoadIPsFiltered("")
|
||||
}
|
||||
|
||||
func (tpr *TpReader) LoadRankingsFiltered(tag string) error {
|
||||
tps, err := tpr.lr.GetTPRankings(tpr.tpid, "", tag)
|
||||
if err != nil {
|
||||
@@ -330,6 +352,9 @@ func (tpr *TpReader) LoadAll() (err error) {
|
||||
if err = tpr.LoadRankings(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
}
|
||||
if err = tpr.LoadIPs(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
}
|
||||
if err = tpr.LoadTrends(); err != nil && err.Error() != utils.NotFoundCaps {
|
||||
return
|
||||
}
|
||||
@@ -404,6 +429,25 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
|
||||
loadIDs[utils.CacheResourceProfiles] = loadID
|
||||
loadIDs[utils.CacheResources] = loadID
|
||||
}
|
||||
if verbose {
|
||||
log.Print("IPProfiles")
|
||||
}
|
||||
for _, tpIP := range tpr.ipProfiles {
|
||||
var ip *utils.IPProfile
|
||||
if ip, err = APItoIP(tpIP); err != nil {
|
||||
return
|
||||
}
|
||||
if err = tpr.dm.SetIPProfile(context.TODO(), ip, true); err != nil {
|
||||
return
|
||||
}
|
||||
if verbose {
|
||||
log.Print("\t", ip.TenantID())
|
||||
}
|
||||
}
|
||||
if len(tpr.ipProfiles) != 0 {
|
||||
loadIDs[utils.CacheIPProfiles] = loadID
|
||||
loadIDs[utils.CacheIPAllocations] = loadID
|
||||
}
|
||||
if verbose {
|
||||
log.Print("StatQueueProfiles:")
|
||||
}
|
||||
@@ -594,6 +638,8 @@ func (tpr *TpReader) WriteToDatabase(verbose, disableReverse bool) (err error) {
|
||||
func (tpr *TpReader) ShowStatistics() {
|
||||
// resource profiles
|
||||
log.Print("ResourceProfiles: ", len(tpr.resProfiles))
|
||||
// ip profiles
|
||||
log.Print("IPProfiels", len(tpr.ipProfiles))
|
||||
// stats
|
||||
log.Print("Stats: ", len(tpr.sqProfiles))
|
||||
// thresholds
|
||||
@@ -627,7 +673,30 @@ func (tpr *TpReader) GetLoadedIds(categ string) ([]string, error) {
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
|
||||
case utils.IPProfilesPrefix:
|
||||
keys := make([]string, len(tpr.ipProfiles))
|
||||
i := 0
|
||||
for k := range tpr.ipProfiles {
|
||||
keys[i] = k.TenantID()
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case utils.TrendProfilePrefix:
|
||||
keys := make([]string, len(tpr.trProfiles))
|
||||
i := 0
|
||||
for k := range tpr.trProfiles {
|
||||
keys[i] = k.TenantID()
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case utils.RankingProfilePrefix:
|
||||
keys := make([]string, len(tpr.rgProfiles))
|
||||
i := 0
|
||||
for k := range tpr.rgProfiles {
|
||||
keys[i] = k.TenantID()
|
||||
i++
|
||||
}
|
||||
return keys, nil
|
||||
case utils.StatQueueProfilePrefix:
|
||||
keys := make([]string, len(tpr.sqProfiles))
|
||||
i := 0
|
||||
@@ -710,6 +779,17 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
|
||||
log.Print("\t", utils.ConcatenatedKey(tpRsp.Tenant, tpRsp.ID))
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("IPProfiles")
|
||||
}
|
||||
for _, tpIP := range tpr.ipProfiles {
|
||||
if err = tpr.dm.RemoveIPProfile(context.TODO(), tpIP.Tenant, tpIP.ID, true); err != nil {
|
||||
return
|
||||
}
|
||||
if verbose {
|
||||
log.Print("\t", utils.ConcatenatedKey(tpIP.Tenant, tpIP.ID))
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("StatQueueProfiles:")
|
||||
}
|
||||
@@ -732,6 +812,26 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
|
||||
log.Print("\t", utils.ConcatenatedKey(tpTH.Tenant, tpTH.ID))
|
||||
}
|
||||
}
|
||||
if verbose {
|
||||
log.Print("TrendProfiles:")
|
||||
}
|
||||
|
||||
for _, tpTr := range tpr.trProfiles {
|
||||
if err = tpr.dm.RemoveTrendProfile(context.TODO(), tpTr.Tenant, tpTr.ID); err != nil {
|
||||
return
|
||||
}
|
||||
log.Print("\t", utils.ConcatenatedKey(tpTr.Tenant, tpTr.ID))
|
||||
}
|
||||
if verbose {
|
||||
log.Print("RankingProfiles:")
|
||||
}
|
||||
for _, tpRnk := range tpr.rgProfiles {
|
||||
if err = tpr.dm.RemoveRankingProfile(context.TODO(), tpRnk.Tenant, tpRnk.ID); err != nil {
|
||||
return
|
||||
}
|
||||
log.Print("\t", utils.ConcatenatedKey(tpRnk.Tenant, tpRnk.ID))
|
||||
}
|
||||
|
||||
if verbose {
|
||||
log.Print("RouteProfiles:")
|
||||
}
|
||||
@@ -829,6 +929,10 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
|
||||
loadIDs[utils.CacheResourceProfiles] = loadID
|
||||
loadIDs[utils.CacheResources] = loadID
|
||||
}
|
||||
if len(tpr.ipProfiles) != 0 {
|
||||
loadIDs[utils.CacheIPProfiles] = loadID
|
||||
loadIDs[utils.CacheIPAllocations] = loadID
|
||||
}
|
||||
if len(tpr.sqProfiles) != 0 {
|
||||
loadIDs[utils.CacheStatQueueProfiles] = loadID
|
||||
loadIDs[utils.CacheStatQueues] = loadID
|
||||
@@ -837,6 +941,14 @@ func (tpr *TpReader) RemoveFromDatabase(verbose, disableReverse bool) (err error
|
||||
loadIDs[utils.CacheThresholdProfiles] = loadID
|
||||
loadIDs[utils.CacheThresholds] = loadID
|
||||
}
|
||||
if len(tpr.trProfiles) != 0 {
|
||||
loadIDs[utils.CacheTrendProfiles] = loadID
|
||||
loadIDs[utils.CacheTrends] = loadID
|
||||
}
|
||||
if len(tpr.rgProfiles) != 0 {
|
||||
loadIDs[utils.CacheRankingProfiles] = loadID
|
||||
loadIDs[utils.CacheRankings] = loadID
|
||||
}
|
||||
if len(tpr.routeProfiles) != 0 {
|
||||
loadIDs[utils.CacheRouteProfiles] = loadID
|
||||
}
|
||||
@@ -867,6 +979,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
|
||||
rspIDs, _ := tpr.GetLoadedIds(utils.ResourceProfilesPrefix)
|
||||
stqpIDs, _ := tpr.GetLoadedIds(utils.StatQueueProfilePrefix)
|
||||
trspfIDs, _ := tpr.GetLoadedIds(utils.ThresholdProfilePrefix)
|
||||
trnpfIDs, _ := tpr.GetLoadedIds(utils.TrendProfilePrefix)
|
||||
rnkpfIDs, _ := tpr.GetLoadedIds(utils.RankingProfilePrefix)
|
||||
ippfIDs, _ := tpr.GetLoadedIds(utils.IPProfilesPrefix)
|
||||
flrIDs, _ := tpr.GetLoadedIds(utils.FilterPrefix)
|
||||
routeIDs, _ := tpr.GetLoadedIds(utils.RouteProfilePrefix)
|
||||
apfIDs, _ := tpr.GetLoadedIds(utils.AttributeProfilePrefix)
|
||||
@@ -889,6 +1004,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
|
||||
utils.CacheChargerProfiles: chargerIDs,
|
||||
utils.CacheRateProfiles: ratePrfIDs,
|
||||
utils.CacheActionProfiles: actionPrfIDs,
|
||||
utils.CacheTrendProfiles: trnpfIDs,
|
||||
utils.CacheRankingProfiles: rnkpfIDs,
|
||||
utils.CacheIPProfiles: ippfIDs,
|
||||
}
|
||||
|
||||
// verify if we need to clear indexes
|
||||
@@ -896,6 +1014,9 @@ func (tpr *TpReader) ReloadCache(ctx *context.Context, caching string, verbose b
|
||||
if len(apfIDs) != 0 {
|
||||
cacheIDs = append(cacheIDs, utils.CacheAttributeFilterIndexes)
|
||||
}
|
||||
if len(ippfIDs) != 0 {
|
||||
cacheIDs = append(cacheIDs, utils.CacheIPFilterIndexes)
|
||||
}
|
||||
if len(routeIDs) != 0 {
|
||||
cacheIDs = append(cacheIDs, utils.CacheRouteFilterIndexes)
|
||||
}
|
||||
|
||||
@@ -582,6 +582,7 @@ func TestTPReaderReloadCache(t *testing.T) {
|
||||
APIOpts: map[string]any{},
|
||||
Tenant: "cgrates.org",
|
||||
ResourceProfileIDs: []string{"cgrates.org:resourceProfilesID"},
|
||||
IPProfileIDs: []string{"cgrates.org:ipProfilesID"},
|
||||
StatsQueueProfileIDs: []string{"cgrates.org:statProfilesID"},
|
||||
ThresholdProfileIDs: []string{"cgrates.org:thresholdProfilesID"},
|
||||
FilterIDs: []string{"cgrates.org:filtersID"},
|
||||
@@ -589,6 +590,8 @@ func TestTPReaderReloadCache(t *testing.T) {
|
||||
AttributeProfileIDs: []string{"cgrates.org:attributeProfilesID"},
|
||||
ChargerProfileIDs: []string{"cgrates.org:chargerProfilesID"},
|
||||
ResourceIDs: []string{"cgrates.org:resourceProfilesID"},
|
||||
TrendProfileIDs: []string{"cgrates.org:trendProfilesID"},
|
||||
RankingProfileIDs: []string{"cgrates.org:rankingProfileID"},
|
||||
StatsQueueIDs: []string{"cgrates.org:statProfilesID"},
|
||||
ThresholdIDs: []string{"cgrates.org:thresholdProfilesID"},
|
||||
|
||||
@@ -617,12 +620,21 @@ func TestTPReaderReloadCache(t *testing.T) {
|
||||
resProfiles: map[utils.TenantID]*utils.TPResourceProfile{
|
||||
{Tenant: "cgrates.org", ID: "resourceProfilesID"}: {},
|
||||
},
|
||||
ipProfiles: map[utils.TenantID]*utils.TPIPProfile{
|
||||
{Tenant: "cgrates.org", ID: "ipProfilesID"}: {},
|
||||
},
|
||||
rgProfiles: map[utils.TenantID]*utils.TPRankingProfile{
|
||||
{Tenant: "cgrates.org", ID: "rankingProfileID"}: {},
|
||||
},
|
||||
sqProfiles: map[utils.TenantID]*utils.TPStatProfile{
|
||||
{Tenant: "cgrates.org", ID: "statProfilesID"}: {},
|
||||
},
|
||||
thProfiles: map[utils.TenantID]*utils.TPThresholdProfile{
|
||||
{Tenant: "cgrates.org", ID: "thresholdProfilesID"}: {},
|
||||
},
|
||||
trProfiles: map[utils.TenantID]*utils.TPTrendsProfile{
|
||||
{Tenant: "cgrates.org", ID: "trendProfilesID"}: {},
|
||||
},
|
||||
filters: map[utils.TenantID]*utils.TPFilterProfile{
|
||||
{Tenant: "cgrates.org", ID: "filtersID"}: {},
|
||||
},
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
//go:build flaky
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
@@ -131,7 +130,22 @@ cgrates.org,Threshold2,*string:~*req.Metrics.*pdd.ID:*pdd,;10,-1,0,1s,false,,tru
|
||||
}
|
||||
|
||||
client, _ := ng.Run(t)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// Wait for loader to finish loading all profiles (thresholds, stats, trends)
|
||||
for range 20 {
|
||||
var thPrfIDs, stPrfIDs []string
|
||||
var trPrfs *[]*utils.TrendProfile
|
||||
errTh := client.Call(context.Background(), utils.AdminSv1GetThresholdProfileIDs,
|
||||
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &thPrfIDs)
|
||||
errSt := client.Call(context.Background(), utils.AdminSv1GetStatQueueProfileIDs,
|
||||
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &stPrfIDs)
|
||||
errTr := client.Call(context.Background(), utils.AdminSv1GetTrendProfiles,
|
||||
&utils.ArgsItemIDs{Tenant: "cgrates.org"}, &trPrfs)
|
||||
if errTh == nil && errSt == nil && errTr == nil &&
|
||||
len(thPrfIDs) == 2 && len(stPrfIDs) == 2 && trPrfs != nil && len(*trPrfs) == 2 {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
var tr *utils.Trend
|
||||
t.Run("TrendSchedule", func(t *testing.T) {
|
||||
var replyTrendProfiles *[]*utils.TrendProfile
|
||||
|
||||
@@ -151,7 +151,27 @@ type TPResourceProfile struct {
|
||||
Weights string // Weight to sort the ResourceLimits
|
||||
ThresholdIDs []string // Thresholds to check after changing Limit
|
||||
}
|
||||
type TPIPPool struct {
|
||||
ID string
|
||||
FilterIDs []string
|
||||
Type string
|
||||
Range string
|
||||
Strategy string
|
||||
Message string
|
||||
Weights string
|
||||
Blockers string
|
||||
}
|
||||
|
||||
type TPIPProfile struct {
|
||||
TPid string
|
||||
Tenant string
|
||||
ID string
|
||||
FilterIDs []string
|
||||
TTL string
|
||||
Stored bool
|
||||
Weights string
|
||||
Pools []*TPIPPool
|
||||
}
|
||||
type ArgsComputeFilterIndexIDs struct {
|
||||
Tenant string
|
||||
APIOpts map[string]any
|
||||
|
||||
@@ -74,8 +74,8 @@ var (
|
||||
CacheThresholds: ThresholdPrefix,
|
||||
CacheFilters: FilterPrefix,
|
||||
CacheRouteProfiles: RouteProfilePrefix,
|
||||
CacheRankingProfiles: RankingPrefix,
|
||||
CacheRankings: RankingProfilePrefix,
|
||||
CacheRankingProfiles: RankingProfilePrefix,
|
||||
CacheRankings: RankingPrefix,
|
||||
CacheAttributeProfiles: AttributeProfilePrefix,
|
||||
CacheChargerProfiles: ChargerProfilePrefix,
|
||||
CacheRateProfiles: RateProfilePrefix,
|
||||
@@ -855,6 +855,13 @@ const (
|
||||
SortingData = "SortingData"
|
||||
ProfileID = "ProfileID"
|
||||
PoolID = "PoolID"
|
||||
PoolFilterIDs = "PoolFilterIDs"
|
||||
PoolType = "PoolType"
|
||||
PoolRange = "PoolRange"
|
||||
PoolStrategy = "PoolStrategy"
|
||||
PoolMessage = "PoolMessage"
|
||||
PoolWeights = "PoolWeights"
|
||||
PoolBlockers = "PoolBlockers"
|
||||
SortedRoutes = "SortedRoutes"
|
||||
MetaMonthly = "*monthly"
|
||||
MetaYearly = "*yearly"
|
||||
@@ -1946,6 +1953,7 @@ const (
|
||||
// Table Name
|
||||
const (
|
||||
TBLTPResources = "tp_resources"
|
||||
TBLTPIPs = "tp_ips"
|
||||
TBLTPStats = "tp_stats"
|
||||
TBLTPRankings = "tp_rankings"
|
||||
TBLTPTrends = "tp_trends"
|
||||
@@ -2855,11 +2863,15 @@ const (
|
||||
AWSToken = "awsToken"
|
||||
|
||||
// sqs
|
||||
SQSQueueID = "sqsQueueID"
|
||||
SQSQueueID = "sqsQueueID"
|
||||
SQSForcePathStyle = "sqsForcePathStyle"
|
||||
SQSSkipTlsVerify = "sqsSkipTlsVerify"
|
||||
|
||||
// s3
|
||||
S3Bucket = "s3BucketID"
|
||||
S3FolderPath = "s3FolderPath"
|
||||
S3Bucket = "s3BucketID"
|
||||
S3FolderPath = "s3FolderPath"
|
||||
S3ForcePathStyle = "s3ForcePathStyle"
|
||||
S3SkipTlsVerify = "s3SkipTlsVerify"
|
||||
|
||||
// sql
|
||||
SQLDefaultDBName = "cgrates"
|
||||
|
||||
Reference in New Issue
Block a user