DNSAgent - request processing mechanism implemented

This commit is contained in:
DanB
2019-04-18 20:05:13 +02:00
parent 8626e90cf3
commit 7e2fae7523
2 changed files with 278 additions and 0 deletions

View File

@@ -24,6 +24,7 @@ import (
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
"github.com/miekg/dns"
@@ -89,17 +90,219 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) {
)
}
}
rply.Rcode = dns.RcodeServerFailure
w.WriteMsg(rply)
*/
dnsDP := newDNSDataProvider(req, w)
reqVars := make(map[string]interface{})
reqVars[QueryType] = dns.TypeToString[req.Question[0].Qtype]
rply := new(dns.Msg)
rply.SetReply(req)
if req.Question[0].Qtype == dns.TypeNAPTR {
e164, err := e164FromNAPTR(req.Question[0].Name)
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> decoding NAPTR query: <%s>, err: %s",
utils.DNSAgent, req.Question[0].Name, err.Error()))
rply.Rcode = dns.RcodeServerFailure
dnsWriteMsg(w, rply)
return
}
reqVars[E164Address] = e164
}
rplyNM := config.NewNavigableMap(nil) // share it among different processors
var processed bool
var err error
for _, reqProcessor := range da.cgrCfg.DNSAgentCfg().RequestProcessors {
var lclProcessed bool
lclProcessed, err = da.processRequest(
reqProcessor,
newAgentRequest(
dnsDP, reqVars, rplyNM,
reqProcessor.Tenant,
da.cgrCfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(da.cgrCfg.DNSAgentCfg().Timezone,
da.cgrCfg.GeneralCfg().DefaultTimezone),
da.fltrS))
if lclProcessed {
processed = lclProcessed
}
if err != nil ||
(lclProcessed && !reqProcessor.ContinueOnSuccess) {
break
}
}
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s from %s",
utils.DNSAgent, err.Error(), req, w.RemoteAddr()))
rply.Rcode = dns.RcodeServerFailure
dnsWriteMsg(w, rply)
return
} else if !processed {
utils.Logger.Warning(
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
utils.DNSAgent, req, w.RemoteAddr()))
rply.Rcode = dns.RcodeServerFailure
dnsWriteMsg(w, rply)
return
}
}
func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor,
agReq *AgentRequest) (processed bool, err error) {
if pass, err := da.fltrS.Pass(agReq.tenant,
reqProcessor.Filters, agReq); err != nil || !pass {
return pass, err
}
if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil {
return
}
cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep)
var reqType string
for _, typ := range []string{
utils.MetaDryRun, utils.MetaAuth,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaEvent,
utils.MetaCDRs, utils.META_NONE} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break
}
}
if reqProcessor.Flags.HasKey(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: <%s>, message: %s",
utils.DNSAgent, reqProcessor.ID, agReq.Request.String()))
}
switch reqType {
default:
return false, fmt.Errorf("unknown request type: <%s>", reqType)
case utils.META_NONE: // do nothing on CGRateS side
case utils.MetaDryRun:
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",
utils.DiameterAgent, reqProcessor.ID, utils.ToJSON(cgrEv)))
case utils.MetaAuth:
authArgs := sessions.NewV1AuthorizeArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats),
reqProcessor.Flags.HasKey(utils.MetaSuppliers),
reqProcessor.Flags.HasKey(utils.MetaSuppliersIgnoreErrors),
reqProcessor.Flags.HasKey(utils.MetaSuppliersEventCost),
*cgrEv)
var authReply sessions.V1AuthorizeReply
err = da.sS.Call(utils.SessionSv1AuthorizeEvent,
authArgs, &authReply)
if agReq.CGRReply, err = NewCGRReply(&authReply, err); err != nil {
return
}
case utils.MetaInitiate:
initArgs := sessions.NewV1InitSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var initReply sessions.V1InitSessionReply
err = da.sS.Call(utils.SessionSv1InitiateSession,
initArgs, &initReply)
if agReq.CGRReply, err = NewCGRReply(&initReply, err); err != nil {
return
}
case utils.MetaUpdate:
updateArgs := sessions.NewV1UpdateSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaAccounts), *cgrEv)
var updateReply sessions.V1UpdateSessionReply
err = da.sS.Call(utils.SessionSv1UpdateSession,
updateArgs, &updateReply)
if agReq.CGRReply, err = NewCGRReply(&updateReply, err); err != nil {
return
}
case utils.MetaTerminate:
terminateArgs := sessions.NewV1TerminateSessionArgs(
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv)
var tRply string
err = da.sS.Call(utils.SessionSv1TerminateSession,
terminateArgs, &tRply)
if agReq.CGRReply, err = NewCGRReply(nil, err); err != nil {
return
}
case utils.MetaEvent:
evArgs := sessions.NewV1ProcessEventArgs(
reqProcessor.Flags.HasKey(utils.MetaResources),
reqProcessor.Flags.HasKey(utils.MetaAccounts),
reqProcessor.Flags.HasKey(utils.MetaAttributes),
reqProcessor.Flags.HasKey(utils.MetaThresholds),
reqProcessor.Flags.HasKey(utils.MetaStats),
*cgrEv)
var eventRply sessions.V1ProcessEventReply
err = da.sS.Call(utils.SessionSv1ProcessEvent,
evArgs, &eventRply)
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
} else if eventRply.MaxUsage != nil {
cgrEv.Event[utils.Usage] = *eventRply.MaxUsage // make sure the CDR reflects the debit
}
if agReq.CGRReply, err = NewCGRReply(&eventRply, err); err != nil {
return
}
case utils.MetaCDRs: // allow CDR processing
}
// separate request so we can capture the Terminate/Event also here
if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
!reqProcessor.Flags.HasKey(utils.MetaDryRun) {
var rplyCDRs string
//compose the arguments for SessionSv1ProcessCDR
argProcessCDR := &utils.CGREventWithArgDispatcher{
CGREvent: cgrEv,
}
//check if we have APIKey in event and in case it has add it in ArgDispatcher
apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey]
if hasApiKey {
argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{
APIKey: utils.StringPointer(apiKeyIface.(string)),
}
}
//check if we have RouteID in event and in case it has add it in ArgDispatcher
routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID]
if hasRouteID {
if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct
argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{
RouteID: utils.StringPointer(routeIDIface.(string)),
}
} else {
argProcessCDR.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string))
}
}
if err = da.sS.Call(utils.SessionSv1ProcessCDR,
argProcessCDR, &rplyCDRs); err != nil {
agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false)
}
}
if nM, err := agReq.AsNavigableMap(reqProcessor.ReplyFields); err != nil {
return false, err
} else {
agReq.Reply.Merge(nM)
}
if reqProcessor.Flags.HasKey(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, Diameter reply: %s",
utils.DiameterAgent, agReq.Reply))
}
if reqType == utils.MetaDryRun {
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, Diameter reply: %s",
utils.DiameterAgent, agReq.Reply))
}
return true, nil
}

