diff --git a/agents/librad.go b/agents/librad.go index dfffd5b09..641b77249 100644 --- a/agents/librad.go +++ b/agents/librad.go @@ -21,6 +21,8 @@ package agents import ( "strings" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/sessionmanager" "github.com/cgrates/cgrates/utils" "github.com/cgrates/radigo" ) @@ -53,3 +55,15 @@ func radPassesFieldFilter(pkt *radigo.Packet, fieldFilter *utils.RSRField, proce } return true } + +// radPktAsSMGEvent converts a RADIUS packet into SMGEvent +func radReqAsSMGEvent(radPkt *radigo.Packet, procVars map[string]string, + tplFlds []*config.CfgCdrField, procFlags utils.StringMap) (smgEv sessionmanager.SMGenericEvent, err error) { + return +} + +// radReplyAppendAttributes appends attributes to a RADIUS reply based on predefined template +func radReplyAppendAttributes(reply *radigo.Packet, procVars map[string]string, + tplFlds []*config.CfgCdrField, procFlags utils.StringMap) (err error) { + return +} diff --git a/agents/radagent.go b/agents/radagent.go index 0798b551b..5d1adaf95 100644 --- a/agents/radagent.go +++ b/agents/radagent.go @@ -19,6 +19,8 @@ package agents import ( "fmt" + "strconv" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" @@ -27,8 +29,14 @@ import ( ) const ( - MetaRadReqCode = "*req_code" - MetaRadReplyCode = "*reply_code" + MetaRadReqCode = "*radReqCode" + MetaRadReplyCode = "*radReplyCode" + MetaRadAuth = "*radAuth" + MetaRadAcctStart = "*radAcctStart" + MetaRadAcctUpdate = "*radAcctUpdate" + MetaRadAcctStop = "*radAcctStop" + MetaRadAcctEvent = "*radAcctEvent" + MetaCGRMaxUsage = "*cgrMaxUsage" ) func NewRadiusAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection) (ra *RadiusAgent, err error) { @@ -39,10 +47,14 @@ func NewRadiusAgent(cgrCfg *config.CGRConfig, smg rpcclient.RpcClientConnection) } } ra = &RadiusAgent{cgrCfg: cgrCfg, smg: smg} - ra.rsAuth = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, cgrCfg.RadiusAgentCfg().ListenAuth, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts, - map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){radigo.AccessRequest: ra.handleAuth}, nil) - ra.rsAcct = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, cgrCfg.RadiusAgentCfg().ListenAcct, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts, - map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){radigo.AccountingRequest: ra.handleAcct}, nil) + ra.rsAuth = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, + cgrCfg.RadiusAgentCfg().ListenAuth, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts, + map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){ + radigo.AccessRequest: ra.handleAuth}, nil) + ra.rsAcct = radigo.NewServer(cgrCfg.RadiusAgentCfg().ListenNet, + cgrCfg.RadiusAgentCfg().ListenAcct, cgrCfg.RadiusAgentCfg().ClientSecrets, dicts, + map[radigo.PacketCode]func(*radigo.Packet) (*radigo.Packet, error){ + radigo.AccountingRequest: ra.handleAcct}, nil) return } @@ -59,7 +71,7 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e req.SetAVPValues() // populate string values in AVPs utils.Logger.Debug(fmt.Sprintf("RadiusAgent handleAuth, received request: %+v", req)) procVars := map[string]string{ - MetaRadReqCode: "4", + MetaRadAuth: "true", } rpl = req.Reply() rpl.Code = radigo.AccessAccept @@ -88,8 +100,16 @@ func (ra *RadiusAgent) handleAuth(req *radigo.Packet) (rpl *radigo.Packet, err e func (ra *RadiusAgent) handleAcct(req *radigo.Packet) (rpl *radigo.Packet, err error) { req.SetAVPValues() // populate string values in AVPs utils.Logger.Debug(fmt.Sprintf("Received request: %s", utils.ToJSON(req))) - procVars := map[string]string{ - MetaRadReqCode: "4", + procVars := make(map[string]string) + if avps := req.AttributesWithName("Acct-Status-Type", ""); len(avps) != 0 { // populate accounting type + switch avps[0].StringValue() { // first AVP found will give out the type of accounting + case "Start": + procVars[MetaRadAcctStart] = "true" + case "Interim-Update": + procVars[MetaRadAcctUpdate] = "true" + case "Stop": + procVars[MetaRadAcctStop] = "true" + } } rpl = req.Reply() rpl.Code = radigo.AccountingResponse @@ -125,7 +145,43 @@ func (ra *RadiusAgent) processRequest(reqProcessor *config.RARequestProcessor, if !passesAllFilters { // Not going with this processor further return false, nil } - return + for k, v := range reqProcessor.Flags { // update processorVars with flags from processor + processorVars[k] = strconv.FormatBool(v) + } + smgEv, err := radReqAsSMGEvent(req, processorVars, reqProcessor.RequestFields, reqProcessor.Flags) + if err != nil { + return false, err + } + + var maxUsage time.Duration + if processorVars[MetaRadReqCode] == "3" { // auth attempt, make sure that MaxUsage is enough + if err = ra.smg.Call("SMGenericV2.GetMaxUsage", smgEv, &maxUsage); err != nil { + return + } + if reqUsage, has := smgEv[utils.USAGE]; !has { // usage was not requested, decide based on 0 + if maxUsage == 0 { + reply.Code = radigo.AccessReject + } + } else if reqUsage.(time.Duration) < maxUsage { + reply.Code = radigo.AccessReject + } + } else if _, has := processorVars[MetaRadAcctStart]; has { + err = ra.smg.Call("SMGenericV1.InitiateSession", smgEv, &maxUsage) + } else if _, has := processorVars[MetaRadAcctUpdate]; has { + err = ra.smg.Call("SMGenericV1.UpdateSession", smgEv, &maxUsage) + } else if _, has := processorVars[MetaRadAcctStop]; has { + var rpl string + err = ra.smg.Call("SMGenericV1.TerminateSession", smgEv, &rpl) + } + if err != nil { + return false, err + } + + processorVars[MetaCGRMaxUsage] = strconv.Itoa(int(maxUsage)) + if err := radReplyAppendAttributes(reply, processorVars, reqProcessor.ReplyFields, reqProcessor.Flags); err != nil { + return false, err + } + return true, nil } func (ra *RadiusAgent) ListenAndServe() (err error) { diff --git a/apier/v1/smgenericv1.go b/apier/v1/smgenericv1.go index 9a6aff681..d4df7ec98 100644 --- a/apier/v1/smgenericv1.go +++ b/apier/v1/smgenericv1.go @@ -27,79 +27,75 @@ import ( ) func NewSMGenericV1(sm *sessionmanager.SMGeneric) *SMGenericV1 { - return &SMGenericV1{sm: sm} + return &SMGenericV1{SMG: sm} } // Exports RPC from SMGeneric type SMGenericV1 struct { - sm *sessionmanager.SMGeneric + SMG *sessionmanager.SMGeneric } // Returns MaxUsage (for calls in seconds), -1 for no limit func (self *SMGenericV1) GetMaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *float64) error { - return self.sm.BiRPCV1GetMaxUsage(nil, ev, maxUsage) + return self.SMG.BiRPCV1GetMaxUsage(nil, ev, maxUsage) } // Returns list of suppliers which can be used for the request func (self *SMGenericV1) GetLCRSuppliers(ev sessionmanager.SMGenericEvent, suppliers *[]string) error { - return self.sm.BiRPCV1GetLCRSuppliers(nil, ev, suppliers) + return self.SMG.BiRPCV1GetLCRSuppliers(nil, ev, suppliers) } // Called on session start, returns the maximum number of seconds the session can last func (self *SMGenericV1) InitiateSession(ev sessionmanager.SMGenericEvent, maxUsage *float64) error { - return self.sm.BiRPCV1InitiateSession(nil, ev, maxUsage) + return self.SMG.BiRPCV1InitiateSession(nil, ev, maxUsage) } // Interim updates, returns remaining duration from the rater func (self *SMGenericV1) UpdateSession(ev sessionmanager.SMGenericEvent, maxUsage *float64) error { - return self.sm.BiRPCV1UpdateSession(nil, ev, maxUsage) + return self.SMG.BiRPCV1UpdateSession(nil, ev, maxUsage) } // Called on session end, should stop debit loop func (self *SMGenericV1) TerminateSession(ev sessionmanager.SMGenericEvent, reply *string) error { - return self.sm.BiRPCV1TerminateSession(nil, ev, reply) + return self.SMG.BiRPCV1TerminateSession(nil, ev, reply) } // Called on individual Events (eg SMS) func (self *SMGenericV1) ChargeEvent(ev sessionmanager.SMGenericEvent, maxUsage *float64) error { - return self.sm.BiRPCV1ChargeEvent(nil, ev, maxUsage) + return self.SMG.BiRPCV1ChargeEvent(nil, ev, maxUsage) } // Called on session end, should send the CDR to CDRS func (self *SMGenericV1) ProcessCDR(ev sessionmanager.SMGenericEvent, reply *string) error { - return self.sm.BiRPCV1ProcessCDR(nil, ev, reply) + return self.SMG.BiRPCV1ProcessCDR(nil, ev, reply) } func (self *SMGenericV1) GetActiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { - return self.sm.BiRPCV1GetActiveSessions(nil, attrs, reply) + return self.SMG.BiRPCV1GetActiveSessions(nil, attrs, reply) } func (self *SMGenericV1) GetActiveSessionsCount(attrs map[string]string, reply *int) error { - return self.sm.BiRPCV1GetActiveSessionsCount(nil, attrs, reply) + return self.SMG.BiRPCV1GetActiveSessionsCount(nil, attrs, reply) } func (self *SMGenericV1) GetPassiveSessions(attrs map[string]string, reply *[]*sessionmanager.ActiveSession) error { - return self.sm.BiRPCV1GetPassiveSessions(nil, attrs, reply) + return self.SMG.BiRPCV1GetPassiveSessions(nil, attrs, reply) } func (self *SMGenericV1) GetPassiveSessionsCount(attrs map[string]string, reply *int) error { - return self.sm.BiRPCV1GetPassiveSessionsCount(nil, attrs, reply) + return self.SMG.BiRPCV1GetPassiveSessionsCount(nil, attrs, reply) } func (self *SMGenericV1) SetPassiveSessions(args sessionmanager.ArgsSetPassiveSessions, reply *string) error { - return self.sm.BiRPCV1SetPassiveSessions(nil, args, reply) -} - -func (self *SMGenericV1) SetGZIPpedPassiveSessions(args []byte, reply *string) error { - return self.sm.BiRPCV1SetGZIPpedPassiveSessions(nil, args, reply) + return self.SMG.BiRPCV1SetPassiveSessions(nil, args, reply) } func (self *SMGenericV1) ReplicateActiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error { - return self.sm.BiRPCV1ReplicateActiveSessions(nil, args, reply) + return self.SMG.BiRPCV1ReplicateActiveSessions(nil, args, reply) } func (self *SMGenericV1) ReplicatePassiveSessions(args sessionmanager.ArgsReplicateSessions, reply *string) error { - return self.sm.BiRPCV1ReplicatePassiveSessions(nil, args, reply) + return self.SMG.BiRPCV1ReplicatePassiveSessions(nil, args, reply) } // rpcclient.RpcClientConnection interface diff --git a/apier/v2/smgeneric.go b/apier/v2/smgeneric.go new file mode 100644 index 000000000..15ddf49d7 --- /dev/null +++ b/apier/v2/smgeneric.go @@ -0,0 +1,44 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package v2 + +import ( + "time" + + "github.com/cgrates/cgrates/apier/v1" + "github.com/cgrates/cgrates/sessionmanager" +) + +type SMGenericV2 struct { + v1.SMGenericV1 +} + +// GetMaxUsage returns maxUsage as time.Duration/int64 +func (smgv2 *SMGenericV2) GetMaxUsage(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error { + return smgv2.SMG.BiRPCV2GetMaxUsage(nil, ev, maxUsage) +} + +// Called on session start, returns the maximum number of seconds the session can last +func (smgv2 *SMGenericV2) InitiateSession(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error { + return smgv2.SMG.BiRPCV2InitiateSession(nil, ev, maxUsage) +} + +// Interim updates, returns remaining duration from the rater +func (smgv2 *SMGenericV2) UpdateSession(ev sessionmanager.SMGenericEvent, maxUsage *time.Duration) error { + return smgv2.SMG.BiRPCV2UpdateSession(nil, ev, maxUsage) +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index d3ed78d20..adbb9ae6d 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -167,6 +167,7 @@ func startSmGeneric(internalSMGChan chan *sessionmanager.SMGeneric, internalRate // Register RPC handler smgRpc := v1.NewSMGenericV1(sm) server.RpcRegister(smgRpc) + server.RpcRegister(&v2.SMGenericV2{*smgRpc}) // Register BiRpc handlers if cfg.SmGenericConfig.ListenBijson != "" { smgBiRpc := v1.NewSMGenericBiRpcV1(sm) diff --git a/sessionmanager/smgeneric.go b/sessionmanager/smgeneric.go index d0ac3ca6a..535ef3229 100644 --- a/sessionmanager/smgeneric.go +++ b/sessionmanager/smgeneric.go @@ -18,7 +18,6 @@ along with this program. If not, see package sessionmanager import ( - "encoding/json" "errors" "fmt" "reflect" @@ -1016,7 +1015,6 @@ func (smg *SMGeneric) CallBiRPC(clnt rpcclient.RpcClientConnection, serviceMetho } func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) error { - *maxUsage = 0 // Bug in OpenSIPS, remove in the future maxUsageDur, err := smg.GetMaxUsage(ev) if err != nil { return utils.NewErrServerError(err) @@ -1029,6 +1027,16 @@ func (smg *SMGeneric) BiRPCV1GetMaxUsage(clnt rpcclient.RpcClientConnection, ev return nil } +// BiRPCV2GetMaxUsage returns the maximum usage as duration/int64 +func (smg *SMGeneric) BiRPCV2GetMaxUsage(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) error { + maxUsageDur, err := smg.GetMaxUsage(ev) + if err != nil { + return utils.NewErrServerError(err) + } + *maxUsage = maxUsageDur + return nil +} + /// Returns list of suppliers which can be used for the request func (smg *SMGeneric) BiRPCV1GetLCRSuppliers(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, suppliers *[]string) error { if supls, err := smg.GetLCRSuppliers(ev); err != nil { @@ -1052,6 +1060,19 @@ func (smg *SMGeneric) BiRPCV1InitiateSession(clnt rpcclient.RpcClientConnection, return } +// BiRPCV2InitiateSession initiates a new session, returns the maximum duration the session can last +func (smg *SMGeneric) BiRPCV2InitiateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) { + var minMaxUsage time.Duration + if minMaxUsage, err = smg.InitiateSession(ev, clnt); err != nil { + if err != rpcclient.ErrSessionNotFound { + err = utils.NewErrServerError(err) + } + } else { + *maxUsage = minMaxUsage + } + return +} + // Interim updates, returns remaining duration from the RALs func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *float64) (err error) { var minMaxUsage time.Duration @@ -1065,6 +1086,19 @@ func (smg *SMGeneric) BiRPCV1UpdateSession(clnt rpcclient.RpcClientConnection, e return } +// BiRPCV1UpdateSession updates an existing session, returning the duration which the session can still last +func (smg *SMGeneric) BiRPCV2UpdateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, maxUsage *time.Duration) (err error) { + var minMaxUsage time.Duration + if minMaxUsage, err = smg.UpdateSession(ev, clnt); err != nil { + if err != rpcclient.ErrSessionNotFound { + err = utils.NewErrServerError(err) + } + } else { + *maxUsage = minMaxUsage + } + return +} + // Called on session end, should stop debit loop func (smg *SMGeneric) BiRPCV1TerminateSession(clnt rpcclient.RpcClientConnection, ev SMGenericEvent, reply *string) (err error) { if err = smg.TerminateSession(ev, clnt); err != nil { @@ -1174,18 +1208,6 @@ func (smg *SMGeneric) BiRPCV1SetPassiveSessions(clnt rpcclient.RpcClientConnecti return } -// BiRPCV1SetGZIPpedPassiveSessions is used to handle GZIP compressed arguments to BiRPCV1SetPassiveSessions -// eg: if CallCosts are too big, sending them over network could introduce latency -func (smg *SMGeneric) BiRPCV1SetGZIPpedPassiveSessions(clnt rpcclient.RpcClientConnection, args []byte, reply *string) (err error) { - var argsSetPSS ArgsSetPassiveSessions - if dst, err := utils.GUnZIPContent(args); err != nil { - return err - } else if err := json.Unmarshal(dst, &argsSetPSS); err != nil { - return err - } - return smg.BiRPCV1SetPassiveSessions(clnt, argsSetPSS, reply) -} - type ArgsReplicateSessions struct { Filter map[string]string Connections []*config.HaPoolConfig