From 6035e42e2a94220660620f61cb0ea127337db1d7 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Wed, 24 Jun 2020 18:16:39 +0300 Subject: [PATCH 1/4] Updated SIPAgent --- agents/libsip.go | 22 ++++++++++++ agents/sipagent.go | 85 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 103 insertions(+), 4 deletions(-) diff --git a/agents/libsip.go b/agents/libsip.go index 0b3d63ffd..bb83c74af 100644 --- a/agents/libsip.go +++ b/agents/libsip.go @@ -23,6 +23,7 @@ import ( "strings" "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" "github.com/cgrates/cgrates/utils" "github.com/cgrates/sipingo" ) @@ -47,3 +48,24 @@ func updateSIPMsgFromNavMap(m sipingo.Message, navMp *utils.OrderedNavigableMap) } return } + +func sipErr(m utils.DataProvider, sipMessage sipingo.Message, + reqVars utils.NavigableMap2, + tpl []*config.FCTemplate, tnt, tmz string, + filterS *engine.FilterS) (a sipingo.Message, err error) { + aReq := NewAgentRequest( + m, reqVars, + nil, nil, nil, nil, + tnt, tmz, filterS, nil, nil) + if err = aReq.SetFields(tpl); err != nil { + return + } + if err = updateSIPMsgFromNavMap(sipMessage, aReq.Reply); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s encoding out %s", + utils.SIPAgent, err.Error(), utils.ToJSON(aReq.Reply))) + return + } + sipMessage.PrepareReply() + return sipMessage, nil +} diff --git a/agents/sipagent.go b/agents/sipagent.go index 68507ecea..ff96c1b9b 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -32,7 +32,11 @@ import ( ) const ( - bufferSize = 5000 + bufferSize = 5000 + ackMethod = "ACK" + requestHeader = "Request" + callIDHeader = "Call-ID" + fromHeader = "From" ) // NewSIPAgent will construct a SIPAgent @@ -42,6 +46,7 @@ func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig, connMgr: connMgr, filterS: filterS, cfg: cfg, + ackMap: make(map[string]chan struct{}), } } @@ -51,6 +56,8 @@ type SIPAgent struct { filterS *engine.FilterS cfg *config.CGRConfig stopChan chan struct{} + ackMap map[string]chan struct{} + ackLocks sync.RWMutex } // Shutdown will stop the SIPAgent server @@ -199,6 +206,8 @@ func (sa *SIPAgent) serveTCP(stop chan struct{}) (err error) { continue } + + var sipMessage sipingo.Message // recreate map SIP if sipMessage, err = sipingo.NewMessage(string(buf[:n])); err != nil { utils.Logger.Warning( @@ -214,18 +223,82 @@ func (sa *SIPAgent) serveTCP(stop chan struct{}) (err error) { if sipAnswer, err = sa.handleMessage(sipMessage, conn.LocalAddr().String()); err != nil { continue } + if len(sipAnswer) == 0 { + continue + } if _, err = conn.Write([]byte(sipAnswer.String())); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s sending message: %s", utils.SIPAgent, err.Error(), sipAnswer)) continue } + // go func(stopChan chan struct{},){ //map[string]chan struct{} callID+frontTag + // <-config.Timer: + // write message + // } } }(conn) } } -func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message, err error) { +func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte) error) (err error) { + var sipMessage sipingo.Message // recreate map SIP + if sipMessage, err = sipingo.NewMessage(messageStr); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s parsing message: %s", + utils.SIPAgent, err.Error(), messageStr)) + return // do we need to return error in case we can't parse the message? + } + key := utils.ConcatenatedKey(sipMessage[fromHeader], sipMessage[callIDHeader]) + if ackMethod == sipMessage.MethodFrom(requestHeader) { + sa.ackLocks.Lock() + if stopChan, has := sa.ackMap[key]; has { + close(stopChan) + sa.ackLocks.Unlock() + return + } + sa.ackLocks.Unlock() // log the message if we did not find it in the map + } + var sipAnswer sipingo.Message + if sipAnswer, err = sa.handleMessage(sipMessage, addr); err != nil { + return + } + if len(sipAnswer) == 0 { + return // do not write the message if we do not have anything to reply + } + ans := []byte(sipAnswer.String()) + if err = write(ans); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s sending message: %s", + utils.SIPAgent, err.Error(), sipAnswer)) + return + } + stopChan := make(chan struct{}) + sa.ackLocks.Lock() + sa.ackMap[key] = stopChan + sa.ackLocks.Unlock() + go func(stopChan chan struct{}, a []byte) { + for { + select { + case <-time.After(time.Second): + if err = write(ans); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s sending message: %s", + utils.SIPAgent, err.Error(), sipAnswer)) + return + } + case <-stopChan: + sa.ackLocks.Lock() + delete(sa.ackMap, key) + sa.ackLocks.Unlock() + return + } + } + }(stopChan, ans) + return +} + +func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) { if sipMessage["User-Agent"] != "" { sipMessage["User-Agent"] = utils.CGRateS } @@ -242,6 +315,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) utils.RemoteHost: utils.NewNMData(remoteHost), "Method": utils.NewNMData(sipMessage.MethodFrom("Request")), } + var err error for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors { agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM, opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant, @@ -263,7 +337,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) break } } - if err != nil { + if err != nil { // write err message on conection 500 Server Error utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing message: %s from %s", utils.SIPAgent, err.Error(), sipMessage, remoteHost)) @@ -275,6 +349,9 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) utils.SIPAgent, sipMessage, remoteHost)) return } + if rplyNM.Len() == 0 { // if we do not populate the reply with any field we do not send any reply back + return + } if err = updateSIPMsgFromNavMap(sipMessage, rplyNM); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s encoding out %s", @@ -282,7 +359,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) return } sipMessage.PrepareReply() - return sipMessage, nil + return sipMessage } // processRequest represents one processor processing the request From 00d700b3673fc88192170d1d0afea396e253bca1 Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 25 Jun 2020 12:52:21 +0300 Subject: [PATCH 2/4] Updated SIPAgent to support ACK --- agents/libsip.go | 6 ++ agents/sipagent.go | 146 ++++++++++++++++++------------------- agents/sipagent_it_test.go | 30 ++++++++ config/config_defaults.go | 7 ++ config/libconfig_json.go | 2 + config/sipagentcfg.go | 18 +++++ services/sipagent.go | 7 +- 7 files changed, 141 insertions(+), 75 deletions(-) diff --git a/agents/libsip.go b/agents/libsip.go index bb83c74af..5eb3cf26a 100644 --- a/agents/libsip.go +++ b/agents/libsip.go @@ -69,3 +69,9 @@ func sipErr(m utils.DataProvider, sipMessage sipingo.Message, sipMessage.PrepareReply() return sipMessage, nil } + +func bareSipErr(m sipingo.Message, err string) sipingo.Message { + m[requestHeader] = err + m.PrepareReply() + return m +} diff --git a/agents/sipagent.go b/agents/sipagent.go index ff96c1b9b..dc404077a 100644 --- a/agents/sipagent.go +++ b/agents/sipagent.go @@ -32,22 +32,41 @@ import ( ) const ( - bufferSize = 5000 - ackMethod = "ACK" - requestHeader = "Request" - callIDHeader = "Call-ID" - fromHeader = "From" + bufferSize = 5000 + ackMethod = "ACK" + inviteMethod = "INVITE" + requestHeader = "Request" + callIDHeader = "Call-ID" + fromHeader = "From" + sipServerErr = "SIP/2.0 500 Internal Server Error" + userAgentHeader = "User-Agent" + method = "Method" ) // NewSIPAgent will construct a SIPAgent func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig, - filterS *engine.FilterS) *SIPAgent { - return &SIPAgent{ + filterS *engine.FilterS) (sa *SIPAgent, err error) { + sa = &SIPAgent{ connMgr: connMgr, filterS: filterS, cfg: cfg, ackMap: make(map[string]chan struct{}), } + msgTemplates := sa.cfg.SIPAgentCfg().Templates + // Inflate *template field types + for _, procsr := range sa.cfg.SIPAgentCfg().RequestProcessors { + if tpls, err := config.InflateTemplates(procsr.RequestFields, msgTemplates); err != nil { + return nil, err + } else if tpls != nil { + procsr.RequestFields = tpls + } + if tpls, err := config.InflateTemplates(procsr.ReplyFields, msgTemplates); err != nil { + return nil, err + } else if tpls != nil { + procsr.ReplyFields = tpls + } + } + return } // SIPAgent is a handler for SIP requests @@ -62,6 +81,11 @@ type SIPAgent struct { // Shutdown will stop the SIPAgent server func (sa *SIPAgent) Shutdown() { + sa.ackLocks.Lock() + for _, ch := range sa.ackMap { // close all ack + close(ch) + } + sa.ackLocks.Unlock() close(sa.stopChan) } @@ -79,6 +103,7 @@ func (sa *SIPAgent) ListenAndServe() (err error) { return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet) } } + func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) { var conn net.PacketConn if conn, err = net.ListenPacket(utils.UDP, sa.cfg.SIPAgentCfg().Listen); err != nil { @@ -117,33 +142,13 @@ func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) { continue } wg.Add(1) - go func(message string, conn net.PacketConn) { - var sipMessage sipingo.Message - if sipMessage, err = sipingo.NewMessage(message); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s parsing message: %s", - utils.SIPAgent, err.Error(), message)) - wg.Done() + go func(message string, saddr net.Addr, conn net.PacketConn) { + sa.answerMessage(message, saddr.String(), func(ans []byte) (werr error) { + _, werr = conn.WriteTo(ans, saddr) return - } - if "ACK" == sipMessage.MethodFrom("Request") { - return - } - var sipAnswer sipingo.Message - var err error - if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil { - wg.Done() - return - } - if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s sending message: %s", - utils.SIPAgent, err.Error(), sipAnswer)) - wg.Done() - return - } + }) // do not log the received error because is already logged in function so for now just ignore it wg.Done() - }(string(buf[:n]), conn) + }(string(buf[:n]), saddr, conn) } } @@ -206,36 +211,10 @@ func (sa *SIPAgent) serveTCP(stop chan struct{}) (err error) { continue } - - - var sipMessage sipingo.Message // recreate map SIP - if sipMessage, err = sipingo.NewMessage(string(buf[:n])); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s parsing message: %s", - utils.SIPAgent, err.Error(), string(buf[:n]))) - wg.Done() - continue - } - if "ACK" == sipMessage.MethodFrom("Request") { - continue - } - var sipAnswer sipingo.Message - if sipAnswer, err = sa.handleMessage(sipMessage, conn.LocalAddr().String()); err != nil { - continue - } - if len(sipAnswer) == 0 { - continue - } - if _, err = conn.Write([]byte(sipAnswer.String())); err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error: %s sending message: %s", - utils.SIPAgent, err.Error(), sipAnswer)) - continue - } - // go func(stopChan chan struct{},){ //map[string]chan struct{} callID+frontTag - // <-config.Timer: - // write message - // } + sa.answerMessage(string(buf[:n]), conn.LocalAddr().String(), func(ans []byte) (werr error) { + _, werr = conn.Write(ans) + return + }) // do not log the received error because is already logged in function so for now just ignore it } }(conn) } @@ -250,7 +229,11 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte return // do we need to return error in case we can't parse the message? } key := utils.ConcatenatedKey(sipMessage[fromHeader], sipMessage[callIDHeader]) - if ackMethod == sipMessage.MethodFrom(requestHeader) { + method := sipMessage.MethodFrom(requestHeader) + if ackMethod == method { + if sa.cfg.SIPAgentCfg().ACKInterval == 0 { // ignore ACK + return + } sa.ackLocks.Lock() if stopChan, has := sa.ackMap[key]; has { close(stopChan) @@ -260,10 +243,7 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte sa.ackLocks.Unlock() // log the message if we did not find it in the map } var sipAnswer sipingo.Message - if sipAnswer, err = sa.handleMessage(sipMessage, addr); err != nil { - return - } - if len(sipAnswer) == 0 { + if sipAnswer = sa.handleMessage(sipMessage, addr); len(sipAnswer) == 0 { return // do not write the message if we do not have anything to reply } ans := []byte(sipAnswer.String()) @@ -273,6 +253,11 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte utils.SIPAgent, err.Error(), sipAnswer)) return } + // because we expext to send codes from 300-699 we wait for the ACK every time + if method != inviteMethod || // only invitest need ACK + sa.cfg.SIPAgentCfg().ACKInterval == 0 { + return // disabled ACK + } stopChan := make(chan struct{}) sa.ackLocks.Lock() sa.ackMap[key] = stopChan @@ -280,7 +265,7 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte go func(stopChan chan struct{}, a []byte) { for { select { - case <-time.After(time.Second): + case <-time.After(sa.cfg.SIPAgentCfg().ACKInterval): if err = write(ans); err != nil { utils.Logger.Warning( fmt.Sprintf("<%s> error: %s sending message: %s", @@ -299,8 +284,8 @@ func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte } func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) { - if sipMessage["User-Agent"] != "" { - sipMessage["User-Agent"] = utils.CGRateS + if sipMessage[userAgentHeader] != "" { + sipMessage[userAgentHeader] = utils.CGRateS } sipMessageIface := make(map[string]interface{}) for k, v := range sipMessage { @@ -313,9 +298,22 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) opts := utils.NewOrderedNavigableMap() reqVars := utils.NavigableMap2{ utils.RemoteHost: utils.NewNMData(remoteHost), - "Method": utils.NewNMData(sipMessage.MethodFrom("Request")), + method: utils.NewNMData(sipMessage.MethodFrom(requestHeader)), } - var err error + // build the negative error answer + sErr, err := sipErr( + dp, sipMessage.Clone(), reqVars, + sa.cfg.SIPAgentCfg().Templates[utils.MetaErr], + sa.cfg.GeneralCfg().DefaultTenant, + sa.cfg.GeneralCfg().DefaultTimezone, + sa.filterS) + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s building errSIP for message: %s", + utils.SIPAgent, err.Error(), sipMessage)) + return bareSipErr(sipMessage, sipServerErr) + } + for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors { agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM, opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant, @@ -341,7 +339,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) utils.Logger.Warning( fmt.Sprintf("<%s> error: %s processing message: %s from %s", utils.SIPAgent, err.Error(), sipMessage, remoteHost)) - return + return sErr } if !processed { utils.Logger.Warning( @@ -356,7 +354,7 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) utils.Logger.Warning( fmt.Sprintf("<%s> error: %s encoding out %s", utils.SIPAgent, err.Error(), utils.ToJSON(rplyNM))) - return + return sErr } sipMessage.PrepareReply() return sipMessage diff --git a/agents/sipagent_it_test.go b/agents/sipagent_it_test.go index 2a85a7181..9f5c79c19 100644 --- a/agents/sipagent_it_test.go +++ b/agents/sipagent_it_test.go @@ -164,6 +164,7 @@ func testSAitSIPRegister(t *testing.T) { func testSAitSIPInvite(t *testing.T) { inviteMessage := "INVITE sip:1002@192.168.58.203 SIP/2.0\r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 2 INVITE\r\nFrom: \"1001\" ;tag=99f35805\r\nTo: \r\nMax-Forwards: 70\r\nContact: \"1001\" \r\nUser-Agent: Jitsi2.11.20200408Linux\r\nContent-Type: application/sdp\r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-393139-939e89686023b86822cb942ede452b62\r\nProxy-Authorization: Digest username=\"1001\",realm=\"192.168.58.203\",nonce=\"XruO2167ja8uRODnSv8aXqv+/hqPJiXh\",uri=\"sip:1002@192.168.58.203\",response=\"5b814c709d1541d72ea778599c2e48a4\"\r\nContent-Length: 897\r\n\r\nv=0\r\no=1001-jitsi.org 0 0 IN IP4 192.168.58.201\r\ns=-\r\nc=IN IP4 192.168.58.201\r\nt=0 0\r\nm=audio 5000 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=ptime:20\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=extmap:2 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=rtcp-xr:voip-metrics\r\nm=video 5002 RTP/AVP 105 99\r\na=recvonly\r\na=rtpmap:105 h264/90000\r\na=fmtp:105 profile-level-id=42E01f;packetization-mode=1\r\na=imageattr:105 send * recv [x=[1:1920],y=[1:1080]]\r\na=rtpmap:\r\n" + ack := "ACK sip:1001@192.168.56.203:6060 SIP/2.0\r\nVia: SIP/2.0/UDP 192.168.56.203;rport;branch=z9hG4bKQeB89BamX86UD\r\nMax-Forwards: 69\r\nFrom: \"1001\" ;tag=99f35805\r\nTo: \r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 21984733 ACK\r\nContent-Length: 0\r\n" if saConn == nil { t.Fatal("connection not initialized") } @@ -186,4 +187,33 @@ func testSAitSIPInvite(t *testing.T) { if expected := "\"1002\" ;q=0.7; expires=3600;cgr_cost=0.3;cgr_maxusage=60000000000,\"1002\" ;q=0.2;cgr_cost=0.6;cgr_maxusage=60000000000,\"1002\" ;q=0.1;cgr_cost=0.01;cgr_maxusage=60000000000"; recived["Contact"] != expected { t.Errorf("Expected %q, received: %q", expected, recived["Contact"]) } + + time.Sleep(time.Second) + buffer = make([]byte, bufferSize) + if _, err = saConn.Read(buffer); err != nil { + t.Fatal(err) + } + if recived, err = sipingo.NewMessage(string(buffer)); err != nil { + t.Fatal(err) + } + + if expected := "SIP/2.0 302 Moved Temporarily"; recived["Request"] != expected { + t.Errorf("Expected %q, received: %q", expected, recived["Request"]) + } + if expected := "\"1002\" ;q=0.7; expires=3600;cgr_cost=0.3;cgr_maxusage=60000000000,\"1002\" ;q=0.2;cgr_cost=0.6;cgr_maxusage=60000000000,\"1002\" ;q=0.1;cgr_cost=0.01;cgr_maxusage=60000000000"; recived["Contact"] != expected { + t.Errorf("Expected %q, received: %q", expected, recived["Contact"]) + } + + if _, err = saConn.Write([]byte(ack)); err != nil { + t.Fatal(err) + } + buffer = make([]byte, bufferSize) + saConn.SetDeadline(time.Now().Add(time.Second)) + if _, err = saConn.Read(buffer); err == nil { + t.Error("Expected error received nil") + } else if nerr, ok := err.(net.Error); !ok { + t.Errorf("Expected net.Error received:%v", err) + } else if !nerr.Timeout() { + t.Errorf("Expected a timeout error received:%v", err) + } } diff --git a/config/config_defaults.go b/config/config_defaults.go index 48474fd2f..7819643a8 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -941,6 +941,13 @@ const CGRATES_CFG_JSON = ` "listen_net": "udp", // network to listen on "sessions_conns": ["*internal"], "timezone": "", // timezone of the events if not specified + "ack_interval": "1s", // the duration to wait to receive an ACK before resending the reply + "templates":{ // default message templates + "*err": [ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 500 Internal Server Error", "mandatory": true}, + ], + }, "request_processors": [ // request processors to be applied to SIP messages ], }, diff --git a/config/libconfig_json.go b/config/libconfig_json.go index cf4b815ea..92906c256 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -627,5 +627,7 @@ type SIPAgentJsonCfg struct { Listen_net *string Sessions_conns *[]string Timezone *string + Ack_interval *string + Templates map[string][]*FcTemplateJsonCfg Request_processors *[]*ReqProcessorJsnCfg } diff --git a/config/sipagentcfg.go b/config/sipagentcfg.go index f2bf50ce4..e7a2e1538 100644 --- a/config/sipagentcfg.go +++ b/config/sipagentcfg.go @@ -20,6 +20,7 @@ package config import ( "strings" + "time" "github.com/cgrates/cgrates/utils" ) @@ -30,6 +31,8 @@ type SIPAgentCfg struct { ListenNet string // udp or tcp SessionSConns []string Timezone string + ACKInterval time.Duration // timeout replies if not reaching back + Templates map[string][]*FCTemplate RequestProcessors []*RequestProcessor } @@ -60,6 +63,21 @@ func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err } } } + if jsnCfg.Ack_interval != nil { + if da.ACKInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Ack_interval); err != nil { + return err + } + } + if jsnCfg.Templates != nil { + if da.Templates == nil { + da.Templates = make(map[string][]*FCTemplate) + } + for k, jsnTpls := range jsnCfg.Templates { + if da.Templates[k], err = FCTemplatesFromFCTemplatesJsonCfg(jsnTpls, sep); err != nil { + return + } + } + } if jsnCfg.Request_processors != nil { for _, reqProcJsn := range *jsnCfg.Request_processors { rp := new(RequestProcessor) diff --git a/services/sipagent.go b/services/sipagent.go index ba21d0708..328fc7605 100644 --- a/services/sipagent.go +++ b/services/sipagent.go @@ -65,7 +65,12 @@ func (sip *SIPAgent) Start() (err error) { sip.Lock() defer sip.Unlock() sip.oldListen = sip.cfg.SIPAgentCfg().Listen - sip.sip = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS) + sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS) + if err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: %s!", + utils.SIPAgent, err)) + return + } go func() { if err = sip.sip.ListenAndServe(); err != nil { utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) From 62d3cdf6e1c583f648b960020d8c5e8008beafdd Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 25 Jun 2020 12:52:43 +0300 Subject: [PATCH 3/4] Updated rates service integration tests --- services/rates_it_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/rates_it_test.go b/services/rates_it_test.go index c16d22ffd..f14dfcd1e 100644 --- a/services/rates_it_test.go +++ b/services/rates_it_test.go @@ -46,6 +46,9 @@ func TestRateSReload(t *testing.T) { srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) db := NewDataDBService(cfg, nil) chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheRateProfiles)) + close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes)) + close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes)) rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1)) srvMngr.AddServices(rS, NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db) From 34a3a53de4b025bb7ef16fcc60f0ddc0d656bd6f Mon Sep 17 00:00:00 2001 From: Trial97 Date: Thu, 25 Jun 2020 13:19:20 +0300 Subject: [PATCH 4/4] Added integration test for ApierS service without RALs enabled --- config/configsanity.go | 2 +- services/apiers_it_test.go | 112 +++++++++++++++++++++++++++++++++++++ services/apierv1.go | 2 +- services/apierv2.go | 2 +- services/stordb.go | 2 +- 5 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 services/apiers_it_test.go diff --git a/config/configsanity.go b/config/configsanity.go index 32852dcf9..10cbf15e3 100644 --- a/config/configsanity.go +++ b/config/configsanity.go @@ -678,7 +678,7 @@ func (cfg *CGRConfig) checkConfigSanity() error { } } for _, connID := range cfg.filterSCfg.ApierSConns { - if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.ralsCfg.Enabled { + if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.apier.Enabled { return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ApierS, utils.FilterS) } if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) { diff --git a/services/apiers_it_test.go b/services/apiers_it_test.go new file mode 100644 index 000000000..590a481b3 --- /dev/null +++ b/services/apiers_it_test.go @@ -0,0 +1,112 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ +package services + +import ( + "path" + "testing" + "time" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +func TestApiersReload(t *testing.T) { + cfg, err := config.NewDefaultCGRConfig() + if err != nil { + t.Fatal(err) + } + utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID) + utils.Logger.SetLogLevel(7) + filterSChan := make(chan *engine.FilterS, 1) + filterSChan <- nil + engineShutdown := make(chan bool, 1) + chS := engine.NewCacheS(cfg, nil) + close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles)) + close(chS.GetPrecacheChannel(utils.CacheThresholds)) + close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes)) + + cfg.ThresholdSCfg().Enabled = true + cfg.SchedulerCfg().Enabled = true + server := utils.NewServer() + srvMngr := servmanager.NewServiceManager(cfg, engineShutdown) + db := NewDataDBService(cfg, nil) + cfg.StorDbCfg().Type = utils.INTERNAL + stordb := NewStorDBService(cfg) + schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil) + tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1)) + + apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService), + make(chan rpcclient.ClientConnector, 1), nil) + + apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1)) + srvMngr.AddServices(apiSv1, apiSv2, schS, tS, + NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb) + if err = srvMngr.StartServices(); err != nil { + t.Error(err) + } + if apiSv1.IsRunning() { + t.Errorf("Expected service to be down") + } + if apiSv2.IsRunning() { + t.Errorf("Expected service to be down") + } + if db.IsRunning() { + t.Errorf("Expected service to be down") + } + if stordb.IsRunning() { + t.Errorf("Expected service to be down") + } + var reply string + if err := cfg.V1ReloadConfigFromPath(&config.ConfigReloadWithArgDispatcher{ + Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"), + Section: config.ApierS, + }, &reply); err != nil { + t.Error(err) + } else if reply != utils.OK { + t.Errorf("Expecting OK ,received %s", reply) + } + time.Sleep(10 * time.Millisecond) //need to switch to gorutine + if !apiSv1.IsRunning() { + t.Errorf("Expected service to be running") + } + if !apiSv2.IsRunning() { + t.Errorf("Expected service to be running") + } + if !db.IsRunning() { + t.Errorf("Expected service to be running") + } + if !stordb.IsRunning() { + t.Errorf("Expected service to be running") + } + cfg.ApierCfg().Enabled = false + cfg.GetReloadChan(config.ApierS) <- struct{}{} + time.Sleep(10 * time.Millisecond) + if apiSv1.IsRunning() { + t.Errorf("Expected service to be down") + } + if apiSv2.IsRunning() { + t.Errorf("Expected service to be down") + } + engineShutdown <- true +} diff --git a/services/apierv1.go b/services/apierv1.go index 197612830..91a97da8d 100644 --- a/services/apierv1.go +++ b/services/apierv1.go @@ -167,7 +167,7 @@ func (apiService *APIerSv1Service) GetAPIerSv1() *v1.APIerSv1 { // ShouldRun returns if the service should be running func (apiService *APIerSv1Service) ShouldRun() bool { - return apiService.cfg.RalsCfg().Enabled + return apiService.cfg.ApierCfg().Enabled } // GetDMChan returns the DataManager chanel diff --git a/services/apierv2.go b/services/apierv2.go index d0faf254d..6faded50e 100644 --- a/services/apierv2.go +++ b/services/apierv2.go @@ -109,5 +109,5 @@ func (api *APIerSv2Service) ServiceName() string { // ShouldRun returns if the service should be running func (api *APIerSv2Service) ShouldRun() bool { - return api.cfg.RalsCfg().Enabled + return api.cfg.ApierCfg().Enabled } diff --git a/services/stordb.go b/services/stordb.go index a08f6f2fb..6d1aeab59 100644 --- a/services/stordb.go +++ b/services/stordb.go @@ -160,7 +160,7 @@ func (db *StorDBService) ServiceName() string { // ShouldRun returns if the service should be running func (db *StorDBService) ShouldRun() bool { - return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled + return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled || db.cfg.ApierCfg().Enabled } // RegisterSyncChan used by dependent subsystems to register a chanel to reload only the storDB(thread safe)