From fd0ef907c8ba5fe900f181b04baacf7484109e76 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 13 Nov 2025 20:19:36 +0100 Subject: [PATCH] Start SessionSv1.ProcessEvent API --- .../sessions_procev_internal/cgrates.json | 105 ++++++++++++++++++ sessions/apis.go | 40 +++++++ sessions/libsessions.go | 12 +- sessions/procev_it_test.go | 104 +++++++++++++++++ sessions/sessions.go | 6 +- sessions/sessions_test.go | 6 +- 6 files changed, 262 insertions(+), 11 deletions(-) create mode 100644 data/conf/samples/sessions_procev_internal/cgrates.json create mode 100644 sessions/procev_it_test.go diff --git a/data/conf/samples/sessions_procev_internal/cgrates.json b/data/conf/samples/sessions_procev_internal/cgrates.json new file mode 100644 index 000000000..690bea2c6 --- /dev/null +++ b/data/conf/samples/sessions_procev_internal/cgrates.json @@ -0,0 +1,105 @@ +{ +// CGRateS Configuration file +// +// Used for SessionSv1 integration tests + + +"general": { + "reply_timeout": "10s" +}, + +"logger": { + "level": 7 +}, + +"listen": { + "rpc_json": ":2012", + "rpc_gob": ":2013", + "http": ":2080", +}, + +"db": { + "db_conns": { + "*default": { + "db_type": "*internal", + "opts":{ + "internalDBRewriteInterval": "0s", + "internalDBDumpInterval": "0s" + } + } + } +}, + +"stor_db": { + "db_type": "*internal" +}, + +"rates": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, + "session_cost_retries": 1, + "chargers_conns":["*internal"], + "rates_conns": ["*internal"], +}, + + +"chargers": { + "enabled": true, + "attributes_conns": ["*internal"], +}, + + +"resources": { + "enabled": true, + "store_interval": "-1", +}, + + +"attributes": { + "enabled": true, +}, + + +"thresholds": { + "enabled": true, + "store_interval": "-1", +}, + + +"stats": { + "enabled": true, + "store_interval": "-1", + "thresholds_conns": ["*internal"], +}, + + +"routes": { + "enabled": true, +}, + + +"sessions": { + "enabled": true, + "session_ttl": "50ms", + "chargers_conns": ["*internal"], + "rates_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "resources_conns": ["*internal"], + "thresholds_conns": ["*internal"], + "stats_conns": ["*internal"], + "routes_conns": ["*internal"], + "attributes_conns": ["*internal"], + "alterable_fields": ["Extra1"] +}, + + +"admins": { + "enabled": true, +}, + + +} diff --git a/sessions/apis.go b/sessions/apis.go index 098d03d6e..e40ae8766 100644 --- a/sessions/apis.go +++ b/sessions/apis.go @@ -781,3 +781,43 @@ func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context, return sS.processCDR(ctx, cgrEv, rply) } + +// BiRPCv1ProcessEvent processes an CGREvent with various subsystems +func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, + args *utils.CGREvent, authReply *V1ProcessEventReply) (err error) { + if args == nil { + return utils.NewErrMandatoryIeMissing(utils.CGREventString) + } + if args.Event == nil { + return utils.NewErrMandatoryIeMissing(utils.Event) + } + if args.APIOpts == nil { + args.APIOpts = make(map[string]any) + } + if args.ID == "" { + args.ID = utils.GenUUID() + } + if args.Tenant == "" { + args.Tenant = sS.cfg.GeneralCfg().DefaultTenant + } + // RPC caching + if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 { + cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID) + refID := guardian.Guardian.GuardIDs("", + sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic + defer guardian.Guardian.UnguardIDs(refID) + + if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has { + cachedResp := itm.(*utils.CachedRPCResponse) + if cachedResp.Error == nil { + *authReply = *cachedResp.Result.(*V1ProcessEventReply) + } + return cachedResp.Error + } + defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey, + &utils.CachedRPCResponse{Result: authReply, Error: err}, + nil, true, utils.NonTransactional) + } + // end of RPC caching + return +} diff --git a/sessions/libsessions.go b/sessions/libsessions.go index 6e88af7e1..f92289319 100644 --- a/sessions/libsessions.go +++ b/sessions/libsessions.go @@ -325,8 +325,8 @@ func getDerivedEvents(events map[string]*utils.CGREvent, derivedReply bool) map[ // V1ProcessEventReply is the reply for the ProcessEvent API type V1ProcessEventReply struct { - MaxUsage map[string]time.Duration `json:",omitempty"` - Cost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part + AccountSUsage map[string]time.Duration `json:",omitempty"` + RateSCost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part ResourceAllocation map[string]string `json:",omitempty"` Attributes map[string]*attributes.AttrSProcessEventReply `json:",omitempty"` RouteProfiles map[string]routes.SortedRoutesList `json:",omitempty"` @@ -338,9 +338,9 @@ type V1ProcessEventReply struct { // AsNavigableMap is part of engine.NavigableMapper interface func (v1Rply *V1ProcessEventReply) AsNavigableMap() map[string]*utils.DataNode { cgrReply := make(map[string]*utils.DataNode) - if v1Rply.MaxUsage != nil { + if v1Rply.AccountSUsage != nil { usage := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)} - for k, v := range v1Rply.MaxUsage { + for k, v := range v1Rply.AccountSUsage { usage.Map[k] = utils.NewLeafNode(v) } cgrReply[utils.CapMaxUsage] = usage @@ -397,9 +397,9 @@ func (v1Rply *V1ProcessEventReply) AsNavigableMap() map[string]*utils.DataNode { } cgrReply[utils.CapStatQueues] = st } - if v1Rply.Cost != nil { + if v1Rply.RateSCost != nil { costs := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)} - for k, cost := range v1Rply.Cost { + for k, cost := range v1Rply.RateSCost { costs.Map[k] = utils.NewLeafNode(cost) } } diff --git a/sessions/procev_it_test.go b/sessions/procev_it_test.go new file mode 100644 index 000000000..c4d6fa96c --- /dev/null +++ b/sessions/procev_it_test.go @@ -0,0 +1,104 @@ +//go:build integration +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package sessions + +import ( + "os" + "path" + "testing" + + "github.com/cgrates/birpc" + "github.com/cgrates/birpc/context" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" +) + +var ( + sProcEvCfgPath string + sProcEvCfgDIR string + sProcEvCfg *config.CGRConfig + sProcEvRPC *birpc.Client + + SessionsBkupTests = []func(t *testing.T){ + testSessionSProcEvInitCfg, + testSessionSProcEvResetDB, + testSessionSProcEvStartEngine, + testSessionSProcEvApierRpcConn, + + testSessionSProcEvStopCgrEngine, + } +) + +func TestSessionSProcEv(t *testing.T) { + switch *utils.DBType { + case utils.MetaInternal: + sProcEvCfgDIR = "sessions_procev_internal" + defer func() { + if err := os.RemoveAll("/tmp/internal_db"); err != nil { + t.Error(err) + } + }() + case utils.MetaMySQL: + sProcEvCfgDIR = "sessions_procev_mysql" + case utils.MetaMongo: + sProcEvCfgDIR = "sessions_backup_mongo" + case utils.MetaPostgres: + sProcEvCfgDIR = "sessions_procev_postgres" + default: + t.Fatal("Unknown Database type") + } + for _, stest := range SessionsBkupTests { + t.Run(*utils.DBType, stest) + } +} + +func testSessionSProcEvInitCfg(t *testing.T) { + var err error + sProcEvCfgPath = path.Join(*utils.DataDir, "conf", "samples", sProcEvCfgDIR) + if sProcEvCfg, err = config.NewCGRConfigFromPath(context.Background(), sProcEvCfgPath); err != nil { + t.Fatal(err) + } +} + +// Remove data in both rating and accounting db +func testSessionSProcEvResetDB(t *testing.T) { + engine.FlushDBs(t, sProcEvCfg, true) +} + +// Start CGR Engine +func testSessionSProcEvStartEngine(t *testing.T) { + if _, err := engine.StartEngine(sProcEvCfgPath, *utils.WaitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testSessionSProcEvApierRpcConn(t *testing.T) { + sProcEvRPC = engine.NewRPCClient(t, sProcEvCfg.ListenCfg(), *utils.Encoding) +} + +func testSessionSProcEvStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(1000); err != nil { + t.Error(err) + } +} diff --git a/sessions/sessions.go b/sessions/sessions.go index dcbdeac6e..dbe5f8790 100644 --- a/sessions/sessions.go +++ b/sessions/sessions.go @@ -1708,10 +1708,11 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context, return } +/* // BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, args *utils.CGREvent, rply *V1ProcessEventReply) (err error) { - /* + if args == nil { return utils.NewErrMandatoryIeMissing(utils.CGREventString) } @@ -2151,9 +2152,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context, if withErrors { err = utils.ErrPartiallyExecuted } - */ + return } +*/ // BiRPCv1SyncSessions will sync sessions on demand func (sS *SessionS) BiRPCv1SyncSessions(ctx *context.Context, diff --git a/sessions/sessions_test.go b/sessions/sessions_test.go index 3572cd8e3..74219fa08 100644 --- a/sessions/sessions_test.go +++ b/sessions/sessions_test.go @@ -1166,7 +1166,7 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) { t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply) } //max usage check - v1per.MaxUsage = map[string]time.Duration{utils.MetaDefault: 5 * time.Minute} + v1per.AccountSUsage = map[string]time.Duration{utils.MetaDefault: 5 * time.Minute} expected[utils.CapMaxUsage] = &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaDefault: utils.NewLeafNode(5 * time.Minute)}} if rply := v1per.AsNavigableMap(); !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply) @@ -1213,8 +1213,8 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) { } cost := map[string]float64{"TEST1": 2.0} - v1per.Cost = cost - v1per.Cost[utils.MetaRaw] = cost["TEST1"] + v1per.RateSCost = cost + v1per.RateSCost[utils.MetaRaw] = cost["TEST1"] if rply := v1per.AsNavigableMap(); !reflect.DeepEqual(expected, rply) { t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply) }