From 2161cda746daa5f05abb2a1642658f0054046c98 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 13 May 2020 18:06:22 +0300 Subject: [PATCH] Updated test templates for SIPAgent --- agents/libsip.go | 30 +-- agents/sipagent.go | 179 ++++++++++++------ agents/sipagent_it_test.go | 4 +- .../samples/sipagent_internal/redirect.json | 7 +- .../conf/samples/sipagent_mongo/redirect.json | 7 +- .../conf/samples/sipagent_mysql/redirect.json | 11 +- .../samples/sipagent_mysql/suppliers.json | 68 +++++++ go.mod | 2 +- go.sum | 3 +- services/sipagent.go | 37 ++-- utils/consts.go | 3 + utils/dataconverter.go | 34 ++++ 12 files changed, 257 insertions(+), 128 deletions(-) create mode 100644 data/conf/samples/sipagent_mysql/suppliers.json diff --git a/agents/libsip.go b/agents/libsip.go index 82129c39c..59369d3da 100644 --- a/agents/libsip.go +++ b/agents/libsip.go @@ -19,19 +19,15 @@ along with this program. If not, see package agents import ( - "context" "fmt" - "net" "strings" - "syscall" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/utils" "github.com/cgrates/sipd" - "golang.org/x/sys/unix" ) -// updateDiamMsgFromNavMap will update the diameter message with items from navigable map +// updateSIPMsgFromNavMap will update the diameter message with items from navigable map func updateSIPMsgFromNavMap(m sipd.Message, navMp *utils.OrderedNavigableMap) (err error) { // write reply into message for el := navMp.GetFirstElement(); el != nil; el = el.Next() { @@ -51,27 +47,3 @@ func updateSIPMsgFromNavMap(m sipd.Message, navMp *utils.OrderedNavigableMap) (e } return } - -func reuseportControl(network, address string, c syscall.RawConn) error { - var opErr error - err := c.Control(func(fd uintptr) { - opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) - }) - if err != nil { - return err - } - - return opErr -} - -func listenTCPWithReuseablePort(network, addr string) (net.Listener, error) { - var lc net.ListenConfig - lc.Control = reuseportControl - return lc.Listen(context.Background(), network, addr) -} - -func listenUDPWithReuseablePort(network, addr string) (net.PacketConn, error) { - var lc net.ListenConfig - lc.Control = reuseportControl - return lc.ListenPacket(context.Background(), network, addr) -} diff --git a/agents/sipagent.go b/agents/sipagent.go index f8da3eea7..ed56f67f4 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -21,6 +21,8 @@ package agents import ( "fmt" "net" + "sync" + "time" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/engine" @@ -45,28 +47,34 @@ func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig, // SIPAgent is a handler for HTTP requests type SIPAgent struct { - connMgr *engine.ConnManager - filterS *engine.FilterS - cfg *config.CGRConfig + connMgr *engine.ConnManager + filterS *engine.FilterS + cfg *config.CGRConfig + stopChan chan struct{} +} + +// Shutdown will stop the SIPAgent server +func (sa *SIPAgent) Shutdown() { + close(sa.stopChan) } // ListenAndServe will run the DNS handler doing also the connection to listen address func (sa *SIPAgent) ListenAndServe() (err error) { + sa.stopChan = make(chan struct{}) utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>", utils.SIPAgent, sa.cfg.SIPAgentCfg().ListenNet, sa.cfg.SIPAgentCfg().Listen)) switch sa.cfg.SIPAgentCfg().ListenNet { case utils.TCP: - sa.serveTCP() + return sa.serveTCP(sa.stopChan) case utils.UDP: - sa.serveUDP() + return sa.serveUDP(sa.stopChan) default: return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet) } - return nil //da.server.ListenAndServe() } -func (sa *SIPAgent) serveUDP() { - conn, err := listenUDPWithReuseablePort(utils.UDP, sa.cfg.SIPAgentCfg().Listen) - if err != nil { +func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) { + var conn net.PacketConn + if conn, err = net.ListenPacket(utils.UDP, sa.cfg.SIPAgentCfg().Listen); err != nil { utils.Logger.Err( fmt.Sprintf("<%s> error: %s unable to listen to: %s", utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen)) @@ -76,36 +84,63 @@ func (sa *SIPAgent) serveUDP() { defer conn.Close() buf := make([]byte, bufferSize) - + wg := sync.WaitGroup{} for { - n, saddr, err := conn.ReadFrom(buf) - if err != nil { - continue + select { + case <-stop: + wg.Wait() + return + default: + } + conn.SetDeadline(time.Now().Add(time.Second)) + var n int + var saddr net.Addr + if n, saddr, err = conn.ReadFrom(buf); err != nil { + if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { + continue + } + utils.Logger.Err( + fmt.Sprintf("<%s> error: %s unable to read from: %s", + utils.SIPAgent, err.Error(), saddr)) + return } // echo response if n < 50 { conn.WriteTo(buf[:n], saddr) continue } - - sipMessage := make(sipd.Message) // recreate map SIP - sipMessage.Parse(string(buf[:n])) - var sipAnswer sipd.Message - if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil { - continue - } - if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s sending message: %s", - utils.SIPAgent, err.Error(), sipAnswer)) - continue - } + wg.Add(1) + go func(message string, conn net.PacketConn) { + sipMessage := make(sipd.Message) + sipMessage.Parse(message) + var sipAnswer sipd.Message + var err error + if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil { + wg.Done() + return + } + if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s sending message: %s", + utils.SIPAgent, err.Error(), sipAnswer)) + wg.Done() + return + } + wg.Done() + }(string(buf[:n]), conn) } } -func (sa *SIPAgent) serveTCP() { - l, err := listenTCPWithReuseablePort(utils.TCP, sa.cfg.SIPAgentCfg().Listen) - if err != nil { +func (sa *SIPAgent) serveTCP(stop chan struct{}) (err error) { + var l *net.TCPListener + var addr *net.TCPAddr + if addr, err = net.ResolveTCPAddr("tcp", sa.cfg.SIPAgentCfg().Listen); err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> unable to rezolve TCP Address <%s> because: %s", + utils.SIPAgent, sa.cfg.SIPAgentCfg().Listen, err.Error())) + return + } + if l, err = net.ListenTCP(utils.TCP, addr); err != nil { utils.Logger.Err( fmt.Sprintf("<%s> error: %s unable to listen to: %s", utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen)) @@ -114,14 +149,35 @@ func (sa *SIPAgent) serveTCP() { defer l.Close() + wg := sync.WaitGroup{} for { - conn, err := l.Accept() - if err != nil { - continue + select { + case <-stop: + wg.Wait() + return + default: + } + l.SetDeadline(time.Now().Add(time.Second)) + var conn net.Conn + if conn, err = l.Accept(); err != nil { + if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { + continue + } + utils.Logger.Err( + fmt.Sprintf("<%s> unable to accept connection because of error %s", + utils.SIPAgent, err.Error())) + return } go func(conn net.Conn) { buf := make([]byte, bufferSize) for { + select { + case <-stop: + conn.Close() + return + default: + } + conn.SetReadDeadline(time.Now().Add(time.Second)) n, err := conn.Read(buf) if err != nil { continue @@ -162,7 +218,10 @@ func (sa *SIPAgent) handleMessage(sipMessage sipd.Message, remoteHost string) (s cgrRplyNM := utils.NavigableMap2{} rplyNM := utils.NewOrderedNavigableMap() opts := utils.NewOrderedNavigableMap() - reqVars := utils.NavigableMap2{utils.RemoteHost: utils.NewNMData(remoteHost)} + reqVars := utils.NavigableMap2{ + utils.RemoteHost: utils.NewNMData(remoteHost), + "Method": utils.NewNMData(sipMessage.MethodFrom("Request")), + } for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors { agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM, opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant, @@ -218,10 +277,10 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, opts := config.NMAsMapInterface(agReq.Opts, utils.NestingSep) var reqType string for _, typ := range []string{ - utils.MetaDryRun, /* utils.MetaAuthorize, - utils.MetaInitiate, utils.MetaUpdate, - utils.MetaTerminate, utils.MetaMessage, - utils.MetaCDRs, */utils.MetaEvent, utils.META_NONE} { + utils.MetaDryRun, utils.MetaAuthorize, /* + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaMessage, + utils.MetaCDRs, */utils.MetaEvent, utils.META_NONE} { if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags reqType = typ break @@ -247,29 +306,29 @@ func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, utils.Logger.Info( fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s", utils.SIPAgent, reqProcessor.ID, utils.ToJSON(cgrEv))) - // case utils.MetaAuthorize: - // authArgs := sessions.NewV1AuthorizeArgs( - // reqProcessor.Flags.HasKey(utils.MetaAttributes), - // reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), - // reqProcessor.Flags.HasKey(utils.MetaThresholds), - // reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), - // reqProcessor.Flags.HasKey(utils.MetaStats), - // reqProcessor.Flags.ParamsSlice(utils.MetaStats), - // reqProcessor.Flags.HasKey(utils.MetaResources), - // reqProcessor.Flags.HasKey(utils.MetaAccounts), - // reqProcessor.Flags.HasKey(utils.MetaRoutes), - // reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors), - // reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost), - // cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator, - // reqProcessor.Flags.HasKey(utils.MetaFD), - // opts, - // ) - // rply := new(sessions.V1AuthorizeReply) - // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1AuthorizeEvent, - // authArgs, rply) - // if err = agReq.setCGRReply(rply, err); err != nil { - // return - // } + case utils.MetaAuthorize: + authArgs := sessions.NewV1AuthorizeArgs( + reqProcessor.Flags.HasKey(utils.MetaAttributes), + reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), + reqProcessor.Flags.HasKey(utils.MetaThresholds), + reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), + reqProcessor.Flags.HasKey(utils.MetaStats), + reqProcessor.Flags.ParamsSlice(utils.MetaStats), + reqProcessor.Flags.HasKey(utils.MetaResources), + reqProcessor.Flags.HasKey(utils.MetaAccounts), + reqProcessor.Flags.HasKey(utils.MetaRoutes), + reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors), + reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost), + cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator, + reqProcessor.Flags.HasKey(utils.MetaFD), + opts, + ) + rply := new(sessions.V1AuthorizeReply) + err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1AuthorizeEvent, + authArgs, rply) + if err = agReq.setCGRReply(rply, err); err != nil { + return + } // case utils.MetaInitiate: // initArgs := sessions.NewV1InitSessionArgs( // reqProcessor.Flags.HasKey(utils.MetaAttributes), diff --git a/agents/sipagent_it_test.go b/agents/sipagent_it_test.go index bc4d5279c..fef4a6529 100644 --- a/agents/sipagent_it_test.go +++ b/agents/sipagent_it_test.go @@ -150,7 +150,7 @@ func testSAitSetAttributeProfile(t *testing.T) { Tenant: "cgrates.org", ID: "ChangeDestination", Contexts: []string{utils.ANY}, - FilterIDs: []string{"*prefix:~*req.Account:\"1001\""}, + FilterIDs: []string{"*string:~*req.Account:1001"}, Attributes: []*engine.ExternalAttribute{{ Path: utils.MetaReq + utils.NestingSep + "Destination", Value: "sip:1003@192.168.53.203:5060", @@ -184,7 +184,7 @@ func testSAitSIPRegister(t *testing.T) { t.Fatal(err) } - if expected := "SIP/2.0 200 OK"; recived["Request"] != expected { + if expected := "SIP/2.0 405 Method Not Allowed"; recived["Request"] != expected { t.Errorf("Expected %q, received: %q", expected, recived["Request"]) } } diff --git a/data/conf/samples/sipagent_internal/redirect.json b/data/conf/samples/sipagent_internal/redirect.json index 5cb2de3fd..f0cd69de2 100644 --- a/data/conf/samples/sipagent_internal/redirect.json +++ b/data/conf/samples/sipagent_internal/redirect.json @@ -9,8 +9,7 @@ "request_fields":[], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", - "value": "SIP/2.0 200 OK"}, - {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + "value": "SIP/2.0 405 Method Not Allowed"}, ] }, { @@ -19,9 +18,9 @@ "flags": ["*attributes","*event"], "request_fields":[ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", - "value": "~*req.From", "mandatory": true}, + "value": "~*req.From{*sipuri_user}", "mandatory": true}, {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", - "value": "~*req.To", "mandatory": true} + "value": "~*req.To{*sipuri_user}", "mandatory": true} ], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", diff --git a/data/conf/samples/sipagent_mongo/redirect.json b/data/conf/samples/sipagent_mongo/redirect.json index 5cb2de3fd..f0cd69de2 100644 --- a/data/conf/samples/sipagent_mongo/redirect.json +++ b/data/conf/samples/sipagent_mongo/redirect.json @@ -9,8 +9,7 @@ "request_fields":[], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", - "value": "SIP/2.0 200 OK"}, - {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + "value": "SIP/2.0 405 Method Not Allowed"}, ] }, { @@ -19,9 +18,9 @@ "flags": ["*attributes","*event"], "request_fields":[ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", - "value": "~*req.From", "mandatory": true}, + "value": "~*req.From{*sipuri_user}", "mandatory": true}, {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", - "value": "~*req.To", "mandatory": true} + "value": "~*req.To{*sipuri_user}", "mandatory": true} ], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", diff --git a/data/conf/samples/sipagent_mysql/redirect.json b/data/conf/samples/sipagent_mysql/redirect.json index 5cb2de3fd..78f9f62ff 100644 --- a/data/conf/samples/sipagent_mysql/redirect.json +++ b/data/conf/samples/sipagent_mysql/redirect.json @@ -4,24 +4,23 @@ "request_processors": [ { "id": "Register", - "filters": ["*prefix:~*req.Request:REGISTER"], + "filters": ["*notprefix:~*vars.Method:INVITE"], "flags": ["*none"], "request_fields":[], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", - "value": "SIP/2.0 200 OK"}, - {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + "value": "SIP/2.0 405 Method Not Allowed"}, ] }, { "id": "InviteRedirect", - "filters": ["*prefix:~*req.Request:INVITE"], + "filters": ["*prefix:~*vars.Method:INVITE"], "flags": ["*attributes","*event"], "request_fields":[ {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", - "value": "~*req.From", "mandatory": true}, + "value": "~*req.From{*sipuri_user}", "mandatory": true}, {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", - "value": "~*req.To", "mandatory": true} + "value": "~*req.To{*sipuri_user}", "mandatory": true} ], "reply_fields":[ {"tag": "Request", "path": "*rep.Request", "type": "*constant", diff --git a/data/conf/samples/sipagent_mysql/suppliers.json b/data/conf/samples/sipagent_mysql/suppliers.json new file mode 100644 index 000000000..4f20f0743 --- /dev/null +++ b/data/conf/samples/sipagent_mysql/suppliers.json @@ -0,0 +1,68 @@ +{ + "sip_agent": { + "request_processors": [ + { + "id": "NAPTRRoutesQuery", + "filters": ["*string:~*vars.Method:INVITE"], + "flags": ["*event", "*routes","*continue"], + "request_fields":[ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", + "value": "~*req.From{*sipuri_user}", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", + "value": "~*req.To{*sipuri_user}", "mandatory": true} + ], + "reply_fields":[ + ], + }, + { + "id": "NAPTRSuppliersOneSupplier", + "filters": ["*string:~*vars.Method:INVITE", + "*gte:~*cgrep.Routes.Count:1"], + "flags": ["*none","*continue"], // do not send request to CGRateS + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 302 Moved Temporarily"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"\""}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"~*cgreq.Destination"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"\" ;q=0.7; expires=3600"}, + ], + }, + { + "id": "NAPTRSuppliersTwoSuppliers", + "filters": ["*string:~*vars.Method:INVITE", + "*gte:~*cgrep.Routes.Count:2"], + "flags": ["*none","*continue"], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 302 Moved Temporarily"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"\""}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"~*cgreq.Destination"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"\" ;q=0.1"}, + ], + }, + ], + }, + +} diff --git a/go.mod b/go.mod index d120935df..c301a91ad 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453 - github.com/cgrates/sipd v1.0.0 + github.com/cgrates/sipd v1.0.1-0.20200513092040-728b130a73d6 github.com/creack/pty v1.1.7 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible diff --git a/go.sum b/go.sum index 641c33341..f1a8a8271 100644 --- a/go.sum +++ b/go.sum @@ -63,8 +63,9 @@ github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 h1:QvA6Nbwq9kTd7hsv github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1/go.mod h1:HZbsg3Y+xw4lsfCqX9rzj429wrg0XOug6pFT3B6VHZY= github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453 h1:kgIdi3qR/meiWILdmDRuDi1fFgd6A3lutGV6HLiTDyc= github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU= -github.com/cgrates/sipd v1.0.0 h1:9CPVaWCYBAWGVyzsEtv+VbG6kMk+sHml6kx3OvYQnkc= github.com/cgrates/sipd v1.0.0/go.mod h1:Itz4HoJHqckX9XAbGRanB1PvXfKrIO/UtMB8JwCFsT4= +github.com/cgrates/sipd v1.0.1-0.20200513092040-728b130a73d6 h1:yKEwGpD79Pbss7/03mwPh9MvbBXmttiUX+iyA67PRA8= +github.com/cgrates/sipd v1.0.1-0.20200513092040-728b130a73d6/go.mod h1:yAWcE5qt60PMrWME9Ijp6IWlWwL5d+/VflUYudO1Xm0= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= diff --git a/services/sipagent.go b/services/sipagent.go index 2c764fcb1..fa2fc6e76 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -84,24 +84,21 @@ func (sip *SIPAgent) GetIntenternalChan() (conn chan rpcclient.ClientConnector) // Reload handles the change of config func (sip *SIPAgent) Reload() (err error) { - // if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { - // return - // } - // if err = sip.Shutdown(); err != nil { - // return - // } - // sip.Lock() - // sip.oldListen = sip.cfg.SIPAgentCfg().Listen - // defer sip.Unlock() - // if err = sip.sip.Reload(); err != nil { - // return - // } - // go func() { - // if err := sip.sip.ListenAndServe(); err != nil { - // utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) - // sip.exitChan <- true // stop the engine here - // } - // }() + if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { + return + } + if err = sip.Shutdown(); err != nil { + return + } + sip.Lock() + sip.oldListen = sip.cfg.SIPAgentCfg().Listen + sip.Unlock() + go func() { + if err := sip.sip.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) + sip.exitChan <- true // stop the engine here + } + }() return } @@ -109,9 +106,7 @@ func (sip *SIPAgent) Reload() (err error) { func (sip *SIPAgent) Shutdown() (err error) { sip.Lock() defer sip.Unlock() - // if err = sip.sip.Shutdown(); err != nil { - // return - // } + sip.sip.Shutdown() sip.sip = nil return } diff --git a/utils/consts.go b/utils/consts.go index 7c04d1049..d2f4bd5ae 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -644,6 +644,9 @@ const ( MetaDuration = "*duration" MetaLibPhoneNumber = "*libphonenumber" MetaIP2Hex = "*ip2hex" + MetaSIPURIMethod = "*sipuri_method" + MetaSIPURIHost = "*sipuri_host" + MetaSIPURIUser = "*sipuri_user" MetaReload = "*reload" MetaLoad = "*load" MetaRemove = "*remove" diff --git a/utils/dataconverter.go b/utils/dataconverter.go index 4e31bce12..eb2c3952d 100644 --- a/utils/dataconverter.go +++ b/utils/dataconverter.go @@ -26,6 +26,7 @@ import ( "strings" "time" + "github.com/cgrates/sipd" "github.com/nyaruka/phonenumbers" ) @@ -75,6 +76,12 @@ func NewDataConverter(params string) (conv DataConverter, err error) { return NewDurationConverter("") case params == MetaIP2Hex: return new(IP2HexConverter), nil + case params == MetaSIPURIHost: + return new(SIPURIHostConverter), nil + case params == MetaSIPURIUser: + return new(SIPURIUserConverter), nil + case params == MetaSIPURIMethod: + return new(SIPURIMethodConverter), nil case strings.HasPrefix(params, MetaLibPhoneNumber): if len(params) == len(MetaLibPhoneNumber) { return NewPhoneNumberConverter("") @@ -300,3 +307,30 @@ func (*IP2HexConverter) Convert(in interface{}) (out interface{}, err error) { } return "0x" + string([]byte(hx)[len(hx)-8:]), nil } + +// SIPURIHostConverter will return the +type SIPURIHostConverter struct{} + +// Convert implements DataConverter interface +func (*SIPURIHostConverter) Convert(in interface{}) (out interface{}, err error) { + val := IfaceAsString(in) + return sipd.HostFrom(val), nil +} + +// SIPURIUserConverter will return the +type SIPURIUserConverter struct{} + +// Convert implements DataConverter interface +func (*SIPURIUserConverter) Convert(in interface{}) (out interface{}, err error) { + val := IfaceAsString(in) + return sipd.NameFrom(val), nil +} + +// SIPURIMethodConverter will return the +type SIPURIMethodConverter struct{} + +// Convert implements DataConverter interface +func (*SIPURIMethodConverter) Convert(in interface{}) (out interface{}, err error) { + val := IfaceAsString(in) + return sipd.MethodFrom(val), nil +}