mirror of
https://github.com/cgrates/cgrates.git
synced 2026-02-12 18:46:24 +05:00
Merge pull request #2242 from Trial97/master
Updated SIPAgent to support ACK
This commit is contained in:
@@ -23,6 +23,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/sipingo"
|
||||
)
|
||||
@@ -47,3 +48,30 @@ func updateSIPMsgFromNavMap(m sipingo.Message, navMp *utils.OrderedNavigableMap)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func sipErr(m utils.DataProvider, sipMessage sipingo.Message,
|
||||
reqVars utils.NavigableMap2,
|
||||
tpl []*config.FCTemplate, tnt, tmz string,
|
||||
filterS *engine.FilterS) (a sipingo.Message, err error) {
|
||||
aReq := NewAgentRequest(
|
||||
m, reqVars,
|
||||
nil, nil, nil, nil,
|
||||
tnt, tmz, filterS, nil, nil)
|
||||
if err = aReq.SetFields(tpl); err != nil {
|
||||
return
|
||||
}
|
||||
if err = updateSIPMsgFromNavMap(sipMessage, aReq.Reply); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s encoding out %s",
|
||||
utils.SIPAgent, err.Error(), utils.ToJSON(aReq.Reply)))
|
||||
return
|
||||
}
|
||||
sipMessage.PrepareReply()
|
||||
return sipMessage, nil
|
||||
}
|
||||
|
||||
func bareSipErr(m sipingo.Message, err string) sipingo.Message {
|
||||
m[requestHeader] = err
|
||||
m.PrepareReply()
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -32,17 +32,41 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
bufferSize = 5000
|
||||
bufferSize = 5000
|
||||
ackMethod = "ACK"
|
||||
inviteMethod = "INVITE"
|
||||
requestHeader = "Request"
|
||||
callIDHeader = "Call-ID"
|
||||
fromHeader = "From"
|
||||
sipServerErr = "SIP/2.0 500 Internal Server Error"
|
||||
userAgentHeader = "User-Agent"
|
||||
method = "Method"
|
||||
)
|
||||
|
||||
// NewSIPAgent will construct a SIPAgent
|
||||
func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig,
|
||||
filterS *engine.FilterS) *SIPAgent {
|
||||
return &SIPAgent{
|
||||
filterS *engine.FilterS) (sa *SIPAgent, err error) {
|
||||
sa = &SIPAgent{
|
||||
connMgr: connMgr,
|
||||
filterS: filterS,
|
||||
cfg: cfg,
|
||||
ackMap: make(map[string]chan struct{}),
|
||||
}
|
||||
msgTemplates := sa.cfg.SIPAgentCfg().Templates
|
||||
// Inflate *template field types
|
||||
for _, procsr := range sa.cfg.SIPAgentCfg().RequestProcessors {
|
||||
if tpls, err := config.InflateTemplates(procsr.RequestFields, msgTemplates); err != nil {
|
||||
return nil, err
|
||||
} else if tpls != nil {
|
||||
procsr.RequestFields = tpls
|
||||
}
|
||||
if tpls, err := config.InflateTemplates(procsr.ReplyFields, msgTemplates); err != nil {
|
||||
return nil, err
|
||||
} else if tpls != nil {
|
||||
procsr.ReplyFields = tpls
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// SIPAgent is a handler for SIP requests
|
||||
@@ -51,10 +75,17 @@ type SIPAgent struct {
|
||||
filterS *engine.FilterS
|
||||
cfg *config.CGRConfig
|
||||
stopChan chan struct{}
|
||||
ackMap map[string]chan struct{}
|
||||
ackLocks sync.RWMutex
|
||||
}
|
||||
|
||||
// Shutdown will stop the SIPAgent server
|
||||
func (sa *SIPAgent) Shutdown() {
|
||||
sa.ackLocks.Lock()
|
||||
for _, ch := range sa.ackMap { // close all ack
|
||||
close(ch)
|
||||
}
|
||||
sa.ackLocks.Unlock()
|
||||
close(sa.stopChan)
|
||||
}
|
||||
|
||||
@@ -72,6 +103,7 @@ func (sa *SIPAgent) ListenAndServe() (err error) {
|
||||
return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet)
|
||||
}
|
||||
}
|
||||
|
||||
func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) {
|
||||
var conn net.PacketConn
|
||||
if conn, err = net.ListenPacket(utils.UDP, sa.cfg.SIPAgentCfg().Listen); err != nil {
|
||||
@@ -110,33 +142,13 @@ func (sa *SIPAgent) serveUDP(stop chan struct{}) (err error) {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(message string, conn net.PacketConn) {
|
||||
var sipMessage sipingo.Message
|
||||
if sipMessage, err = sipingo.NewMessage(message); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s parsing message: %s",
|
||||
utils.SIPAgent, err.Error(), message))
|
||||
wg.Done()
|
||||
go func(message string, saddr net.Addr, conn net.PacketConn) {
|
||||
sa.answerMessage(message, saddr.String(), func(ans []byte) (werr error) {
|
||||
_, werr = conn.WriteTo(ans, saddr)
|
||||
return
|
||||
}
|
||||
if "ACK" == sipMessage.MethodFrom("Request") {
|
||||
return
|
||||
}
|
||||
var sipAnswer sipingo.Message
|
||||
var err error
|
||||
if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s sending message: %s",
|
||||
utils.SIPAgent, err.Error(), sipAnswer))
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}) // do not log the received error because is already logged in function so for now just ignore it
|
||||
wg.Done()
|
||||
}(string(buf[:n]), conn)
|
||||
}(string(buf[:n]), saddr, conn)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,35 +211,81 @@ func (sa *SIPAgent) serveTCP(stop chan struct{}) (err error) {
|
||||
continue
|
||||
}
|
||||
|
||||
var sipMessage sipingo.Message // recreate map SIP
|
||||
if sipMessage, err = sipingo.NewMessage(string(buf[:n])); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s parsing message: %s",
|
||||
utils.SIPAgent, err.Error(), string(buf[:n])))
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
if "ACK" == sipMessage.MethodFrom("Request") {
|
||||
continue
|
||||
}
|
||||
var sipAnswer sipingo.Message
|
||||
if sipAnswer, err = sa.handleMessage(sipMessage, conn.LocalAddr().String()); err != nil {
|
||||
continue
|
||||
}
|
||||
if _, err = conn.Write([]byte(sipAnswer.String())); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s sending message: %s",
|
||||
utils.SIPAgent, err.Error(), sipAnswer))
|
||||
continue
|
||||
}
|
||||
sa.answerMessage(string(buf[:n]), conn.LocalAddr().String(), func(ans []byte) (werr error) {
|
||||
_, werr = conn.Write(ans)
|
||||
return
|
||||
}) // do not log the received error because is already logged in function so for now just ignore it
|
||||
}
|
||||
}(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message, err error) {
|
||||
if sipMessage["User-Agent"] != "" {
|
||||
sipMessage["User-Agent"] = utils.CGRateS
|
||||
func (sa *SIPAgent) answerMessage(messageStr, addr string, write func(ans []byte) error) (err error) {
|
||||
var sipMessage sipingo.Message // recreate map SIP
|
||||
if sipMessage, err = sipingo.NewMessage(messageStr); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s parsing message: %s",
|
||||
utils.SIPAgent, err.Error(), messageStr))
|
||||
return // do we need to return error in case we can't parse the message?
|
||||
}
|
||||
key := utils.ConcatenatedKey(sipMessage[fromHeader], sipMessage[callIDHeader])
|
||||
method := sipMessage.MethodFrom(requestHeader)
|
||||
if ackMethod == method {
|
||||
if sa.cfg.SIPAgentCfg().ACKInterval == 0 { // ignore ACK
|
||||
return
|
||||
}
|
||||
sa.ackLocks.Lock()
|
||||
if stopChan, has := sa.ackMap[key]; has {
|
||||
close(stopChan)
|
||||
sa.ackLocks.Unlock()
|
||||
return
|
||||
}
|
||||
sa.ackLocks.Unlock() // log the message if we did not find it in the map
|
||||
}
|
||||
var sipAnswer sipingo.Message
|
||||
if sipAnswer = sa.handleMessage(sipMessage, addr); len(sipAnswer) == 0 {
|
||||
return // do not write the message if we do not have anything to reply
|
||||
}
|
||||
ans := []byte(sipAnswer.String())
|
||||
if err = write(ans); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s sending message: %s",
|
||||
utils.SIPAgent, err.Error(), sipAnswer))
|
||||
return
|
||||
}
|
||||
// because we expext to send codes from 300-699 we wait for the ACK every time
|
||||
if method != inviteMethod || // only invitest need ACK
|
||||
sa.cfg.SIPAgentCfg().ACKInterval == 0 {
|
||||
return // disabled ACK
|
||||
}
|
||||
stopChan := make(chan struct{})
|
||||
sa.ackLocks.Lock()
|
||||
sa.ackMap[key] = stopChan
|
||||
sa.ackLocks.Unlock()
|
||||
go func(stopChan chan struct{}, a []byte) {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(sa.cfg.SIPAgentCfg().ACKInterval):
|
||||
if err = write(ans); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s sending message: %s",
|
||||
utils.SIPAgent, err.Error(), sipAnswer))
|
||||
return
|
||||
}
|
||||
case <-stopChan:
|
||||
sa.ackLocks.Lock()
|
||||
delete(sa.ackMap, key)
|
||||
sa.ackLocks.Unlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}(stopChan, ans)
|
||||
return
|
||||
}
|
||||
|
||||
func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string) (sipAnswer sipingo.Message) {
|
||||
if sipMessage[userAgentHeader] != "" {
|
||||
sipMessage[userAgentHeader] = utils.CGRateS
|
||||
}
|
||||
sipMessageIface := make(map[string]interface{})
|
||||
for k, v := range sipMessage {
|
||||
@@ -240,8 +298,22 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string)
|
||||
opts := utils.NewOrderedNavigableMap()
|
||||
reqVars := utils.NavigableMap2{
|
||||
utils.RemoteHost: utils.NewNMData(remoteHost),
|
||||
"Method": utils.NewNMData(sipMessage.MethodFrom("Request")),
|
||||
method: utils.NewNMData(sipMessage.MethodFrom(requestHeader)),
|
||||
}
|
||||
// build the negative error answer
|
||||
sErr, err := sipErr(
|
||||
dp, sipMessage.Clone(), reqVars,
|
||||
sa.cfg.SIPAgentCfg().Templates[utils.MetaErr],
|
||||
sa.cfg.GeneralCfg().DefaultTenant,
|
||||
sa.cfg.GeneralCfg().DefaultTimezone,
|
||||
sa.filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s building errSIP for message: %s",
|
||||
utils.SIPAgent, err.Error(), sipMessage))
|
||||
return bareSipErr(sipMessage, sipServerErr)
|
||||
}
|
||||
|
||||
for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors {
|
||||
agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM,
|
||||
opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant,
|
||||
@@ -263,11 +335,11 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string)
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err != nil { // write err message on conection 500 Server Error
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s processing message: %s from %s",
|
||||
utils.SIPAgent, err.Error(), sipMessage, remoteHost))
|
||||
return
|
||||
return sErr
|
||||
}
|
||||
if !processed {
|
||||
utils.Logger.Warning(
|
||||
@@ -275,14 +347,17 @@ func (sa *SIPAgent) handleMessage(sipMessage sipingo.Message, remoteHost string)
|
||||
utils.SIPAgent, sipMessage, remoteHost))
|
||||
return
|
||||
}
|
||||
if rplyNM.Len() == 0 { // if we do not populate the reply with any field we do not send any reply back
|
||||
return
|
||||
}
|
||||
if err = updateSIPMsgFromNavMap(sipMessage, rplyNM); err != nil {
|
||||
utils.Logger.Warning(
|
||||
fmt.Sprintf("<%s> error: %s encoding out %s",
|
||||
utils.SIPAgent, err.Error(), utils.ToJSON(rplyNM)))
|
||||
return
|
||||
return sErr
|
||||
}
|
||||
sipMessage.PrepareReply()
|
||||
return sipMessage, nil
|
||||
return sipMessage
|
||||
}
|
||||
|
||||
// processRequest represents one processor processing the request
|
||||
|
||||
@@ -164,6 +164,7 @@ func testSAitSIPRegister(t *testing.T) {
|
||||
|
||||
func testSAitSIPInvite(t *testing.T) {
|
||||
inviteMessage := "INVITE sip:1002@192.168.58.203 SIP/2.0\r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 2 INVITE\r\nFrom: \"1001\" <sip:1001@192.168.58.203>;tag=99f35805\r\nTo: <sip:1002@192.168.58.203>\r\nMax-Forwards: 70\r\nContact: \"1001\" <sip:1001@192.168.58.201:5060;transport=udp;registering_acc=192_168_58_203>\r\nUser-Agent: Jitsi2.11.20200408Linux\r\nContent-Type: application/sdp\r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-393139-939e89686023b86822cb942ede452b62\r\nProxy-Authorization: Digest username=\"1001\",realm=\"192.168.58.203\",nonce=\"XruO2167ja8uRODnSv8aXqv+/hqPJiXh\",uri=\"sip:1002@192.168.58.203\",response=\"5b814c709d1541d72ea778599c2e48a4\"\r\nContent-Length: 897\r\n\r\nv=0\r\no=1001-jitsi.org 0 0 IN IP4 192.168.58.201\r\ns=-\r\nc=IN IP4 192.168.58.201\r\nt=0 0\r\nm=audio 5000 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=ptime:20\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=extmap:2 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=rtcp-xr:voip-metrics\r\nm=video 5002 RTP/AVP 105 99\r\na=recvonly\r\na=rtpmap:105 h264/90000\r\na=fmtp:105 profile-level-id=42E01f;packetization-mode=1\r\na=imageattr:105 send * recv [x=[1:1920],y=[1:1080]]\r\na=rtpmap:\r\n"
|
||||
ack := "ACK sip:1001@192.168.56.203:6060 SIP/2.0\r\nVia: SIP/2.0/UDP 192.168.56.203;rport;branch=z9hG4bKQeB89BamX86UD\r\nMax-Forwards: 69\r\nFrom: \"1001\" <sip:1001@192.168.58.203>;tag=99f35805\r\nTo: <sip:1001@192.168.56.203:6060>\r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 21984733 ACK\r\nContent-Length: 0\r\n"
|
||||
if saConn == nil {
|
||||
t.Fatal("connection not initialized")
|
||||
}
|
||||
@@ -186,4 +187,33 @@ func testSAitSIPInvite(t *testing.T) {
|
||||
if expected := "\"1002\" <sip:1002@cgrates.org>;q=0.7; expires=3600;cgr_cost=0.3;cgr_maxusage=60000000000,\"1002\" <sip:1002@cgrates.net>;q=0.2;cgr_cost=0.6;cgr_maxusage=60000000000,\"1002\" <sip:1002@cgrates.com>;q=0.1;cgr_cost=0.01;cgr_maxusage=60000000000"; recived["Contact"] != expected {
|
||||
t.Errorf("Expected %q, received: %q", expected, recived["Contact"])
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
buffer = make([]byte, bufferSize)
|
||||
if _, err = saConn.Read(buffer); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if recived, err = sipingo.NewMessage(string(buffer)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if expected := "SIP/2.0 302 Moved Temporarily"; recived["Request"] != expected {
|
||||
t.Errorf("Expected %q, received: %q", expected, recived["Request"])
|
||||
}
|
||||
if expected := "\"1002\" <sip:1002@cgrates.org>;q=0.7; expires=3600;cgr_cost=0.3;cgr_maxusage=60000000000,\"1002\" <sip:1002@cgrates.net>;q=0.2;cgr_cost=0.6;cgr_maxusage=60000000000,\"1002\" <sip:1002@cgrates.com>;q=0.1;cgr_cost=0.01;cgr_maxusage=60000000000"; recived["Contact"] != expected {
|
||||
t.Errorf("Expected %q, received: %q", expected, recived["Contact"])
|
||||
}
|
||||
|
||||
if _, err = saConn.Write([]byte(ack)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
buffer = make([]byte, bufferSize)
|
||||
saConn.SetDeadline(time.Now().Add(time.Second))
|
||||
if _, err = saConn.Read(buffer); err == nil {
|
||||
t.Error("Expected error received nil")
|
||||
} else if nerr, ok := err.(net.Error); !ok {
|
||||
t.Errorf("Expected net.Error received:%v", err)
|
||||
} else if !nerr.Timeout() {
|
||||
t.Errorf("Expected a timeout error received:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -941,6 +941,13 @@ const CGRATES_CFG_JSON = `
|
||||
"listen_net": "udp", // network to listen on <udp|tcp|tcp-tls>
|
||||
"sessions_conns": ["*internal"],
|
||||
"timezone": "", // timezone of the events if not specified <UTC|Local|$IANA_TZ_DB>
|
||||
"ack_interval": "1s", // the duration to wait to receive an ACK before resending the reply
|
||||
"templates":{ // default message templates
|
||||
"*err": [
|
||||
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
|
||||
"value": "SIP/2.0 500 Internal Server Error", "mandatory": true},
|
||||
],
|
||||
},
|
||||
"request_processors": [ // request processors to be applied to SIP messages
|
||||
],
|
||||
},
|
||||
|
||||
@@ -678,7 +678,7 @@ func (cfg *CGRConfig) checkConfigSanity() error {
|
||||
}
|
||||
}
|
||||
for _, connID := range cfg.filterSCfg.ApierSConns {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.ralsCfg.Enabled {
|
||||
if strings.HasPrefix(connID, utils.MetaInternal) && !cfg.apier.Enabled {
|
||||
return fmt.Errorf("<%s> not enabled but requested by <%s> component.", utils.ApierS, utils.FilterS)
|
||||
}
|
||||
if _, has := cfg.rpcConns[connID]; !has && !strings.HasPrefix(connID, utils.MetaInternal) {
|
||||
|
||||
@@ -627,5 +627,7 @@ type SIPAgentJsonCfg struct {
|
||||
Listen_net *string
|
||||
Sessions_conns *[]string
|
||||
Timezone *string
|
||||
Ack_interval *string
|
||||
Templates map[string][]*FcTemplateJsonCfg
|
||||
Request_processors *[]*ReqProcessorJsnCfg
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ package config
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
)
|
||||
@@ -30,6 +31,8 @@ type SIPAgentCfg struct {
|
||||
ListenNet string // udp or tcp
|
||||
SessionSConns []string
|
||||
Timezone string
|
||||
ACKInterval time.Duration // timeout replies if not reaching back
|
||||
Templates map[string][]*FCTemplate
|
||||
RequestProcessors []*RequestProcessor
|
||||
}
|
||||
|
||||
@@ -60,6 +63,21 @@ func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.Ack_interval != nil {
|
||||
if da.ACKInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Ack_interval); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if jsnCfg.Templates != nil {
|
||||
if da.Templates == nil {
|
||||
da.Templates = make(map[string][]*FCTemplate)
|
||||
}
|
||||
for k, jsnTpls := range jsnCfg.Templates {
|
||||
if da.Templates[k], err = FCTemplatesFromFCTemplatesJsonCfg(jsnTpls, sep); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if jsnCfg.Request_processors != nil {
|
||||
for _, reqProcJsn := range *jsnCfg.Request_processors {
|
||||
rp := new(RequestProcessor)
|
||||
|
||||
112
services/apiers_it_test.go
Normal file
112
services/apiers_it_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
// +build integration
|
||||
|
||||
/*
|
||||
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
|
||||
Copyright (C) ITsysCOM GmbH
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
package services
|
||||
|
||||
import (
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cgrates/cgrates/config"
|
||||
"github.com/cgrates/cgrates/engine"
|
||||
"github.com/cgrates/cgrates/servmanager"
|
||||
"github.com/cgrates/cgrates/utils"
|
||||
"github.com/cgrates/rpcclient"
|
||||
)
|
||||
|
||||
func TestApiersReload(t *testing.T) {
|
||||
cfg, err := config.NewDefaultCGRConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
utils.Newlogger(utils.MetaSysLog, cfg.GeneralCfg().NodeID)
|
||||
utils.Logger.SetLogLevel(7)
|
||||
filterSChan := make(chan *engine.FilterS, 1)
|
||||
filterSChan <- nil
|
||||
engineShutdown := make(chan bool, 1)
|
||||
chS := engine.NewCacheS(cfg, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholds))
|
||||
close(chS.GetPrecacheChannel(utils.CacheThresholdFilterIndexes))
|
||||
|
||||
cfg.ThresholdSCfg().Enabled = true
|
||||
cfg.SchedulerCfg().Enabled = true
|
||||
server := utils.NewServer()
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg, nil)
|
||||
cfg.StorDbCfg().Type = utils.INTERNAL
|
||||
stordb := NewStorDBService(cfg)
|
||||
schS := NewSchedulerService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1), nil)
|
||||
tS := NewThresholdService(cfg, db, chS, filterSChan, server, make(chan rpcclient.ClientConnector, 1))
|
||||
|
||||
apiSv1 := NewAPIerSv1Service(cfg, db, stordb, filterSChan, server, schS, new(ResponderService),
|
||||
make(chan rpcclient.ClientConnector, 1), nil)
|
||||
|
||||
apiSv2 := NewAPIerSv2Service(apiSv1, cfg, server, make(chan rpcclient.ClientConnector, 1))
|
||||
srvMngr.AddServices(apiSv1, apiSv2, schS, tS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db, stordb)
|
||||
if err = srvMngr.StartServices(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if apiSv1.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
if apiSv2.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
if db.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
if stordb.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
var reply string
|
||||
if err := cfg.V1ReloadConfigFromPath(&config.ConfigReloadWithArgDispatcher{
|
||||
Path: path.Join("/usr", "share", "cgrates", "conf", "samples", "tutmongo"),
|
||||
Section: config.ApierS,
|
||||
}, &reply); err != nil {
|
||||
t.Error(err)
|
||||
} else if reply != utils.OK {
|
||||
t.Errorf("Expecting OK ,received %s", reply)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond) //need to switch to gorutine
|
||||
if !apiSv1.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
if !apiSv2.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
if !db.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
if !stordb.IsRunning() {
|
||||
t.Errorf("Expected service to be running")
|
||||
}
|
||||
cfg.ApierCfg().Enabled = false
|
||||
cfg.GetReloadChan(config.ApierS) <- struct{}{}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if apiSv1.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
if apiSv2.IsRunning() {
|
||||
t.Errorf("Expected service to be down")
|
||||
}
|
||||
engineShutdown <- true
|
||||
}
|
||||
@@ -167,7 +167,7 @@ func (apiService *APIerSv1Service) GetAPIerSv1() *v1.APIerSv1 {
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (apiService *APIerSv1Service) ShouldRun() bool {
|
||||
return apiService.cfg.RalsCfg().Enabled
|
||||
return apiService.cfg.ApierCfg().Enabled
|
||||
}
|
||||
|
||||
// GetDMChan returns the DataManager chanel
|
||||
|
||||
@@ -109,5 +109,5 @@ func (api *APIerSv2Service) ServiceName() string {
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (api *APIerSv2Service) ShouldRun() bool {
|
||||
return api.cfg.RalsCfg().Enabled
|
||||
return api.cfg.ApierCfg().Enabled
|
||||
}
|
||||
|
||||
@@ -46,6 +46,9 @@ func TestRateSReload(t *testing.T) {
|
||||
srvMngr := servmanager.NewServiceManager(cfg, engineShutdown)
|
||||
db := NewDataDBService(cfg, nil)
|
||||
chS := engine.NewCacheS(cfg, nil)
|
||||
close(chS.GetPrecacheChannel(utils.CacheRateProfiles))
|
||||
close(chS.GetPrecacheChannel(utils.CacheRateProfilesFilterIndexes))
|
||||
close(chS.GetPrecacheChannel(utils.CacheRateFilterIndexes))
|
||||
rS := NewRateService(cfg, chS, filterSChan, db, server, engineShutdown, make(chan rpcclient.ClientConnector, 1))
|
||||
srvMngr.AddServices(rS,
|
||||
NewLoaderService(cfg, db, filterSChan, server, engineShutdown, make(chan rpcclient.ClientConnector, 1), nil), db)
|
||||
|
||||
@@ -65,7 +65,12 @@ func (sip *SIPAgent) Start() (err error) {
|
||||
sip.Lock()
|
||||
defer sip.Unlock()
|
||||
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
|
||||
sip.sip = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS)
|
||||
sip.sip, err = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS)
|
||||
if err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: %s!",
|
||||
utils.SIPAgent, err))
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err = sip.sip.ListenAndServe(); err != nil {
|
||||
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
|
||||
|
||||
@@ -160,7 +160,7 @@ func (db *StorDBService) ServiceName() string {
|
||||
|
||||
// ShouldRun returns if the service should be running
|
||||
func (db *StorDBService) ShouldRun() bool {
|
||||
return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled
|
||||
return db.cfg.RalsCfg().Enabled || db.cfg.CdrsCfg().Enabled || db.cfg.ApierCfg().Enabled
|
||||
}
|
||||
|
||||
// RegisterSyncChan used by dependent subsystems to register a chanel to reload only the storDB(thread safe)
|
||||
|
||||
Reference in New Issue
Block a user