SMGenericV2 GetMaxUsage, InitiateSession, UpdateSession API methods returning time.Duration, improvements on RadiusAgent

This commit is contained in:
DanB
2017-06-06 13:24:24 +02:00
parent 4a9ef9bd53
commit 2fd0c9a4d7
6 changed files with 177 additions and 44 deletions

View File

@@ -21,6 +21,8 @@ package agents
import (
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/sessionmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/radigo"
)
@@ -53,3 +55,15 @@ func radPassesFieldFilter(pkt *radigo.Packet, fieldFilter *utils.RSRField, proce
}
return true
}
// radPktAsSMGEvent converts a RADIUS packet into SMGEvent
func radReqAsSMGEvent(radPkt *radigo.Packet, procVars map[string]string,
tplFlds []*config.CfgCdrField, procFlags utils.StringMap) (smgEv sessionmanager.SMGenericEvent, err error) {
return
}
// radReplyAppendAttributes appends attributes to a RADIUS reply based on predefined template
func radReplyAppendAttributes(reply *radigo.Packet, procVars map[string]string,
tplFlds []*config.CfgCdrField, procFlags utils.StringMap) (err error) {
return
}

View File

@@ -19,6 +19,8 @@ package agents
import (
"fmt"
"strconv"
"time"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
@@ -27,8 +29,14 @@ import (
)
const (
MetaRadReqCode = "*req_code"
MetaRadReplyCode = "*reply_code"
MetaRadReqCode = "*radReqCode"
MetaRadReplyCode = "*radReplyCode"
MetaRadAuth = "*radAuth"
MetaRadAcctStart = "*radAcctStart"
MetaRadAcctUpdate = "*radAcctUpdate"
MetaRadAcctStop = "*radAcctStop"
MetaRadAcctEvent = "*radAcctEvent"
MetaCGRMaxUsage = "*cgrMaxUsage"
)
func NewRadiusAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection) (ra *RadiusAgent, err error) {
@@ -39,10 +47,14 @@ func NewRadiusAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection)
}
}
ra = &RadiusAgent{cgrCfg: cgrCfg, smg: smg}
ra.rsAuth = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, cgrCfg.RadiusAgentCfg().ListenAuth, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts,
map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){radigo.AccessRequest: ra.handleAuth}, nil)
ra.rsAcct = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, cgrCfg.RadiusAgentCfg().ListenAcct, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts,
map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){radigo.AccountingRequest: ra.handleAcct}, nil)
ra.rsAuth = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet,
cgrCfg.RadiusAgentCfg().ListenAuth, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts,
map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){
radigo.AccessRequest: ra.handleAuth}, nil)
ra.rsAcct = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet,
cgrCfg.RadiusAgentCfg().ListenAcct, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts,
map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){
radigo.AccountingRequest: ra.handleAcct}, nil)
return
}
@@ -59,7 +71,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
req.SetAVPValues() // populate string values in AVPs
utils.Logger.Debug(fmt.Sprintf("RadiusAgent handleAuth, received request: %+v", req))
procVars := map[string]string{
MetaRadReqCode: "4",
MetaRadAuth: "true",
}
rpl = req.Reply()
rpl.Code = radigo.AccessAccept
@@ -88,8 +100,16 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e
func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err error) {
req.SetAVPValues() // populate string values in AVPs
utils.Logger.Debug(fmt.Sprintf("Received request: %s", utils.ToJSON(req)))
procVars := map[string]string{
MetaRadReqCode: "4",
procVars := make(map[string]string)
if avps := req.AttributesWithName("Acct-Status-Type", ""); len(avps) != 0 { // populate accounting type
switch avps[0].StringValue() { // first AVP found will give out the type of accounting
case "Start":
procVars[MetaRadAcctStart] = "true"
case "Interim-Update":
procVars[MetaRadAcctUpdate] = "true"
case "Stop":
procVars[MetaRadAcctStop] = "true"
}
}
rpl = req.Reply()
rpl.Code = radigo.AccountingResponse
@@ -125,7 +145,43 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor,
if !passesAllFilters { // Not going with this processor further
return false, nil
}
return
for k, v := range reqProcessor.Flags { // update processorVars with flags from processor
processorVars[k] = strconv.FormatBool(v)
}
smgEv, err := radReqAsSMGEvent(req, processorVars, reqProcessor.RequestFields, reqProcessor.Flags)
if err != nil {
return false, err
}
var maxUsage time.Duration
if processorVars[MetaRadReqCode] == "3" { // auth attempt, make sure that MaxUsage is enough
if err = ra.smg.Call("SMGenericV2.GetMaxUsage", smgEv, &maxUsage); err != nil {
return
}
if reqUsage, has := smgEv[utils.USAGE]; !has { // usage was not requested, decide based on 0
if maxUsage == 0 {
reply.Code = radigo.AccessReject
}
} else if reqUsage.(time.Duration) < maxUsage {
reply.Code = radigo.AccessReject
}
} else if _, has := processorVars[MetaRadAcctStart]; has {
err = ra.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage)
} else if _, has := processorVars[MetaRadAcctUpdate]; has {
err = ra.smg.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage)
} else if _, has := processorVars[MetaRadAcctStop]; has {
var rpl string
err = ra.smg.Call("SMGenericV1.TerminateSession", smgEv, &rpl)
}
if err != nil {
return false, err
}
processorVars[MetaCGRMaxUsage] = strconv.Itoa(int(maxUsage))
if err := radReplyAppendAttributes(reply, processorVars, reqProcessor.ReplyFields, reqProcessor.Flags); err != nil {
return false, err
}
return true, nil
}
func (ra *RadiusAgent) ListenAndServe() (err error) {

View File

@@ -27,79 +27,75 @@ import (
)
func NewSMGenericV1(sm *sessionmanager.SMGeneric) *SMGenericV1 {
return &SMGenericV1{sm: sm}
return &SMGenericV1{SMG: sm}
}
// Exports RPC from SMGeneric
type SMGenericV1 struct {
sm *sessionmanager.SMGeneric
SMG *sessionmanager.SMGeneric
}
// Returns MaxUsage (for calls in seconds), -1 for no limit
func (self *SMGenericV1) GetMaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
return self.sm.BiRPCV1GetMaxUsage(nil, ev, maxUsage)
return self.SMG.BiRPCV1GetMaxUsage(nil, ev, maxUsage)
}
// Returns list of suppliers which can be used for the request
func (self *SMGenericV1) GetLCRSuppliers(ev sessionmanager.SMGenericEvent, suppliers *[]string) error {
return self.sm.BiRPCV1GetLCRSuppliers(nil, ev, suppliers)
return self.SMG.BiRPCV1GetLCRSuppliers(nil, ev, suppliers)
}
// Called on session start, returns the maximum number of seconds the session can last
func (self *SMGenericV1) InitiateSession(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
return self.sm.BiRPCV1InitiateSession(nil, ev, maxUsage)
return self.SMG.BiRPCV1InitiateSession(nil, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (self *SMGenericV1) UpdateSession(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
return self.sm.BiRPCV1UpdateSession(nil, ev, maxUsage)
return self.SMG.BiRPCV1UpdateSession(nil, ev, maxUsage)
}
// Called on session end, should stop debit loop
func (self *SMGenericV1) TerminateSession(ev sessionmanager.SMGenericEvent, reply *string) error {
return self.sm.BiRPCV1TerminateSession(nil, ev, reply)
return self.SMG.BiRPCV1TerminateSession(nil, ev, reply)
}
// Called on individual Events (eg SMS)
func (self *SMGenericV1) ChargeEvent(ev sessionmanager.SMGenericEvent, maxUsage *float64) error {
return self.sm.BiRPCV1ChargeEvent(nil, ev, maxUsage)
return self.SMG.BiRPCV1ChargeEvent(nil, ev, maxUsage)
}
// Called on session end, should send the CDR to CDRS
func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *string) error {
return self.sm.BiRPCV1ProcessCDR(nil, ev, reply)
return self.SMG.BiRPCV1ProcessCDR(nil, ev, reply)
}
func (self *SMGenericV1) GetActiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error {
return self.sm.BiRPCV1GetActiveSessions(nil, attrs, reply)
return self.SMG.BiRPCV1GetActiveSessions(nil, attrs, reply)
}
func (self *SMGenericV1) GetActiveSessionsCount(attrs map[string]string, reply *int) error {
return self.sm.BiRPCV1GetActiveSessionsCount(nil, attrs, reply)
return self.SMG.BiRPCV1GetActiveSessionsCount(nil, attrs, reply)
}
func (self *SMGenericV1) GetPassiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error {
return self.sm.BiRPCV1GetPassiveSessions(nil, attrs, reply)
return self.SMG.BiRPCV1GetPassiveSessions(nil, attrs, reply)
}
func (self *SMGenericV1) GetPassiveSessionsCount(attrs map[string]string, reply *int) error {
return self.sm.BiRPCV1GetPassiveSessionsCount(nil, attrs, reply)
return self.SMG.BiRPCV1GetPassiveSessionsCount(nil, attrs, reply)
}
func (self *SMGenericV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSessions, reply *string) error {
return self.sm.BiRPCV1SetPassiveSessions(nil, args, reply)
}
func (self *SMGenericV1) SetGZIPpedPassiveSessions(args []byte, reply *string) error {
return self.sm.BiRPCV1SetGZIPpedPassiveSessions(nil, args, reply)
return self.SMG.BiRPCV1SetPassiveSessions(nil, args, reply)
}
func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply)
return self.SMG.BiRPCV1ReplicateActiveSessions(nil, args, reply)
}
func (self *SMGenericV1) ReplicatePassiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error {
return self.sm.BiRPCV1ReplicatePassiveSessions(nil, args, reply)
return self.SMG.BiRPCV1ReplicatePassiveSessions(nil, args, reply)
}
// rpcclient.RpcClientConnection interface

