Added SIPAgent

This commit is contained in:
Trial97
2020-05-13 11:35:19 +03:00
committed by Dan Christian Bogos
parent 035de962e9
commit 5bbd39a3b2
24 changed files with 1383 additions and 26 deletions

77
agents/libsip.go Normal file
View File

@@ -0,0 +1,77 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"context"
"fmt"
"net"
"strings"
"syscall"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/sipd"
"golang.org/x/sys/unix"
)
// updateDiamMsgFromNavMap will update the diameter message with items from navigable map
func updateSIPMsgFromNavMap(m sipd.Message, navMp *utils.OrderedNavigableMap) (err error) {
// write reply into message
for el := navMp.GetFirstElement(); el != nil; el = el.Next() {
val := el.Value
var nmIt utils.NMInterface
if nmIt, err = navMp.Field(val); err != nil {
return
}
itm, isNMItem := nmIt.(*config.NMItem)
if !isNMItem {
return fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(val))
}
if itm == nil {
continue // all attributes, not writable to diameter packet
}
m[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data)
}
return
}
func reuseportControl(network, address string, c syscall.RawConn) error {
var opErr error
err := c.Control(func(fd uintptr) {
opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
})
if err != nil {
return err
}
return opErr
}
func listenTCPWithReuseablePort(network, addr string) (net.Listener, error) {
var lc net.ListenConfig
lc.Control = reuseportControl
return lc.Listen(context.Background(), network, addr)
}
func listenUDPWithReuseablePort(network, addr string) (net.PacketConn, error) {
var lc net.ListenConfig
lc.Control = reuseportControl
return lc.ListenPacket(context.Background(), network, addr)
}

399
agents/sipagent.go Normal file
View File

