diff --git a/agents/libsip.go b/agents/libsip.go new file mode 100644 index 000000000..82129c39c --- /dev/null +++ b/agents/libsip.go @@ -0,0 +1,77 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "context" + "fmt" + "net" + "strings" + "syscall" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/sipd" + "golang.org/x/sys/unix" +) + +// updateDiamMsgFromNavMap will update the diameter message with items from navigable map +func updateSIPMsgFromNavMap(m sipd.Message, navMp *utils.OrderedNavigableMap) (err error) { + // write reply into message + for el := navMp.GetFirstElement(); el != nil; el = el.Next() { + val := el.Value + var nmIt utils.NMInterface + if nmIt, err = navMp.Field(val); err != nil { + return + } + itm, isNMItem := nmIt.(*config.NMItem) + if !isNMItem { + return fmt.Errorf("cannot encode reply value: %s, err: not NMItems", utils.ToJSON(val)) + } + if itm == nil { + continue // all attributes, not writable to diameter packet + } + m[strings.Join(itm.Path, utils.NestingSep)] = utils.IfaceAsString(itm.Data) + } + return +} + +func reuseportControl(network, address string, c syscall.RawConn) error { + var opErr error + err := c.Control(func(fd uintptr) { + opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }) + if err != nil { + return err + } + + return opErr +} + +func listenTCPWithReuseablePort(network, addr string) (net.Listener, error) { + var lc net.ListenConfig + lc.Control = reuseportControl + return lc.Listen(context.Background(), network, addr) +} + +func listenUDPWithReuseablePort(network, addr string) (net.PacketConn, error) { + var lc net.ListenConfig + lc.Control = reuseportControl + return lc.ListenPacket(context.Background(), network, addr) +} diff --git a/agents/sipagent.go b/agents/sipagent.go new file mode 100644 index 000000000..f8da3eea7 --- /dev/null +++ b/agents/sipagent.go @@ -0,0 +1,399 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "fmt" + "net" + + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/sessions" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/sipd" +) + +const ( + bufferSize = 5000 +) + +// NewSIPAgent will construct a SIPAgent +func NewSIPAgent(connMgr *engine.ConnManager, cfg *config.CGRConfig, + filterS *engine.FilterS) *SIPAgent { + return &SIPAgent{ + connMgr: connMgr, + filterS: filterS, + cfg: cfg, + } +} + +// SIPAgent is a handler for HTTP requests +type SIPAgent struct { + connMgr *engine.ConnManager + filterS *engine.FilterS + cfg *config.CGRConfig +} + +// ListenAndServe will run the DNS handler doing also the connection to listen address +func (sa *SIPAgent) ListenAndServe() (err error) { + utils.Logger.Info(fmt.Sprintf("<%s> start listening on <%s:%s>", + utils.SIPAgent, sa.cfg.SIPAgentCfg().ListenNet, sa.cfg.SIPAgentCfg().Listen)) + switch sa.cfg.SIPAgentCfg().ListenNet { + case utils.TCP: + sa.serveTCP() + case utils.UDP: + sa.serveUDP() + default: + return fmt.Errorf("Unecepected protocol %s", sa.cfg.SIPAgentCfg().ListenNet) + } + return nil //da.server.ListenAndServe() +} +func (sa *SIPAgent) serveUDP() { + conn, err := listenUDPWithReuseablePort(utils.UDP, sa.cfg.SIPAgentCfg().Listen) + if err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> error: %s unable to listen to: %s", + utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen)) + return + } + + defer conn.Close() + + buf := make([]byte, bufferSize) + + for { + n, saddr, err := conn.ReadFrom(buf) + if err != nil { + continue + } + // echo response + if n < 50 { + conn.WriteTo(buf[:n], saddr) + continue + } + + sipMessage := make(sipd.Message) // recreate map SIP + sipMessage.Parse(string(buf[:n])) + var sipAnswer sipd.Message + if sipAnswer, err = sa.handleMessage(sipMessage, saddr.String()); err != nil { + continue + } + if _, err = conn.WriteTo([]byte(sipAnswer.String()), saddr); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s sending message: %s", + utils.SIPAgent, err.Error(), sipAnswer)) + continue + } + } +} + +func (sa *SIPAgent) serveTCP() { + l, err := listenTCPWithReuseablePort(utils.TCP, sa.cfg.SIPAgentCfg().Listen) + if err != nil { + utils.Logger.Err( + fmt.Sprintf("<%s> error: %s unable to listen to: %s", + utils.SIPAgent, err.Error(), sa.cfg.SIPAgentCfg().Listen)) + return + } + + defer l.Close() + + for { + conn, err := l.Accept() + if err != nil { + continue + } + go func(conn net.Conn) { + buf := make([]byte, bufferSize) + for { + n, err := conn.Read(buf) + if err != nil { + continue + } + // echo response + if n < 50 { + conn.Write(buf[:n]) + continue + } + + sipMessage := make(sipd.Message) // recreate map SIP + sipMessage.Parse(string(buf[:n])) + var sipAnswer sipd.Message + if sipAnswer, err = sa.handleMessage(sipMessage, conn.LocalAddr().String()); err != nil { + continue + } + if _, err = conn.Write([]byte(sipAnswer.String())); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s sending message: %s", + utils.SIPAgent, err.Error(), sipAnswer)) + continue + } + } + }(conn) + } +} + +func (sa *SIPAgent) handleMessage(sipMessage sipd.Message, remoteHost string) (sipAnswer sipd.Message, err error) { + if sipMessage["User-Agent"] != "" { + sipMessage["User-Agent"] = utils.CGRateS + } + sipMessageIface := make(map[string]interface{}) + for k, v := range sipMessage { + sipMessageIface[k] = v + } + dp := config.NewNavigableMap(sipMessageIface) + var processed bool + cgrRplyNM := utils.NavigableMap2{} + rplyNM := utils.NewOrderedNavigableMap() + opts := utils.NewOrderedNavigableMap() + reqVars := utils.NavigableMap2{utils.RemoteHost: utils.NewNMData(remoteHost)} + for _, reqProcessor := range sa.cfg.SIPAgentCfg().RequestProcessors { + agReq := NewAgentRequest(dp, reqVars, &cgrRplyNM, rplyNM, + opts, reqProcessor.Tenant, sa.cfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(reqProcessor.Timezone, + config.CgrConfig().GeneralCfg().DefaultTimezone), + sa.filterS, nil, nil) + if processed, err = sa.processRequest(reqProcessor, agReq); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing request: %s", + utils.SIPAgent, err.Error(), utils.ToJSON(agReq))) + continue + } + if !processed { + continue + } + if processed && !reqProcessor.Flags.GetBool(utils.MetaContinue) { + break + } + } + if err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s processing message: %s from %s", + utils.SIPAgent, err.Error(), sipMessage, remoteHost)) + return + } + if !processed { + utils.Logger.Warning( + fmt.Sprintf("<%s> no request processor enabled, ignoring message %s from %s", + utils.SIPAgent, sipMessage, remoteHost)) + return + } + if err = updateSIPMsgFromNavMap(sipMessage, rplyNM); err != nil { + utils.Logger.Warning( + fmt.Sprintf("<%s> error: %s encoding out %s", + utils.SIPAgent, err.Error(), utils.ToJSON(rplyNM))) + return + } + sipMessage.PrepareReply() + return sipMessage, nil +} + +// processRequest represents one processor processing the request +func (sa *SIPAgent) processRequest(reqProcessor *config.RequestProcessor, + agReq *AgentRequest) (processed bool, err error) { + if pass, err := sa.filterS.Pass(agReq.Tenant, + reqProcessor.Filters, agReq); err != nil || !pass { + return pass, err + } + if err = agReq.SetFields(reqProcessor.RequestFields); err != nil { + return + } + cgrEv := config.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep) + opts := config.NMAsMapInterface(agReq.Opts, utils.NestingSep) + var reqType string + for _, typ := range []string{ + utils.MetaDryRun, /* utils.MetaAuthorize, + utils.MetaInitiate, utils.MetaUpdate, + utils.MetaTerminate, utils.MetaMessage, + utils.MetaCDRs, */utils.MetaEvent, utils.META_NONE} { + if reqProcessor.Flags.HasKey(typ) { // request type is identified through flags + reqType = typ + break + } + } + var cgrArgs utils.ExtractedArgs + if cgrArgs, err = utils.ExtractArgsFromOpts(opts, reqProcessor.Flags.HasKey(utils.MetaDispatchers), + reqType == utils.MetaAuthorize || reqType == utils.MetaMessage || reqType == utils.MetaEvent); err != nil { + utils.Logger.Warning(fmt.Sprintf("<%s> args extraction failed because <%s>", + utils.SIPAgent, err.Error())) + err = nil // reset the error and continue the processing + } + if reqProcessor.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, processorID: %s, http message: %s", + utils.SIPAgent, reqProcessor.ID, agReq.Request.String())) + } + switch reqType { + default: + return false, fmt.Errorf("unknown request type: <%s>", reqType) + case utils.META_NONE: // do nothing on CGRateS side + case utils.MetaDryRun: + utils.Logger.Info( + fmt.Sprintf("<%s> DRY_RUN, processorID: %s, CGREvent: %s", + utils.SIPAgent, reqProcessor.ID, utils.ToJSON(cgrEv))) + // case utils.MetaAuthorize: + // authArgs := sessions.NewV1AuthorizeArgs( + // reqProcessor.Flags.HasKey(utils.MetaAttributes), + // reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), + // reqProcessor.Flags.HasKey(utils.MetaThresholds), + // reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), + // reqProcessor.Flags.HasKey(utils.MetaStats), + // reqProcessor.Flags.ParamsSlice(utils.MetaStats), + // reqProcessor.Flags.HasKey(utils.MetaResources), + // reqProcessor.Flags.HasKey(utils.MetaAccounts), + // reqProcessor.Flags.HasKey(utils.MetaRoutes), + // reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors), + // reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost), + // cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator, + // reqProcessor.Flags.HasKey(utils.MetaFD), + // opts, + // ) + // rply := new(sessions.V1AuthorizeReply) + // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1AuthorizeEvent, + // authArgs, rply) + // if err = agReq.setCGRReply(rply, err); err != nil { + // return + // } + // case utils.MetaInitiate: + // initArgs := sessions.NewV1InitSessionArgs( + // reqProcessor.Flags.HasKey(utils.MetaAttributes), + // reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), + // reqProcessor.Flags.HasKey(utils.MetaThresholds), + // reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), + // reqProcessor.Flags.HasKey(utils.MetaStats), + // reqProcessor.Flags.ParamsSlice(utils.MetaStats), + // reqProcessor.Flags.HasKey(utils.MetaResources), + // reqProcessor.Flags.HasKey(utils.MetaAccounts), + // cgrEv, cgrArgs.ArgDispatcher, + // reqProcessor.Flags.HasKey(utils.MetaFD), + // opts) + // rply := new(sessions.V1InitSessionReply) + // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1InitiateSession, + // initArgs, rply) + // if err = agReq.setCGRReply(rply, err); err != nil { + // return + // } + // case utils.MetaUpdate: + // updateArgs := sessions.NewV1UpdateSessionArgs( + // reqProcessor.Flags.HasKey(utils.MetaAttributes), + // reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), + // reqProcessor.Flags.HasKey(utils.MetaAccounts), + // cgrEv, cgrArgs.ArgDispatcher, + // reqProcessor.Flags.HasKey(utils.MetaFD), + // opts) + // rply := new(sessions.V1UpdateSessionReply) + // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1UpdateSession, + // updateArgs, rply) + // if err = agReq.setCGRReply(rply, err); err != nil { + // return + // } + // case utils.MetaTerminate: + // terminateArgs := sessions.NewV1TerminateSessionArgs( + // reqProcessor.Flags.HasKey(utils.MetaAccounts), + // reqProcessor.Flags.HasKey(utils.MetaResources), + // reqProcessor.Flags.HasKey(utils.MetaThresholds), + // reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), + // reqProcessor.Flags.HasKey(utils.MetaStats), + // reqProcessor.Flags.ParamsSlice(utils.MetaStats), + // cgrEv, cgrArgs.ArgDispatcher, + // reqProcessor.Flags.HasKey(utils.MetaFD), + // opts) + // rply := utils.StringPointer("") + // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1TerminateSession, + // terminateArgs, rply) + // if err = agReq.setCGRReply(nil, err); err != nil { + // return + // } + // case utils.MetaMessage: + // evArgs := sessions.NewV1ProcessMessageArgs( + // reqProcessor.Flags.HasKey(utils.MetaAttributes), + // reqProcessor.Flags.ParamsSlice(utils.MetaAttributes), + // reqProcessor.Flags.HasKey(utils.MetaThresholds), + // reqProcessor.Flags.ParamsSlice(utils.MetaThresholds), + // reqProcessor.Flags.HasKey(utils.MetaStats), + // reqProcessor.Flags.ParamsSlice(utils.MetaStats), + // reqProcessor.Flags.HasKey(utils.MetaResources), + // reqProcessor.Flags.HasKey(utils.MetaAccounts), + // reqProcessor.Flags.HasKey(utils.MetaRoutes), + // reqProcessor.Flags.HasKey(utils.MetaRoutesIgnoreErrors), + // reqProcessor.Flags.HasKey(utils.MetaRoutesEventCost), + // cgrEv, cgrArgs.ArgDispatcher, *cgrArgs.RoutePaginator, + // reqProcessor.Flags.HasKey(utils.MetaFD), + // opts) + // rply := new(sessions.V1ProcessMessageReply) + // err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessMessage, + // evArgs, rply) + // if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + // cgrEv.Event[utils.Usage] = 0 // avoid further debits + // } else if evArgs.Debit { + // cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit + // } + // if err = agReq.setCGRReply(nil, err); err != nil { + // return + // } + case utils.MetaEvent: + evArgs := &sessions.V1ProcessEventArgs{ + Flags: reqProcessor.Flags.SliceFlags(), + CGREvent: cgrEv, + ArgDispatcher: cgrArgs.ArgDispatcher, + Paginator: *cgrArgs.RoutePaginator, + Opts: opts, + } + needMaxUsage := reqProcessor.Flags.HasKey(utils.MetaAuth) || + reqProcessor.Flags.HasKey(utils.MetaInit) || + reqProcessor.Flags.HasKey(utils.MetaUpdate) + rply := new(sessions.V1ProcessEventReply) + err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessEvent, + evArgs, rply) + if utils.ErrHasPrefix(err, utils.RalsErrorPrfx) { + cgrEv.Event[utils.Usage] = 0 // avoid further debits + } else if needMaxUsage { + cgrEv.Event[utils.Usage] = rply.MaxUsage // make sure the CDR reflects the debit + } + if err = agReq.setCGRReply(rply, err); err != nil { + return + } + // case utils.MetaCDRs: // allow CDR processing + } + // separate request so we can capture the Terminate/Event also here + // if reqProcessor.Flags.HasKey(utils.MetaCDRs) && + // !reqProcessor.Flags.HasKey(utils.MetaDryRun) { + // rplyCDRs := utils.StringPointer("") + // if err = sa.connMgr.Call(sa.cfg.SIPAgentCfg().SessionSConns, nil, utils.SessionSv1ProcessCDR, + // &utils.CGREventWithArgDispatcher{CGREvent: cgrEv, + // ArgDispatcher: cgrArgs.ArgDispatcher}, + // rplyCDRs); err != nil { + // agReq.CGRReply.Set(utils.PathItems{{Field: utils.Error}}, utils.NewNMData(err.Error())) + // } + // } + if err := agReq.SetFields(reqProcessor.ReplyFields); err != nil { + return false, err + } + if reqProcessor.Flags.HasKey(utils.MetaLog) { + utils.Logger.Info( + fmt.Sprintf("<%s> LOG, HTTP reply: %s", + utils.SIPAgent, agReq.Reply)) + } + if reqType == utils.MetaDryRun { + utils.Logger.Info( + fmt.Sprintf("<%s> DRY_RUN, HTTP reply: %s", + utils.SIPAgent, agReq.Reply)) + } + return true, nil +} diff --git a/agents/sipagent_it_test.go b/agents/sipagent_it_test.go new file mode 100644 index 000000000..bc4d5279c --- /dev/null +++ b/agents/sipagent_it_test.go @@ -0,0 +1,216 @@ +// +build integration + +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package agents + +import ( + "net" + "net/rpc" + "path" + "testing" + "time" + + v2 "github.com/cgrates/cgrates/apier/v2" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/sipd" +) + +var ( + saonfigDIR string + saCfgPath string + saCfg *config.CGRConfig + saRPC *rpc.Client + saConn net.Conn + + sTestsSIP = []func(t *testing.T){ + testSAitInitCfg, + testSAitResetDataDb, + testSAitResetStorDb, + testSAitStartEngine, + testSAitApierRpcConn, + testSAitTPFromFolder, + + testSAitSetAttributeProfile, + testSAitSIPRegister, + testSAitSIPInvite, + + testSAitStopCgrEngine, + } +) + +// Test start here +func TestSAit(t *testing.T) { + // engine.KillEngine(0) + switch *dbType { + case utils.MetaInternal: + saonfigDIR = "sipagent_internal" + case utils.MetaMySQL: + saonfigDIR = "sipagent_mysql" + case utils.MetaMongo: + saonfigDIR = "sipagent_mongo" + case utils.MetaPostgres: + t.SkipNow() + default: + t.Fatal("Unknown Database type") + } + for _, stest := range sTestsSIP { + t.Run(saonfigDIR, stest) + } +} + +func testSAitInitCfg(t *testing.T) { + saCfgPath = path.Join(*dataDir, "conf", "samples", saonfigDIR) + // Init config first + var err error + saCfg, err = config.NewCGRConfigFromPath(saCfgPath) + if err != nil { + t.Error(err) + } + saCfg.DataFolderPath = *dataDir // Share DataFolderPath through config towards StoreDb for Flush() + config.SetCgrConfig(saCfg) + if isDispatcherActive { + saCfg.ListenCfg().RPCJSONListen = ":6012" + } +} + +// Remove data in both rating and accounting db +func testSAitResetDataDb(t *testing.T) { + if err := engine.InitDataDb(saCfg); err != nil { + t.Fatal(err) + } +} + +// Wipe out the cdr database +func testSAitResetStorDb(t *testing.T) { + if err := engine.InitStorDb(saCfg); err != nil { + t.Fatal(err) + } +} + +// Start CGR Engine +func testSAitStartEngine(t *testing.T) { + if _, err := engine.StartEngine(saCfgPath, *waitRater); err != nil { + t.Fatal(err) + } +} + +// Connect rpc client to rater +func testSAitApierRpcConn(t *testing.T) { + var err error + saRPC, err = newRPCClient(saCfg.ListenCfg()) // We connect over JSON so we can also troubleshoot if needed + if err != nil { + t.Fatal(err) + } + if saConn, err = net.Dial(utils.TCP, "127.0.0.1:5060"); err != nil { + t.Fatal(err) + } +} + +// Load the tariff plan, creating accounts and their balances +func testSAitTPFromFolder(t *testing.T) { + attrs := &utils.AttrLoadTpFromFolder{FolderPath: path.Join(*dataDir, "tariffplans", "oldtutorial")} + var loadInst utils.LoadInstance + if err := saRPC.Call(utils.APIerSv2LoadTariffPlanFromFolder, attrs, &loadInst); err != nil { + t.Error(err) + } + if isDispatcherActive { + testRadiusitTPLoadData(t) + } + time.Sleep(time.Duration(*waitRater) * time.Millisecond) // Give time for scheduler to execute topups +} + +func testSAitStopCgrEngine(t *testing.T) { + if err := engine.KillEngine(100); err != nil { + t.Error(err) + } +} + +func testSAitSetAttributeProfile(t *testing.T) { + attrPrf := &v2.AttributeWithCache{ + ExternalAttributeProfile: &engine.ExternalAttributeProfile{ + Tenant: "cgrates.org", + ID: "ChangeDestination", + Contexts: []string{utils.ANY}, + FilterIDs: []string{"*prefix:~*req.Account:\"1001\""}, + Attributes: []*engine.ExternalAttribute{{ + Path: utils.MetaReq + utils.NestingSep + "Destination", + Value: "sip:1003@192.168.53.203:5060", + }}, + Weight: 20, + }, + } + var result string + if err := saRPC.Call(utils.APIerSv2SetAttributeProfile, attrPrf, &result); err != nil { + t.Error(err) + } else if result != utils.OK { + t.Error("Unexpected reply returned", result) + } +} + +func testSAitSIPRegister(t *testing.T) { + registerMessage := "REGISTER sip:192.168.58.203 SIP/2.0\r\nCall-ID: d72a4ed6feb4167b5adb208525879db5@0:0:0:0:0:0:0:0\r\nCSeq: 1 REGISTER\r\nFrom: \"1002\" \r\n;tag=d28739b9\r\nTo: \"1002\" \r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-323131-311ce8716a7bf1f6094859ae516a44eb\r\nMax-Forwards: 70\r\nUser-Agent: Jitsi2.11.20200408Linux\r\nExpires: 600\r\nContact: \"1002\" ;expires=600\r\nContent-Length: 0\r\n" + if saConn == nil { + t.Fatal("connection not initialized") + } + var err error + if _, err = saConn.Write([]byte(registerMessage)); err != nil { + t.Fatal(err) + } + buffer := make([]byte, bufferSize) + if _, err = saConn.Read(buffer); err != nil { + t.Fatal(err) + } + recived := sipd.Message{} + if err = recived.Parse(string(buffer)); err != nil { + t.Fatal(err) + } + + if expected := "SIP/2.0 200 OK"; recived["Request"] != expected { + t.Errorf("Expected %q, received: %q", expected, recived["Request"]) + } +} + +func testSAitSIPInvite(t *testing.T) { + inviteMessage := "INVITE sip:1002@192.168.58.203 SIP/2.0\r\nCall-ID: 4d4d84b0cc83fc90aca41e295cd8ff43@0:0:0:0:0:0:0:0\r\nCSeq: 2 INVITE\r\nFrom: \"1001\" ;tag=99f35805\r\nTo: \r\nMax-Forwards: 70\r\nContact: \"1001\" \r\nUser-Agent: Jitsi2.11.20200408Linux\r\nContent-Type: application/sdp\r\nVia: SIP/2.0/UDP 192.168.58.201:5060;branch=z9hG4bK-393139-939e89686023b86822cb942ede452b62\r\nProxy-Authorization: Digest username=\"1001\",realm=\"192.168.58.203\",nonce=\"XruO2167ja8uRODnSv8aXqv+/hqPJiXh\",uri=\"sip:1002@192.168.58.203\",response=\"5b814c709d1541d72ea778599c2e48a4\"\r\nContent-Length: 897\r\n\r\nv=0\r\no=1001-jitsi.org 0 0 IN IP4 192.168.58.201\r\ns=-\r\nc=IN IP4 192.168.58.201\r\nt=0 0\r\nm=audio 5000 RTP/AVP 96 97 98 9 100 102 0 8 103 3 104 101\r\na=rtpmap:96 opus/48000/2\r\na=fmtp:96 usedtx=1\r\na=ptime:20\r\na=rtpmap:97 SILK/24000\r\na=rtpmap:98 SILK/16000\r\na=rtpmap:9 G722/8000\r\na=rtpmap:100 speex/32000\r\na=rtpmap:102 speex/16000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:103 iLBC/8000\r\na=rtpmap:3 GSM/8000\r\na=rtpmap:104 speex/8000\r\na=rtpmap:101 telephone-event/8000\r\na=extmap:1 urn:ietf:params:rtp-hdrext:csrc-audio-level\r\na=extmap:2 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=rtcp-xr:voip-metrics\r\nm=video 5002 RTP/AVP 105 99\r\na=recvonly\r\na=rtpmap:105 h264/90000\r\na=fmtp:105 profile-level-id=42E01f;packetization-mode=1\r\na=imageattr:105 send * recv [x=[1:1920],y=[1:1080]]\r\na=rtpmap:\r\n" + if saConn == nil { + t.Fatal("connection not initialized") + } + var err error + if _, err = saConn.Write([]byte(inviteMessage)); err != nil { + t.Fatal(err) + } + buffer := make([]byte, bufferSize) + if _, err = saConn.Read(buffer); err != nil { + t.Fatal(err) + } + recived := sipd.Message{} + if err = recived.Parse(string(buffer)); err != nil { + t.Fatal(err) + } + + if expected := "SIP/2.0 302 Moved Temporarily"; recived["Request"] != expected { + t.Errorf("Expected %q, received: %q", expected, recived["Request"]) + } + if expected := ""; recived["Contact"] != expected { + t.Errorf("Expected %q, received: %q", expected, recived["Contact"]) + } +} diff --git a/cmd/cgr-engine/cgr-engine.go b/cmd/cgr-engine/cgr-engine.go index 2bb60e4df..f431240ce 100644 --- a/cmd/cgr-engine/cgr-engine.go +++ b/cmd/cgr-engine/cgr-engine.go @@ -529,6 +529,7 @@ func main() { connManager, server, exitChan, internalEEsChan), services.NewRateService(cfg, filterSChan, server, exitChan, internalRateSChan), + services.NewSIPAgent(cfg, filterSChan, exitChan, connManager), ) srvManager.StartServices() // Start FilterS diff --git a/config/config.go b/config/config.go index d8b20deb4..16d5d7888 100755 --- a/config/config.go +++ b/config/config.go @@ -182,6 +182,7 @@ func NewDefaultCGRConfig() (cfg *CGRConfig, err error) { cfg.eesCfg = new(EEsCfg) cfg.eesCfg.Cache = make(map[string]*CacheParamCfg) cfg.rateSCfg = new(RateSCfg) + cfg.sipAgentCfg = new(SIPAgentCfg) cfg.ConfigReloads = make(map[string]chan struct{}) cfg.ConfigReloads[utils.CDRE] = make(chan struct{}, 1) @@ -305,6 +306,7 @@ type CGRConfig struct { ersCfg *ERsCfg // EventReader config eesCfg *EEsCfg // EventExporter config rateSCfg *RateSCfg // RateS config + sipAgentCfg *SIPAgentCfg // SIPAgent config } var posibleLoaderTypes = utils.NewStringSet([]string{utils.MetaAttributes, @@ -352,7 +354,7 @@ func (cfg *CGRConfig) loadFromJsonCfg(jsnCfg *CgrJsonCfg) (err error) { cfg.loadMailerCfg, cfg.loadSureTaxCfg, cfg.loadDispatcherSCfg, cfg.loadLoaderCgrCfg, cfg.loadMigratorCgrCfg, cfg.loadTlsCgrCfg, cfg.loadAnalyzerCgrCfg, cfg.loadApierCfg, cfg.loadErsCfg, cfg.loadEesCfg, - cfg.loadRateCfg} { + cfg.loadRateCfg, cfg.loadSIPAgentCfg} { if err = loadFunc(jsnCfg); err != nil { return } @@ -752,6 +754,15 @@ func (cfg *CGRConfig) loadRateCfg(jsnCfg *CgrJsonCfg) (err error) { return cfg.rateSCfg.loadFromJsonCfg(jsnRateCfg) } +// loadApierCfg loads the Apier section of the configuration +func (cfg *CGRConfig) loadSIPAgentCfg(jsnCfg *CgrJsonCfg) (err error) { + var jsnSIPAgentCfg *SIPAgentJsonCfg + if jsnSIPAgentCfg, err = jsnCfg.SIPAgentJsonCfg(); err != nil { + return + } + return cfg.sipAgentCfg.loadFromJsonCfg(jsnSIPAgentCfg, cfg.generalCfg.RSRSep) +} + // SureTaxCfg use locking to retrieve the configuration, possibility later for runtime reload func (cfg *CGRConfig) SureTaxCfg() *SureTaxCfg { cfg.lks[SURETAX_JSON].Lock() @@ -997,7 +1008,7 @@ func (cfg *CGRConfig) EEsCfg() *EEsCfg { return cfg.eesCfg } -// EEsCfg reads the EventExporter configuration +// EEsNoLksCfg reads the EventExporter configuration without locks func (cfg *CGRConfig) EEsNoLksCfg() *EEsCfg { return cfg.eesCfg } @@ -1008,6 +1019,13 @@ func (cfg *CGRConfig) RateSCfg() *RateSCfg { return cfg.rateSCfg } +// SIPAgentCfg reads the Apier configuration +func (cfg *CGRConfig) SIPAgentCfg() *SIPAgentCfg { + cfg.lks[SIPAgentJson].Lock() + defer cfg.lks[SIPAgentJson].Unlock() + return cfg.sipAgentCfg +} + // RPCConns reads the RPCConns configuration func (cfg *CGRConfig) RPCConns() map[string]*RPCConn { cfg.lks[RPCConnsJsonName].RLock() @@ -1103,6 +1121,8 @@ func (cfg *CGRConfig) V1GetConfigSection(args *StringWithArgDispatcher, reply *m jsonString = utils.ToJSON(cfg.ERsCfg()) case RPCConnsJsonName: jsonString = utils.ToJSON(cfg.RPCConns()) + case SIPAgentJson: + jsonString = utils.ToJSON(cfg.SIPAgentCfg()) default: return errors.New("Invalid section") } @@ -1228,6 +1248,7 @@ func (cfg *CGRConfig) getLoadFunctions() map[string]func(*CgrJsonCfg) error { ApierS: cfg.loadApierCfg, RPCConnsJsonName: cfg.loadRPCConns, RateSJson: cfg.loadRateCfg, + SIPAgentJson: cfg.loadSIPAgentCfg, } } @@ -1493,6 +1514,7 @@ func (cfg *CGRConfig) reloadSections(sections ...string) (err error) { case AnalyzerCfgJson: case ApierS: cfg.rldChans[ApierS] <- struct{}{} + case SIPAgentJson: } return } @@ -1521,7 +1543,6 @@ func (cfg *CGRConfig) AsMapInterface(separator string) map[string]interface{} { } return map[string]interface{}{ - utils.CdreProfiles: cdreProfiles, utils.LoaderCfg: loaderCfg, utils.HttpAgentCfg: httpAgentCfg, diff --git a/config/config_defaults.go b/config/config_defaults.go index 5520720d0..16c5be206 100755 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -893,4 +893,14 @@ const CGRATES_CFG_JSON = ` "enabled": false, }, +"sip_agent": { // SIP Agents, only used for redirections + "enabled": false, // enables the SIP agent: + "listen": "127.0.0.1:5060", // address where to listen for SIP requests + "listen_net": "udp", // network to listen on + "sessions_conns": ["*internal"], + "timezone": "", // timezone of the events if not specified + "request_processors": [ // request processors to be applied to SIP messages + ], +}, + }` diff --git a/config/config_json.go b/config/config_json.go index 2c2e2f9a9..d8b11cfba 100644 --- a/config/config_json.go +++ b/config/config_json.go @@ -61,15 +61,15 @@ const ( EEsJson = "ees" RateSJson = "rates" RPCConnsJsonName = "rpc_conns" + SIPAgentJson = "sip_agent" ) var ( - sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, - SCHEDULER_JSN, CACHE_JSN, FilterSjsn, RALS_JSN, - CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, KamailioAgentJSN, - DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, THRESHOLDS_JSON, - RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, - AnalyzerCfgJson, ApierS, EEsJson, RateSJson} + sortedCfgSections = []string{GENERAL_JSN, RPCConnsJsonName, DATADB_JSN, STORDB_JSN, LISTEN_JSN, TlsCfgJson, HTTP_JSN, SCHEDULER_JSN, + CACHE_JSN, FilterSjsn, RALS_JSN, CDRS_JSN, CDRE_JSN, ERsJson, SessionSJson, AsteriskAgentJSN, FreeSWITCHAgentJSN, + KamailioAgentJSN, DA_JSN, RA_JSN, HttpAgentJson, DNSAgentJson, ATTRIBUTE_JSN, ChargerSCfgJson, RESOURCES_JSON, STATS_JSON, + THRESHOLDS_JSON, RouteSJson, LoaderJson, MAILER_JSN, SURETAX_JSON, CgrLoaderCfgJson, CgrMigratorCfgJson, DispatcherSJson, + AnalyzerCfgJson, ApierS, EEsJson, RateSJson, SIPAgentJson} ) // Loads the json config out of io.Reader, eg other sources than file, maybe over http @@ -519,3 +519,15 @@ func (self CgrJsonCfg) RateCfgJson() (*RateSJsonCfg, error) { } return cfg, nil } + +func (self CgrJsonCfg) SIPAgentJsonCfg() (*SIPAgentJsonCfg, error) { + rawCfg, hasKey := self[SIPAgentJson] + if !hasKey { + return nil, nil + } + sipAgnt := new(SIPAgentJsonCfg) + if err := json.Unmarshal(*rawCfg, sipAgnt); err != nil { + return nil, err + } + return sipAgnt, nil +} diff --git a/config/libconfig_json.go b/config/libconfig_json.go index d92296cc5..ba668a25e 100755 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -608,3 +608,13 @@ type STIRJsonCfg struct { type RateSJsonCfg struct { Enabled *bool } + +// SIPAgentJsonCfg +type SIPAgentJsonCfg struct { + Enabled *bool + Listen *string + Listen_net *string + Sessions_conns *[]string + Timezone *string + Request_processors *[]*ReqProcessorJsnCfg +} diff --git a/config/sipagentcfg.go b/config/sipagentcfg.go new file mode 100644 index 000000000..e720ee17f --- /dev/null +++ b/config/sipagentcfg.go @@ -0,0 +1,109 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package config + +import ( + "strings" + + "github.com/cgrates/cgrates/utils" +) + +type SIPAgentCfg struct { + Enabled bool + Listen string + ListenNet string // udp or tcp + SessionSConns []string + Timezone string + RequestProcessors []*RequestProcessor +} + +func (da *SIPAgentCfg) loadFromJsonCfg(jsnCfg *SIPAgentJsonCfg, sep string) (err error) { + if jsnCfg == nil { + return nil + } + if jsnCfg.Enabled != nil { + da.Enabled = *jsnCfg.Enabled + } + if jsnCfg.Listen_net != nil { + da.ListenNet = *jsnCfg.Listen_net + } + if jsnCfg.Listen != nil { + da.Listen = *jsnCfg.Listen + } + if jsnCfg.Timezone != nil { + da.Timezone = *jsnCfg.Timezone + } + if jsnCfg.Sessions_conns != nil { + da.SessionSConns = make([]string, len(*jsnCfg.Sessions_conns)) + for idx, connID := range *jsnCfg.Sessions_conns { + // if we have the connection internal we change the name so we can have internal rpc for each subsystem + if connID == utils.MetaInternal { + da.SessionSConns[idx] = utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) + } else { + da.SessionSConns[idx] = connID + } + } + } + if jsnCfg.Request_processors != nil { + for _, reqProcJsn := range *jsnCfg.Request_processors { + rp := new(RequestProcessor) + var haveID bool + for _, rpSet := range da.RequestProcessors { + if reqProcJsn.ID != nil && rpSet.ID == *reqProcJsn.ID { + rp = rpSet // Will load data into the one set + haveID = true + break + } + } + if err := rp.loadFromJsonCfg(reqProcJsn, sep); err != nil { + return nil + } + if !haveID { + da.RequestProcessors = append(da.RequestProcessors, rp) + } + } + } + return nil +} + +func (da *SIPAgentCfg) AsMapInterface(separator string) map[string]interface{} { + requestProcessors := make([]map[string]interface{}, len(da.RequestProcessors)) + for i, item := range da.RequestProcessors { + requestProcessors[i] = item.AsMapInterface(separator) + } + sessionSConns := make([]string, len(da.SessionSConns)) + for i, item := range da.SessionSConns { + buf := utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS) + if item == buf { + sessionSConns[i] = strings.ReplaceAll(item, utils.CONCATENATED_KEY_SEP+utils.MetaSessionS, utils.EmptyString) + } else { + sessionSConns[i] = item + } + } + + return map[string]interface{}{ + utils.EnabledCfg: da.Enabled, + utils.ListenCfg: da.Listen, + utils.ListenNetCfg: da.ListenNet, + utils.SessionSConnsCfg: sessionSConns, + utils.TimezoneCfg: da.Timezone, + utils.RequestProcessorsCfg: requestProcessors, + } + +} diff --git a/data/conf/samples/sipagent_internal/cgrates.json b/data/conf/samples/sipagent_internal/cgrates.json new file mode 100644 index 000000000..6c3b4ac8f --- /dev/null +++ b/data/conf/samples/sipagent_internal/cgrates.json @@ -0,0 +1,75 @@ +{ + +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + +"general": { + "log_level": 7, // control the level of messages logged (0-emerg to 7-debug) +}, + + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "*internal", // stor database type to use: +}, + +"stor_db": { + "db_type": "*internal", // stor database type to use: +}, + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "attributes_conns": ["*localhost"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "routes_conns": ["*localhost"], +}, + + +"rals": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, + "rals_conns": ["*internal"], +}, + + +"chargers": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, + "indexed_selects": false, // enable profile matching exclusively on indexes +}, + + +"routes": { + "enabled": true, +}, + + +"sip_agent": { + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, +} \ No newline at end of file diff --git a/data/conf/samples/sipagent_internal/redirect.json b/data/conf/samples/sipagent_internal/redirect.json new file mode 100644 index 000000000..5cb2de3fd --- /dev/null +++ b/data/conf/samples/sipagent_internal/redirect.json @@ -0,0 +1,40 @@ +{ + "sip_agent": { + "listen_net": "tcp", // network to listen on + "request_processors": [ + { + "id": "Register", + "filters": ["*prefix:~*req.Request:REGISTER"], + "flags": ["*none"], + "request_fields":[], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 200 OK"}, + {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + ] + }, + { + "id": "InviteRedirect", + "filters": ["*prefix:~*req.Request:INVITE"], + "flags": ["*attributes","*event"], + "request_fields":[ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", + "value": "~*req.From", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", + "value": "~*req.To", "mandatory": true} + ], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 302 Moved Temporarily"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"<"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"~*cgrep.Attributes.Destination"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":">"} + ] + } + ] + } + +} diff --git a/data/conf/samples/sipagent_mongo/cgrates.json b/data/conf/samples/sipagent_mongo/cgrates.json new file mode 100644 index 000000000..d4dc52da9 --- /dev/null +++ b/data/conf/samples/sipagent_mongo/cgrates.json @@ -0,0 +1,79 @@ +{ + +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + +"general": { + "log_level": 7, // control the level of messages logged (0-emerg to 7-debug) +}, + + +"data_db": { // database used to store runtime data (eg: accounts, cdr stats) + "db_type": "*mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "10", +}, + +"stor_db": { + "db_type": "*mongo", // stor database type to use: + "db_port": 27017, // the port to reach the stordb + "db_name": "cgrates", +}, + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "attributes_conns": ["*localhost"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "routes_conns": ["*localhost"], +}, + + +"rals": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, + "rals_conns": ["*internal"], +}, + + +"chargers": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, + "indexed_selects": false, // enable profile matching exclusively on indexes +}, + + +"routes": { + "enabled": true, +}, + + +"sip_agent": { + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, +} \ No newline at end of file diff --git a/data/conf/samples/sipagent_mongo/redirect.json b/data/conf/samples/sipagent_mongo/redirect.json new file mode 100644 index 000000000..5cb2de3fd --- /dev/null +++ b/data/conf/samples/sipagent_mongo/redirect.json @@ -0,0 +1,40 @@ +{ + "sip_agent": { + "listen_net": "tcp", // network to listen on + "request_processors": [ + { + "id": "Register", + "filters": ["*prefix:~*req.Request:REGISTER"], + "flags": ["*none"], + "request_fields":[], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 200 OK"}, + {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + ] + }, + { + "id": "InviteRedirect", + "filters": ["*prefix:~*req.Request:INVITE"], + "flags": ["*attributes","*event"], + "request_fields":[ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", + "value": "~*req.From", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", + "value": "~*req.To", "mandatory": true} + ], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 302 Moved Temporarily"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"<"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"~*cgrep.Attributes.Destination"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":">"} + ] + } + ] + } + +} diff --git a/data/conf/samples/sipagent_mysql/cgrates.json b/data/conf/samples/sipagent_mysql/cgrates.json new file mode 100644 index 000000000..56f79ef18 --- /dev/null +++ b/data/conf/samples/sipagent_mysql/cgrates.json @@ -0,0 +1,72 @@ +{ + +// Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +// Copyright (C) ITsysCOM GmbH +// +// This file contains the default configuration hardcoded into CGRateS. +// This is what you get when you load CGRateS with an empty configuration file. + + +"general": { + "log_level": 7, // control the level of messages logged (0-emerg to 7-debug) +}, + + +"stor_db": { + "db_password": "CGRateS.org", +}, + + +"schedulers": { + "enabled": true, + "cdrs_conns": ["*internal"], +}, + + +"sessions": { + "enabled": true, + "attributes_conns": ["*localhost"], + "rals_conns": ["*internal"], + "cdrs_conns": ["*internal"], + "chargers_conns": ["*internal"], + "routes_conns": ["*localhost"], +}, + + +"rals": { + "enabled": true, +}, + + +"cdrs": { + "enabled": true, + "rals_conns": ["*internal"], +}, + + +"chargers": { + "enabled": true, +}, + + +"attributes": { + "enabled": true, + "indexed_selects": false, // enable profile matching exclusively on indexes +}, + + +"routes": { + "enabled": true, +}, + + +"sip_agent": { + "enabled": true, +}, + + +"apiers": { + "enabled": true, + "scheduler_conns": ["*internal"], +}, +} \ No newline at end of file diff --git a/data/conf/samples/sipagent_mysql/redirect.json b/data/conf/samples/sipagent_mysql/redirect.json new file mode 100644 index 000000000..5cb2de3fd --- /dev/null +++ b/data/conf/samples/sipagent_mysql/redirect.json @@ -0,0 +1,40 @@ +{ + "sip_agent": { + "listen_net": "tcp", // network to listen on + "request_processors": [ + { + "id": "Register", + "filters": ["*prefix:~*req.Request:REGISTER"], + "flags": ["*none"], + "request_fields":[], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 200 OK"}, + {"tag": "Authorization", "path": "*rep.Authorization", "type": "*remove"} + ] + }, + { + "id": "InviteRedirect", + "filters": ["*prefix:~*req.Request:INVITE"], + "flags": ["*attributes","*event"], + "request_fields":[ + {"tag": "Account", "path": "*cgreq.Account", "type": "*variable", + "value": "~*req.From", "mandatory": true}, + {"tag": "Destination", "path": "*cgreq.Destination", "type": "*variable", + "value": "~*req.To", "mandatory": true} + ], + "reply_fields":[ + {"tag": "Request", "path": "*rep.Request", "type": "*constant", + "value": "SIP/2.0 302 Moved Temporarily"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"<"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":"~*cgrep.Attributes.Destination"}, + {"tag": "Contact", "path": "*rep.Contact", "type": "*composed", + "value":">"} + ] + } + ] + } + +} diff --git a/engine/attributes.go b/engine/attributes.go index 02736d42a..69d3e6db2 100644 --- a/engine/attributes.go +++ b/engine/attributes.go @@ -94,8 +94,11 @@ func (alS *AttributeService) attributeProfileForEvent(args *AttrArgsProcessEvent } attrIDs = aPrflIDs.Slice() } - evNm := config.NewNavigableMap(nil) - evNm.Set([]string{utils.MetaReq}, args.Event, false, false) + + evNm := config.NewNavigableMap(map[string]interface{}{ + utils.MetaReq: args.Event, + utils.MetaOpts: args.Opts, + }) for _, apID := range attrIDs { aPrfl, err := alS.dm.GetAttributeProfile(args.Tenant, apID, true, true, utils.NonTransactional) if err != nil { diff --git a/engine/filterhelpers.go b/engine/filterhelpers.go index 4ffbc6f5b..b355a8191 100644 --- a/engine/filterhelpers.go +++ b/engine/filterhelpers.go @@ -61,9 +61,9 @@ func MatchingItemIDsForEvent(ev map[string]interface{}, stringFldIDs, prefixFldI itemIDs = utils.StringMapFromSlice(sliceIDs) return } - stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one - filterIndexTypes := []string{utils.MetaString, utils.MetaPrefix, utils.META_NONE} // the META_NONE is used for all items that do not have filters - for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, &[]string{utils.ANY}} { // same routine for both string and prefix filter types + stringFieldVals := map[string]string{utils.ANY: utils.ANY} // cache here field string values, start with default one + filterIndexTypes := []string{utils.MetaString, utils.MetaPrefix, utils.META_NONE} // the META_NONE is used for all items that do not have filters + for i, fieldIDs := range []*[]string{stringFldIDs, prefixFldIDs, {utils.ANY}} { // same routine for both string and prefix filter types if fieldIDs == nil { fieldIDs = &allFieldIDs } diff --git a/engine/libengine.go b/engine/libengine.go index ab4145728..0ab025cd1 100644 --- a/engine/libengine.go +++ b/engine/libengine.go @@ -62,20 +62,18 @@ func NewRPCPool(dispatchStrategy string, keyPath, certPath, caPath string, connA } // IntRPC is the global variable that is used to comunicate with all the subsystems internally -var IntRPC *RPCClientSet +var IntRPC RPCClientSet // NewRPCClientSet initilalizates the map of connections -func NewRPCClientSet() (s *RPCClientSet) { - return &RPCClientSet{set: make(map[string]*rpcclient.RPCClient)} +func NewRPCClientSet() (s RPCClientSet) { + return make(RPCClientSet) } // RPCClientSet is a RPC ClientConnector for the internal subsystems -type RPCClientSet struct { - set map[string]*rpcclient.RPCClient -} +type RPCClientSet map[string]*rpcclient.RPCClient // AddInternalRPCClient creates and adds to the set a new rpc client using the provided configuration -func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.ClientConnector) { +func (s RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient.ClientConnector) { rpc, err := rpcclient.NewRPCClient(utils.EmptyString, utils.EmptyString, false, utils.EmptyString, utils.EmptyString, utils.EmptyString, config.CgrConfig().GeneralCfg().ConnectAttempts, config.CgrConfig().GeneralCfg().Reconnects, @@ -85,25 +83,33 @@ func (s *RPCClientSet) AddInternalRPCClient(name string, connChan chan rpcclient utils.Logger.Err(fmt.Sprintf("<%s> Error adding %s to the set: %s", utils.InternalRPCSet, name, err.Error())) return } - s.set[name] = rpc + s[name] = rpc } // GetInternalChanel is used when RPCClientSet is passed as internal connection for RPCPool -func (s *RPCClientSet) GetInternalChanel() chan rpcclient.ClientConnector { +func (s RPCClientSet) GetInternalChanel() chan rpcclient.ClientConnector { connChan := make(chan rpcclient.ClientConnector, 1) connChan <- s return connChan } // Call the implementation of the rpcclient.ClientConnector interface -func (s *RPCClientSet) Call(method string, args interface{}, reply interface{}) error { +func (s RPCClientSet) Call(method string, args interface{}, reply interface{}) error { methodSplit := strings.Split(method, ".") if len(methodSplit) != 2 { return rpcclient.ErrUnsupporteServiceMethod } - conn, has := s.set[methodSplit[0]] + conn, has := s[methodSplit[0]] if !has { return rpcclient.ErrUnsupporteServiceMethod } return conn.Call(method, args, reply) } + +// func (s RPCClientSet) ReconnectInternals(subsystems ...string) (err error) { +// for _, subsystem := range subsystems { +// if err = s[subsystem].Reconnect(); err != nil { +// return +// } +// } +// } diff --git a/go.mod b/go.mod index 3823dd8b0..d120935df 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/cgrates/ltcache v0.0.0-20181016092649-92fb7fa77cca github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453 + github.com/cgrates/sipd v1.0.0 github.com/creack/pty v1.1.7 github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/fiorix/go-diameter v3.0.3-0.20190716165154-f4823472d0e0+incompatible @@ -56,6 +57,7 @@ require ( go.opencensus.io v0.22.1-0.20190713072201-b4a14686f0a9 // indirect golang.org/x/net v0.0.0-20190909003024-a7b16738d86b golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 + golang.org/x/sys v0.0.0-20190904154756-749cb33beabd google.golang.org/api v0.10.0 pack.ag/amqp v0.12.2 ) diff --git a/go.sum b/go.sum index c43e2ccd6..641c33341 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1 h1:QvA6Nbwq9kTd7hsv github.com/cgrates/radigo v0.0.0-20200324152710-35e651804ad1/go.mod h1:HZbsg3Y+xw4lsfCqX9rzj429wrg0XOug6pFT3B6VHZY= github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453 h1:kgIdi3qR/meiWILdmDRuDi1fFgd6A3lutGV6HLiTDyc= github.com/cgrates/rpcclient v0.0.0-20200326100105-a579e2c47453/go.mod h1:xXLqAKVvcdWeDYwHJYwDgAI3ZOg5LZYxzb72kLjsLZU= +github.com/cgrates/sipd v1.0.0 h1:9CPVaWCYBAWGVyzsEtv+VbG6kMk+sHml6kx3OvYQnkc= +github.com/cgrates/sipd v1.0.0/go.mod h1:Itz4HoJHqckX9XAbGRanB1PvXfKrIO/UtMB8JwCFsT4= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/creack/pty v1.1.7 h1:6pwm8kMQKCmgUg0ZHTm5+/YvRK0s3THD/28+T6/kk4A= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= diff --git a/packages/debian/changelog b/packages/debian/changelog index 8e805e467..5b6b9f8ef 100644 --- a/packages/debian/changelog +++ b/packages/debian/changelog @@ -1,4 +1,5 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium + [ DanB ] * [FilterS] Renamed rals_conns to apiers_conns * [FilterS] Updated *destination filter to get ReverseDestination form API @@ -58,8 +59,9 @@ cgrates (0.11.0~dev) UNRELEASED; urgency=medium * [AgentS] FieldAsInterface return data instead of NMItem * [RouteS] Add posibility to load routes with the sameID and different filters * [RouteS] Correctly populate Sorting out of models + * [AgentS] Added SIPAgent for SIP redirection - -- Alexandru Tripon Wed, 19 Feb 2020 13:25:52 +0200 + -- DanB Wed, 19 Feb 2020 13:25:52 +0200 cgrates (0.10.0) UNRELEASED; urgency=medium diff --git a/services/sipagent.go b/services/sipagent.go new file mode 100644 index 000000000..2c764fcb1 --- /dev/null +++ b/services/sipagent.go @@ -0,0 +1,134 @@ +/* +Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments +Copyright (C) ITsysCOM GmbH + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +package services + +import ( + "fmt" + "sync" + + "github.com/cgrates/cgrates/agents" + "github.com/cgrates/cgrates/config" + "github.com/cgrates/cgrates/engine" + "github.com/cgrates/cgrates/servmanager" + "github.com/cgrates/cgrates/utils" + "github.com/cgrates/rpcclient" +) + +// NewSIPAgent returns the sip Agent +func NewSIPAgent(cfg *config.CGRConfig, filterSChan chan *engine.FilterS, + exitChan chan bool, connMgr *engine.ConnManager) servmanager.Service { + return &SIPAgent{ + cfg: cfg, + filterSChan: filterSChan, + exitChan: exitChan, + connMgr: connMgr, + } +} + +// SIPAgent implements Agent interface +type SIPAgent struct { + sync.RWMutex + cfg *config.CGRConfig + filterSChan chan *engine.FilterS + exitChan chan bool + + sip *agents.SIPAgent + connMgr *engine.ConnManager + + oldListen string +} + +// Start should handle the sercive start +func (sip *SIPAgent) Start() (err error) { + if sip.IsRunning() { + return fmt.Errorf("service aleady running") + } + + filterS := <-sip.filterSChan + sip.filterSChan <- filterS + + sip.Lock() + defer sip.Unlock() + sip.oldListen = sip.cfg.SIPAgentCfg().Listen + sip.sip = agents.NewSIPAgent(sip.connMgr, sip.cfg, filterS) + go func() { + if err = sip.sip.ListenAndServe(); err != nil { + utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) + sip.exitChan <- true // stop the engine here + } + }() + return +} + +// GetIntenternalChan returns the internal connection chanel +// no chanel for SIPAgent +func (sip *SIPAgent) GetIntenternalChan() (conn chan rpcclient.ClientConnector) { + return nil +} + +// Reload handles the change of config +func (sip *SIPAgent) Reload() (err error) { + // if sip.oldListen == sip.cfg.SIPAgentCfg().Listen { + // return + // } + // if err = sip.Shutdown(); err != nil { + // return + // } + // sip.Lock() + // sip.oldListen = sip.cfg.SIPAgentCfg().Listen + // defer sip.Unlock() + // if err = sip.sip.Reload(); err != nil { + // return + // } + // go func() { + // if err := sip.sip.ListenAndServe(); err != nil { + // utils.Logger.Err(fmt.Sprintf("<%s> error: <%s>", utils.SIPAgent, err.Error())) + // sip.exitChan <- true // stop the engine here + // } + // }() + return +} + +// Shutdown stops the service +func (sip *SIPAgent) Shutdown() (err error) { + sip.Lock() + defer sip.Unlock() + // if err = sip.sip.Shutdown(); err != nil { + // return + // } + sip.sip = nil + return +} + +// IsRunning returns if the service is running +func (sip *SIPAgent) IsRunning() bool { + sip.RLock() + defer sip.RUnlock() + return sip != nil && sip.sip != nil +} + +// ServiceName returns the service name +func (sip *SIPAgent) ServiceName() string { + return utils.SIPAgent +} + +// ShouldRun returns if the service should be running +func (sip *SIPAgent) ShouldRun() bool { + return sip.cfg.SIPAgentCfg().Enabled +} diff --git a/servmanager/servmanager.go b/servmanager/servmanager.go index bf40534a8..2124d5353 100644 --- a/servmanager/servmanager.go +++ b/servmanager/servmanager.go @@ -174,6 +174,7 @@ func (srvMngr *ServiceManager) StartServices() (err error) { utils.DispatcherS: srvMngr.GetConfig().DispatcherSCfg().Enabled, utils.EventExporterS: srvMngr.GetConfig().EEsCfg().Enabled, utils.RateS: srvMngr.GetConfig().RateSCfg().Enabled, + utils.SIPAgent: srvMngr.GetConfig().SIPAgentCfg().Enabled, } { if shouldRun { go srvMngr.startService(serviceName) @@ -320,6 +321,10 @@ func (srvMngr *ServiceManager) handleReload() { } case <-srvMngr.GetConfig().GetReloadChan(config.RPCConnsJsonName): engine.Cache.Clear([]string{utils.CacheRPCConnections}) + case <-srvMngr.GetConfig().GetReloadChan(config.SIPAgentJson): + if err = srvMngr.reloadService(utils.SIPAgent); err != nil { + return + } } // handle RPC server } diff --git a/utils/consts.go b/utils/consts.go index 7ff5b3e19..7c04d1049 100755 --- a/utils/consts.go +++ b/utils/consts.go @@ -636,6 +636,7 @@ const ( RemoteHost = "RemoteHost" Local = "local" TCP = "tcp" + UDP = "udp" CGRDebitInterval = "CGRDebitInterval" Version = "Version" MetaTenant = "*tenant" @@ -1637,6 +1638,7 @@ const ( FreeSWITCHAgent = "FreeSWITCHAgent" AsteriskAgent = "AsteriskAgent" HTTPAgent = "HTTPAgent" + SIPAgent = "SIPAgent" ) // Poster