From 8db91ca47909f3501870186dc649387803526799 Mon Sep 17 00:00:00 2001 From: DanB Date: Mon, 8 Apr 2024 20:09:06 +0200 Subject: [PATCH] Basic service infrastructure for JanusAgent --- agents/janusagent.go | 53 +++++++++++++++++ cmd/cgr-engine/cgr-engine.go | 1 + config/config.go | 11 +++- config/config_defaults.go | 9 +++ config/config_json.go | 1 + config/janusagntcfg.go | 27 +++++++++ services/janusagent.go | 107 +++++++++++++++++++++++++++++++++++ utils/consts.go | 1 + 8 files changed, 209 insertions(+), 1 deletion(-) create mode 100644 config/janusagntcfg.go create mode 100644 services/janusagent.go diff --git a/agents/janusagent.go b/agents/janusagent.go index 406f6f482..2c2e3cbb5 100644 --- a/agents/janusagent.go +++ b/agents/janusagent.go @@ -19,6 +19,8 @@ along with this program. If not, see package agents import ( + "net/http" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" ) @@ -42,3 +44,54 @@ type JanusAgent struct { reqProcessors []*config.RequestProcessor sessionConns []string } + +// ServeHTTP implements http.Handler interface +func (ja *JanusAgent) ServeHTTP(w http.ResponseWriter, req *http.Request) { + /*dcdr, err := newHADataProvider(ha.reqPayload, req) // dcdr will provide information from request + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error creating decoder: %s", + utils.HTTPAgent, err.Error())) + return + } + cgrRplyNM := &utils.DataNode{Type: utils.NMMapType, Map: make(map[string]*utils.DataNode)} + rplyNM := utils.NewOrderedNavigableMap() + opts := utils.MapStorage{} + reqVars := &utils.DataNode{Type: utils.NMMapType, Map: map[string]*utils.DataNode{utils.RemoteHost: utils.NewLeafNode(req.RemoteAddr)}} + for _, reqProcessor := range ha.reqProcessors { + agReq := NewAgentRequest(dcdr, reqVars, cgrRplyNM, rplyNM, + opts, reqProcessor.Tenant, ha.dfltTenant, + utils.FirstNonEmpty(reqProcessor.Timezone, + config.CgrConfig().GeneralCfg().DefaultTimezone), + ha.filterS, nil) + lclProcessed, err := processRequest(context.TODO(), reqProcessor, agReq, + utils.HTTPAgent, ha.connMgr, ha.sessionConns, + agReq.filterS) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing request: %s", + utils.HTTPAgent, err.Error(), utils.ToJSON(agReq))) + return // FixMe with returning some error on HTTP level + } + if !lclProcessed { + continue + } + if lclProcessed && !reqProcessor.Flags.GetBool(utils.MetaContinue) { + break + } + } + encdr, err := newHAReplyEncoder(ha.rplyPayload, w) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error creating reply encoder: %s", + utils.HTTPAgent, err.Error())) + return + } + if err = encdr.Encode(rplyNM); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s encoding out %s", + utils.HTTPAgent, err.Error(), utils.ToJSON(rplyNM))) + return + } + */ +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 655beb164..97b0b0c9f 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -645,6 +645,7 @@ func main() { services.NewEventReaderService(cfg, filterSChan, shdChan, connManager, server, internalERsChan, anz, srvDep), services.NewSIPAgent(cfg, filterSChan, shdChan, connManager, srvDep), + services.NewJanusAgent(cfg, filterSChan, server, connManager, srvDep), ) srvManager.StartServices() // Start FilterS diff --git a/config/config.go b/config/config.go index 3cc2412cf..0f32a5880 100644 --- a/config/config.go +++ b/config/config.go @@ -174,6 +174,7 @@ func newCGRConfig(config []byte) (cfg *CGRConfig, err error) { cfg.eesCfg = new(EEsCfg) cfg.eesCfg.Cache = make(map[string]*CacheParamCfg) cfg.sipAgentCfg = new(SIPAgentCfg) + cfg.janusAgentCfg = new(JanusAgentCfg) cfg.configSCfg = new(ConfigSCfg) cfg.apiBanCfg = new(APIBanCfg) cfg.sentryPeerCfg = new(SentryPeerCfg) @@ -325,6 +326,7 @@ type CGRConfig struct { ersCfg *ERsCfg // EventReader config eesCfg *EEsCfg // EventExporter config sipAgentCfg *SIPAgentCfg // SIPAgent config + janusAgentCfg *JanusAgentCfg // JanusAgent config configSCfg *ConfigSCfg // ConfigS config apiBanCfg *APIBanCfg // APIBan config sentryPeerCfg *SentryPeerCfg //SentryPeer config @@ -1078,13 +1080,20 @@ func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg { return cfg.eesCfg } -// SIPAgentCfg reads the Apier configuration +// SIPAgentCfg reads the SIPAgent configuration func (cfg *CGRConfig) SIPAgentCfg() *SIPAgentCfg { cfg.lks[SIPAgentJson].Lock() defer cfg.lks[SIPAgentJson].Unlock() return cfg.sipAgentCfg } +// JanusAgentCfg reads the JanusAgent configuration +func (cfg *CGRConfig) JanusAgentCfg() *JanusAgentCfg { + cfg.lks[JanusAgentJson].Lock() + defer cfg.lks[JanusAgentJson].Unlock() + return cfg.janusAgentCfg +} + // RPCConns reads the RPCConns configuration func (cfg *CGRConfig) RPCConns() RPCConns { cfg.lks[RPCConnsJsonName].RLock() diff --git a/config/config_defaults.go b/config/config_defaults.go index a23cab158..3fa771930 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -1180,6 +1180,15 @@ const CGRATES_CFG_JSON = ` }, +"janus_agent": { + "enabled": false, // enables the Janus agent: + "url": "/janus", + "sessions_conns": ["*internal"], + "request_processors": [ // request processors to be applied to Janus messages + ], +}, + + "templates": { "*err": [ {"tag": "SessionId", "path": "*rep.Session-Id", "type": "*variable", diff --git a/config/config_json.go b/config/config_json.go index a0401fed8..570b24982 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -61,6 +61,7 @@ const ( EEsJson = "ees" RPCConnsJsonName = "rpc_conns" SIPAgentJson = "sip_agent" + JanusAgentJson = "janus_agent" TemplatesJson = "templates" ConfigSJson = "configs" APIBanCfgJson = "apiban" diff --git a/config/janusagntcfg.go b/config/janusagntcfg.go new file mode 100644 index 000000000..275a329cc --- /dev/null +++ b/config/janusagntcfg.go @@ -0,0 +1,27 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +// JanusAgentCfg the config for an Janus Agent +type JanusAgentCfg struct { + Enabled bool + URL string + SessionSConns []string + RequestProcessors []*RequestProcessor +} diff --git a/services/janusagent.go b/services/janusagent.go new file mode 100644 index 000000000..9e895b04b --- /dev/null +++ b/services/janusagent.go @@ -0,0 +1,107 @@ +/* +Real-time Online/Offline Cjarging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope tjat it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERchanTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should jave received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/cores" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" +) + +// NewJanusAgent returns the Janus Agent +func NewJanusAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + server *cores.Server, connMgr *engine.ConnManager, + srvDep map[string]*sync.WaitGroup) servmanager.Service { + return &JanusAgent{ + cfg: cfg, + filterSChan: filterSChan, + server: server, + connMgr: connMgr, + srvDep: srvDep, + } +} + +// JanusAgent implements Service interface +type JanusAgent struct { + sync.RWMutex + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + server *cores.Server + + // we can realy stop the JanusAgent so keep a flag + // if we registerd the jandlers + started bool + connMgr *engine.ConnManager + srvDep map[string]*sync.WaitGroup +} + +// Start should jandle the sercive start +func (ja *JanusAgent) Start() (err error) { + if ja.IsRunning() { + return utils.ErrServiceAlreadyRunning + } + + filterS := <-ja.filterSChan + ja.filterSChan <- filterS + + ja.Lock() + ja.started = true + utils.Logger.Info(fmt.Sprintf("<%s> successfully started.", utils.JanusAgent)) + ja.server.RegisterHttpHandler(ja.cfg.JanusAgentCfg().URL, + agents.NewJanusAgent(ja.connMgr, ja.cfg.JanusAgentCfg().SessionSConns, filterS, + ja.cfg.JanusAgentCfg().RequestProcessors)) + ja.Unlock() + return +} + +// Reload jandles the change of config +func (ja *JanusAgent) Reload() (err error) { + return // no reload +} + +// Shutdown stops the service +func (ja *JanusAgent) Shutdown() (err error) { + ja.Lock() + ja.started = false + ja.Unlock() + return // no shutdown for the momment +} + +// IsRunning returns if the service is running +func (ja *JanusAgent) IsRunning() bool { + ja.RLock() + defer ja.RUnlock() + return ja.started +} + +// ServiceName returns the service name +func (ja *JanusAgent) ServiceName() string { + return utils.JanusAgent +} + +// ShouldRun returns if the service should be running +func (ja *JanusAgent) ShouldRun() bool { + return ja.cfg.JanusAgentCfg().Enabled +} diff --git a/utils/consts.go b/utils/consts.go index 30a51bba3..a96e47516 100644 --- a/utils/consts.go +++ b/utils/consts.go @@ -1978,6 +1978,7 @@ const ( AsteriskAgent = "AsteriskAgent" HTTPAgent = "HTTPAgent" SIPAgent = "SIPAgent" + JanusAgent = "JanusAgent" ) // Google_API