Updated SIPAgent to map ACK based on Call-ID and From tag

This commit is contained in:
Trial97
2020-06-25 17:08:34 +03:00
parent c8f51fa108
commit 129aa104db
7 changed files with 37 additions and 31 deletions

View File

@@ -45,11 +45,11 @@ func NewHTTPAgent(connMgr *engine.ConnManager, sessionConns []string,
// HTTPAgent is a handler for HTTP requests
type HTTPAgent struct {
connMgr *engine.ConnManager
filterS *engine.FilterS
dfltTenant,
reqPayload,
rplyPayload string
connMgr *engine.ConnManager
filterS *engine.FilterS
dfltTenant string
reqPayload string
rplyPayload string
reqProcessors []*config.RequestProcessor
sessionConns []string
}

View File

@@ -24,7 +24,6 @@ import (
"net/http"
"strings"
"testing"
//"github.com/cgrates/cgrates/utils"
)
func TestHttpUrlDPFieldAsInterface(t *testing.T) {

View File

@@ -21,6 +21,7 @@ package agents
import (
"fmt"
"net"
"regexp"
"sync"
"time"
@@ -43,6 +44,10 @@ const (
method = "Method"
)
var (
sipTagRgx = regexp.MustCompile(`tag=([^ ,;>]*)`)
)
// NewSIPAgent will construct a SIPAgent
func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig,
filterS *engine.FilterS) (sa *SIPAgent, err error) {
@@ -228,10 +233,12 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte
utils.SIPAgent, err.Error(), messageStr))
return // do we need to return error in case we can't parse the message?
}
key := utils.ConcatenatedKey(sipMessage[fromHeader], sipMessage[callIDHeader])
tags := sipTagRgx.FindStringSubmatch(sipMessage[fromHeader])
// in case we get a wrong sip message ( without tag in the From header) the next line should panic
key := utils.ConcatenatedKey(sipMessage[callIDHeader], tags[1])
method := sipMessage.MethodFrom(requestHeader)
if ackMethod == method {
if sa.cfg.SIPAgentCfg().ACKInterval == 0 { // ignore ACK
if sa.cfg.SIPAgentCfg().RetransmissionTimer == 0 { // ignore ACK
return
}
sa.ackLocks.Lock()
@@ -255,7 +262,7 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte
}
// because we expext to send codes from 300-699 we wait for the ACK every time
if method != inviteMethod || // only invitest need ACK
sa.cfg.SIPAgentCfg().ACKInterval == 0 {
sa.cfg.SIPAgentCfg().RetransmissionTimer == 0 {
return // disabled ACK
}
stopChan := make(chan struct{})
@@ -265,7 +272,7 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte
go func(stopChan chan struct{}, a []byte) {
for {
select {
case <-time.After(sa.cfg.SIPAgentCfg().ACKInterval):
case <-time.After(sa.cfg.SIPAgentCfg().RetransmissionTimer):
if err = write(ans); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s sending message: %s",
@@ -285,7 +292,7 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte
func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) {
if sipMessage[userAgentHeader] != "" {
sipMessage[userAgentHeader] = utils.CGRateS
sipMessage[userAgentHeader] = fmt.Sprintf("%s@%s", utils.CGRateS, utils.VERSION)
}
sipMessageIface := make(map[string]interface{})
for k, v := range sipMessage {

View File

@@ -941,7 +941,7 @@ const CGRATES_CFG_JSON = `
"listen_net": "udp", // network to listen on <udp|tcp|tcp-tls>
"sessions_conns": ["*internal"],
"timezone": "", // timezone of the events if not specified <UTC|Local|$IANA_TZ_DB>
"ack_interval": "1s", // the duration to wait to receive an ACK before resending the reply
"retransmission_timer": "1s", // the duration to wait to receive an ACK before resending the reply
"templates":{ // default message templates
"*err": [
{"tag": "Request", "path": "*rep.Request", "type": "*constant",

View File

@@ -622,12 +622,12 @@ type RateSJsonCfg struct {
// SIPAgentJsonCfg
type SIPAgentJsonCfg struct {
Enabled *bool
Listen *string
Listen_net *string
Sessions_conns *[]string
Timezone *string
Ack_interval *string
Templates map[string][]*FcTemplateJsonCfg
Request_processors *[]*ReqProcessorJsnCfg
Enabled *bool
Listen *string
Listen_net *string
Sessions_conns *[]string
Timezone *string
Retransmission_timer *string
Templates map[string][]*FcTemplateJsonCfg
Request_processors *[]*ReqProcessorJsnCfg
}

View File

@@ -26,14 +26,14 @@ import (
)
type SIPAgentCfg struct {
Enabled bool
Listen string
ListenNet string // udp or tcp
SessionSConns []string
Timezone string
ACKInterval time.Duration // timeout replies if not reaching back
Templates map[string][]*FCTemplate
RequestProcessors []*RequestProcessor
Enabled bool
Listen string
ListenNet string // udp or tcp
SessionSConns []string
Timezone string
RetransmissionTimer time.Duration // timeout replies if not reaching back
Templates map[string][]*FCTemplate
RequestProcessors []*RequestProcessor
}
func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err error) {
@@ -63,8 +63,8 @@ func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
}
}
}
if jsnCfg.Ack_interval != nil {
if da.ACKInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Ack_interval); err != nil {
if jsnCfg.Retransmission_timer != nil {
if da.RetransmissionTimer, err = utils.ParseDurationWithNanosecs(*jsnCfg.Retransmission_timer); err != nil {
return err
}
}

View File

@@ -54,7 +54,7 @@ type Migrator struct {
dryRun bool
sameDataDB bool
sameStorDB bool
sameOutDB bool
sameOutDB bool // needed in case we set version and we use same DataDB as StorDB to store the versions without overwriting them
stats map[string]int
}