View File

@@ -20,9 +20,13 @@ package agents
import (
"errors"
"fmt"
"net"
"strings"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/miekg/dns"
)
const (
@@ -40,3 +44,74 @@ func e164FromNAPTR(name string) (e164 string, err error) {
strings.Replace(name[:i], ".", "", -1))
return
}
// newDADataProvider constructs a DataProvider for a diameter message
func newDNSDataProvider(req *dns.Msg,
w dns.ResponseWriter) config.DataProvider {
return &dnsDP{req: req, w: w,
cache: config.NewNavigableMap(nil)}
}
// dnsDP implements engien.DataProvider, serving as dns.Msg decoder
// cache is used to cache queries within the message
type dnsDP struct {
req *dns.Msg
w dns.ResponseWriter
cache *config.NavigableMap
}
// String is part of engine.DataProvider interface
// when called, it will display the already parsed values out of cache
func (dP *dnsDP) String() string {
return utils.ToJSON(dP.req)
}
// AsNavigableMap is part of engine.DataProvider interface
func (dP *dnsDP) AsNavigableMap([]*config.FCTemplate) (
nm *config.NavigableMap, err error) {
return nil, utils.ErrNotImplemented
}
// FieldAsString is part of engine.DataProvider interface
func (dP *dnsDP) FieldAsString(fldPath []string) (data string, err error) {
var valIface interface{}
valIface, err = dP.FieldAsInterface(fldPath)
if err != nil {
return
}
return utils.IfaceAsString(valIface)
}
// RemoteHost is part of engine.DataProvider interface
func (dP *dnsDP) RemoteHost() net.Addr {
return utils.NewNetAddr(dP.w.RemoteAddr().Network(), dP.w.RemoteAddr().String())
}
// FieldAsInterface is part of engine.DataProvider interface
func (dP *dnsDP) FieldAsInterface(fldPath []string) (data interface{}, err error) {
if data, err = dP.cache.FieldAsInterface(fldPath); err != nil {
if err != utils.ErrNotFound { // item found in cache
return nil, err
}
err = nil // cancel previous err
} else {
return // data was found in cache
}
data = ""
// Return Question[0] by default
if len(dP.req.Question) != 0 {
data = dP.req.Question[0]
}
dP.cache.Set(fldPath, data, false, false)
return
}
// dnsWriteErr writes the error with code back to the client
func dnsWriteMsg(w dns.ResponseWriter, msg *dns.Msg) (err error) {
if err = w.WriteMsg(msg); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: <%s> when writing on connection",
utils.DNSAgent, err.Error()))
}
return
}