44
apier/v2/smgeneric.go Normal file
View File

@@ -0,0 +1,44 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package v2
import (
"time"
"github.com/cgrates/cgrates/apier/v1"
"github.com/cgrates/cgrates/sessionmanager"
)
type SMGenericV2 struct {
v1.SMGenericV1
}
// GetMaxUsage returns maxUsage as time.Duration/int64
func (smgv2 *SMGenericV2) GetMaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2GetMaxUsage(nil, ev, maxUsage)
}
// Called on session start, returns the maximum number of seconds the session can last
func (smgv2 *SMGenericV2) InitiateSession(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2InitiateSession(nil, ev, maxUsage)
}
// Interim updates, returns remaining duration from the rater
func (smgv2 *SMGenericV2) UpdateSession(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error {
return smgv2.SMG.BiRPCV2UpdateSession(nil, ev, maxUsage)
}

View File

@@ -167,6 +167,7 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate
// Register RPC handler
smgRpc := v1.NewSMGenericV1(sm)
server.RpcRegister(smgRpc)
server.RpcRegister(&v2.SMGenericV2{*smgRpc})
// Register BiRpc handlers
if cfg.SmGenericConfig.ListenBijson != "" {
smgBiRpc := v1.NewSMGenericBiRpcV1(sm)

View File

@@ -18,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
package sessionmanager
import (
"encoding/json"
"errors"
"fmt"
"reflect"
@@ -1016,7 +1015,6 @@ func (smg *SMGeneric) CallBiRPC(clnt rpcclient.RpcClientConnection, serviceMetho
}
func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error {
*maxUsage = 0 // Bug in OpenSIPS, remove in the future
maxUsageDur, err := smg.GetMaxUsage(ev)
if err != nil {
return utils.NewErrServerError(err)
@@ -1029,6 +1027,16 @@ func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection, ev
return nil
}
// BiRPCV2GetMaxUsage returns the maximum usage as duration/int64
func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) error {
maxUsageDur, err := smg.GetMaxUsage(ev)
if err != nil {
return utils.NewErrServerError(err)
}
*maxUsage = maxUsageDur
return nil
}
/// Returns list of suppliers which can be used for the request
func (smg *SMGeneric) BiRPCV1GetLCRSuppliers(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, suppliers *[]string) error {
if supls, err := smg.GetLCRSuppliers(ev); err != nil {
@@ -1052,6 +1060,19 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection,
return
}
// BiRPCV2InitiateSession initiates a new session, returns the maximum duration the session can last
func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
} else {
*maxUsage = minMaxUsage
}
return
}
// Interim updates, returns remaining duration from the RALs
func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) (err error) {
var minMaxUsage time.Duration
@@ -1065,6 +1086,19 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, e
return
}
// BiRPCV1UpdateSession updates an existing session, returning the duration which the session can still last
func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) {
var minMaxUsage time.Duration
if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil {
if err != rpcclient.ErrSessionNotFound {
err = utils.NewErrServerError(err)
}
} else {
*maxUsage = minMaxUsage
}
return
}
// Called on session end, should stop debit loop
func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, reply *string) (err error) {
if err = smg.TerminateSession(ev, clnt); err != nil {
@@ -1174,18 +1208,6 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(clnt rpcclient.RpcClientConnecti
return
}
// BiRPCV1SetGZIPpedPassiveSessions is used to handle GZIP compressed arguments to BiRPCV1SetPassiveSessions
// eg: if CallCosts are too big, sending them over network could introduce latency
func (smg *SMGeneric) BiRPCV1SetGZIPpedPassiveSessions(clnt rpcclient.RpcClientConnection, args []byte, reply *string) (err error) {
var argsSetPSS ArgsSetPassiveSessions
if dst, err := utils.GUnZIPContent(args); err != nil {
return err
} else if err := json.Unmarshal(dst, &argsSetPSS); err != nil {
return err
}
return smg.BiRPCV1SetPassiveSessions(clnt, argsSetPSS, reply)
}
type ArgsReplicateSessions struct {
Filter map[string]string
Connections []*config.HaPoolConfig