@@ -0,0 +1,399 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"fmt"
"net"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/sessions"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/sipd"
)
const (
bufferSize = 5000
)
// NewSIPAgent will construct a SIPAgent
func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig,
filterS *engine.FilterS) *SIPAgent {
return &SIPAgent{
connMgr: connMgr,
filterS: filterS,
cfg: cfg,
}
}
// SIPAgent is a handler for HTTP requests
type SIPAgent struct {
connMgr *engine.ConnManager
filterS *engine.FilterS
cfg *config.CGRConfig
}
// ListenAndServe will run the DNS handler doing also the connection to listen address
func (sa *SIPAgent) ListenAndServe() (err error) {
utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>",
utils.SIPAgent, sa.cfg.SIPAgentCfg().ListenNet, sa.cfg.SIPAgentCfg().Listen))
switch sa.cfg.SIPAgentCfg().ListenNet {
case utils.TCP:
sa.serveTCP()
case utils.UDP:
sa.serveUDP()
default:
return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet)
}
return nil //da.server.ListenAndServe()
}
func (sa *SIPAgent) serveUDP() {
conn, err := listenUDPWithReuseablePort(utils.UDP, sa.cfg.SIPAgentCfg().Listen)
if err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> error: %s unable to listen to: %s",
utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen))
return
}
defer conn.Close()
buf := make([]byte, bufferSize)
for {
n, saddr, err := conn.ReadFrom(buf)
if err != nil {
continue
}
// echo response
if n < 50 {
conn.WriteTo(buf[:n], saddr)
continue
}
sipMessage := make(sipd.Message) // recreate map SIP
sipMessage.Parse(string(buf[:n]))
var sipAnswer sipd.Message
if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil {
continue
}
if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s sending message: %s",
utils.SIPAgent, err.Error(), sipAnswer))
continue
}
}
}
func (sa *SIPAgent) serveTCP() {
l, err := listenTCPWithReuseablePort(utils.TCP, sa.cfg.SIPAgentCfg().Listen)
if err != nil {
utils.Logger.Err(
fmt.Sprintf("<%s> error: %s unable to listen to: %s",
utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen))
return
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
continue
}
go func(conn net.Conn) {
buf := make([]byte, bufferSize)
for {
n, err := conn.Read(buf)
if err != nil {
continue
}
// echo response
if n < 50 {
conn.Write(buf[:n])
continue
}
sipMessage := make(sipd.Message) // recreate map SIP
sipMessage.Parse(string(buf[:n]))
var sipAnswer sipd.Message
if sipAnswer, err = sa.handleMessage(sipMessage, conn.LocalAddr().String()); err != nil {
continue
}
if _, err = conn.Write([]byte(sipAnswer.String())); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s sending message: %s",
utils.SIPAgent, err.Error(), sipAnswer))
continue
}
}
}(conn)
}
}
func (sa *SIPAgent) handleMessage(sipMessage sipd.Message, remoteHost string) (sipAnswer sipd.Message, err error) {
if sipMessage["User-Agent"] != "" {
sipMessage["User-Agent"] = utils.CGRateS
}
sipMessageIface := make(map[string]interface{})
for k, v := range sipMessage {
sipMessageIface[k] = v
}
dp := config.NewNavigableMap(sipMessageIface)
var processed bool
cgrRplyNM := utils.NavigableMap2{}
rplyNM := utils.NewOrderedNavigableMap()
opts := utils.NewOrderedNavigableMap()
reqVars := utils.NavigableMap2{utils.RemoteHost: utils.NewNMData(remoteHost)}
for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors {
agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM,
opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant,
utils.FirstNonEmpty(reqProcessor.Timezone,
config.CgrConfig().GeneralCfg().DefaultTimezone),
sa.filterS, nil, nil)
if processed, err = sa.processRequest(reqProcessor, agReq); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing request: %s",
utils.SIPAgent, err.Error(), utils.ToJSON(agReq)))
continue
}
if !processed {
continue
}
if processed && !reqProcessor.Flags.GetBool(utils.MetaContinue) {
break
}
}
if err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s processing message: %s from %s",
utils.SIPAgent, err.Error(), sipMessage, remoteHost))
return
}
if !processed {
utils.Logger.Warning(
fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s",
utils.SIPAgent, sipMessage, remoteHost))
return
}
if err = updateSIPMsgFromNavMap(sipMessage, rplyNM); err != nil {
utils.Logger.Warning(
fmt.Sprintf("<%s> error: %s encoding out %s",
utils.SIPAgent, err.Error(), utils.ToJSON(rplyNM)))
return
}
sipMessage.PrepareReply()
return sipMessage, nil
}
// processRequest represents one processor processing the request
func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor,
agReq *AgentRequest) (processed bool, err error) {
if pass, err := sa.filterS.Pass(agReq.Tenant,
reqProcessor.Filters, agReq); err != nil || !pass {
return pass, err
}
if err = agReq.SetFields(reqProcessor.RequestFields); err != nil {
return
}
cgrEv := config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep)
opts := config.NMAsMapInterface(agReq.Opts, utils.NestingSep)
var reqType string
for _, typ := range []string{
utils.MetaDryRun, /* utils.MetaAuthorize,
utils.MetaInitiate, utils.MetaUpdate,
utils.MetaTerminate, utils.MetaMessage,
utils.MetaCDRs, */utils.MetaEvent, utils.META_NONE} {
if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags
reqType = typ
break
}
}
var cgrArgs utils.ExtractedArgs
if cgrArgs, err = utils.ExtractArgsFromOpts(opts, reqProcessor.Flags.HasKey(utils.MetaDispatchers),
reqType == utils.MetaAuthorize || reqType == utils.MetaMessage || reqType == utils.MetaEvent); err != nil {
utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>",
utils.SIPAgent, err.Error()))
err = nil // reset the error and continue the processing
}
if reqProcessor.Flags.HasKey(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, processorID: %s, http message: %s",
utils.SIPAgent, reqProcessor.ID, agReq.Request.String()))
}
switch reqType {
default:
return false, fmt.Errorf("unknown request type: <%s>", reqType)
case utils.META_NONE: // do nothing on CGRateS side
case utils.MetaDryRun:
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s",
utils.SIPAgent, reqProcessor.ID, utils.ToJSON(cgrEv)))
// case utils.MetaAuthorize:
// authArgs := sessions.NewV1AuthorizeArgs(
// reqProcessor.Flags.HasKey(utils.MetaAttributes),
// reqProcessor.Flags.ParamsSlice(utils.MetaAttributes),
// reqProcessor.Flags.HasKey(utils.MetaThresholds),
// reqProcessor.Flags.ParamsSlice(utils.MetaThresholds),
// reqProcessor.Flags.HasKey(utils.MetaStats),
// reqProcessor.Flags.ParamsSlice(utils.MetaStats),
// reqProcessor.Flags.HasKey(utils.MetaResources),
// reqProcessor.Flags.HasKey(utils.MetaAccounts),
// reqProcessor.Flags.HasKey(utils.MetaRoutes),
// reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors),
// reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost),
// cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator,
// reqProcessor.Flags.HasKey(utils.MetaFD),
// opts,
// )
// rply := new(sessions.V1AuthorizeReply)
// err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1AuthorizeEvent,
// authArgs, rply)
// if err = agReq.setCGRReply(rply, err); err != nil {
// return
// }
// case utils.MetaInitiate:
// initArgs := sessions.NewV1InitSessionArgs(
// reqProcessor.Flags.HasKey(utils.MetaAttributes),
// reqProcessor.Flags.ParamsSlice(utils.MetaAttributes),
// reqProcessor.Flags.HasKey(utils.MetaThresholds),
// reqProcessor.Flags.ParamsSlice(utils.MetaThresholds),
// reqProcessor.Flags.HasKey(utils.MetaStats),
// reqProcessor.Flags.ParamsSlice(utils.MetaStats),
// reqProcessor.Flags.HasKey(utils.MetaResources),
// reqProcessor.Flags.HasKey(utils.MetaAccounts),
// cgrEv, cgrArgs.ArgDispatcher,
// reqProcessor.Flags.HasKey(utils.MetaFD),
// opts)
// rply := new(sessions.V1InitSessionReply)
// err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1InitiateSession,
// initArgs, rply)
// if err = agReq.setCGRReply(rply, err); err != nil {
// return
// }
// case utils.MetaUpdate:
// updateArgs := sessions.NewV1UpdateSessionArgs(
// reqProcessor.Flags.HasKey(utils.MetaAttributes),
// reqProcessor.Flags.ParamsSlice(utils.MetaAttributes),
// reqProcessor.Flags.HasKey(utils.MetaAccounts),
// cgrEv, cgrArgs.ArgDispatcher,
// reqProcessor.Flags.HasKey(utils.MetaFD),
// opts)
// rply := new(sessions.V1UpdateSessionReply)
// err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1UpdateSession,
// updateArgs, rply)
// if err = agReq.setCGRReply(rply, err); err != nil {
// return
// }
// case utils.MetaTerminate:
// terminateArgs := sessions.NewV1TerminateSessionArgs(
// reqProcessor.Flags.HasKey(utils.MetaAccounts),
// reqProcessor.Flags.HasKey(utils.MetaResources),
// reqProcessor.Flags.HasKey(utils.MetaThresholds),
// reqProcessor.Flags.ParamsSlice(utils.MetaThresholds),
// reqProcessor.Flags.HasKey(utils.MetaStats),
// reqProcessor.Flags.ParamsSlice(utils.MetaStats),
// cgrEv, cgrArgs.ArgDispatcher,
// reqProcessor.Flags.HasKey(utils.MetaFD),
// opts)
// rply := utils.StringPointer("")
// err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1TerminateSession,
// terminateArgs, rply)
// if err = agReq.setCGRReply(nil, err); err != nil {
// return
// }
// case utils.MetaMessage:
// evArgs := sessions.NewV1ProcessMessageArgs(
// reqProcessor.Flags.HasKey(utils.MetaAttributes),
// reqProcessor.Flags.ParamsSlice(utils.MetaAttributes),
// reqProcessor.Flags.HasKey(utils.MetaThresholds),
// reqProcessor.Flags.ParamsSlice(utils.MetaThresholds),
// reqProcessor.Flags.HasKey(utils.MetaStats),
// reqProcessor.Flags.ParamsSlice(utils.MetaStats),
// reqProcessor.Flags.HasKey(utils.MetaResources),
// reqProcessor.Flags.HasKey(utils.MetaAccounts),
// reqProcessor.Flags.HasKey(utils.MetaRoutes),
// reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors),
// reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost),
// cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator,
// reqProcessor.Flags.HasKey(utils.MetaFD),
// opts)
// rply := new(sessions.V1ProcessMessageReply)
// err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessMessage,
// evArgs, rply)
// if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
// cgrEv.Event[utils.Usage] = 0 // avoid further debits
// } else if evArgs.Debit {
// cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit
// }
// if err = agReq.setCGRReply(nil, err); err != nil {
// return
// }
case utils.MetaEvent:
evArgs := &sessions.V1ProcessEventArgs{
Flags: reqProcessor.Flags.SliceFlags(),
CGREvent: cgrEv,
ArgDispatcher: cgrArgs.ArgDispatcher,
Paginator: *cgrArgs.RoutePaginator,
Opts: opts,
}
needMaxUsage := reqProcessor.Flags.HasKey(utils.MetaAuth) ||
reqProcessor.Flags.HasKey(utils.MetaInit) ||
reqProcessor.Flags.HasKey(utils.MetaUpdate)
rply := new(sessions.V1ProcessEventReply)
err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessEvent,
evArgs, rply)
if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) {
cgrEv.Event[utils.Usage] = 0 // avoid further debits
} else if needMaxUsage {
cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit
}
if err = agReq.setCGRReply(rply, err); err != nil {
return
}
// case utils.MetaCDRs: // allow CDR processing
}
// separate request so we can capture the Terminate/Event also here
// if reqProcessor.Flags.HasKey(utils.MetaCDRs) &&
// !reqProcessor.Flags.HasKey(utils.MetaDryRun) {
// rplyCDRs := utils.StringPointer("")
// if err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessCDR,
// &utils.CGREventWithArgDispatcher{CGREvent: cgrEv,
// ArgDispatcher: cgrArgs.ArgDispatcher},
// rplyCDRs); err != nil {
// agReq.CGRReply.Set(utils.PathItems{{Field: utils.Error}}, utils.NewNMData(err.Error()))
// }
// }
if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil {
return false, err
}
if reqProcessor.Flags.HasKey(utils.MetaLog) {
utils.Logger.Info(
fmt.Sprintf("<%s> LOG, HTTP reply: %s",
utils.SIPAgent, agReq.Reply))
}
if reqType == utils.MetaDryRun {
utils.Logger.Info(
fmt.Sprintf("<%s> DRY_RUN, HTTP reply: %s",
utils.SIPAgent, agReq.Reply))
}
return true, nil
}

