diff --git a/agents/janusagent.go b/agents/janusagent.go index ef60e100b..00164f127 100644 --- a/agents/janusagent.go +++ b/agents/janusagent.go @@ -19,15 +19,25 @@ along with this program. If not, see package agents import ( + "context" + "encoding/json" "fmt" + "io/ioutil" "net/http" + "regexp" + "strconv" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" - "github.com/cgrates/cgrates/utils" janus "github.com/cgrates/janusgo" ) +var ( + janSessionPath = regexp.MustCompile(`^\/janus/\d+?$`) + janPluginHandlePath = regexp.MustCompile(`^\/janus/.*/.*`) +) + // NewJanusAgent will construct a JanusAgent func NewJanusAgent(cgrCfg *config.CGRConfig, connMgr *engine.ConnManager, @@ -60,55 +70,209 @@ func (ja *JanusAgent) Shutdown() error { } // ServeHTTP implements http.Handler interface -func (ja *JanusAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { - dcdr, err := newJanusHTTPjsonDP(req) // dcdr will provide information from request +func (ja *JanusAgent) CORSOptions(w http.ResponseWriter, req *http.Request) { + janusAccessControlHeaders(w, req) +} + +// CreateSession will create a new session within janusgo +func (ja *JanusAgent) CreateSession(w http.ResponseWriter, req *http.Request) { + janusAccessControlHeaders(w, req) + var msg janus.BaseMsg + if err := json.NewDecoder(req.Body).Decode(&msg); err != nil { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + resp, err := ja.jnsConn.CreateSession(ctx, msg) if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error creating decoder: %s", - utils.HTTPAgent, err.Error())) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + json.NewEncoder(w).Encode(&resp) +} + +// SessioNKeepalive sends keepalive once OPTIONS are coming for the session from HTTP +func (ja *JanusAgent) SessioNKeepalive(w http.ResponseWriter, r *http.Request) { + janusAccessControlHeaders(w, r) + sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) + if err != nil { + http.Error(w, "Invalid session ID", http.StatusBadRequest) + return + } + ja.jnsConn.RLock() + session, has := ja.jnsConn.Sessions[sessionID] + ja.jnsConn.RUnlock() + if !has { + http.Error(w, "Session not found", http.StatusNotFound) + return + } + msg := janus.BaseMsg{ + Session: session.ID, + Type: "keepalive", + } + var resp any + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + resp, err = session.KeepAlive(ctx, msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + + } + json.NewEncoder(w).Encode(resp) +} + +// PollSession will create a long-poll request to be notified about events and incoming messages from session +func (ja *JanusAgent) PollSession(w http.ResponseWriter, req *http.Request) { + janusAccessControlHeaders(w, req) + sessionID, err := strconv.ParseUint(req.PathValue("sessionID"), 10, 64) + if err != nil { + http.Error(w, "Invalid session ID", http.StatusBadRequest) + return + } + ja.jnsConn.RLock() + session, has := ja.jnsConn.Sessions[sessionID] + ja.jnsConn.RUnlock() + if !has { + http.Error(w, "Session not found", http.StatusNotFound) + return + } + maxEvs, err := strconv.Atoi(req.URL.Query().Get("maxev")) + if err != nil { + http.Error(w, fmt.Sprintf("Invalid maxev, err: %s", err.Error()), + http.StatusBadRequest) + return + } + msg := janus.BaseMsg{ + Session: session.ID, + Type: "keepalive", + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + events, err := session.LongPoll(ctx, maxEvs, msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + json.NewEncoder(w).Encode(events) +} + +// AttachPlugin will attach a plugin to a session +func (ja *JanusAgent) AttachPlugin(w http.ResponseWriter, r *http.Request) { + janusAccessControlHeaders(w, r) + sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) + if err != nil { + http.Error(w, "Invalid session ID", http.StatusBadRequest) + return + } + ja.jnsConn.RLock() + session, has := ja.jnsConn.Sessions[sessionID] + ja.jnsConn.RUnlock() + if !has { + http.Error(w, "Session not found", http.StatusNotFound) + return + } + var msg janus.BaseMsg + if err := json.NewDecoder(r.Body).Decode(&msg); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - utils.Logger.Debug(dcdr.String()) - /* - cgrRplyNM := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)} - rplyNM := utils.NewOrderedNavigableMap() - opts := utils.MapStorage{} - reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.RemoteHost: utils.NewLeafNode(req.RemoteAddr)}} - for _, reqProcessor := range ha.reqProcessors { - agReq := NewAgentRequest(dcdr, reqVars, cgrRplyNM, rplyNM, - opts, reqProcessor.Tenant, ha.dfltTenant, - utils.FirstNonEmpty(reqProcessor.Timezone, - config.CgrConfig().GeneralCfg().DefaultTimezone), - ha.filterS, nil) - lclProcessed, err := processRequest(context.TODO(), reqProcessor, agReq, - utils.HTTPAgent, ha.connMgr, ha.sessionConns, - agReq.filterS) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s processing request: %s", - utils.HTTPAgent, err.Error(), utils.ToJSON(agReq))) - return // FixMe with returning some error on HTTP level - } - if !lclProcessed { - continue - } - if lclProcessed && !reqProcessor.Flags.GetBool(utils.MetaContinue) { - break - } - } - encdr, err := newHAReplyEncoder(ha.rplyPayload, w) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error creating reply encoder: %s", - utils.HTTPAgent, err.Error())) - return - } - if err = encdr.Encode(rplyNM); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s encoding out %s", - utils.HTTPAgent, err.Error(), utils.ToJSON(rplyNM))) - return - } - */ + msg.Session = session.ID + + var resp any + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + if msg.Type == "destroy" { + resp, err = session.DestroySession(ctx, msg) + } else { + resp, err = session.AttachSession(ctx, msg) + } + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + + } + json.NewEncoder(w).Encode(resp) +} + +// HandlePlugin will handle requests towards a plugin +func (ja *JanusAgent) HandlePlugin(w http.ResponseWriter, r *http.Request) { + janusAccessControlHeaders(w, r) + sessionID, err := strconv.ParseUint(r.PathValue("sessionID"), 10, 64) + if err != nil { + http.Error(w, "Invalid session ID", http.StatusBadRequest) + return + } + ja.jnsConn.RLock() + session, has := ja.jnsConn.Sessions[sessionID] + ja.jnsConn.RUnlock() + if !has { + http.Error(w, "Session not found", http.StatusNotFound) + return + } + handleID, err := strconv.ParseUint(r.PathValue("handleID"), 10, 64) + if err != nil { + http.Error(w, "Invalid handle ID", http.StatusBadRequest) + return + } + handle, has := session.Handles[handleID] + if !has { + if !has { + http.Error(w, "Handle not found", http.StatusNotFound) + return + } + } + rBody, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "Cannot read body", http.StatusBadRequest) + return + } + var msg janus.BaseMsg + if err := json.Unmarshal(rBody, &msg); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var resp any + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // handle message, depending on it's type + switch msg.Type { + case "message": + var hMsg janus.HandlerMessageJsep + if err := json.Unmarshal(rBody, &hMsg); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + hMsg.Session = session.ID + hMsg.BaseMsg.Handle = handle.ID + hMsg.Handle = handle.ID + resp, err = handle.Message(ctx, hMsg) + case "trickle": + var hMsg janus.TrickleOne + if err := json.Unmarshal(rBody, &hMsg); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + hMsg.Session = session.ID + hMsg.Handle = handle.ID + hMsg.HandleR = handle.ID + resp, err = handle.Trickle(ctx, hMsg) + default: + if err != nil { + http.Error(w, "Invalid message type", http.StatusBadRequest) + return + } + + } + + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + + } + json.NewEncoder(w).Encode(resp) } diff --git a/agents/libjanusagent.go b/agents/libjanusagent.go index 6d29f50ac..89312d2ba 100644 --- a/agents/libjanusagent.go +++ b/agents/libjanusagent.go @@ -26,6 +26,15 @@ import ( "github.com/cgrates/cgrates/utils" ) +// janusAccessControlHeaders will add the necessary access control headers +func janusAccessControlHeaders(w http.ResponseWriter, req *http.Request) { + if origin := req.Header.Get("Origin"); origin != "" { + w.Header().Set("Access-Control-Allow-Origin", origin) + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") + w.Header().Set("Access-Control-Allow-Headers", "Accept, Accept-Language, Content-Type") + } +} + // newJanusHTTPjsonDP is the constructor for janusHTTPjsonDP struct func newJanusHTTPjsonDP(req *http.Request) (utils.DataProvider, error) { jHj := &janusHTTPjsonDP{req: req, cache: utils.MapStorage{}} diff --git a/go.mod b/go.mod index b1544c6cf..f4f69fd02 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/cgrates/baningo v0.0.0-20210413080722-004ffd5e429f github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084 github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0 - github.com/cgrates/janusgo v0.0.0-20240425092925-a3b9aa99e793 + github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf github.com/cgrates/ltcache v0.0.0-20240411152156-e673692056db github.com/cgrates/radigo v0.0.0-20240123163129-491c899df727 diff --git a/go.sum b/go.sum index 94a21252c..e38bb132e 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,8 @@ github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084 h1:YIEepjEOjeHaFre github.com/cgrates/birpc v1.3.1-0.20211117095917-5b0ff29f3084/go.mod h1:z/PmNnDPqSQALedKJv5T8+eXIq6XHa9J0St1YsvAVns= github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0 h1:rnSM0tG6Cl8GXjyBQXw78WEaJolKfZqHTXkOCZh+w/k= github.com/cgrates/fsock v0.0.0-20240322171959-35309017b3e0/go.mod h1:bKByLko2HF33K+PbiiToAgevrrbr96C+7Pp3HGS6oag= -github.com/cgrates/janusgo v0.0.0-20240425092925-a3b9aa99e793 h1:sXN6HgkrtnFF4KaxGaoyYXG6KhIUs0i8ic7I5bcEgos= -github.com/cgrates/janusgo v0.0.0-20240425092925-a3b9aa99e793/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag= +github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d h1:/+5MxHHJPGA/WR1OaH3174hM2yKVqgNWh5U3zUpeH2k= +github.com/cgrates/janusgo v0.0.0-20240428171703-4a62a727d30d/go.mod h1:XBQDDjrIn+RCS4PDApYjTWwdp51NbqYfUGAYtzSB5ag= github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf h1:GbMJzvtwdX1OCEmsqSts/cRCIcIMvo8AYtC2dQExWlg= github.com/cgrates/kamevapi v0.0.0-20240307160311-26273f03eedf/go.mod h1:oEq/JbubkOD2pXHvDy4r7519NkxriONisrnVpkCaNJw= github.com/cgrates/ltcache v0.0.0-20240411152156-e673692056db h1:JRgzMS5kJ1WxaveoZ1YG/FowUDxFQXD3GjCHR7rH0Gk= diff --git a/services/janusagent.go b/services/janusagent.go index 8d644d0e2..f1278c818 100644 --- a/services/janusagent.go +++ b/services/janusagent.go @@ -72,7 +72,15 @@ func (ja *JanusAgent) Start() (err error) { if err = ja.jA.Connect(); err != nil { return } - ja.server.RegisterHttpHandler(ja.cfg.JanusAgentCfg().URL, ja.jA) + + ja.server.RegisterHttpFunc("POST "+ja.cfg.JanusAgentCfg().URL, ja.jA.CreateSession) + ja.server.RegisterHttpFunc("OPTIONS "+ja.cfg.JanusAgentCfg().URL, ja.jA.CORSOptions) + ja.server.RegisterHttpFunc(fmt.Sprintf("OPTIONS %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.SessioNKeepalive) + ja.server.RegisterHttpFunc(fmt.Sprintf("OPTIONS %s/{sessionID}/", ja.cfg.JanusAgentCfg().URL), ja.jA.CORSOptions) + ja.server.RegisterHttpFunc(fmt.Sprintf("GET %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.PollSession) + ja.server.RegisterHttpFunc(fmt.Sprintf("POST %s/{sessionID}", ja.cfg.JanusAgentCfg().URL), ja.jA.AttachPlugin) + ja.server.RegisterHttpFunc(fmt.Sprintf("POST %s/{sessionID}/{handleID}", ja.cfg.JanusAgentCfg().URL), ja.jA.HandlePlugin) + ja.started = true ja.Unlock() utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent))