diff --git a/agents/libagents.go b/agents/libagents.go index a34e55c23..b4ea0de58 100644 --- a/agents/libagents.go +++ b/agents/libagents.go @@ -114,7 +114,11 @@ func processRequest(ctx *context.Context, reqProcessor *config.RequestProcessor, case utils.MetaUpdate: updateArgs := sessions.NewV1UpdateSessionArgs( reqProcessor.Flags.GetBool(utils.MetaAttributes), + reqProcessor.Flags.GetBool(utils.MetaThresholds), + reqProcessor.Flags.GetBool(utils.MetaStats), reqProcessor.Flags.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), + reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs), + reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs), reqProcessor.Flags.Has(utils.MetaAccounts), cgrEv, reqProcessor.Flags.Has(utils.MetaFD)) rply := new(sessions.V1UpdateSessionReply) diff --git a/agents/radagent.go b/agents/radagent.go index 7439ae772..53555663a 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -378,7 +378,11 @@ func (ra *RadiusAgent) processRequest(req *radigo.Packet, reqProcessor *config.R case utils.MetaUpdate: updateArgs := sessions.NewV1UpdateSessionArgs( reqProcessor.Flags.GetBool(utils.MetaAttributes), + reqProcessor.Flags.GetBool(utils.MetaThresholds), + reqProcessor.Flags.GetBool(utils.MetaStats), reqProcessor.Flags.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), + reqProcessor.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs), + reqProcessor.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs), reqProcessor.Flags.Has(utils.MetaAccounts), cgrEv, reqProcessor.Flags.Has(utils.MetaFD)) rply := new(sessions.V1UpdateSessionReply) diff --git a/ers/ers.go b/ers/ers.go index 9851cb62a..9827ae16e 100644 --- a/ers/ers.go +++ b/ers/ers.go @@ -307,7 +307,11 @@ func (erS *ERService) processEvent(cgrEv *utils.CGREvent, case utils.MetaUpdate: updateArgs := sessions.NewV1UpdateSessionArgs( rdrCfg.Flags.Has(utils.MetaAttributes), + rdrCfg.Flags.Has(utils.MetaThresholds), + rdrCfg.Flags.Has(utils.MetaStats), rdrCfg.Flags.ParamsSlice(utils.MetaAttributes, utils.MetaIDs), + rdrCfg.Flags.ParamsSlice(utils.MetaThresholds, utils.MetaIDs), + rdrCfg.Flags.ParamsSlice(utils.MetaStats, utils.MetaIDs), rdrCfg.Flags.Has(utils.MetaAccounts), cgrEv, rdrCfg.Flags.Has(utils.MetaFD)) rply := new(sessions.V1UpdateSessionReply) diff --git a/general_tests/session_update_stats_it_test.go b/general_tests/session_update_stats_it_test.go new file mode 100644 index 000000000..b3e27fc74 --- /dev/null +++ b/general_tests/session_update_stats_it_test.go @@ -0,0 +1,167 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package general_tests + +import ( + "testing" + "time" + + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" +) + +func TestSessionUpdateToStats(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + case utils.MetaMySQL, utils.MetaMongo, utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + content := `{ + +"data_db": { + "db_type": "*internal", +}, + + +"stor_db": { + "db_type": "*internal", +}, + +"rals": { + "enabled": true, +}, + + +"schedulers": { + "enabled": true, +}, + +"cdrs": { + "enabled": true, + "chargers_conns":["*internal"], + "rals_conns": ["*internal"], +}, + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + +"attributes": { + "enabled": true, +}, + +"thresholds": { + "enabled": true, + "store_interval": "-1", +}, +"stats": { + "enabled": true, + "store_interval": "-1", +}, + +"sessions": { + "enabled": true, + "chargers_conns": ["*internal"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "stats_conns": ["*internal"], + "thresholds_conns": ["*internal"], + "attributes_conns": ["*internal"], +}, +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, + }` + + tpFiles := map[string]string{ + utils.ChargersCsv: `#Id,ActionsId,TimingId,Weight +#Tenant,ID,FilterIDs,ActivationInterval,RunID,AttributeIDs,Weight +cgrates.org,DEFAULT,,,DEFAULT,*none,20`, + utils.AccountActionsCsv: `#Tenant,Account,ActionPlanId,ActionTriggersId,AllowNegative,Disabled +cgrates.org,1001,PACKAGE_1001,,,`, + utils.ActionPlansCsv: `#Id,ActionsId,TimingId,Weight +PACKAGE_1001,ACT_TOPUP,*asap,10`, + utils.ActionsCsv: `#ActionsId[0],Action[1],ExtraParameters[2],Filter[3],BalanceId[4],BalanceType[5],Categories[6],DestinationIds[7],RatingSubject[8],SharedGroup[9],ExpiryTime[10],TimingIds[11],Units[12],BalanceWeight[13],BalanceBlocker[14],BalanceDisabled[15],Weight[16] +ACT_TOPUP,*topup_reset,,,balance_monetary,*monetary,,*any,,,*unlimited,,5,10,false,false,20`, + utils.DestinationRatesCsv: `#Id,DestinationId,RatesTag,RoundingMethod,RoundingDecimals,MaxCost,MaxCostStrategy +DR_VOICE,*any,RT_VOICE,*up,20,0,`, + utils.RatesCsv: `#Id,ConnectFee,Rate,RateUnit,RateIncrement,GroupIntervalStart +RT_VOICE,0,1,1s,1s,0s`, + utils.RatingPlansCsv: `#Id,DestinationRatesId,TimingTag,Weight +RP_VOICE,DR_VOICE,*any,10`, + utils.RatingProfilesCsv: `#Tenant,Category,Subject,ActivationTime,RatingPlanId,RatesFallbackSubject +cgrates.org,call,1001,2014-01-14T00:00:00Z,RP_VOICE,`, + utils.StatsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],QueueLength[4],TTL[5],MinItems[6],Metrics[7],MetricFilterIDs[8],Stored[9],Blocker[10],Weight[11],ThresholdIDs[12] +cgrates.org,Stat1,*string:~*req.Account:1001,,,,,*tcc;*acd;*tcd,,,,,`, + utils.ThresholdsCsv: `#Tenant[0],Id[1],FilterIDs[2],ActivationInterval[3],MaxHits[4],MinHits[5],MinSleep[6],Blocker[7],Weight[8],ActionIDs[9],Async[10] +cgrates.org,TH1,*string:~*req.Account:1001,2014-07-29T15:00:00Z,-1,0,0,false,10,,false`, + } + ng := engine.TestEngine{ + ConfigJSON: content, + TpFiles: tpFiles, + } + + client, _ := ng.Run(t) + time.Sleep(500 * time.Millisecond) + t.Run("TestUpdateSession", func(t *testing.T) { + var replyUpdate sessions.V1UpdateSessionReply + if err := client.Call(context.Background(), utils.SessionSv1UpdateSession, &sessions.V1UpdateSessionArgs{ + UpdateSession: true, + GetAttributes: true, + ProcessStats: true, + ProcessThresholds: true, + ThresholdIDs: []string{"TH1"}, + StatIDs: []string{"Stat1"}, + CGREvent: &utils.CGREvent{ + Tenant: "cgrates.org", + ID: "SessionSv1UpdateSession1", + Event: map[string]any{ + utils.OriginID: "prepaidSession", + utils.Tenant: "cgrates.org", + utils.Category: "call", + utils.ToR: utils.MetaVoice, + utils.RequestType: utils.MetaPrepaid, + utils.AccountField: "1001", + utils.Subject: "1001", + utils.Destination: "1002", + utils.SetupTime: time.Date(2023, time.February, 28, 8, 59, 50, 0, time.UTC), + utils.AnswerTime: time.Date(2023, time.February, 28, 9, 0, 0, 0, time.UTC), + utils.Usage: 5 * time.Second, + utils.BalanceFactorID: "voiceFactor", + }, + APIOpts: map[string]any{ + utils.OptsDebitInterval: 0, + }, + }, + }, &replyUpdate); err != nil { + t.Error(err) + } else if len(*replyUpdate.StatQueueIDs) != 1 || len(*replyUpdate.ThresholdIDs) != 1 { + t.Error("expected to pass event through stats or thresholds") + } + }) +} diff --git a/sessions/sessions.go b/sessions/sessions.go index 4cd94acbc..3de76339f 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -2574,26 +2574,38 @@ func (sS *SessionS) BiRPCv1InitiateSessionWithDigest(ctx *context.Context, } // NewV1UpdateSessionArgs is a constructor for update session arguments -func NewV1UpdateSessionArgs(attrs bool, attributeIDs []string, +func NewV1UpdateSessionArgs(attrs, thresholds, stats bool, attributeIDs, thresholdIDs, statIDs []string, acnts bool, cgrEv *utils.CGREvent, forceDuration bool) (args *V1UpdateSessionArgs) { args = &V1UpdateSessionArgs{ - GetAttributes: attrs, - UpdateSession: acnts, - CGREvent: cgrEv, - ForceDuration: forceDuration, + GetAttributes: attrs, + UpdateSession: acnts, + CGREvent: cgrEv, + ForceDuration: forceDuration, + ProcessThresholds: thresholds, + ProcessStats: stats, } if len(attributeIDs) != 0 { args.AttributeIDs = attributeIDs } + if len(thresholdIDs) != 0 { + args.ThresholdIDs = thresholdIDs + } + if len(statIDs) != 0 { + args.StatIDs = statIDs + } return } // V1UpdateSessionArgs contains options for session update type V1UpdateSessionArgs struct { - GetAttributes bool - UpdateSession bool - ForceDuration bool - AttributeIDs []string + GetAttributes bool + UpdateSession bool + ForceDuration bool + ProcessThresholds bool + ProcessStats bool + AttributeIDs []string + ThresholdIDs []string + StatIDs []string *utils.CGREvent } @@ -2601,10 +2613,11 @@ func (V1UpdateSessionArgs) RPCClone() {} // V1UpdateSessionReply contains options for session update reply type V1UpdateSessionReply struct { - Attributes *engine.AttrSProcessEventReply `json:",omitempty"` - MaxUsage *time.Duration `json:",omitempty"` - - needsMaxUsage bool // for gob encoding only + Attributes *engine.AttrSProcessEventReply `json:",omitempty"` + MaxUsage *time.Duration `json:",omitempty"` + ThresholdIDs *[]string `json:",omitempty"` + StatQueueIDs *[]string `json:",omitempty"` + needsMaxUsage bool // for gob encoding only } // SetMaxUsageNeeded used by agent that use the reply as NavigableMapper @@ -2725,6 +2738,24 @@ func (sS *SessionS) BiRPCv1UpdateSession(ctx *context.Context, } rply.MaxUsage = &maxUsage } + if args.ProcessThresholds { + tIDs, err := sS.processThreshold(args.CGREvent, args.ThresholdIDs, true) + if err == nil { + rply.ThresholdIDs = &tIDs + } else if err.Error() != utils.ErrNotFound.Error() { + return utils.NewErrThresholdS(err) + } + } + if args.ProcessStats { + sIDs, err := sS.processStats(args.CGREvent, args.StatIDs, false) + if err == nil { + rply.StatQueueIDs = &sIDs + } else if err.Error() != utils.ErrNotFound.Error() { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing event %+v with StatS.", + utils.SessionS, err.Error(), args.CGREvent)) + } + } return } diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 8b79ebb1e..1c518a293 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1111,7 +1111,7 @@ func TestSessionSNewV1UpdateSessionArgs(t *testing.T) { CGREvent: cgrEv, ForceDuration: true, } - rply := NewV1UpdateSessionArgs(true, nil, true, cgrEv, true) + rply := NewV1UpdateSessionArgs(true, false, false, nil, nil, nil, true, cgrEv, true) if !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting %+v, received: %+v", expected, rply) } @@ -1121,13 +1121,13 @@ func TestSessionSNewV1UpdateSessionArgs(t *testing.T) { CGREvent: cgrEv, ForceDuration: true, } - rply = NewV1UpdateSessionArgs(false, nil, true, cgrEv, true) + rply = NewV1UpdateSessionArgs(false, false, false, nil, nil, nil, true, cgrEv, true) if !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting %+v, received: %+v", expected, rply) } //test with len(AttributeIDs) != 0 attributeIDs := []string{"ATTR1", "ATTR2"} - rply = NewV1UpdateSessionArgs(false, attributeIDs, true, cgrEv, true) + rply = NewV1UpdateSessionArgs(false, false, false, attributeIDs, nil, nil, true, cgrEv, true) expected.AttributeIDs = []string{"ATTR1", "ATTR2"} if !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting %+v, received: %+v", expected, rply) diff --git a/sessions/sessionscover_test.go b/sessions/sessionscover_test.go index bb72b5b57..576139ec0 100644 --- a/sessions/sessionscover_test.go +++ b/sessions/sessionscover_test.go @@ -3168,7 +3168,7 @@ func TestBiRPCv1UpdateSession1(t *testing.T) { utils.OriginID: "TEST_ID", }, } - args := NewV1UpdateSessionArgs(true, []string{}, false, + args := NewV1UpdateSessionArgs(true, false, false, []string{}, nil, nil, false, nil, true) rply := new(V1UpdateSessionReply) @@ -3185,7 +3185,7 @@ func TestBiRPCv1UpdateSession1(t *testing.T) { }, } cgrEvent.ID = "test_id" - args = NewV1UpdateSessionArgs(true, []string{}, false, + args = NewV1UpdateSessionArgs(true, false, false, []string{}, nil, nil, false, cgrEvent, true) engine.Cache = caches engine.Cache.SetWithoutReplicate(utils.CacheRPCResponses, utils.ConcatenatedKey(utils.SessionSv1UpdateSession, args.CGREvent.ID), @@ -3196,20 +3196,20 @@ func TestBiRPCv1UpdateSession1(t *testing.T) { engine.Cache = tmp cgrEvent.ID = utils.EmptyString - args = NewV1UpdateSessionArgs(true, []string{"attrr1"}, false, + args = NewV1UpdateSessionArgs(true, false, false, []string{"attrr1"}, nil, nil, false, cgrEvent, true) expected = "ATTRIBUTES_ERROR:NOT_IMPLEMENTED" if err := sessions.BiRPCv1UpdateSession(context.Background(), args, rply); err == nil || err.Error() != expected { t.Errorf("Exepected %+v, received %+v", expected, err) } - args = NewV1UpdateSessionArgs(true, []string{}, false, + args = NewV1UpdateSessionArgs(true, false, false, []string{}, nil, nil, false, cgrEvent, true) if err := sessions.BiRPCv1UpdateSession(context.Background(), args, rply); err != nil { t.Error(err) } - args = NewV1UpdateSessionArgs(false, []string{}, false, + args = NewV1UpdateSessionArgs(false, false, false, []string{}, nil, nil, false, cgrEvent, true) expected = "MANDATORY_IE_MISSING: [Subsystems]" if err := sessions.BiRPCv1UpdateSession(context.Background(), args, rply); err == nil || err.Error() != expected { @@ -3259,7 +3259,7 @@ func TestBiRPCv1UpdateSession2(t *testing.T) { utils.OptsDebitInterval: "invalid_dur_format", }, } - args := NewV1UpdateSessionArgs(false, []string{}, true, + args := NewV1UpdateSessionArgs(false, false, false, []string{}, nil, nil, true, cgrEvent, true) rply := new(V1UpdateSessionReply) expected := "RALS_ERROR:time: invalid duration \"invalid_dur_format\"" @@ -3268,7 +3268,7 @@ func TestBiRPCv1UpdateSession2(t *testing.T) { } cgrEvent.APIOpts[utils.OptsDebitInterval] = "10s" - args = NewV1UpdateSessionArgs(false, []string{}, true, + args = NewV1UpdateSessionArgs(false, false, false, []string{}, nil, nil, true, cgrEvent, true) expected = "ChargerS is disabled" if err := sessions.BiRPCv1UpdateSession(context.Background(), args, rply); err == nil || err.Error() != expected { @@ -3282,7 +3282,7 @@ func TestBiRPCv1UpdateSession2(t *testing.T) { } cgrEvent.Event[utils.Usage] = time.Minute - args = NewV1UpdateSessionArgs(false, []string{}, true, + args = NewV1UpdateSessionArgs(false, false, false, []string{}, nil, nil, true, cgrEvent, true) if err := sessions.BiRPCv1UpdateSession(context.Background(), args, rply); err != nil { t.Error(err)