216
agents/sipagent_it_test.go Normal file
View File

@@ -0,0 +1,216 @@
// +build integration
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package agents
import (
"net"
"net/rpc"
"path"
"testing"
"time"
v2 "github.com/cgrates/cgrates/apier/v2"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/sipd"
)
var (
saonfigDIR string
saCfgPath string
saCfg *config.CGRConfig
saRPC *rpc.Client
saConn net.Conn
sTestsSIP = []func(t *testing.T){
testSAitInitCfg,
testSAitResetDataDb,
testSAitResetStorDb,
testSAitStartEngine,
testSAitApierRpcConn,
testSAitTPFromFolder,
testSAitSetAttributeProfile,
testSAitSIPRegister,
testSAitSIPInvite,
testSAitStopCgrEngine,
}
)
// Test start here
func TestSAit(t *testing.T) {
// engine.KillEngine(0)
switch *dbType {
case utils.MetaInternal:
saonfigDIR = "sipagent_internal"
case utils.MetaMySQL:
saonfigDIR = "sipagent_mysql"
case utils.MetaMongo:
saonfigDIR = "sipagent_mongo"
case utils.MetaPostgres:
t.SkipNow()
default:
t.Fatal("Unknown Database type")
}
for _, stest := range sTestsSIP {
t.Run(saonfigDIR, stest)
}
}
func testSAitInitCfg(t *testing.T) {
saCfgPath = path.Join(*dataDir, "conf", "samples", saonfigDIR)
// Init config first
var err error
saCfg, err = config.NewCGRConfigFromPath(saCfgPath)
if err != nil {
t.Error(err)
}
saCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush()
config.SetCgrConfig(saCfg)
if isDispatcherActive {
saCfg.ListenCfg().RPCJSONListen = ":6012"
}
}
// Remove data in both rating and accounting db
func testSAitResetDataDb(t *testing.T) {
if err := engine.InitDataDb(saCfg); err != nil {
t.Fatal(err)
}
}
// Wipe out the cdr database
func testSAitResetStorDb(t *testing.T) {
if err := engine.InitStorDb(saCfg); err != nil {
t.Fatal(err)
}
}
// Start CGR Engine
func testSAitStartEngine(t *testing.T) {
if _, err := engine.StartEngine(saCfgPath, *waitRater); err != nil {
t.Fatal(err)
}
}
// Connect rpc client to rater
func testSAitApierRpcConn(t *testing.T) {
var err error
saRPC, err = newRPCClient(saCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed
if err != nil {
t.Fatal(err)
}
if saConn, err = net.Dial(utils.TCP, "127.0.0.1:5060"); err != nil {
t.Fatal(err)
}
}
// Load the tariff plan, creating accounts and their balances
func testSAitTPFromFolder(t *testing.T) {
attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")}
var loadInst utils.LoadInstance
if err := saRPC.Call(utils.APIerSv2LoadTariffPlanFromFolder, attrs, &loadInst); err != nil {
t.Error(err)
}
if isDispatcherActive {
testRadiusitTPLoadData(t)
}
time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups
}
func testSAitStopCgrEngine(t *testing.T) {
if err := engine.KillEngine(100); err != nil {
t.Error(err)
}
}
func testSAitSetAttributeProfile(t *testing.T) {
attrPrf := &v2.AttributeWithCache{
ExternalAttributeProfile: &engine.ExternalAttributeProfile{
Tenant: "cgrates.org",
ID: "ChangeDestination",
Contexts: []string{utils.ANY},
FilterIDs: []string{"*prefix:~*req.Account:\"1001\""},
Attributes: []*engine.ExternalAttribute{{
Path: utils.MetaReq + utils.NestingSep + "Destination",
Value: "sip:1003@192.168.53.203:5060",
}},
Weight: 20,
},
}
var result string
if err := saRPC.Call(utils.APIerSv2SetAttributeProfile, attrPrf, &result); err != nil {
t.Error(err)
} else if result != utils.OK {
t.Error("Unexpected reply returned", result)
}
}
func testSAitSIPRegister(t *testing.T) {
registerMessage := "REGISTER sip:192.168.58.203 SIP/2.0\r\nCall-ID: d72a4ed6feb4167b5adb208525879db5@0:0:0:0:0:0:0:0\r\nCSeq: 1 REGISTER\r\nFrom: \"1002\" \r\n<sip:1002@192.168.58.203>;tag=d28739b9\r\nTo: \"1002\" <sip:1002@192.168.58.203>\r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-323131-311ce8716a7bf1f6094859ae516a44eb\r\nMax-Forwards: 70\r\nUser-Agent: Jitsi2.11.20200408Linux\r\nExpires: 600\r\nContact: \"1002\" <sip:1002@192.168.58.201:5060;transport=udp;registering_acc=192_168_58_203>;expires=600\r\nContent-Length: 0\r\n"
if saConn == nil {
t.Fatal("connection not initialized")
}
var err error
if _, err = saConn.Write([]byte(registerMessage)); err != nil {
t.Fatal(err)
}
buffer := make([]byte, bufferSize)
if _, err = saConn.Read(buffer); err != nil {
t.Fatal(err)
}
recived := sipd.Message{}
if err = recived.Parse(string(buffer)); err != nil {
t.Fatal(err)
}
if expected := "SIP/2.0 200 OK"; recived["Request"] != expected {
t.Errorf("Expected %q, received: %q", expected, recived["Request"])
}
}
func testSAitSIPInvite(t *testing.T) {
inviteMessage := "INVITE sip:1002@192.168.58.203 SIP/2.0\r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 2 INVITE\r\nFrom: \"1001\" <sip:1001@192.168.58.203>;tag=99f35805\r\nTo: <sip:1002@192.168.58.203>\r\nMax-Forwards: 70\r\nContact: \"1001\" <sip:1001@192.168.58.201:5060;transport=udp;registering_acc=192_168_58_203>\r\nUser-Agent: Jitsi2.11.20200408Linux\r\nContent-Type: application/sdp\r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-393139-939e89686023b86822cb942ede452b62\r\nProxy-Authorization: Digest username=\"1001\",realm=\"192.168.58.203\",nonce=\"XruO2167ja8uRODnSv8aXqv+/hqPJiXh\",uri=\"sip:1002@192.168.58.203\",response=\"5b814c709d1541d72ea778599c2e48a4\"\r\nContent-Length: 897\r\n\r\nv=0\r\no=1001-jitsi.org 0 0 IN IP4 192.168.58.201\r\ns=-\r\nc=IN IP4 192.168.58.201\r\nt=0 0\r\nm=audio 5000 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=ptime:20\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=extmap:2 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=rtcp-xr:voip-metrics\r\nm=video 5002 RTP/AVP 105 99\r\na=recvonly\r\na=rtpmap:105 h264/90000\r\na=fmtp:105 profile-level-id=42E01f;packetization-mode=1\r\na=imageattr:105 send * recv [x=[1:1920],y=[1:1080]]\r\na=rtpmap:\r\n"
if saConn == nil {
t.Fatal("connection not initialized")
}
var err error
if _, err = saConn.Write([]byte(inviteMessage)); err != nil {
t.Fatal(err)
}
buffer := make([]byte, bufferSize)
if _, err = saConn.Read(buffer); err != nil {
t.Fatal(err)
}
recived := sipd.Message{}
if err = recived.Parse(string(buffer)); err != nil {
t.Fatal(err)
}
if expected := "SIP/2.0 302 Moved Temporarily"; recived["Request"] != expected {
t.Errorf("Expected %q, received: %q", expected, recived["Request"])
}
if expected := "<sip:1003@192.168.53.203:5060>"; recived["Contact"] != expected {
t.Errorf("Expected %q, received: %q", expected, recived["Contact"])
}
}

View File

@@ -529,6 +529,7 @@ func main() {
connManager, server, exitChan, internalEEsChan),
services.NewRateService(cfg, filterSChan,
server, exitChan, internalRateSChan),
services.NewSIPAgent(cfg, filterSChan, exitChan, connManager),
)
srvManager.StartServices()
// Start FilterS

