From 7e2fae75232df605c1b707810ae7dcea87e1cbd3 Mon Sep 17 00:00:00 2001 From: DanB Date: Thu, 18 Apr 2019 20:05:13 +0200 Subject: [PATCH] DNSAgent - request processing mechanism implemented --- agents/dnsagent.go | 203 +++++++++++++++++++++++++++++++++++++++++++++ agents/libdns.go | 75 +++++++++++++++++ 2 files changed, 278 insertions(+) diff --git a/agents/dnsagent.go b/agents/dnsagent.go index 1ca0c9172..06ad14622 100644 --- a/agents/dnsagent.go +++ b/agents/dnsagent.go @@ -24,6 +24,7 @@ import ( "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" "github.com/cgrates/cgrates/utils" "github.com/cgrates/rpcclient" "github.com/miekg/dns" @@ -89,17 +90,219 @@ func (da *DNSAgent) handleMessage(w dns.ResponseWriter, req *dns.Msg) { ) } } + rply.Rcode = dns.RcodeServerFailure w.WriteMsg(rply) */ + + dnsDP := newDNSDataProvider(req, w) reqVars := make(map[string]interface{}) reqVars[QueryType] = dns.TypeToString[req.Question[0].Qtype] + rply := new(dns.Msg) + rply.SetReply(req) if req.Question[0].Qtype == dns.TypeNAPTR { e164, err := e164FromNAPTR(req.Question[0].Name) if err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> decoding NAPTR query: <%s>, err: %s", utils.DNSAgent, req.Question[0].Name, err.Error())) + rply.Rcode = dns.RcodeServerFailure + dnsWriteMsg(w, rply) + return } reqVars[E164Address] = e164 } + rplyNM := config.NewNavigableMap(nil) // share it among different processors + + var processed bool + var err error + for _, reqProcessor := range da.cgrCfg.DNSAgentCfg().RequestProcessors { + var lclProcessed bool + lclProcessed, err = da.processRequest( + reqProcessor, + newAgentRequest( + dnsDP, reqVars, rplyNM, + reqProcessor.Tenant, + da.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(da.cgrCfg.DNSAgentCfg().Timezone, + da.cgrCfg.GeneralCfg().DefaultTimezone), + da.fltrS)) + if lclProcessed { + processed = lclProcessed + } + if err != nil || + (lclProcessed && !reqProcessor.ContinueOnSuccess) { + break + } + } + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing message: %s from %s", + utils.DNSAgent, err.Error(), req, w.RemoteAddr())) + rply.Rcode = dns.RcodeServerFailure + dnsWriteMsg(w, rply) + return + } else if !processed { + utils.Logger.Warning( + fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s", + utils.DNSAgent, req, w.RemoteAddr())) + rply.Rcode = dns.RcodeServerFailure + dnsWriteMsg(w, rply) + return + } +} + +func (da *DNSAgent) processRequest(reqProcessor *config.RequestProcessor, + agReq *AgentRequest) (processed bool, err error) { + if pass, err := da.fltrS.Pass(agReq.tenant, + reqProcessor.Filters, agReq); err != nil || !pass { + return pass, err + } + if agReq.CGRRequest, err = agReq.AsNavigableMap(reqProcessor.RequestFields); err != nil { + return + } + cgrEv := agReq.CGRRequest.AsCGREvent(agReq.tenant, utils.NestingSep) + var reqType string + for _, typ := range []string{ + utils.MetaDryRun, utils.MetaAuth, + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaEvent, + utils.MetaCDRs, utils.META_NONE} { + if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags + reqType = typ + break + } + } + if reqProcessor.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, processorID: <%s>, message: %s", + utils.DNSAgent, reqProcessor.ID, agReq.Request.String())) + } + switch reqType { + default: + return false, fmt.Errorf("unknown request type: <%s>", reqType) + case utils.META_NONE: // do nothing on CGRateS side + case utils.MetaDryRun: + utils.Logger.Info( + fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s", + utils.DiameterAgent, reqProcessor.ID, utils.ToJSON(cgrEv))) + case utils.MetaAuth: + authArgs := sessions.NewV1AuthorizeArgs( + reqProcessor.Flags.HasKey(utils.MetaAttributes), + reqProcessor.Flags.HasKey(utils.MetaResources), + reqProcessor.Flags.HasKey(utils.MetaAccounts), + reqProcessor.Flags.HasKey(utils.MetaThresholds), + reqProcessor.Flags.HasKey(utils.MetaStats), + reqProcessor.Flags.HasKey(utils.MetaSuppliers), + reqProcessor.Flags.HasKey(utils.MetaSuppliersIgnoreErrors), + reqProcessor.Flags.HasKey(utils.MetaSuppliersEventCost), + *cgrEv) + var authReply sessions.V1AuthorizeReply + err = da.sS.Call(utils.SessionSv1AuthorizeEvent, + authArgs, &authReply) + if agReq.CGRReply, err = NewCGRReply(&authReply, err); err != nil { + return + } + case utils.MetaInitiate: + initArgs := sessions.NewV1InitSessionArgs( + reqProcessor.Flags.HasKey(utils.MetaAttributes), + reqProcessor.Flags.HasKey(utils.MetaResources), + reqProcessor.Flags.HasKey(utils.MetaAccounts), + reqProcessor.Flags.HasKey(utils.MetaThresholds), + reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv) + var initReply sessions.V1InitSessionReply + err = da.sS.Call(utils.SessionSv1InitiateSession, + initArgs, &initReply) + if agReq.CGRReply, err = NewCGRReply(&initReply, err); err != nil { + return + } + case utils.MetaUpdate: + updateArgs := sessions.NewV1UpdateSessionArgs( + reqProcessor.Flags.HasKey(utils.MetaAttributes), + reqProcessor.Flags.HasKey(utils.MetaAccounts), *cgrEv) + var updateReply sessions.V1UpdateSessionReply + err = da.sS.Call(utils.SessionSv1UpdateSession, + updateArgs, &updateReply) + if agReq.CGRReply, err = NewCGRReply(&updateReply, err); err != nil { + return + } + case utils.MetaTerminate: + terminateArgs := sessions.NewV1TerminateSessionArgs( + reqProcessor.Flags.HasKey(utils.MetaAccounts), + reqProcessor.Flags.HasKey(utils.MetaResources), + reqProcessor.Flags.HasKey(utils.MetaThresholds), + reqProcessor.Flags.HasKey(utils.MetaStats), *cgrEv) + var tRply string + err = da.sS.Call(utils.SessionSv1TerminateSession, + terminateArgs, &tRply) + if agReq.CGRReply, err = NewCGRReply(nil, err); err != nil { + return + } + case utils.MetaEvent: + evArgs := sessions.NewV1ProcessEventArgs( + reqProcessor.Flags.HasKey(utils.MetaResources), + reqProcessor.Flags.HasKey(utils.MetaAccounts), + reqProcessor.Flags.HasKey(utils.MetaAttributes), + reqProcessor.Flags.HasKey(utils.MetaThresholds), + reqProcessor.Flags.HasKey(utils.MetaStats), + *cgrEv) + var eventRply sessions.V1ProcessEventReply + err = da.sS.Call(utils.SessionSv1ProcessEvent, + evArgs, &eventRply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if eventRply.MaxUsage != nil { + cgrEv.Event[utils.Usage] = *eventRply.MaxUsage // make sure the CDR reflects the debit + } + if agReq.CGRReply, err = NewCGRReply(&eventRply, err); err != nil { + return + } + case utils.MetaCDRs: // allow CDR processing + } + // separate request so we can capture the Terminate/Event also here + if reqProcessor.Flags.HasKey(utils.MetaCDRs) && + !reqProcessor.Flags.HasKey(utils.MetaDryRun) { + var rplyCDRs string + //compose the arguments for SessionSv1ProcessCDR + argProcessCDR := &utils.CGREventWithArgDispatcher{ + CGREvent: cgrEv, + } + //check if we have APIKey in event and in case it has add it in ArgDispatcher + apiKeyIface, hasApiKey := cgrEv.Event[utils.MetaApiKey] + if hasApiKey { + argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{ + APIKey: utils.StringPointer(apiKeyIface.(string)), + } + } + //check if we have RouteID in event and in case it has add it in ArgDispatcher + routeIDIface, hasRouteID := cgrEv.Event[utils.MetaRouteID] + if hasRouteID { + if !hasApiKey { //in case we don't have APIKey, but we have RouteID we need to initialize the struct + argProcessCDR.ArgDispatcher = &utils.ArgDispatcher{ + RouteID: utils.StringPointer(routeIDIface.(string)), + } + } else { + argProcessCDR.ArgDispatcher.RouteID = utils.StringPointer(routeIDIface.(string)) + } + } + if err = da.sS.Call(utils.SessionSv1ProcessCDR, + argProcessCDR, &rplyCDRs); err != nil { + agReq.CGRReply.Set([]string{utils.Error}, err.Error(), false, false) + } + } + if nM, err := agReq.AsNavigableMap(reqProcessor.ReplyFields); err != nil { + return false, err + } else { + agReq.Reply.Merge(nM) + } + if reqProcessor.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, Diameter reply: %s", + utils.DiameterAgent, agReq.Reply)) + } + if reqType == utils.MetaDryRun { + utils.Logger.Info( + fmt.Sprintf("<%s> DRY_RUN, Diameter reply: %s", + utils.DiameterAgent, agReq.Reply)) + } + return true, nil } diff --git a/agents/libdns.go b/agents/libdns.go index b2f5a95b1..9b70c3c28 100644 --- a/agents/libdns.go +++ b/agents/libdns.go @@ -20,9 +20,13 @@ package agents import ( "errors" + "fmt" + "net" "strings" + "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" + "github.com/miekg/dns" ) const ( @@ -40,3 +44,74 @@ func e164FromNAPTR(name string) (e164 string, err error) { strings.Replace(name[:i], ".", "", -1)) return } + +// newDADataProvider constructs a DataProvider for a diameter message +func newDNSDataProvider(req *dns.Msg, + w dns.ResponseWriter) config.DataProvider { + return &dnsDP{req: req, w: w, + cache: config.NewNavigableMap(nil)} +} + +// dnsDP implements engien.DataProvider, serving as dns.Msg decoder +// cache is used to cache queries within the message +type dnsDP struct { + req *dns.Msg + w dns.ResponseWriter + cache *config.NavigableMap +} + +// String is part of engine.DataProvider interface +// when called, it will display the already parsed values out of cache +func (dP *dnsDP) String() string { + return utils.ToJSON(dP.req) +} + +// AsNavigableMap is part of engine.DataProvider interface +func (dP *dnsDP) AsNavigableMap([]*config.FCTemplate) ( + nm *config.NavigableMap, err error) { + return nil, utils.ErrNotImplemented +} + +// FieldAsString is part of engine.DataProvider interface +func (dP *dnsDP) FieldAsString(fldPath []string) (data string, err error) { + var valIface interface{} + valIface, err = dP.FieldAsInterface(fldPath) + if err != nil { + return + } + return utils.IfaceAsString(valIface) +} + +// RemoteHost is part of engine.DataProvider interface +func (dP *dnsDP) RemoteHost() net.Addr { + return utils.NewNetAddr(dP.w.RemoteAddr().Network(), dP.w.RemoteAddr().String()) +} + +// FieldAsInterface is part of engine.DataProvider interface +func (dP *dnsDP) FieldAsInterface(fldPath []string) (data interface{}, err error) { + if data, err = dP.cache.FieldAsInterface(fldPath); err != nil { + if err != utils.ErrNotFound { // item found in cache + return nil, err + } + err = nil // cancel previous err + } else { + return // data was found in cache + } + data = "" + // Return Question[0] by default + if len(dP.req.Question) != 0 { + data = dP.req.Question[0] + } + dP.cache.Set(fldPath, data, false, false) + return +} + +// dnsWriteErr writes the error with code back to the client +func dnsWriteMsg(w dns.ResponseWriter, msg *dns.Msg) (err error) { + if err = w.WriteMsg(msg); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: <%s> when writing on connection", + utils.DNSAgent, err.Error())) + } + return +}