mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-21 15:18:44 +05:00
DiameterAgent processRequest skel
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
"github.com/fiorix/go-diameter/diam"
|
||||
@@ -31,14 +32,14 @@ import (
|
||||
"github.com/fiorix/go-diameter/diam/sm"
|
||||
)
|
||||
|
||||
func NewDiameterAgent(cgrCfg *config.CGRConfig,
|
||||
func NewDiameterAgent(cgrCfg *config.CGRConfig, filterS *engine.FilterS,
|
||||
sessionS rpcclient.RpcClientConnection) (*DiameterAgent, error) {
|
||||
if sessionS != nil && reflect.ValueOf(sessionS).IsNil() {
|
||||
sessionS = nil
|
||||
}
|
||||
da := &DiameterAgent{
|
||||
cgrCfg: cgrCfg, sessionS: sessionS,
|
||||
connMux: new(sync.Mutex)}
|
||||
cgrCfg: cgrCfg, filterS: filterS,
|
||||
sessionS: sessionS, connMux: new(sync.Mutex)}
|
||||
dictsPath := cgrCfg.DiameterAgentCfg().DictionariesPath
|
||||
if len(dictsPath) != 0 {
|
||||
if err := loadDictionaries(dictsPath, utils.DiameterAgent); err != nil {
|
||||
@@ -50,26 +51,27 @@ func NewDiameterAgent(cgrCfg *config.CGRConfig,
|
||||
|
||||
type DiameterAgent struct {
|
||||
cgrCfg *config.CGRConfig
|
||||
filterS *engine.FilterS
|
||||
sessionS rpcclient.RpcClientConnection // Connection towards CGR-SessionS component
|
||||
connMux *sync.Mutex // Protect connection for read/write
|
||||
}
|
||||
|
||||
// ListenAndServe is called when DiameterAgent is started, usually from within cmd/cgr-engine
|
||||
func (self *DiameterAgent) ListenAndServe() error {
|
||||
return diam.ListenAndServe(self.cgrCfg.DiameterAgentCfg().Listen, self.handlers(), nil)
|
||||
func (da *DiameterAgent) ListenAndServe() error {
|
||||
return diam.ListenAndServe(da.cgrCfg.DiameterAgentCfg().Listen, da.handlers(), nil)
|
||||
}
|
||||
|
||||
// Creates the message handlers
|
||||
func (self *DiameterAgent) handlers() diam.Handler {
|
||||
func (da *DiameterAgent) handlers() diam.Handler {
|
||||
settings := &sm.Settings{
|
||||
OriginHost: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginHost),
|
||||
OriginRealm: datatype.DiameterIdentity(self.cgrCfg.DiameterAgentCfg().OriginRealm),
|
||||
VendorID: datatype.Unsigned32(self.cgrCfg.DiameterAgentCfg().VendorId),
|
||||
ProductName: datatype.UTF8String(self.cgrCfg.DiameterAgentCfg().ProductName),
|
||||
OriginHost: datatype.DiameterIdentity(da.cgrCfg.DiameterAgentCfg().OriginHost),
|
||||
OriginRealm: datatype.DiameterIdentity(da.cgrCfg.DiameterAgentCfg().OriginRealm),
|
||||
VendorID: datatype.Unsigned32(da.cgrCfg.DiameterAgentCfg().VendorId),
|
||||
ProductName: datatype.UTF8String(da.cgrCfg.DiameterAgentCfg().ProductName),
|
||||
FirmwareRevision: datatype.Unsigned32(utils.DIAMETER_FIRMWARE_REVISION),
|
||||
}
|
||||
dSM := sm.New(settings)
|
||||
dSM.HandleFunc("ALL", self.handleALL) // route all commands to one dispatcher
|
||||
dSM.HandleFunc("ALL", da.handleMessage) // route all commands to one dispatcher
|
||||
go func() {
|
||||
for err := range dSM.ErrorReports() {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> sm error: %v", utils.DiameterAgent, err))
|
||||
@@ -79,7 +81,40 @@ func (self *DiameterAgent) handlers() diam.Handler {
|
||||
}
|
||||
|
||||
// handleALL is the handler of all messages coming in via Diameter
|
||||
func (self *DiameterAgent) handleALL(c diam.Conn, m *diam.Message) {
|
||||
utils.Logger.Warning(fmt.Sprintf("<%s> received unexpected message from %s:\n%s",
|
||||
utils.DiameterAgent, c.RemoteAddr(), m))
|
||||
func (da *DiameterAgent) handleMessage(c diam.Conn, m *diam.Message) {
|
||||
var processed bool
|
||||
var rply *diam.Message
|
||||
var err error
|
||||
for _, reqProcessor := range da.cgrCfg.DiameterAgentCfg().RequestProcessors {
|
||||
var lclProcessed bool
|
||||
lclProcessed, err = da.processRequest(
|
||||
reqProcessor,
|
||||
newAgentRequest(
|
||||
newDADataProvider(m),
|
||||
reqProcessor.Tenant, da.cgrCfg.DefaultTenant, da.filterS),
|
||||
rply)
|
||||
if lclProcessed {
|
||||
processed = lclProcessed
|
||||
}
|
||||
if err != nil ||
|
||||
(lclProcessed && !reqProcessor.ContinueOnSuccess) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing message: %s",
|
||||
utils.DiameterAgent, err.Error(), m))
|
||||
return // FixMe with returning some error on HTTP level
|
||||
} else if !processed {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
|
||||
utils.DiameterAgent, m, c.RemoteAddr()))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (da *DiameterAgent) processRequest(reqProcessor *config.DARequestProcessor,
|
||||
agReq *AgentRequest, rply *diam.Message) (processed bool, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user