View File

@@ -182,6 +182,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) {
cfg.eesCfg = new(EEsCfg)
cfg.eesCfg.Cache = make(map[string]*CacheParamCfg)
cfg.rateSCfg = new(RateSCfg)
cfg.sipAgentCfg = new(SIPAgentCfg)
cfg.ConfigReloads = make(map[string]chan struct{})
cfg.ConfigReloads[utils.CDRE] = make(chan struct{}, 1)
@@ -305,6 +306,7 @@ type CGRConfig struct {
ersCfg *ERsCfg // EventReader config
eesCfg *EEsCfg // EventExporter config
rateSCfg *RateSCfg // RateS config
sipAgentCfg *SIPAgentCfg // SIPAgent config
}
var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes,
@@ -352,7 +354,7 @@ func (cfg *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) {
cfg.loadMailerCfg, cfg.loadSureTaxCfg, cfg.loadDispatcherSCfg,
cfg.loadLoaderCgrCfg, cfg.loadMigratorCgrCfg, cfg.loadTlsCgrCfg,
cfg.loadAnalyzerCgrCfg, cfg.loadApierCfg, cfg.loadErsCfg, cfg.loadEesCfg,
cfg.loadRateCfg} {
cfg.loadRateCfg, cfg.loadSIPAgentCfg} {
if err = loadFunc(jsnCfg); err != nil {
return
}
@@ -752,6 +754,15 @@ func (cfg *CGRConfig) loadRateCfg(jsnCfg *CgrJsonCfg) (err error) {
return cfg.rateSCfg.loadFromJsonCfg(jsnRateCfg)
}
// loadApierCfg loads the Apier section of the configuration
func (cfg *CGRConfig) loadSIPAgentCfg(jsnCfg *CgrJsonCfg) (err error) {
var jsnSIPAgentCfg *SIPAgentJsonCfg
if jsnSIPAgentCfg, err = jsnCfg.SIPAgentJsonCfg(); err != nil {
return
}
return cfg.sipAgentCfg.loadFromJsonCfg(jsnSIPAgentCfg, cfg.generalCfg.RSRSep)
}
// SureTaxCfg use locking to retrieve the configuration, possibility later for runtime reload
func (cfg *CGRConfig) SureTaxCfg() *SureTaxCfg {
cfg.lks[SURETAX_JSON].Lock()
@@ -997,7 +1008,7 @@ func (cfg *CGRConfig) EEsCfg() *EEsCfg {
return cfg.eesCfg
}
// EEsCfg reads the EventExporter configuration
// EEsNoLksCfg reads the EventExporter configuration without locks
func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg {
return cfg.eesCfg
}
@@ -1008,6 +1019,13 @@ func (cfg *CGRConfig) RateSCfg() *RateSCfg {
return cfg.rateSCfg
}
// SIPAgentCfg reads the Apier configuration
func (cfg *CGRConfig) SIPAgentCfg() *SIPAgentCfg {
cfg.lks[SIPAgentJson].Lock()
defer cfg.lks[SIPAgentJson].Unlock()
return cfg.sipAgentCfg
}
// RPCConns reads the RPCConns configuration
func (cfg *CGRConfig) RPCConns() map[string]*RPCConn {
cfg.lks[RPCConnsJsonName].RLock()
@@ -1103,6 +1121,8 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m
jsonString = utils.ToJSON(cfg.ERsCfg())
case RPCConnsJsonName:
jsonString = utils.ToJSON(cfg.RPCConns())
case SIPAgentJson:
jsonString = utils.ToJSON(cfg.SIPAgentCfg())
default:
return errors.New("Invalid section")
}
@@ -1228,6 +1248,7 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error {
ApierS: cfg.loadApierCfg,
RPCConnsJsonName: cfg.loadRPCConns,
RateSJson: cfg.loadRateCfg,
SIPAgentJson: cfg.loadSIPAgentCfg,
}
}
@@ -1493,6 +1514,7 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) {
case AnalyzerCfgJson:
case ApierS:
cfg.rldChans[ApierS] <- struct{}{}
case SIPAgentJson:
}
return
}
@@ -1521,7 +1543,6 @@ func (cfg *CGRConfig) AsMapInterface(separator string) map[string]interface{} {
}
return map[string]interface{}{
utils.CdreProfiles: cdreProfiles,
utils.LoaderCfg: loaderCfg,
utils.HttpAgentCfg: httpAgentCfg,

View File

@@ -893,4 +893,14 @@ const CGRATES_CFG_JSON = `
"enabled": false,
},
"sip_agent": { // SIP Agents, only used for redirections
"enabled": false, // enables the SIP agent: <true|false>
"listen": "127.0.0.1:5060", // address where to listen for SIP requests <x.y.z.y:1234>
"listen_net": "udp", // network to listen on <udp|tcp|tcp-tls>
"sessions_conns": ["*internal"],
"timezone": "", // timezone of the events if not specified <UTC|Local|$IANA_TZ_DB>
"request_processors": [ // request processors to be applied to SIP messages
],
},
}`

View File

@@ -61,15 +61,15 @@ const (
EEsJson = "ees"
RateSJson = "rates"
RPCConnsJsonName = "rpc_conns"
SIPAgentJson = "sip_agent"
)
var (
sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN,
SCHEDULER_JSN, CACHE_JSN, FilterSjsn, RALS_JSN,
CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN,
DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON,
RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
AnalyzerCfgJson, ApierS, EEsJson, RateSJson}
sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN,
CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN,
KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON,
THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson,
AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson}
)
// Loads the json config out of io.Reader, eg other sources than file, maybe over http
@@ -519,3 +519,15 @@ func (self CgrJsonCfg) RateCfgJson() (*RateSJsonCfg, error) {
}
return cfg, nil
}
func (self CgrJsonCfg) SIPAgentJsonCfg() (*SIPAgentJsonCfg, error) {
rawCfg, hasKey := self[SIPAgentJson]
if !hasKey {
return nil, nil
}
sipAgnt := new(SIPAgentJsonCfg)
if err := json.Unmarshal(*rawCfg, sipAgnt); err != nil {
return nil, err
}
return sipAgnt, nil
}

