refactor StatSum and use string conn status values

This commit is contained in:
ionutboangiu
2025-11-12 18:11:32 +02:00
committed by Dan Christian Bogos
parent 02f31a5047
commit ce720a259d
6 changed files with 127 additions and 73 deletions

View File

@@ -60,7 +60,7 @@ func TestDiamConnStats(t *testing.T) {
"stats": {
"enabled": true,
"store_interval": "-1",
"string_indexed_fields": ["*req.OriginHost"]
"string_indexed_fields": ["*opts.*eventType"]
},
"thresholds": {
"enabled": true,
@@ -83,27 +83,32 @@ func TestDiamConnStats(t *testing.T) {
// LogBuffer: &bytes.Buffer{},
GracefulShutdown: true,
}
// defer fmt.Println(ng.LogBuffer)
// t.Cleanup(func() {
// fmt.Println(ng.LogBuffer)
// })
client, cfg := ng.Run(t)
setSQProfile := func(id, originHost, originRealm string, ttl time.Duration) {
t.Helper()
fltrIDs := []string{"*string:~*opts.*eventType:ConnectionStatusReport"}
if originHost != "" {
fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginHost:%s", originHost))
}
if originRealm != "" {
fltrIDs = append(fltrIDs, fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm))
}
var reply string
if err := client.Call(context.Background(), utils.AdminSv1SetStatQueueProfile,
engine.StatQueueProfileWithAPIOpts{
StatQueueProfile: &engine.StatQueueProfile{
Tenant: "cgrates.org",
ID: id,
FilterIDs: []string{
"*string:~*opts.*eventType:ConnectionStatusReport",
fmt.Sprintf("*string:~*req.OriginHost:%s", originHost),
fmt.Sprintf("*string:~*req.OriginRealm:%s", originRealm),
},
Tenant: "cgrates.org",
ID: id,
FilterIDs: fltrIDs,
QueueLength: -1,
TTL: ttl,
Metrics: []*engine.MetricWithFilters{
{
MetricID: "*sum#~*req.ConnectionStatus",
MetricID: "*sum#~*req.ConnectionStatus{*conn_status}",
},
},
Stored: true,
@@ -162,7 +167,7 @@ func TestDiamConnStats(t *testing.T) {
if err != nil {
t.Error(err)
}
metricID := "*sum#~*req.ConnectionStatus"
metricID := "*sum#~*req.ConnectionStatus{*conn_status}"
got, ok := metrics[metricID]
if !ok {
t.Errorf("could not find metric %q", metricID)
@@ -172,11 +177,13 @@ func TestDiamConnStats(t *testing.T) {
}
}
setSQProfile("SQ_CONN_ALL", "", "", -1)
setSQProfile("SQ_CONN_1", "host1", "realm1", -1)
setSQProfile("SQ_CONN_2", "host2", "realm1", -1)
setSQProfile("SQ_CONN_3", "host3", "realm2", -1)
// no connections have been established yet, expect -1
checkConnStatusMetric("SQ_CONN_ALL", -1)
checkConnStatusMetric("SQ_CONN_1", -1)
checkConnStatusMetric("SQ_CONN_2", -1)
checkConnStatusMetric("SQ_CONN_3", -1)
@@ -186,6 +193,8 @@ func TestDiamConnStats(t *testing.T) {
connHost1 := initDiamConn("host1", "realm1")
connHost2 := initDiamConn("host2", "realm1")
connHost3 := initDiamConn("host3", "realm2")
time.Sleep(10 * time.Millisecond) // wait for stats to process
checkConnStatusMetric("SQ_CONN_ALL", 3)
checkConnStatusMetric("SQ_CONN_1", 1)
checkConnStatusMetric("SQ_CONN_2", 1)
checkConnStatusMetric("SQ_CONN_3", 1)
@@ -199,6 +208,7 @@ func TestDiamConnStats(t *testing.T) {
// Ensure periodic health check happens.
time.Sleep(100 * time.Millisecond)
checkConnStatusMetric("SQ_CONN_ALL", 0)
checkConnStatusMetric("SQ_CONN_1", 0)
checkConnStatusMetric("SQ_CONN_2", 0)
checkConnStatusMetric("SQ_CONN_3", 0)
@@ -206,9 +216,11 @@ func TestDiamConnStats(t *testing.T) {
// restart connection from host1
connHost1 = initDiamConn("host1", "realm1")
t.Cleanup(func() { connHost1.Close() })
time.Sleep(10 * time.Millisecond) // wait for stats to process
checkConnStatusMetric("SQ_CONN_ALL", 1)
checkConnStatusMetric("SQ_CONN_1", 1)
checkConnStatusMetric("SQ_CONN_2", 0)
checkConnStatusMetric("SQ_CONN_3", 0)
t.Cleanup(func() { connHost1.Close() })
// scrapePromURL(t)
}

View File

@@ -449,18 +449,8 @@ func (da *DiameterAgent) handleRAA(c diam.Conn, m *diam.Message) {
ch <- m
}
const (
// Connection status values for ConnectionStatus event field:
// -1 : connection closed/down
// 0 : duplicate connection (no metric change)
// 1 : new connection established/up
diamConnStatusDown = -1
diamConnStatusDuplicate = 0
diamConnStatusUp = 1
)
// sendConnStatusReport reports connection status changes to StatS and ThresholdS.
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status int, localAddr, remoteAddr net.Addr) {
func (da *DiameterAgent) sendConnStatusReport(metadata *smpeer.Metadata, status string, localAddr, remoteAddr net.Addr) {
daCfg := da.cgrCfg.DiameterAgentCfg()
if len(daCfg.StatSConns) == 0 && len(daCfg.ThresholdSConns) == 0 {
return // nothing to do
@@ -515,11 +505,11 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
}
key := string(meta.OriginHost + utils.ConcatenatedKeySep + meta.OriginRealm)
da.peersLck.Lock()
diamConnStatus := diamConnStatusUp
connStatus := utils.ConnStatusUp
if _, exists := da.peers[key]; exists {
// Connection already exists for this peer. Set status to 0 (duplicate)
// Connection already exists for this peer. Set status to DUPLICATE
// to prevent incrementing StatS metrics.
diamConnStatus = diamConnStatusDuplicate
connStatus = utils.ConnStatusDuplicate
utils.Logger.Warning(fmt.Sprintf(
"<%s> a connection from a peer with the same ID (%q) is already registered, overwriting...",
@@ -528,7 +518,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
da.peers[key] = c
da.peersLck.Unlock()
localAddr, remoteAddr := c.LocalAddr(), c.RemoteAddr()
da.sendConnStatusReport(meta, diamConnStatus, localAddr, remoteAddr)
da.sendConnStatusReport(meta, connStatus, localAddr, remoteAddr)
go func() {
// Use hybrid approach to detect connection closure. CloseNotify() may not
// fire if the serve() goroutine is blocked in Read(), so we also perform
@@ -538,7 +528,7 @@ func (da *DiameterAgent) handleConns(peers <-chan diam.Conn) {
da.peersLck.Lock()
delete(da.peers, key)
da.peersLck.Unlock()
da.sendConnStatusReport(meta, diamConnStatusDown, localAddr, remoteAddr)
da.sendConnStatusReport(meta, utils.ConnStatusDown, localAddr, remoteAddr)
}()
closeChan := c.(diam.CloseNotifier).CloseNotify()

View File

@@ -772,41 +772,37 @@ func (m *Metric) Equal(v *Metric) bool {
}
func NewStatSum(minItems uint64, fieldName string, filterIDs []string) StatMetric {
return &StatSum{Metric: NewMetric(minItems, filterIDs),
FieldName: fieldName}
flds, _ := utils.NewRSRParsers(fieldName, utils.InfieldSep)
return &StatSum{
Metric: NewMetric(minItems, filterIDs),
Fields: flds,
}
}
type StatSum struct {
*Metric
FieldName string
Fields utils.RSRParsers
}
func (sum *StatSum) AddEvent(evID string, ev utils.DataProvider) error {
ival, err := utils.DPDynamicInterface(sum.FieldName, ev)
ival, err := sum.Fields.ParseDataProvider(ev)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, sum.FieldName)
}
return err
}
return sum.addEvent(evID, ival)
}
func (sum *StatSum) AddOneEvent(ev utils.DataProvider) error {
ival, err := utils.DPDynamicInterface(sum.FieldName, ev)
ival, err := sum.Fields.ParseDataProvider(ev)
if err != nil {
if err == utils.ErrNotFound {
err = utils.ErrPrefix(err, sum.FieldName)
}
return err
}
return sum.addOneEvent(ival)
}
func (sum *StatSum) Clone() StatMetric {
return &StatSum{
Metric: sum.Metric.Clone(),
FieldName: sum.FieldName,
Metric: sum.Metric.Clone(),
Fields: sum.Fields,
}
}

View File

@@ -2114,7 +2114,8 @@ func TestStatSumGetFloat64Value(t *testing.T) {
t.Errorf("wrong statSum value: %v", v)
}
ev2 := &utils.CGREvent{ID: "EVENT_2"}
if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts}); err == nil || err.Error() != "NOT_FOUND:~*opts.*cost" {
if err := statSum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts}); err == nil ||
err != utils.ErrNotFound {
t.Error(err)
}
if v := statSum.GetValue(); v != utils.DecimalNaN {
@@ -2233,26 +2234,41 @@ func TestStatSumGetStringValue2(t *testing.T) {
}
func TestStatSumGetStringValue3(t *testing.T) {
statSum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"}
statSum := &StatSum{
Metric: NewMetric(2, nil),
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
expected := &StatSum{
Metric: &Metric{
Events: map[string]*DecimalWithCompress{
"EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("12.20000000000000"), CompressFactor: 2},
"EVENT_3": {Stat: utils.NewDecimalFromStringIgnoreError("18.300000000000000710542735760100185871124267578125"), CompressFactor: 1},
"EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("12.2"), CompressFactor: 2},
"EVENT_3": {Stat: utils.NewDecimalFromStringIgnoreError("18.3"), CompressFactor: 1},
},
MinItems: 2,
Count: 3,
Value: utils.NewDecimalFromStringIgnoreError("42.700"),
Value: utils.NewDecimalFromStringIgnoreError("42.7"),
},
FieldName: "~*opts.*cost",
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals)
ev1 := &utils.CGREvent{ID: "EVENT_1",
APIOpts: map[string]any{utils.MetaCost: 18.2}}
ev2 := &utils.CGREvent{ID: "EVENT_1",
APIOpts: map[string]any{utils.MetaCost: 6.2}}
ev3 := &utils.CGREvent{ID: "EVENT_3",
APIOpts: map[string]any{utils.MetaCost: 18.3}}
ev1 := &utils.CGREvent{
ID: "EVENT_1",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(182, 1),
},
}
ev2 := &utils.CGREvent{
ID: "EVENT_1",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(62, 1),
},
}
ev3 := &utils.CGREvent{
ID: "EVENT_3",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(183, 1),
},
}
if err := statSum.AddEvent(ev1.ID, utils.MapStorage{utils.MetaOpts: ev1.APIOpts}); err != nil {
t.Error(err)
}
@@ -2278,26 +2294,41 @@ func TestStatSumGetStringValue3(t *testing.T) {
}
func TestStatSumCompress(t *testing.T) {
sum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"}
sum := &StatSum{
Metric: NewMetric(2, nil),
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
expected := &StatSum{
Metric: &Metric{
Events: map[string]*DecimalWithCompress{
"EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("18.199999999999999289457264239899814128875732421875"), CompressFactor: 1},
"EVENT_2": {Stat: utils.NewDecimalFromStringIgnoreError("6.20000000000000017763568394002504646778106689453125"), CompressFactor: 1},
"EVENT_1": {Stat: utils.NewDecimalFromStringIgnoreError("18.2"), CompressFactor: 1},
"EVENT_2": {Stat: utils.NewDecimalFromStringIgnoreError("6.2"), CompressFactor: 1},
},
MinItems: 2,
Value: utils.NewDecimalFromStringIgnoreError("24.400"),
Value: utils.NewDecimalFromStringIgnoreError("24.4"),
Count: 2,
},
FieldName: "~*opts.*cost",
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals)
ev := &utils.CGREvent{ID: "EVENT_1",
APIOpts: map[string]any{utils.MetaCost: 18.2}}
ev2 := &utils.CGREvent{ID: "EVENT_2",
APIOpts: map[string]any{utils.MetaCost: 6.2}}
ev4 := &utils.CGREvent{ID: "EVENT_1",
APIOpts: map[string]any{utils.MetaCost: 18.3}}
ev := &utils.CGREvent{
ID: "EVENT_1",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(182, 1),
},
}
ev2 := &utils.CGREvent{
ID: "EVENT_2",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(62, 1),
},
}
ev4 := &utils.CGREvent{
ID: "EVENT_1",
APIOpts: map[string]any{
utils.MetaCost: utils.NewDecimal(183, 1),
},
}
sum.AddEvent(ev.ID, utils.MapStorage{utils.MetaOpts: ev.APIOpts})
sum.AddEvent(ev2.ID, utils.MapStorage{utils.MetaOpts: ev2.APIOpts})
expIDs := []string{"EVENT_1", "EVENT_2"}
@@ -2319,7 +2350,7 @@ func TestStatSumCompress(t *testing.T) {
Value: utils.NewDecimalFromFloat64(24.4),
Count: 2,
},
FieldName: "~*opts.*cost",
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
expected.GetStringValue(config.CgrConfig().GeneralCfg().RoundingDecimals)
expIDs = []string{"EVENT_3"}
@@ -3013,7 +3044,7 @@ func TestStatSumMarshal(t *testing.T) {
utils.MetaUsage: 10 * time.Second}}
statSum.AddEvent(ev.ID, utils.MapStorage{utils.MetaOpts: ev.APIOpts})
var nstatSum StatSum
expected := []byte(`{"Value":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"FilterIDs":null,"FieldName":"~*opts.*cost"}`)
expected := []byte(`{"Value":20,"Count":1,"Events":{"EVENT_1":{"Stat":20,"CompressFactor":1}},"MinItems":2,"FilterIDs":null,"Fields":[{"Rules":"~*opts.*cost","Path":"~*opts.*cost"}]}`)
if b, err := jMarshaler.Marshal(statSum); err != nil {
t.Error(err)
} else if !reflect.DeepEqual(expected, b) {
@@ -3704,9 +3735,10 @@ func TestStatPDDClone(t *testing.T) {
}
func TestStatSumClone(t *testing.T) {
sum := &StatSum{Metric: NewMetric(2, nil), FieldName: "~*opts.*cost"}
sum := &StatSum{
Metric: NewMetric(2, nil),
Fields: utils.NewRSRParsersMustCompile("~*opts.*cost", utils.InfieldSep),
}
if rcv := sum.Clone(); !reflect.DeepEqual(rcv, sum) {
t.Errorf("Expecting <%+v>,\n Recevied <%+v>", sum, rcv)
}

View File

@@ -560,9 +560,12 @@ const (
EventConnectionStatusReport = "ConnectionStatusReport"
// Connection status event fields.
ConnLocalAddr = "LocalAddr"
ConnRemoteAddr = "RemoteAddr"
ConnStatus = "ConnectionStatus" // -1=down, 0=duplicate, 1=up
ConnLocalAddr = "LocalAddr"
ConnRemoteAddr = "RemoteAddr"
ConnStatus = "ConnectionStatus" // sum metric: UP=1, DOWN=-1, DUPLICATE=0
ConnStatusUp = "UP"
ConnStatusDown = "DOWN"
ConnStatusDuplicate = "DUPLICATE"
// ReplyState error constants
ErrReplyStateAuthorize = "ERR_AUTHORIZE"
@@ -757,6 +760,7 @@ const (
MetaSIPURIMethod = "*sipuri_method"
MetaSIPURIHost = "*sipuri_host"
MetaSIPURIUser = "*sipuri_user"
MetaConnStatus = "*conn_status"
E164DomainConverter = "*e164Domain"
E164Converter = "*e164"
MetaJoin = "*join"

View File

@@ -142,6 +142,8 @@ func NewDataConverter(params string) (conv DataConverter, err error) {
return splitConverter(params[len(MetaSplit)+1:]), nil
case strings.HasPrefix(params, MetaStrip):
return NewStripConverter(params)
case params == MetaConnStatus:
return ConnStatusConverter{}, nil
case strings.HasPrefix(params, MetaGigawords):
return new(GigawordsConverter), nil
default:
@@ -838,3 +840,21 @@ func (GigawordsConverter) Convert(in any) (any, error) {
totalOctects := (gigawordsValue * int64(math.Pow(2, 32))) // 2^32
return totalOctects, nil
}
// ConnStatusConverter converts connection status strings to numeric values.
// Returns 1 for UP, -1 for DOWN, and 0 for DUPLICATE.
type ConnStatusConverter struct{}
// Convert implements DataConverter interface
func (c ConnStatusConverter) Convert(in any) (any, error) {
status := IfaceAsString(in)
switch status {
case ConnStatusUp:
return 1, nil
case ConnStatusDown:
return -1, nil
case ConnStatusDuplicate:
return 0, nil
}
return 0, fmt.Errorf("unsupported connection status: %q", status)
}