implemented janusagent & added to services

This commit is contained in:
Gezim Blliku
2024-07-11 08:31:44 +02:00
committed by Dan Christian Bogos
parent 6dc612a32a
commit 80249e1074
14 changed files with 1118 additions and 5 deletions

475
agents/janusagent.go Normal file
View File

@@ -0,0 +1,475 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/cgrates/birpc"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
janus "github.com/cgrates/janusgo"
"nhooyr.io/websocket"
)
func NewJanusAgent(cgrCfg *config.CGRConfig,
connMgr *engine.ConnManager,
filterS *engine.FilterS) (*JanusAgent, error) {
jsa := &JanusAgent{
cgrCfg: cgrCfg,
connMgr: connMgr,
filterS: filterS,
}
srv, _ := birpc.NewService(jsa, "", false)
jsa.ctx = context.WithClient(context.TODO(), srv)
return jsa, nil
}
type JanusAgent struct {
cgrCfg *config.CGRConfig
connMgr *engine.ConnManager
filterS *engine.FilterS
jnsConn *janus.Gateway
adminWs *websocket.Conn
ctx *context.Context
}
// Connect will create the connection to the Janus Server
func (ja *JanusAgent) Connect() (err error) {
ja.jnsConn, err = janus.Connect(
fmt.Sprintf("ws://%s", ja.cgrCfg.JanusAgentCfg().JanusConns[0].Address))
if err != nil {
return
}
ja.adminWs, _, err = websocket.Dial(context.Background(), fmt.Sprintf("ws://%s", ja.cgrCfg.JanusAgentCfg().JanusConns[0].AdminAddress), &websocket.DialOptions{
Subprotocols: []string{utils.JanusAdminSubProto},
})
return
}
// Shutdown will close the connection to the Janus Server
func (ja *JanusAgent) Shutdown() (err error) {
if err = ja.jnsConn.Close(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on disconnecting janus server: %s", utils.JanusAgent, err.Error()))
}
if err = ja.adminWs.CloseNow(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> Error on disconnecting janus admin: %s", utils.JanusAgent, err.Error()))
}
return
}
// ServeHTTP implements http.Handler interface
func (ja *JanusAgent) CORSOptions(w http.ResponseWriter, req *http.Request) {
janusAccessControlHeaders(w, req)
}
// CreateSession will create a new session within janusgo
func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) {
janusAccessControlHeaders(w, req)
var msg janus.BaseMsg
if err := json.NewDecoder(req.Body).Decode(&msg); err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
if err := ja.authSession(strings.Split(req.RemoteAddr, ":")[0]); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resp, err := ja.jnsConn.CreateSession(ctx, msg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(&resp)
}
func (ja *JanusAgent) authSession(origIP string) (err error) {
authArgs := &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Event: map[string]any{
utils.AccountField: origIP,
utils.Destination: "echotest",
},
}
rply := new(sessions.V1AuthorizeReply)
err = ja.connMgr.Call(ja.ctx, ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1AuthorizeEvent,
authArgs, rply)
return
}
func (ja *JanusAgent) acntStartSession(s *janus.Session) (err error) {
initArgs := &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
},
}
rply := new(sessions.V1InitSessionReply)
err = ja.connMgr.Call(ja.ctx, ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1InitiateSession,
initArgs, rply)
return
}
func (ja *JanusAgent) acntStopSession(s *janus.Session) (err error) {
terminateArgs := &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
utils.Usage: s.Data[utils.Usage],
},
}
var rply string
err = ja.connMgr.Call(ja.ctx, ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1TerminateSession,
terminateArgs, &rply)
return
}
func (ja *JanusAgent) cdrSession(s *janus.Session) (err error) {
cgrEv := &utils.CGREvent{
Tenant: ja.cgrCfg.GeneralCfg().DefaultTenant,
ID: utils.Sha1(),
Event: map[string]any{
utils.AccountField: s.Data[utils.AccountField],
utils.OriginHost: s.Data[utils.OriginHost],
utils.OriginID: s.Data[utils.OriginID],
utils.Destination: s.Data[utils.Destination],
utils.AnswerTime: s.Data[utils.AnswerTime],
utils.Usage: s.Data[utils.Usage],
},
}
var rply string
err = ja.connMgr.Call(ja.ctx, ja.cgrCfg.JanusAgentCfg().SessionSConns,
utils.SessionSv1ProcessCDR,
cgrEv, &rply)
return
}
// SessioNKeepalive sends keepalive once OPTIONS are coming for the session from HTTP
func (ja *JanusAgent) SessionKeepalive(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
return
}
ja.jnsConn.RLock()
session, has := ja.jnsConn.Sessions[sessionID]
ja.jnsConn.RUnlock()
if !has {
http.Error(w, "Session not found", http.StatusNotFound)
return
}
msg := janus.BaseMsg{
Session: session.ID,
Type: "keepalive",
}
var resp any
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
resp, err = session.KeepAlive(ctx, msg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(resp)
}
// PollSession will create a long-poll request to be notified about events and incoming messages from session
func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) {
janusAccessControlHeaders(w, req)
sessionID, err := strconv.ParseUint(req.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
return
}
ja.jnsConn.RLock()
session, has := ja.jnsConn.Sessions[sessionID]
ja.jnsConn.RUnlock()
if !has {
http.Error(w, "Session not found", http.StatusNotFound)
return
}
maxEvs, err := strconv.Atoi(req.URL.Query().Get("maxev"))
if err != nil {
http.Error(w, fmt.Sprintf("Invalid maxev, err: %s", err.Error()),
http.StatusBadRequest)
return
}
msg := janus.BaseMsg{
Session: session.ID,
Type: "keepalive",
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
events, err := session.LongPoll(ctx, maxEvs, msg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
for _, evIface := range events {
upEv, isWebrtcup := evIface.(*janus.WebRTCUpMsg)
if isWebrtcup {
ja.jnsConn.RLock()
s := ja.jnsConn.Sessions[upEv.Session]
ja.jnsConn.RUnlock()
if s == nil {
continue
}
s.Data[utils.AccountField] = strings.Split(req.RemoteAddr, ":")[0]
s.Data[utils.OriginHost] = strings.Split(req.Host, ":")[0]
s.Data[utils.OriginID] = strconv.Itoa(int(s.ID))
s.Data[utils.Destination] = "echotest"
s.Data[utils.AnswerTime] = time.Now()
go func() { ja.acntStartSession(s) }()
}
}
json.NewEncoder(w).Encode(events)
}
// AttachPlugin will attach a plugin to a session
func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
return
}
ja.jnsConn.RLock()
session, has := ja.jnsConn.Sessions[sessionID]
ja.jnsConn.RUnlock()
if !has {
http.Error(w, "Session not found", http.StatusNotFound)
return
}
var msg janus.BaseMsg
if err := json.NewDecoder(r.Body).Decode(&msg); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
msg.Session = session.ID
var resp any
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if msg.Type == "destroy" {
answerTime, _ := utils.IfaceAsTime(session.Data[utils.AnswerTime], ja.cgrCfg.GeneralCfg().DefaultTimezone)
var totalDur time.Duration
if !answerTime.IsZero() {
totalDur = time.Since(answerTime)
}
session.Data[utils.Usage] = totalDur // toDo: lock session RW
go func() {
ja.acntStopSession(session)
ja.cdrSession(session)
}() // CGRateS accounting stop
resp, err = session.DestroySession(ctx, msg)
} else {
resp, err = session.AttachSession(ctx, msg)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(resp)
}
// HandlePlugin will handle requests towards a plugin
func (ja *JanusAgent) HandlePlugin(w http.ResponseWriter, r *http.Request) {
janusAccessControlHeaders(w, r)
sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64)
if err != nil {
http.Error(w, "Invalid session ID", http.StatusBadRequest)
return
}
ja.jnsConn.RLock()
session, has := ja.jnsConn.Sessions[sessionID]
ja.jnsConn.RUnlock()
if !has {
http.Error(w, "Session not found", http.StatusNotFound)
return
}
handleID, err := strconv.ParseUint(r.PathValue("handleID"), 10, 64)
if err != nil {
http.Error(w, "Invalid handle ID", http.StatusBadRequest)
return
}
handle, has := session.Handles[handleID]
if !has {
if !has {
http.Error(w, "Handle not found", http.StatusNotFound)
return
}
}
rBody, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Cannot read body", http.StatusBadRequest)
return
}
var msg janus.BaseMsg
if err := json.Unmarshal(rBody, &msg); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var resp any
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// handle message, depending on it's type
switch msg.Type {
case "message":
var hMsg janus.HandlerMessageJsep
if err := json.Unmarshal(rBody, &hMsg); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hMsg.Session = session.ID
hMsg.BaseMsg.Handle = handle.ID
hMsg.Handle = handle.ID
resp, err = handle.Message(ctx, hMsg)
case "trickle":
var hMsg janus.TrickleOne
if err := json.Unmarshal(rBody, &hMsg); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
hMsg.Session = session.ID
hMsg.Handle = handle.ID
hMsg.HandleR = handle.ID
resp, err = handle.Trickle(ctx, hMsg)
default:
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(resp)
}
func (ja *JanusAgent) V1GetActiveSessionIDs(ctx *context.Context, ignParam string,
sessionIDs *[]*sessions.SessionID) error {
var sIDs []*sessions.SessionID
msg := struct {
janus.BaseMsg
AdminSecret string `json:"admin_secret"`
}{
BaseMsg: janus.BaseMsg{
Type: "list_sessions",
ID: utils.GenUUID(),
},
AdminSecret: ja.cgrCfg.JanusAgentCfg().JanusConns[0].AdminPassword,
}
byteMsg, err := json.Marshal(msg)
if err != nil {
return err
}
if err = ja.adminWs.Write(context.Background(), websocket.MessageText, byteMsg); err != nil {
return err
}
_, rpl, err := ja.adminWs.Read(context.Background())
if err != nil {
return err
}
var sucessMsg struct {
janus.SuccessMsg
Sessions []uint64 `json:"sessions"`
}
if err = json.Unmarshal(rpl, &sucessMsg); err != nil {
return err
}
for _, sId := range sucessMsg.Sessions {
sess, has := ja.jnsConn.Sessions[sId]
if !has {
continue
}
sIDs = append(sIDs, &sessions.SessionID{
OriginHost: sess.Data[utils.OriginHost].(string),
OriginID: sess.Data[utils.OriginID].(string),
})
}
if len(sIDs) == 0 {
return utils.ErrNoActiveSession
}
*sessionIDs = sIDs
return nil
}
func (ja *JanusAgent) V1DisconnectSession(ctx *context.Context, cgrEv utils.CGREvent, reply *string) (err error) {
sessionID, err := engine.NewMapEvent(cgrEv.Event).GetTInt64(utils.OriginID)
if err != nil {
return err
}
session, has := ja.jnsConn.Sessions[uint64(sessionID)]
if has {
id := utils.GenUUID()
_, err := session.DestroySession(context.Background(), janus.BaseMsg{Type: "destroy", ID: id, Session: uint64(sessionID)})
if err != nil {
return err
}
}
*reply = utils.OK
return nil
}
func (ja *JanusAgent) V1AlterSession(*context.Context, utils.CGREvent, *string) error {
return utils.ErrNotImplemented
}
func (ja *JanusAgent) V1DisconnectPeer(*context.Context, *utils.DPRArgs, *string) error {
return utils.ErrNotImplemented
}
func (ja *JanusAgent) V1WarnDisconnect(*context.Context, map[string]any, *string) error {
return utils.ErrNotImplemented
}

88
agents/libjanus.go Normal file
View File

@@ -0,0 +1,88 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"encoding/json"
"net/http"
"net/http/httputil"
"github.com/cgrates/cgrates/utils"
)
// janusAccessControlHeaders will add the necessary access control headers
func janusAccessControlHeaders(w http.ResponseWriter, req *http.Request) {
if origin := req.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Allow-Headers", "Accept, Accept-Language, Content-Type")
}
}
// newJanusHTTPjsonDP is the constructor for janusHTTPjsonDP struct
func newJanusHTTPjsonDP(req *http.Request) (utils.DataProvider, error) {
jHj := &janusHTTPjsonDP{req: req, cache: utils.MapStorage{}}
if err := json.NewDecoder(req.Body).Decode(&jHj.reqBody); err != nil {
return nil, err
}
return jHj, nil
}
// janusHTTPjsonDP implements utils.DataProvider, serving as JSON data decoder
// decoded data is only searched once and cached
type janusHTTPjsonDP struct {
req *http.Request
reqBody map[string]interface{} // unmarshal JSON body here
cache utils.MapStorage
}
// String is part of utils.DataProvider interface
// when called, it will display the already parsed values out of cache
func (jHj *janusHTTPjsonDP) String() string {
byts, _ := httputil.DumpRequest(jHj.req, true)
return string(byts)
}
// FieldAsInterface is part of utils.DataProvider interface
func (jHj *janusHTTPjsonDP) FieldAsInterface(fldPath []string) (data any, err error) {
if len(fldPath) != 1 {
return nil, utils.ErrNotFound
}
if data, err = jHj.cache.FieldAsInterface(fldPath); err != nil {
if err != utils.ErrNotFound { // item found in cache
return
}
err = nil // cancel previous err
} else {
return // data found in cache
}
data = jHj.reqBody[fldPath[0]]
jHj.cache.Set(fldPath, data)
return
}
// FieldAsString is part of utils.DataProvider interface
func (jHj *janusHTTPjsonDP) FieldAsString(fldPath []string) (data string, err error) {
var valIface any
valIface, err = jHj.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(valIface), nil
}