View File

@@ -608,3 +608,13 @@ type STIRJsonCfg struct {
type RateSJsonCfg struct {
Enabled *bool
}
// SIPAgentJsonCfg
type SIPAgentJsonCfg struct {
Enabled *bool
Listen *string
Listen_net *string
Sessions_conns *[]string
Timezone *string
Request_processors *[]*ReqProcessorJsnCfg
}

109
config/sipagentcfg.go Normal file
View File

@@ -0,0 +1,109 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package config
import (
"strings"
"github.com/cgrates/cgrates/utils"
)
type SIPAgentCfg struct {
Enabled bool
Listen string
ListenNet string // udp or tcp
SessionSConns []string
Timezone string
RequestProcessors []*RequestProcessor
}
func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err error) {
if jsnCfg == nil {
return nil
}
if jsnCfg.Enabled != nil {
da.Enabled = *jsnCfg.Enabled
}
if jsnCfg.Listen_net != nil {
da.ListenNet = *jsnCfg.Listen_net
}
if jsnCfg.Listen != nil {
da.Listen = *jsnCfg.Listen
}
if jsnCfg.Timezone != nil {
da.Timezone = *jsnCfg.Timezone
}
if jsnCfg.Sessions_conns != nil {
da.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns))
for idx, connID := range *jsnCfg.Sessions_conns {
// if we have the connection internal we change the name so we can have internal rpc for each subsystem
if connID == utils.MetaInternal {
da.SessionSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)
} else {
da.SessionSConns[idx] = connID
}
}
}
if jsnCfg.Request_processors != nil {
for _, reqProcJsn := range *jsnCfg.Request_processors {
rp := new(RequestProcessor)
var haveID bool
for _, rpSet := range da.RequestProcessors {
if reqProcJsn.ID != nil && rpSet.ID == *reqProcJsn.ID {
rp = rpSet // Will load data into the one set
haveID = true
break
}
}
if err := rp.loadFromJsonCfg(reqProcJsn, sep); err != nil {
return nil
}
if !haveID {
da.RequestProcessors = append(da.RequestProcessors, rp)
}
}
}
return nil
}
func (da *SIPAgentCfg) AsMapInterface(separator string) map[string]interface{} {
requestProcessors := make([]map[string]interface{}, len(da.RequestProcessors))
for i, item := range da.RequestProcessors {
requestProcessors[i] = item.AsMapInterface(separator)
}
sessionSConns := make([]string, len(da.SessionSConns))
for i, item := range da.SessionSConns {
buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)
if item == buf {
sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString)
} else {
sessionSConns[i] = item
}
}
return map[string]interface{}{
utils.EnabledCfg: da.Enabled,
utils.ListenCfg: da.Listen,
utils.ListenNetCfg: da.ListenNet,
utils.SessionSConnsCfg: sessionSConns,
utils.TimezoneCfg: da.Timezone,
utils.RequestProcessorsCfg: requestProcessors,
}
}

