Start SessionSv1.ProcessEvent API

This commit is contained in:
DanB
2025-11-13 20:19:36 +01:00
parent c51d3f27de
commit fd0ef907c8
6 changed files with 262 additions and 11 deletions

View File

@@ -0,0 +1,105 @@
{
// CGRateS Configuration file
//
// Used for SessionSv1 integration tests
"general": {
"reply_timeout": "10s"
},
"logger": {
"level": 7
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080",
},
"db": {
"db_conns": {
"*default": {
"db_type": "*internal",
"opts":{
"internalDBRewriteInterval": "0s",
"internalDBDumpInterval": "0s"
}
}
}
},
"stor_db": {
"db_type": "*internal"
},
"rates": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"session_cost_retries": 1,
"chargers_conns":["*internal"],
"rates_conns": ["*internal"],
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"],
},
"resources": {
"enabled": true,
"store_interval": "-1",
},
"attributes": {
"enabled": true,
},
"thresholds": {
"enabled": true,
"store_interval": "-1",
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"],
},
"routes": {
"enabled": true,
},
"sessions": {
"enabled": true,
"session_ttl": "50ms",
"chargers_conns": ["*internal"],
"rates_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"resources_conns": ["*internal"],
"thresholds_conns": ["*internal"],
"stats_conns": ["*internal"],
"routes_conns": ["*internal"],
"attributes_conns": ["*internal"],
"alterable_fields": ["Extra1"]
},
"admins": {
"enabled": true,
},
}

View File

@@ -781,3 +781,43 @@ func (sS *SessionS) BiRPCv1ProcessCDR(ctx *context.Context,
return sS.processCDR(ctx, cgrEv, rply)
}
// BiRPCv1ProcessEvent processes an CGREvent with various subsystems
func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
args *utils.CGREvent, authReply *V1ProcessEventReply) (err error) {
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
if args.Event == nil {
return utils.NewErrMandatoryIeMissing(utils.Event)
}
if args.APIOpts == nil {
args.APIOpts = make(map[string]any)
}
if args.ID == "" {
args.ID = utils.GenUUID()
}
if args.Tenant == "" {
args.Tenant = sS.cfg.GeneralCfg().DefaultTenant
}
// RPC caching
if sS.cfg.CacheCfg().Partitions[utils.CacheRPCResponses].Limit != 0 {
cacheKey := utils.ConcatenatedKey(utils.SessionSv1AuthorizeEvent, args.ID)
refID := guardian.Guardian.GuardIDs("",
sS.cfg.GeneralCfg().LockingTimeout, cacheKey) // RPC caching needs to be atomic
defer guardian.Guardian.UnguardIDs(refID)
if itm, has := engine.Cache.Get(utils.CacheRPCResponses, cacheKey); has {
cachedResp := itm.(*utils.CachedRPCResponse)
if cachedResp.Error == nil {
*authReply = *cachedResp.Result.(*V1ProcessEventReply)
}
return cachedResp.Error
}
defer engine.Cache.Set(ctx, utils.CacheRPCResponses, cacheKey,
&utils.CachedRPCResponse{Result: authReply, Error: err},
nil, true, utils.NonTransactional)
}
// end of RPC caching
return
}

View File

