From dbed1a20599a4baa0f814ddf755c516064b494b5 Mon Sep 17 00:00:00 2001 From: DanB Date: Sun, 23 Sep 2018 18:49:42 +0200 Subject: [PATCH] DiameterAgent processRequest skel --- agents/dmtagent.go | 63 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/agents/dmtagent.go b/agents/dmtagent.go index ba0643637..494db2381 100644 --- a/agents/dmtagent.go +++ b/agents/dmtagent.go @@ -24,6 +24,7 @@ import ( "sync" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/fiorix/go-diameter/diam" @@ -31,14 +32,14 @@ import ( "github.com/fiorix/go-diameter/diam/sm" ) -func NewDiameterAgent(cgrCfg *config.CGRConfig, +func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS, sessionS rpcclient.RpcClientConnection) (*DiameterAgent, error) { if sessionS != nil && reflect.ValueOf(sessionS).IsNil() { sessionS = nil } da := &DiameterAgent{ - cgrCfg: cgrCfg, sessionS: sessionS, - connMux: new(sync.Mutex)} + cgrCfg: cgrCfg, filterS: filterS, + sessionS: sessionS, connMux: new(sync.Mutex)} dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath if len(dictsPath) != 0 { if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil { @@ -50,26 +51,27 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig, type DiameterAgent struct { cgrCfg *config.CGRConfig + filterS *engine.FilterS sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component connMux *sync.Mutex // Protect connection for read/write } // ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine -func (self *DiameterAgent) ListenAndServe() error { - return diam.ListenAndServe(self.cgrCfg.DiameterAgentCfg().Listen, self.handlers(), nil) +func (da *DiameterAgent) ListenAndServe() error { + return diam.ListenAndServe(da.cgrCfg.DiameterAgentCfg().Listen, da.handlers(), nil) } // Creates the message handlers -func (self *DiameterAgent) handlers() diam.Handler { +func (da *DiameterAgent) handlers() diam.Handler { settings := &sm.Settings{ - OriginHost: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginHost), - OriginRealm: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginRealm), - VendorID: datatype.Unsigned32(self.cgrCfg.DiameterAgentCfg().VendorId), - ProductName: datatype.UTF8String(self.cgrCfg.DiameterAgentCfg().ProductName), + OriginHost: datatype.DiameterIdentity(da.cgrCfg.DiameterAgentCfg().OriginHost), + OriginRealm: datatype.DiameterIdentity(da.cgrCfg.DiameterAgentCfg().OriginRealm), + VendorID: datatype.Unsigned32(da.cgrCfg.DiameterAgentCfg().VendorId), + ProductName: datatype.UTF8String(da.cgrCfg.DiameterAgentCfg().ProductName), FirmwareRevision: datatype.Unsigned32(utils.DIAMETER_FIRMWARE_REVISION), } dSM := sm.New(settings) - dSM.HandleFunc("ALL", self.handleALL) // route all commands to one dispatcher + dSM.HandleFunc("ALL", da.handleMessage) // route all commands to one dispatcher go func() { for err := range dSM.ErrorReports() { utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err)) @@ -79,7 +81,40 @@ func (self *DiameterAgent) handlers() diam.Handler { } // handleALL is the handler of all messages coming in via Diameter -func (self *DiameterAgent) handleALL(c diam.Conn, m *diam.Message) { - utils.Logger.Warning(fmt.Sprintf("<%s> received unexpected message from %s:\n%s", - utils.DiameterAgent, c.RemoteAddr(), m)) +func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) { + var processed bool + var rply *diam.Message + var err error + for _, reqProcessor := range da.cgrCfg.DiameterAgentCfg().RequestProcessors { + var lclProcessed bool + lclProcessed, err = da.processRequest( + reqProcessor, + newAgentRequest( + newDADataProvider(m), + reqProcessor.Tenant, da.cgrCfg.DefaultTenant, da.filterS), + rply) + if lclProcessed { + processed = lclProcessed + } + if err != nil || + (lclProcessed && !reqProcessor.ContinueOnSuccess) { + break + } + } + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing message: %s", + utils.DiameterAgent, err.Error(), m)) + return // FixMe with returning some error on HTTP level + } else if !processed { + utils.Logger.Warning( + fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s", + utils.DiameterAgent, m, c.RemoteAddr())) + return + } +} + +func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor, + agReq *AgentRequest, rply *diam.Message) (processed bool, err error) { + return }