View File

@@ -0,0 +1,75 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"log_level": 7, // control the level of messages logged (0-emerg to 7-debug)
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "*internal", // stor database type to use: <mysql|postgres>
},
"stor_db": {
"db_type": "*internal", // stor database type to use: <mysql|postgres>
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
"routes_conns": ["*localhost"],
},
"rals": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"rals_conns": ["*internal"],
},
"chargers": {
"enabled": true,
},
"attributes": {
"enabled": true,
"indexed_selects": false, // enable profile matching exclusively on indexes
},
"routes": {
"enabled": true,
},
"sip_agent": {
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,40 @@
{
"sip_agent": {
"listen_net": "tcp", // network to listen on <udp|tcp|tcp-tls>
"request_processors": [
{
"id": "Register",
"filters": ["*prefix:~*req.Request:REGISTER"],
"flags": ["*none"],
"request_fields":[],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 200 OK"},
{"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"}
]
},
{
"id": "InviteRedirect",
"filters": ["*prefix:~*req.Request:INVITE"],
"flags": ["*attributes","*event"],
"request_fields":[
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable",
"value": "~*req.From", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable",
"value": "~*req.To", "mandatory": true}
],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 302 Moved Temporarily"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"<"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"~*cgrep.Attributes.Destination"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":">"}
]
}
]
}
}

View File

@@ -0,0 +1,79 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"log_level": 7, // control the level of messages logged (0-emerg to 7-debug)
},
"data_db": { // database used to store runtime data (eg: accounts, cdr stats)
"db_type": "*mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "10",
},
"stor_db": {
"db_type": "*mongo", // stor database type to use: <mysql|postgres>
"db_port": 27017, // the port to reach the stordb
"db_name": "cgrates",
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
"routes_conns": ["*localhost"],
},
"rals": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"rals_conns": ["*internal"],
},
"chargers": {
"enabled": true,
},
"attributes": {
"enabled": true,
"indexed_selects": false, // enable profile matching exclusively on indexes
},
"routes": {
"enabled": true,
},
"sip_agent": {
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,40 @@
{
"sip_agent": {
"listen_net": "tcp", // network to listen on <udp|tcp|tcp-tls>
"request_processors": [
{
"id": "Register",
"filters": ["*prefix:~*req.Request:REGISTER"],
"flags": ["*none"],
"request_fields":[],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 200 OK"},
{"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"}
]
},
{
"id": "InviteRedirect",
"filters": ["*prefix:~*req.Request:INVITE"],
"flags": ["*attributes","*event"],
"request_fields":[
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable",
"value": "~*req.From", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable",
"value": "~*req.To", "mandatory": true}
],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 302 Moved Temporarily"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"<"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"~*cgrep.Attributes.Destination"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":">"}
]
}
]
}
}

View File

