Updating janusgo, AAA basic implementation for janus_agent

This commit is contained in:
DanB
2024-04-30 11:26:58 +02:00
parent bde44ccfd6
commit 6d9e151933
4 changed files with 135 additions and 5 deletions

View File

@@ -22,14 +22,18 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"time"
bicontext "github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
janus "github.com/cgrates/janusgo"
)
@@ -82,6 +86,10 @@ func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
if err := ja.authSession(strings.Split(req.RemoteAddr, ":")[0]); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
@@ -93,8 +101,99 @@ func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) {
json.NewEncoder(w).Encode(&resp)
}
func (ja *JanusAgent) authSession(origIP string) (err error) {
authArgs := &sessions.V1AuthorizeArgs{
GetMaxUsage: true,
GetAttributes: true,
CGREvent: &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.AccountField: origIP,
utils.Destination: "echotest",
},
}}
rply := new(sessions.V1AuthorizeReply)
err = ja.connMgr.Call(bicontext.Background(), ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1AuthorizeEvent,
authArgs, rply)
return
}
func (ja *JanusAgent) acntStartSession(s *janus.Session) (err error) {
initArgs := &sessions.V1InitSessionArgs{
GetAttributes: true,
InitSession: true,
CGREvent: &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
},
},
ForceDuration: true,
}
rply := new(sessions.V1InitSessionReply)
err = ja.connMgr.Call(bicontext.Background(), ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1InitiateSession,
initArgs, rply)
return
}
func (ja *JanusAgent) acntStopSession(s *janus.Session) (err error) {
terminateArgs := &sessions.V1TerminateSessionArgs{
TerminateSession: true,
CGREvent: &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
utils.Usage: s.Data[utils.Usage],
},
},
ForceDuration: true,
}
var rply string
err = ja.connMgr.Call(bicontext.Background(), ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1TerminateSession,
terminateArgs, &rply)
return
}
func (ja *JanusAgent) cdrSession(s *janus.Session) (err error) {
cgrEv := &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Time: utils.TimePointer(time.Now()),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
utils.Usage: s.Data[utils.Usage],
},
}
var rply string
err = ja.connMgr.Call(bicontext.Background(), ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1ProcessCDR,
cgrEv, &rply)
return
}
// SessioNKeepalive sends keepalive once OPTIONS are coming for the session from HTTP
func (ja *JanusAgent) SessioNKeepalive(w http.ResponseWriter, r *http.Request) {
func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
@@ -156,6 +255,23 @@ func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) {
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
for _, evIface := range events {
upEv, isWebrtcup := evIface.(*janus.WebRTCUpMsg)
if isWebrtcup {
ja.jnsConn.RLock()
s := ja.jnsConn.Sessions[upEv.Session]
ja.jnsConn.RUnlock()
if s == nil {
continue
}
s.Data[utils.AccountField] = strings.Split(req.RemoteAddr, ":")[0]
s.Data[utils.OriginHost] = strings.Split(req.Host, ":")[0]
s.Data[utils.OriginID] = strconv.Itoa(int(s.ID))
s.Data[utils.Destination] = "echotest"
s.Data[utils.AnswerTime] = time.Now()
go func() { ja.acntStartSession(s) }()
}
}
json.NewEncoder(w).Encode(events)
}
@@ -185,8 +301,20 @@ func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if msg.Type == "destroy" {
answerTime, _ := utils.IfaceAsTime(session.Data[utils.AnswerTime], ja.cgrCfg.GeneralCfg().DefaultTimezone)
var totalDur time.Duration
if !answerTime.IsZero() {
totalDur = time.Now().Sub(answerTime)
}
session.Data[utils.Usage] = totalDur // toDo: lock session RW
go func() {
ja.acntStopSession(session)
ja.cdrSession(session)
}() // CGRateS accounting stop
resp, err = session.DestroySession(ctx, msg)
} else {
resp, err = session.AttachSession(ctx, msg)
}
if err != nil {
@@ -224,7 +352,7 @@ func (ja *JanusAgent) HandlePlugin(w http.ResponseWriter, r *http.Request) {
return
}
}
rBody, err := ioutil.ReadAll(r.Body)
rBody, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Cannot read body", http.StatusBadRequest)
return

2
go.mod
View File

@@ -21,7 +21,7 @@ require (
github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f
github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084
github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0
github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d
github.com/cgrates/janusgo v0.0.0-20240430092034-1c3db3c74595
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf
github.com/cgrates/ltcache v0.0.0-20240411152156-e673692056db
github.com/cgrates/radigo v0.0.0-20240123163129-491c899df727

2
go.sum
View File

@@ -70,6 +70,8 @@ github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0 h1:rnSM0tG6Cl8GXjyBQ
github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0/go.mod h1:bKByLko2HF33K+PbiiToAgevrrbr96C+7Pp3HGS6oag=
github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d h1:/+5MxHHJPGA/WR1OaH3174hM2yKVqgNWh5U3zUpeH2k=
github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag=
github.com/cgrates/janusgo v0.0.0-20240430092034-1c3db3c74595 h1:f84+l8198ey/rM8oyYvBTFiUOf9fvQgAAGtMu6tUpLI=
github.com/cgrates/janusgo v0.0.0-20240430092034-1c3db3c74595/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf h1:GbMJzvtwdX1OCEmsqSts/cRCIcIMvo8AYtC2dQExWlg=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf/go.mod h1:oEq/JbubkOD2pXHvDy4r7519NkxriONisrnVpkCaNJw=
github.com/cgrates/ltcache v0.0.0-20240411152156-e673692056db h1:JRgzMS5kJ1WxaveoZ1YG/FowUDxFQXD3GjCHR7rH0Gk=

View File

@@ -75,7 +75,7 @@ func (ja *JanusAgent) Start() (err error) {
ja.server.RegisterHttpFunc("POST "+ja.cfg.JanusAgentCfg().URL, ja.jA.CreateSession)
ja.server.RegisterHttpFunc("OPTIONS "+ja.cfg.JanusAgentCfg().URL, ja.jA.CORSOptions)
ja.server.RegisterHttpFunc(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.SessioNKeepalive)
ja.server.RegisterHttpFunc(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.SessionKeepalive)
ja.server.RegisterHttpFunc(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), ja.jA.CORSOptions)
ja.server.RegisterHttpFunc(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.PollSession)
ja.server.RegisterHttpFunc(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.AttachPlugin)