@@ -325,8 +325,8 @@ func getDerivedEvents(events map[string]*utils.CGREvent, derivedReply bool) map[
// V1ProcessEventReply is the reply for the ProcessEvent API
type V1ProcessEventReply struct {
MaxUsage map[string]time.Duration `json:",omitempty"`
Cost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part
AccountSUsage map[string]time.Duration `json:",omitempty"`
RateSCost map[string]float64 `json:",omitempty"` // Cost is the cost received from Rater, ignoring accounting part
ResourceAllocation map[string]string `json:",omitempty"`
Attributes map[string]*attributes.AttrSProcessEventReply `json:",omitempty"`
RouteProfiles map[string]routes.SortedRoutesList `json:",omitempty"`
@@ -338,9 +338,9 @@ type V1ProcessEventReply struct {
// AsNavigableMap is part of engine.NavigableMapper interface
func (v1Rply *V1ProcessEventReply) AsNavigableMap() map[string]*utils.DataNode {
cgrReply := make(map[string]*utils.DataNode)
if v1Rply.MaxUsage != nil {
if v1Rply.AccountSUsage != nil {
usage := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)}
for k, v := range v1Rply.MaxUsage {
for k, v := range v1Rply.AccountSUsage {
usage.Map[k] = utils.NewLeafNode(v)
}
cgrReply[utils.CapMaxUsage] = usage
@@ -397,9 +397,9 @@ func (v1Rply *V1ProcessEventReply) AsNavigableMap() map[string]*utils.DataNode {
}
cgrReply[utils.CapStatQueues] = st
}
if v1Rply.Cost != nil {
if v1Rply.RateSCost != nil {
costs := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)}
for k, cost := range v1Rply.Cost {
for k, cost := range v1Rply.RateSCost {
costs.Map[k] = utils.NewLeafNode(cost)
}
}

104
sessions/procev_it_test.go Normal file
View File

@@ -0,0 +1,104 @@
//go:build integration
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package sessions
import (
"os"
"path"
"testing"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
)
var (
sProcEvCfgPath string
sProcEvCfgDIR string
sProcEvCfg *config.CGRConfig
sProcEvRPC *birpc.Client
SessionsBkupTests = []func(t *testing.T){
testSessionSProcEvInitCfg,
testSessionSProcEvResetDB,
testSessionSProcEvStartEngine,
testSessionSProcEvApierRpcConn,
testSessionSProcEvStopCgrEngine,
}
)
func TestSessionSProcEv(t *testing.T) {
switch *utils.DBType {
case utils.MetaInternal:
sProcEvCfgDIR = "sessions_procev_internal"
defer func() {
if err := os.RemoveAll("/tmp/internal_db"); err != nil {
t.Error(err)
}
}()
case utils.MetaMySQL:
sProcEvCfgDIR = "sessions_procev_mysql"
case utils.MetaMongo:
sProcEvCfgDIR = "sessions_backup_mongo"
case utils.MetaPostgres:
sProcEvCfgDIR = "sessions_procev_postgres"
default:
t.Fatal("Unknown Database type")
}
for _, stest := range SessionsBkupTests {
t.Run(*utils.DBType, stest)
}
}
func testSessionSProcEvInitCfg(t *testing.T) {
var err error
sProcEvCfgPath = path.Join(*utils.DataDir, "conf", "samples", sProcEvCfgDIR)
if sProcEvCfg, err = config.NewCGRConfigFromPath(context.Background(), sProcEvCfgPath); err != nil {
t.Fatal(err)
}
}
// Remove data in both rating and accounting db
func testSessionSProcEvResetDB(t *testing.T) {
engine.FlushDBs(t, sProcEvCfg, true)
}
// Start CGR Engine
func testSessionSProcEvStartEngine(t *testing.T) {
if _, err := engine.StartEngine(sProcEvCfgPath, *utils.WaitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testSessionSProcEvApierRpcConn(t *testing.T) {
sProcEvRPC = engine.NewRPCClient(t, sProcEvCfg.ListenCfg(), *utils.Encoding)
}
func testSessionSProcEvStopCgrEngine(t *testing.T) {
if err := engine.KillEngine(1000); err != nil {
t.Error(err)
}
}

View File

@@ -1708,10 +1708,11 @@ func (sS *SessionS) BiRPCv1ProcessMessage(ctx *context.Context,
return
}
/*
// BiRPCv1ProcessEvent processes one event with the right subsystems based on arguments received
func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
args *utils.CGREvent, rply *V1ProcessEventReply) (err error) {
/*
if args == nil {
return utils.NewErrMandatoryIeMissing(utils.CGREventString)
}
@@ -2151,9 +2152,10 @@ func (sS *SessionS) BiRPCv1ProcessEvent(ctx *context.Context,
if withErrors {
err = utils.ErrPartiallyExecuted
}
*/
return
}
*/
// BiRPCv1SyncSessions will sync sessions on demand
func (sS *SessionS) BiRPCv1SyncSessions(ctx *context.Context,

View File

@@ -1166,7 +1166,7 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) {
t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply)
}
//max usage check
v1per.MaxUsage = map[string]time.Duration{utils.MetaDefault: 5 * time.Minute}
v1per.AccountSUsage = map[string]time.Duration{utils.MetaDefault: 5 * time.Minute}
expected[utils.CapMaxUsage] = &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.MetaDefault: utils.NewLeafNode(5 * time.Minute)}}
if rply := v1per.AsNavigableMap(); !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply)
@@ -1213,8 +1213,8 @@ func TestV1ProcessEventReplyAsNavigableMap(t *testing.T) {
}
cost := map[string]float64{"TEST1": 2.0}
v1per.Cost = cost
v1per.Cost[utils.MetaRaw] = cost["TEST1"]
v1per.RateSCost = cost
v1per.RateSCost[utils.MetaRaw] = cost["TEST1"]
if rply := v1per.AsNavigableMap(); !reflect.DeepEqual(expected, rply) {
t.Errorf("Expecting \n%+v\n, received: \n%+v", expected, rply)
}