@@ -0,0 +1,72 @@
{
// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
// Copyright (C) ITsysCOM GmbH
//
// This file contains the default configuration hardcoded into CGRateS.
// This is what you get when you load CGRateS with an empty configuration file.
"general": {
"log_level": 7, // control the level of messages logged (0-emerg to 7-debug)
},
"stor_db": {
"db_password": "CGRateS.org",
},
"schedulers": {
"enabled": true,
"cdrs_conns": ["*internal"],
},
"sessions": {
"enabled": true,
"attributes_conns": ["*localhost"],
"rals_conns": ["*internal"],
"cdrs_conns": ["*internal"],
"chargers_conns": ["*internal"],
"routes_conns": ["*localhost"],
},
"rals": {
"enabled": true,
},
"cdrs": {
"enabled": true,
"rals_conns": ["*internal"],
},
"chargers": {
"enabled": true,
},
"attributes": {
"enabled": true,
"indexed_selects": false, // enable profile matching exclusively on indexes
},
"routes": {
"enabled": true,
},
"sip_agent": {
"enabled": true,
},
"apiers": {
"enabled": true,
"scheduler_conns": ["*internal"],
},
}

View File

@@ -0,0 +1,40 @@
{
"sip_agent": {
"listen_net": "tcp", // network to listen on <udp|tcp|tcp-tls>
"request_processors": [
{
"id": "Register",
"filters": ["*prefix:~*req.Request:REGISTER"],
"flags": ["*none"],
"request_fields":[],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 200 OK"},
{"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"}
]
},
{
"id": "InviteRedirect",
"filters": ["*prefix:~*req.Request:INVITE"],
"flags": ["*attributes","*event"],
"request_fields":[
{"tag": "Account", "path": "*cgreq.Account", "type": "*variable",
"value": "~*req.From", "mandatory": true},
{"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable",
"value": "~*req.To", "mandatory": true}
],
"reply_fields":[
{"tag": "Request", "path": "*rep.Request", "type": "*constant",
"value": "SIP/2.0 302 Moved Temporarily"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"<"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":"~*cgrep.Attributes.Destination"},
{"tag": "Contact", "path": "*rep.Contact", "type": "*composed",
"value":">"}
]
}
]
}
}

View File