View File

@@ -464,6 +464,12 @@ func storeDiffSection(ctx *context.Context, section string, db ConfigDB, v1, v2
return
}
return db.SetSection(ctx, section, diffHttpAgentsJsonCfg(jsn, v1.HTTPAgentCfg(), v2.HTTPAgentCfg(), v2.GeneralCfg().RSRSep))
case JanusAgentJSON:
jsn := new(JanusAgentJsonCfg)
if err = db.GetSection(ctx, section, jsn); err != nil {
return
}
return db.SetSection(ctx, section, diffJanusAgentSJsonCfg(jsn, v1.JanusAgentCfg(), v2.JanusAgentCfg(), v2.GeneralCfg().RSRSep))
case DNSAgentJSON:
jsn := new(DNSAgentJsonCfg)
if err = db.GetSection(ctx, section, jsn); err != nil {

View File

@@ -184,7 +184,8 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) {
ClientDictionaries: make(map[string]string),
ClientSecrets: make(map[string]string),
},
dnsAgentCfg: new(DNSAgentCfg),
dnsAgentCfg: new(DNSAgentCfg),
janusAgentCfg: new(JanusAgentCfg),
attributeSCfg: &AttributeSCfg{Opts: &AttributesOpts{
ProfileIDs: []*utils.DynamicStringSliceOpt{},
ProcessRuns: []*utils.DynamicIntOpt{},
@@ -350,6 +351,7 @@ type CGRConfig struct {
diameterAgentCfg *DiameterAgentCfg // DiameterAgent config
radiusAgentCfg *RadiusAgentCfg // RadiusAgent config
dnsAgentCfg *DNSAgentCfg // DNSAgent config
janusAgentCfg *JanusAgentCfg // JanusAgent config
attributeSCfg *AttributeSCfg // AttributeS config
chargerSCfg *ChargerSCfg // ChargerS config
resourceSCfg *ResourceSConfig // ResourceS config
@@ -524,6 +526,13 @@ func (cfg *CGRConfig) HTTPAgentCfg() HTTPAgentCfgs {
return cfg.httpAgentCfg
}
// JanusAgentCfg returns the config for JanusAgent
func (cfg *CGRConfig) JanusAgentCfg() *JanusAgentCfg {
cfg.lks[HTTPAgentJSON].Lock()
defer cfg.lks[HTTPAgentJSON].Unlock()
return cfg.janusAgentCfg
}
// FilterSCfg returns the config for FilterS
func (cfg *CGRConfig) FilterSCfg() *FilterSCfg {
cfg.lks[FilterSJSON].Lock()
@@ -1026,6 +1035,7 @@ func (cfg *CGRConfig) Clone() (cln *CGRConfig) {
sessionSCfg: cfg.sessionSCfg.Clone(),
fsAgentCfg: cfg.fsAgentCfg.Clone(),
kamAgentCfg: cfg.kamAgentCfg.Clone(),
janusAgentCfg: cfg.janusAgentCfg.Clone(),
asteriskAgentCfg: cfg.asteriskAgentCfg.Clone(),
diameterAgentCfg: cfg.diameterAgentCfg.Clone(),
radiusAgentCfg: cfg.radiusAgentCfg.Clone(),

View File

@@ -1724,6 +1724,18 @@ const CGRATES_CFG_JSON = `
],
},
"janus_agent": {
"enabled": false, // enables the Janus agent: <true|false>
"url": "/janus",
"sessions_conns": ["*internal"],
"janus_conns": [{ // instantiate connections to multiple Janus Servers
"address": "127.0.0.1:8088", // janus API address
"type": "*ws", // type of the transport to interact via janus API
"admin_address": "localhost:7188", // janus admin address used to retrive more information for sessions and handles
"admin_password": "", // secret to pass restriction to communicate to the endpoint
}],
"request_processors": [], // request processors to be applied to Janus messages
},
"templates": {
"*err": [

View File

@@ -44,6 +44,7 @@ const (
HTTPAgentJSON = "http_agent"
AttributeSJSON = "attributes"
ResourceSJSON = "resources"
JanusAgentJSON = "janus_agent"
StatSJSON = "stats"
ThresholdSJSON = "thresholds"
TPeSJSON = "tpes"
@@ -187,6 +188,7 @@ func newSections(cfg *CGRConfig) Sections {
cfg.kamAgentCfg,
cfg.diameterAgentCfg,
cfg.radiusAgentCfg,
cfg.janusAgentCfg,
&cfg.httpAgentCfg,
cfg.dnsAgentCfg,
cfg.sipAgentCfg,

File diff suppressed because one or more lines are too long

254
config/januscfg.go Normal file
View File

@@ -0,0 +1,254 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"slices"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/utils"
)
// JanusConn represents one connection to Janus server
type JanusConn struct {
Address string // Address to reach Janus
Type string // Connection type
AdminAddress string
AdminPassword string
}
// JanusAgentCfg the config for an Janus Agent
type JanusAgentCfg struct {
Enabled bool
URL string
SessionSConns []string
JanusConns []*JanusConn // connections towards Janus
RequestProcessors []*RequestProcessor
}
func (jacfg *JanusAgentCfg) Load(ctx *context.Context, jsnCfg ConfigDB, cfg *CGRConfig) (err error) {
jsnJACfg := new(JanusAgentJsonCfg)
if err = jsnCfg.GetSection(ctx, JanusAgentJSON, jsnJACfg); err != nil {
return
}
return jacfg.loadFromJSONCfg(jsnJACfg, cfg.generalCfg.RSRSep)
}
func (jc *JanusConn) loadFromJSONCfg(jsnCfg *JanusConnJsonCfg) (err error) {
if jsnCfg == nil {
return
}
if jsnCfg.Address != nil {
jc.Address = *jsnCfg.Address
}
if jsnCfg.Type != nil {
jc.Type = *jsnCfg.Type
}
if jsnCfg.AdminAddress != nil {
jc.AdminAddress = *jsnCfg.AdminAddress
}
if jsnCfg.AdminPassword != nil {
jc.AdminPassword = *jsnCfg.AdminPassword
}
return
}
func (jc *JanusConn) AsMapInterface() map[string]any {
mp := map[string]any{
utils.AddressCfg: jc.Address,
utils.TypeCfg: jc.Type,
utils.AdminAddressCfg: jc.AdminAddress,
utils.AdminPasswordCfg: jc.AdminPassword,
}
return mp
}
func (jc *JanusConn) Clone() (cln *JanusConn) {
cln = &JanusConn{
Address: jc.Address,
Type: jc.Type,
AdminAddress: jc.AdminAddress,
AdminPassword: jc.AdminPassword,
}
return
}
func (jaCfg *JanusAgentCfg) loadFromJSONCfg(jsnCfg *JanusAgentJsonCfg, separator string) (err error) {
if jaCfg == nil {
return
}
if jsnCfg == nil {
return
}
if jsnCfg.Enabled != nil {
jaCfg.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Url != nil {
jaCfg.URL = *jsnCfg.Url
}
if jsnCfg.Sessions_conns != nil {
jaCfg.SessionSConns = updateBiRPCInternalConns(*jsnCfg.Sessions_conns, utils.MetaSessionS)
}
if jsnCfg.Janus_conns != nil {
jaCfg.JanusConns = make([]*JanusConn, len(*jsnCfg.Janus_conns))
for idx, janConnJsn := range *jsnCfg.Janus_conns {
jc := new(JanusConn)
if err = jc.loadFromJSONCfg(janConnJsn); err != nil {
return
}
jaCfg.JanusConns[idx] = jc
}
}
jaCfg.RequestProcessors, err = appendRequestProcessors(jaCfg.RequestProcessors, jsnCfg.Request_processors, separator)
return
}
// AsMapInterface returns the config as a map[string]any
func (jaCfg JanusAgentCfg) AsMapInterface(separator string) any {
mp := map[string]any{
utils.EnabledCfg: jaCfg.Enabled,
utils.URLCfg: jaCfg.URL,
}
requestProcessors := make([]map[string]any, len(jaCfg.RequestProcessors))
for i, item := range jaCfg.RequestProcessors {
requestProcessors[i] = item.AsMapInterface(separator)
}
mp[utils.RequestProcessorsCfg] = requestProcessors
if jaCfg.SessionSConns != nil {
mp[utils.SessionSConnsCfg] = getBiRPCInternalJSONConns(jaCfg.SessionSConns)
}
janConns := make([]map[string]any, len(jaCfg.JanusConns))
for i, jc := range jaCfg.JanusConns {
janConns[i] = jc.AsMapInterface()
}
mp[utils.JanusConnsCfg] = janConns
return mp
}
func (JanusAgentCfg) SName() string { return JanusAgentJSON }
func (jacfg JanusAgentCfg) CloneSection() Section { return jacfg.Clone() }
func (jaCfg *JanusAgentCfg) Clone() *JanusAgentCfg {
cln := &JanusAgentCfg{
Enabled: jaCfg.Enabled,
URL: jaCfg.URL,
}
if jaCfg.SessionSConns != nil {
cln.SessionSConns = slices.Clone(jaCfg.SessionSConns)
}
if jaCfg.JanusConns != nil {
cln.JanusConns = make([]*JanusConn, len(jaCfg.JanusConns))
for i, jc := range jaCfg.JanusConns {
cln.JanusConns[i] = jc.Clone()
}
}
if jaCfg.RequestProcessors != nil {
cln.RequestProcessors = make([]*RequestProcessor, len(jaCfg.RequestProcessors))
for i, rp := range jaCfg.RequestProcessors {
cln.RequestProcessors[i] = rp.Clone()
}
}
return cln
}
type JanusAgentJsonCfg struct {
Enabled *bool `json:"enabled"`
Url *string `json:"url"`
Sessions_conns *[]string `json:"sessions_conns"`
Janus_conns *[]*JanusConnJsonCfg `json:"janus_conns"`
Request_processors *[]*ReqProcessorJsnCfg `json:"request_processors"`
}
type JanusConnJsonCfg struct {
Address *string `json:"address"`
Type *string `json:"type"`
AdminAddress *string `json:"admin_address"`
AdminPassword *string `json:"admin_password"`
}
func diffJanusConnJsonCfg(d *JanusConnJsonCfg, v1, v2 *JanusConn) *JanusConnJsonCfg {
if d == nil {
d = new(JanusConnJsonCfg)
}
if v1.Address != v2.Address {
d.Address = utils.StringPointer(v2.Address)
}
if v1.Type != v2.Type {
d.Type = utils.StringPointer(v2.Type)
}
if v1.AdminAddress != v2.AdminAddress {
d.AdminAddress = utils.StringPointer(v2.AdminAddress)
}
if v1.AdminPassword != v2.AdminPassword {
d.AdminPassword = utils.StringPointer(v2.AdminPassword)
}
return d
}
func getJanusConnJsnCfg(d []*JanusConnJsonCfg, address string) (*JanusConnJsonCfg, int) {
for i, v := range d {
if v.Address != nil && *v.Address == address {
return v, i
}
}
return nil, -1
}
func getJanusConn(d []*JanusConn, address string) *JanusConn {
for _, v := range d {
if v.Address == address {
return v
}
}
return new(JanusConn)
}
func diffJanusConnsJsonCfg(d *[]*JanusConnJsonCfg, v1, v2 []*JanusConn) *[]*JanusConnJsonCfg {
if d == nil || *d == nil {
d = &[]*JanusConnJsonCfg{}
}
for _, val := range v2 {
dv, i := getJanusConnJsnCfg(*d, val.Address)
dv = diffJanusConnJsonCfg(dv, getJanusConn(v1, val.Address), val)
if i == -1 {
*d = append(*d, dv)
} else {
(*d)[i] = dv
}
}
return d
}
func diffJanusAgentSJsonCfg(d *JanusAgentJsonCfg, v1, v2 *JanusAgentCfg, separator string) *JanusAgentJsonCfg {
if d == nil {
d = new(JanusAgentJsonCfg)
}
if v1.Enabled != v2.Enabled {
d.Enabled = utils.BoolPointer(v2.Enabled)
}
if v1.URL != v2.URL {
d.Url = utils.StringPointer(v2.URL)
}
if !slices.Equal(v1.SessionSConns, v2.SessionSConns) {
d.Sessions_conns = utils.SliceStringPointer(getBiRPCInternalJSONConns(v2.SessionSConns))
}
d.Request_processors = diffReqProcessorsJsnCfg(d.Request_processors, v1.RequestProcessors, v2.RequestProcessors, separator)
d.Janus_conns = diffJanusConnsJsonCfg(d.Janus_conns, v1.JanusConns, v2.JanusConns)
return d
}

View File

@@ -0,0 +1,128 @@
{
"general": {
"reply_timeout": "50s"
},
"logger": {
"type": "*syslog", // controls the destination of logs <*syslog|*stdout|*kafka>
"level": 6, // system level precision for floats
},
"listen": {
"rpc_json": ":2012",
"rpc_gob": ":2013",
"http": ":2080"
},
"data_db": {
"db_type": "*internal"
},
"stor_db": {
"db_type": "*internal"
},
"cdrs": {
"enabled": true,
"chargers_conns":["*internal"]
},
"attributes": {
"enabled": true,
"stats_conns": ["*localhost"],
"resources_conns": ["*localhost"],
"accounts_conns": ["*localhost"]
},
"chargers": {
"enabled": true,
"attributes_conns": ["*internal"]
},
"resources": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"stats": {
"enabled": true,
"store_interval": "-1",
"thresholds_conns": ["*internal"]
},
"thresholds": {
"enabled": true,
"store_interval": "-1"
},
"routes": {
"enabled": true,
"prefix_indexed_fields":["*req.Destination"],
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"rates_conns": ["*internal"]
},
"sessions": {
"enabled": true,
"rates_conns": ["*internal"],
"routes_conns": ["*internal"],
"resources_conns": ["*internal"],
"attributes_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"]
},
"admins": {
"enabled": true
},
"rates": {
"enabled": true
},
"actions": {
"enabled": true,
"accounts_conns": ["*localhost"]
},
"accounts": {
"enabled": true
},
"janus_agent": {
"enabled": true,
"url": "/janus",
"sessions_conns": ["*bijson_localhost"],
"janus_conns": [{
"address": "localhost:8188",
"admin_address": "localhost:7188",
"admin_password": "janusoverlord"
}]
},
"filters": {
"stats_conns": ["*internal"],
"resources_conns": ["*internal"],
"accounts_conns": ["*internal"]
},
"tpes": {
"enabled": true
}
}

4
go.mod
View File

@@ -74,6 +74,7 @@ require (
github.com/blevesearch/zap/v15 v15.0.3 // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73
github.com/couchbase/ghistogram v0.1.0 // indirect
github.com/couchbase/moss v0.1.0 // indirect
github.com/couchbase/vellum v1.0.2 // indirect
@@ -81,7 +82,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/ishidawataru/sctp v0.0.0-20190922091402-408ec287e38c // indirect
@@ -125,4 +126,5 @@ require (
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
nhooyr.io/websocket v1.8.11
)

8
go.sum
View File

@@ -57,6 +57,8 @@ github.com/cgrates/cron v0.0.0-20201022095836-3522d5b72c70 h1:/O+Dr12jcizDiCoIG2
github.com/cgrates/cron v0.0.0-20201022095836-3522d5b72c70/go.mod h1:I9cUDn/uzkakr0hmYTjXkQqf6wagg44L2p01gSYRRz0=
github.com/cgrates/fsock v0.0.0-20230123160954-12cae14030cc h1:qKfOK61ZLktbywOLTMNWwobJsxkxszlMbuduwNgAO/c=
github.com/cgrates/fsock v0.0.0-20230123160954-12cae14030cc/go.mod h1:5A9wag324AzIlaDd7tpPDAg26ouUO1orarAq7Vxr4As=
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73 h1:7AYhvpegrSkY9tLGCQsZgNl8yTjL5CaQOTr3/kYlPek=
github.com/cgrates/janusgo v0.0.0-20240503152118-188a408d7e73/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf h1:GbMJzvtwdX1OCEmsqSts/cRCIcIMvo8AYtC2dQExWlg=
github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf/go.mod h1:oEq/JbubkOD2pXHvDy4r7519NkxriONisrnVpkCaNJw=
github.com/cgrates/ltcache v0.0.0-20210405185848-da943e80c1ab h1:dKdAUwrij6vYwewe1WV1+pDSagqGI5JLqjTZZyN2ANo=
@@ -152,8 +154,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o=
github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs=
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
@@ -477,3 +479,5 @@ gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=

View File

@@ -241,6 +241,7 @@ func (cgr *CGREngine) InitServices(httpPrfPath string, cpuPrfFl io.Closer, memPr
NewDNSAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep),
NewFreeswitchAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewKamailioAgent(cgr.cfg, cgr.cM, cgr.srvDep),
NewJanusAgent(cgr.cfg, cgr.iFilterSCh, cgr.server, cgr.cM, cgr.srvDep),
NewAsteriskAgent(cgr.cfg, cgr.cM, cgr.srvDep), // partial reload
NewRadiusAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload
NewDiameterAgent(cgr.cfg, cgr.iFilterSCh, cgr.cM, cgr.srvDep), // partial reload

124
services/janus.go Normal file
View File

@@ -0,0 +1,124 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"net/http"
"sync"
"github.com/cgrates/birpc/context"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/cores"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
)
// NewJanusAgent returns the Janus Agent
func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
server *cores.Server, connMgr *engine.ConnManager,
srvDep map[string]*sync.WaitGroup) servmanager.Service {
return &JanusAgent{
cfg: cfg,
filterSChan: filterSChan,
server: server,
connMgr: connMgr,
srvDep: srvDep,
}
}
// JanusAgent implements Service interface
type JanusAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
server *cores.Server
jA *agents.JanusAgent
// we can realy stop the JanusAgent so keep a flag
// if we registerd the jandlers
started bool
connMgr *engine.ConnManager
srvDep map[string]*sync.WaitGroup
}
// Start should jandle the sercive start
func (ja *JanusAgent) Start(ctx *context.Context, _ context.CancelFunc) (err error) {
filterS := <-ja.filterSChan
ja.filterSChan <- filterS
ja.Lock()
if ja.started {
ja.Unlock()
return utils.ErrServiceAlreadyRunning
}
ja.jA, err = agents.NewJanusAgent(ja.cfg, ja.connMgr, filterS)
if err != nil {
return
}
if err = ja.jA.Connect(); err != nil {
return
}
ja.server.RegisterHttpHandler("POST "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CreateSession))
ja.server.RegisterHttpHandler("OPTIONS "+ja.cfg.JanusAgentCfg().URL, http.HandlerFunc(ja.jA.CORSOptions))
ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.SessionKeepalive))
ja.server.RegisterHttpHandler(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.CORSOptions))
ja.server.RegisterHttpHandler(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.PollSession))
ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.AttachPlugin))
ja.server.RegisterHttpHandler(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), http.HandlerFunc(ja.jA.HandlePlugin))
ja.started = true
ja.Unlock()
utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent))
return
}
// Reload jandles the change of config
func (ja *JanusAgent) Reload(ctx *context.Context, _ context.CancelFunc) (err error) {
return // no reload
}
// Shutdown stops the service
func (ja *JanusAgent) Shutdown() (err error) {
ja.Lock()
err = ja.jA.Shutdown()
ja.started = false
ja.Unlock()
return // no shutdown for the momment
}
// IsRunning returns if the service is running
func (ja *JanusAgent) IsRunning() bool {
ja.RLock()
defer ja.RUnlock()
return ja.started
}
// ServiceName returns the service name
func (ja *JanusAgent) ServiceName() string {
return utils.JanusAgent
}
// ShouldRun returns if the service should be running
func (ja *JanusAgent) ShouldRun() bool {
return ja.cfg.JanusAgentCfg().Enabled
}

View File

@@ -272,6 +272,7 @@ const (
MetaPassword = "*password"
MetaFiller = "*filler"
MetaHTTPPost = "*httpPost"
JanusAdminSubProto = "janus-admin-protocol"
MetaHTTPjsonCDR = "*http_json_cdr"
MetaHTTPjsonMap = "*httpJSONMap"
MetaAMQPjsonCDR = "*amqp_json_cdr"
@@ -1747,6 +1748,7 @@ const (
AsteriskAgent = "AsteriskAgent"
HTTPAgent = "HTTPAgent"
SIPAgent = "SIPAgent"
JanusAgent = "JanusAgent"
)
// Google_API
@@ -2030,6 +2032,11 @@ const (
ClientSecretsCfg = "client_secrets"
ClientDictionariesCfg = "client_dictionaries"
// JanusAgentCfg
JanusConnsCfg = "janus_conns"
AdminAddressCfg = "admin_address"
AdminPasswordCfg = "admin_password"
// AttributeSCfg
IndexedSelectsCfg = "indexed_selects"
ProfileRunsCfg = "profile_runs"