@@ -94,8 +94,11 @@ func (alS *AttributeService) attributeProfileForEvent(args *AttrArgsProcessEvent
}
attrIDs = aPrflIDs.Slice()
}
evNm := config.NewNavigableMap(nil)
evNm.Set([]string{utils.MetaReq}, args.Event, false, false)
evNm := config.NewNavigableMap(map[string]interface{}{
utils.MetaReq: args.Event,
utils.MetaOpts: args.Opts,
})
for _, apID := range attrIDs {
aPrfl, err := alS.dm.GetAttributeProfile(args.Tenant, apID, true, true, utils.NonTransactional)
if err != nil {

View File

@@ -61,9 +61,9 @@ func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldI
itemIDs = utils.StringMapFromSlice(sliceIDs)
return
}
stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one
filterIndexTypes := []string{utils.MetaString, utils.MetaPrefix, utils.META_NONE} // the META_NONE is used for all items that do not have filters
for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, &[]string{utils.ANY}} { // same routine for both string and prefix filter types
stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one
filterIndexTypes := []string{utils.MetaString, utils.MetaPrefix, utils.META_NONE} // the META_NONE is used for all items that do not have filters
for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, {utils.ANY}} { // same routine for both string and prefix filter types
if fieldIDs == nil {
fieldIDs = &allFieldIDs
}

View File

@@ -62,20 +62,18 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA
}
// IntRPC is the global variable that is used to comunicate with all the subsystems internally
var IntRPC *RPCClientSet
var IntRPC RPCClientSet
// NewRPCClientSet initilalizates the map of connections
func NewRPCClientSet() (s *RPCClientSet) {
return &RPCClientSet{set: make(map[string]*rpcclient.RPCClient)}
func NewRPCClientSet() (s RPCClientSet) {
return make(RPCClientSet)
}
// RPCClientSet is a RPC ClientConnector for the internal subsystems
type RPCClientSet struct {
set map[string]*rpcclient.RPCClient
}
type RPCClientSet map[string]*rpcclient.RPCClient
// AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration
func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.ClientConnector) {
func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.ClientConnector) {
rpc, err := rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false,
utils.EmptyString, utils.EmptyString, utils.EmptyString,
config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects,
@@ -85,25 +83,33 @@ func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient
utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error()))
return
}
s.set[name] = rpc
s[name] = rpc
}
// GetInternalChanel is used when RPCClientSet is passed as internal connection for RPCPool
func (s *RPCClientSet) GetInternalChanel() chan rpcclient.ClientConnector {
func (s RPCClientSet) GetInternalChanel() chan rpcclient.ClientConnector {
connChan := make(chan rpcclient.ClientConnector, 1)
connChan <- s
return connChan
}
// Call the implementation of the rpcclient.ClientConnector interface
func (s *RPCClientSet) Call(method string, args interface{}, reply interface{}) error {
func (s RPCClientSet) Call(method string, args interface{}, reply interface{}) error {
methodSplit := strings.Split(method, ".")
if len(methodSplit) != 2 {
return rpcclient.ErrUnsupporteServiceMethod
}
conn, has := s.set[methodSplit[0]]
conn, has := s[methodSplit[0]]
if !has {
return rpcclient.ErrUnsupporteServiceMethod
}
return conn.Call(method, args, reply)
}
// func (s RPCClientSet) ReconnectInternals(subsystems ...string) (err error) {
// for _, subsystem := range subsystems {
// if err = s[subsystem].Reconnect(); err != nil {
// return
// }
// }
// }

2
go.mod
View File

@@ -25,6 +25,7 @@ require (
github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca
github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1
github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453
github.com/cgrates/sipd v1.0.0
github.com/creack/pty v1.1.7
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible
@@ -56,6 +57,7 @@ require (
go.opencensus.io v0.22.1-0.20190713072201-b4a14686f0a9 // indirect
golang.org/x/net v0.0.0-20190909003024-a7b16738d86b
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd
google.golang.org/api v0.10.0
pack.ag/amqp v0.12.2
)

2
go.sum
View File

@@ -63,6 +63,8 @@ github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 h1:QvA6Nbwq9kTd7hsv
github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1/go.mod h1:HZbsg3Y+xw4lsfCqX9rzj429wrg0XOug6pFT3B6VHZY=
github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453 h1:kgIdi3qR/meiWILdmDRuDi1fFgd6A3lutGV6HLiTDyc=
github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU=
github.com/cgrates/sipd v1.0.0 h1:9CPVaWCYBAWGVyzsEtv+VbG6kMk+sHml6kx3OvYQnkc=
github.com/cgrates/sipd v1.0.0/go.mod h1:Itz4HoJHqckX9XAbGRanB1PvXfKrIO/UtMB8JwCFsT4=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=

View File

@@ -1,4 +1,5 @@
cgrates (0.11.0~dev) UNRELEASED; urgency=medium
[ DanB ]
* [FilterS] Renamed rals_conns to apiers_conns
* [FilterS] Updated *destination filter to get ReverseDestination form
API
@@ -58,8 +59,9 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium
* [AgentS] FieldAsInterface return data instead of NMItem
* [RouteS] Add posibility to load routes with the sameID and different filters
* [RouteS] Correctly populate Sorting out of models
* [AgentS] Added SIPAgent for SIP redirection
-- Alexandru Tripon <alexandru.tripon@itsyscom.com> Wed, 19 Feb 2020 13:25:52 +0200
-- DanB <danb@cgrates.org> Wed, 19 Feb 2020 13:25:52 +0200
cgrates (0.10.0) UNRELEASED; urgency=medium

134
services/sipagent.go Normal file
View File

@@ -0,0 +1,134 @@
/*
Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments
Copyright (C) ITsysCOM GmbH
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
package services
import (
"fmt"
"sync"
"github.com/cgrates/cgrates/agents"
"github.com/cgrates/cgrates/config"
"github.com/cgrates/cgrates/engine"
"github.com/cgrates/cgrates/servmanager"
"github.com/cgrates/cgrates/utils"
"github.com/cgrates/rpcclient"
)
// NewSIPAgent returns the sip Agent
func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS,
exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service {
return &SIPAgent{
cfg: cfg,
filterSChan: filterSChan,
exitChan: exitChan,
connMgr: connMgr,
}
}
// SIPAgent implements Agent interface
type SIPAgent struct {
sync.RWMutex
cfg *config.CGRConfig
filterSChan chan *engine.FilterS
exitChan chan bool
sip *agents.SIPAgent
connMgr *engine.ConnManager
oldListen string
}
// Start should handle the sercive start
func (sip *SIPAgent) Start() (err error) {
if sip.IsRunning() {
return fmt.Errorf("service aleady running")
}
filterS := <-sip.filterSChan
sip.filterSChan <- filterS
sip.Lock()
defer sip.Unlock()
sip.oldListen = sip.cfg.SIPAgentCfg().Listen
sip.sip = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS)
go func() {
if err = sip.sip.ListenAndServe(); err != nil {
utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
sip.exitChan <- true // stop the engine here
}
}()
return
}
// GetIntenternalChan returns the internal connection chanel
// no chanel for SIPAgent
func (sip *SIPAgent) GetIntenternalChan() (conn chan rpcclient.ClientConnector) {
return nil
}
// Reload handles the change of config
func (sip *SIPAgent) Reload() (err error) {
// if sip.oldListen == sip.cfg.SIPAgentCfg().Listen {
// return
// }
// if err = sip.Shutdown(); err != nil {
// return
// }
// sip.Lock()
// sip.oldListen = sip.cfg.SIPAgentCfg().Listen
// defer sip.Unlock()
// if err = sip.sip.Reload(); err != nil {
// return
// }
// go func() {
// if err := sip.sip.ListenAndServe(); err != nil {
// utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error()))
// sip.exitChan <- true // stop the engine here
// }
// }()
return
}
// Shutdown stops the service
func (sip *SIPAgent) Shutdown() (err error) {
sip.Lock()
defer sip.Unlock()
// if err = sip.sip.Shutdown(); err != nil {
// return
// }
sip.sip = nil
return
}
// IsRunning returns if the service is running
func (sip *SIPAgent) IsRunning() bool {
sip.RLock()
defer sip.RUnlock()
return sip != nil && sip.sip != nil
}
// ServiceName returns the service name
func (sip *SIPAgent) ServiceName() string {
return utils.SIPAgent
}
// ShouldRun returns if the service should be running
func (sip *SIPAgent) ShouldRun() bool {
return sip.cfg.SIPAgentCfg().Enabled
}

View File

@@ -174,6 +174,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) {
utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled,
utils.EventExporterS: srvMngr.GetConfig().EEsCfg().Enabled,
utils.RateS: srvMngr.GetConfig().RateSCfg().Enabled,
utils.SIPAgent: srvMngr.GetConfig().SIPAgentCfg().Enabled,
} {
if shouldRun {
go srvMngr.startService(serviceName)
@@ -320,6 +321,10 @@ func (srvMngr *ServiceManager) handleReload() {
}
case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName):
engine.Cache.Clear([]string{utils.CacheRPCConnections})
case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson):
if err = srvMngr.reloadService(utils.SIPAgent); err != nil {
return
}
}
// handle RPC server
}

View File

@@ -636,6 +636,7 @@ const (
RemoteHost = "RemoteHost"
Local = "local"
TCP = "tcp"
UDP = "udp"
CGRDebitInterval = "CGRDebitInterval"
Version = "Version"
MetaTenant = "*tenant"
@@ -1637,6 +1638,7 @@ const (
FreeSWITCHAgent = "FreeSWITCHAgent"
AsteriskAgent = "AsteriskAgent"
HTTPAgent = "HTTPAgent"
SIPAgent = "SIPAgent"
)
// Poster